1use crate::{InstanceId, NodeId, TaskId};
4use bytes::Bytes;
5use chrono::{DateTime, Utc};
6use serde::{Deserialize, Serialize};
7use std::time::Duration;
8
9#[derive(Clone, Debug, Serialize, Deserialize)]
11pub struct TaskDef {
12 pub id: TaskId,
14 pub name: String,
16 pub schedule: Schedule,
18 pub config: TaskConfig,
20 pub enabled: bool,
22 pub hlc_timestamp: u64,
24 pub version: u64,
26 pub created_at: DateTime<Utc>,
28 pub updated_at: DateTime<Utc>,
30}
31
32impl TaskDef {
33 pub fn new(name: impl Into<String>, schedule: Schedule) -> Self {
35 let now = Utc::now();
36 Self {
37 id: TaskId::new(),
38 name: name.into(),
39 schedule,
40 config: TaskConfig::default(),
41 enabled: true,
42 hlc_timestamp: 0,
43 version: 0,
44 created_at: now,
45 updated_at: now,
46 }
47 }
48
49 pub fn with_config(mut self, config: TaskConfig) -> Self {
51 self.config = config;
52 self
53 }
54
55 pub fn with_retry(mut self, retry: RetryPolicy) -> Self {
57 self.config.retry = retry;
58 self
59 }
60
61 pub fn with_timeout(mut self, timeout: Duration) -> Self {
63 self.config.timeout = timeout;
64 self
65 }
66
67 pub fn with_payload(mut self, payload: Bytes) -> Self {
69 self.config.payload = Some(payload);
70 self
71 }
72}
73
74#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
76pub enum Schedule {
77 Cron(String),
79 Interval(Duration),
81 Once(DateTime<Utc>),
83}
84
85impl Schedule {
86 pub fn cron(expr: impl Into<String>) -> Self {
88 Schedule::Cron(expr.into())
89 }
90
91 pub fn interval(duration: Duration) -> Self {
93 Schedule::Interval(duration)
94 }
95
96 pub fn once(at: DateTime<Utc>) -> Self {
98 Schedule::Once(at)
99 }
100
101 pub fn delay(duration: Duration) -> Self {
103 Schedule::Once(Utc::now() + chrono::Duration::from_std(duration).unwrap())
104 }
105}
106
107#[derive(Clone, Debug, Serialize, Deserialize)]
109pub struct TaskConfig {
110 pub timeout: Duration,
112 pub retry: RetryPolicy,
114 pub payload: Option<Bytes>,
116 pub handler_id: String,
118}
119
120impl Default for TaskConfig {
121 fn default() -> Self {
122 Self {
123 timeout: Duration::from_secs(300), retry: RetryPolicy::default(),
125 payload: None,
126 handler_id: String::new(),
127 }
128 }
129}
130
131#[derive(Clone, Debug, Serialize, Deserialize)]
133pub struct RetryPolicy {
134 pub max_attempts: u32,
136 pub initial_delay: Duration,
138 pub max_delay: Duration,
140 pub multiplier: f64,
142}
143
144impl Default for RetryPolicy {
145 fn default() -> Self {
146 Self {
147 max_attempts: 3,
148 initial_delay: Duration::from_secs(1),
149 max_delay: Duration::from_secs(300),
150 multiplier: 2.0,
151 }
152 }
153}
154
155impl RetryPolicy {
156 pub fn none() -> Self {
158 Self {
159 max_attempts: 0,
160 ..Default::default()
161 }
162 }
163
164 pub fn exponential(max_attempts: u32, initial_delay: Duration) -> Self {
166 Self {
167 max_attempts,
168 initial_delay,
169 multiplier: 2.0,
170 ..Default::default()
171 }
172 }
173
174 pub fn fixed(max_attempts: u32, delay: Duration) -> Self {
176 Self {
177 max_attempts,
178 initial_delay: delay,
179 max_delay: delay,
180 multiplier: 1.0,
181 }
182 }
183
184 pub fn delay_for_attempt(&self, attempt: u32) -> Duration {
186 if attempt == 0 {
187 return Duration::ZERO;
188 }
189 let delay = self.initial_delay.as_secs_f64() * self.multiplier.powi(attempt as i32 - 1);
190 let delay = Duration::from_secs_f64(delay);
191 std::cmp::min(delay, self.max_delay)
192 }
193}
194
195#[derive(Clone, Debug, Serialize, Deserialize)]
197pub struct TaskInstance {
198 pub id: InstanceId,
200 pub task_id: TaskId,
202 pub scheduled_at: DateTime<Utc>,
204 pub status: TaskStatus,
206 pub claimed_by: Option<NodeId>,
208 pub claim_version: u64,
210 pub started_at: Option<DateTime<Utc>>,
212 pub completed_at: Option<DateTime<Utc>>,
214 pub result: Option<Bytes>,
216 pub error: Option<String>,
218 pub attempt: u32,
220}
221
222impl TaskInstance {
223 pub fn new(task_id: TaskId, scheduled_at: DateTime<Utc>) -> Self {
225 Self {
226 id: InstanceId::new(),
227 task_id,
228 scheduled_at,
229 status: TaskStatus::Pending,
230 claimed_by: None,
231 claim_version: 0,
232 started_at: None,
233 completed_at: None,
234 result: None,
235 error: None,
236 attempt: 0,
237 }
238 }
239
240 pub fn is_ready(&self) -> bool {
242 self.status == TaskStatus::Pending && self.scheduled_at <= Utc::now()
243 }
244
245 pub fn can_retry(&self, policy: &RetryPolicy) -> bool {
247 self.status == TaskStatus::Failed && self.attempt < policy.max_attempts
248 }
249}
250
251#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
253pub enum TaskStatus {
254 Pending,
256 Claimed,
258 Running,
260 Success,
262 Failed,
264 Cancelled,
266}
267
268impl TaskStatus {
269 pub fn is_terminal(&self) -> bool {
271 matches!(self, TaskStatus::Success | TaskStatus::Cancelled)
272 }
273
274 pub fn as_str(&self) -> &'static str {
276 match self {
277 TaskStatus::Pending => "pending",
278 TaskStatus::Claimed => "claimed",
279 TaskStatus::Running => "running",
280 TaskStatus::Success => "success",
281 TaskStatus::Failed => "failed",
282 TaskStatus::Cancelled => "cancelled",
283 }
284 }
285}
286
287impl std::str::FromStr for TaskStatus {
288 type Err = String;
289
290 fn from_str(s: &str) -> Result<Self, Self::Err> {
291 match s {
292 "pending" => Ok(TaskStatus::Pending),
293 "claimed" => Ok(TaskStatus::Claimed),
294 "running" => Ok(TaskStatus::Running),
295 "success" => Ok(TaskStatus::Success),
296 "failed" => Ok(TaskStatus::Failed),
297 "cancelled" => Ok(TaskStatus::Cancelled),
298 _ => Err(format!("unknown task status: {}", s)),
299 }
300 }
301}
302
303#[derive(Clone, Debug)]
305pub struct TaskContext {
306 pub task_id: TaskId,
308 pub instance_id: InstanceId,
310 pub scheduled_at: DateTime<Utc>,
312 pub attempt: u32,
314 payload: Bytes,
316}
317
318impl TaskContext {
319 pub fn new(
321 task_id: TaskId,
322 instance_id: InstanceId,
323 scheduled_at: DateTime<Utc>,
324 attempt: u32,
325 payload: Bytes,
326 ) -> Self {
327 Self {
328 task_id,
329 instance_id,
330 scheduled_at,
331 attempt,
332 payload,
333 }
334 }
335
336 pub fn payload_bytes(&self) -> &Bytes {
338 &self.payload
339 }
340
341 pub fn payload<T: serde::de::DeserializeOwned>(&self) -> Result<T, bincode::Error> {
343 bincode::deserialize(&self.payload)
344 }
345}
346
347#[cfg(test)]
348mod tests {
349 use super::*;
350
351 #[test]
352 fn test_retry_policy_delay() {
353 let policy = RetryPolicy::exponential(5, Duration::from_secs(1));
354
355 assert_eq!(policy.delay_for_attempt(0), Duration::ZERO);
356 assert_eq!(policy.delay_for_attempt(1), Duration::from_secs(1));
357 assert_eq!(policy.delay_for_attempt(2), Duration::from_secs(2));
358 assert_eq!(policy.delay_for_attempt(3), Duration::from_secs(4));
359 }
360
361 #[test]
362 fn test_task_status_parse() {
363 assert_eq!("pending".parse::<TaskStatus>().unwrap(), TaskStatus::Pending);
364 assert_eq!("running".parse::<TaskStatus>().unwrap(), TaskStatus::Running);
365 }
366
367 #[test]
368 fn test_task_instance_ready() {
369 let task_id = TaskId::new();
370 let past = Utc::now() - chrono::Duration::hours(1);
371 let future = Utc::now() + chrono::Duration::hours(1);
372
373 let past_instance = TaskInstance::new(task_id.clone(), past);
374 let future_instance = TaskInstance::new(task_id, future);
375
376 assert!(past_instance.is_ready());
377 assert!(!future_instance.is_ready());
378 }
379}