Skip to main content

taskmill/
task.rs

1use chrono::{DateTime, Utc};
2use serde::de::DeserializeOwned;
3use serde::{Deserialize, Serialize};
4use sha2::{Digest, Sha256};
5
6use crate::priority::Priority;
7
8/// Maximum payload size in bytes (1 MiB).
9pub const MAX_PAYLOAD_BYTES: usize = 1_048_576;
10
11/// Lifecycle state of a task in the active queue.
12#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
13#[serde(rename_all = "snake_case")]
14pub enum TaskStatus {
15    Pending,
16    Running,
17    Paused,
18}
19
20impl TaskStatus {
21    pub fn as_str(self) -> &'static str {
22        match self {
23            Self::Pending => "pending",
24            Self::Running => "running",
25            Self::Paused => "paused",
26        }
27    }
28}
29
30impl std::str::FromStr for TaskStatus {
31    type Err = String;
32
33    fn from_str(s: &str) -> Result<Self, Self::Err> {
34        match s {
35            "pending" => Ok(Self::Pending),
36            "running" => Ok(Self::Running),
37            "paused" => Ok(Self::Paused),
38            other => Err(format!("unknown TaskStatus: {other}")),
39        }
40    }
41}
42
43/// Terminal state of a task in history.
44#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
45#[serde(rename_all = "snake_case")]
46pub enum HistoryStatus {
47    Completed,
48    Failed,
49}
50
51impl HistoryStatus {
52    pub fn as_str(self) -> &'static str {
53        match self {
54            Self::Completed => "completed",
55            Self::Failed => "failed",
56        }
57    }
58}
59
60impl std::str::FromStr for HistoryStatus {
61    type Err = String;
62
63    fn from_str(s: &str) -> Result<Self, Self::Err> {
64        match s {
65            "completed" => Ok(Self::Completed),
66            "failed" => Ok(Self::Failed),
67            other => Err(format!("unknown HistoryStatus: {other}")),
68        }
69    }
70}
71
72/// A task in the active queue (pending, running, or paused).
73#[derive(Debug, Clone, Serialize, Deserialize)]
74pub struct TaskRecord {
75    pub id: i64,
76    pub task_type: String,
77    pub key: String,
78    pub priority: Priority,
79    pub status: TaskStatus,
80    pub payload: Option<Vec<u8>>,
81    pub expected_read_bytes: i64,
82    pub expected_write_bytes: i64,
83    pub retry_count: i32,
84    pub last_error: Option<String>,
85    pub created_at: DateTime<Utc>,
86    pub started_at: Option<DateTime<Utc>>,
87    pub requeue: bool,
88    pub requeue_priority: Option<Priority>,
89}
90
91impl TaskRecord {
92    /// Deserialize the payload blob into a typed value.
93    ///
94    /// Returns `None` if the payload is absent, or an error if deserialization fails.
95    pub fn deserialize_payload<T: serde::de::DeserializeOwned>(
96        &self,
97    ) -> Result<Option<T>, serde_json::Error> {
98        match &self.payload {
99            Some(bytes) => serde_json::from_slice(bytes).map(Some),
100            None => Ok(None),
101        }
102    }
103}
104
105/// A task that has completed or permanently failed.
106#[derive(Debug, Clone, Serialize, Deserialize)]
107pub struct TaskHistoryRecord {
108    pub id: i64,
109    pub task_type: String,
110    pub key: String,
111    pub priority: Priority,
112    pub status: HistoryStatus,
113    pub payload: Option<Vec<u8>>,
114    pub expected_read_bytes: i64,
115    pub expected_write_bytes: i64,
116    pub actual_read_bytes: Option<i64>,
117    pub actual_write_bytes: Option<i64>,
118    pub retry_count: i32,
119    pub last_error: Option<String>,
120    pub created_at: DateTime<Utc>,
121    pub started_at: Option<DateTime<Utc>>,
122    pub completed_at: DateTime<Utc>,
123    pub duration_ms: Option<i64>,
124}
125
126/// Reported by the executor on successful completion.
127#[derive(Debug, Clone, Serialize, Deserialize)]
128pub struct TaskResult {
129    pub actual_read_bytes: i64,
130    pub actual_write_bytes: i64,
131}
132
133/// Reported by the executor on failure.
134#[derive(Debug, Clone, Serialize, Deserialize)]
135pub struct TaskError {
136    pub message: String,
137    pub retryable: bool,
138    pub actual_read_bytes: i64,
139    pub actual_write_bytes: i64,
140}
141
142impl std::fmt::Display for TaskError {
143    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
144        write!(f, "{}", self.message)
145    }
146}
147
148impl std::error::Error for TaskError {}
149
150/// Result of a task submission attempt.
151#[derive(Debug, Clone, PartialEq, Eq)]
152pub enum SubmitOutcome {
153    /// Task was inserted as new.
154    Inserted(i64),
155    /// Duplicate key existed; its priority was upgraded (pending/paused tasks only).
156    Upgraded(i64),
157    /// Duplicate key existed and is running/paused; marked for re-queue after completion.
158    Requeued(i64),
159    /// Duplicate key existed; no changes were made.
160    Duplicate,
161}
162
163impl SubmitOutcome {
164    /// Returns the task ID if the task was inserted, upgraded, or requeued.
165    pub fn id(&self) -> Option<i64> {
166        match self {
167            Self::Inserted(id) | Self::Upgraded(id) | Self::Requeued(id) => Some(*id),
168            Self::Duplicate => None,
169        }
170    }
171
172    /// Returns `true` if a new task was inserted.
173    pub fn is_inserted(&self) -> bool {
174        matches!(self, Self::Inserted(_))
175    }
176}
177
178/// Generate a dedup key by hashing the task type and payload.
179///
180/// Produces a hex-encoded SHA-256 digest of `task_type` concatenated with
181/// the payload bytes (or an empty slice when there is no payload).
182pub fn generate_dedup_key(task_type: &str, payload: Option<&[u8]>) -> String {
183    let mut hasher = Sha256::new();
184    hasher.update(task_type.as_bytes());
185    hasher.update(b":");
186    if let Some(p) = payload {
187        hasher.update(p);
188    }
189    format!("{:x}", hasher.finalize())
190}
191
192/// Parameters for submitting a new task.
193#[derive(Debug, Clone, Serialize, Deserialize)]
194pub struct TaskSubmission {
195    pub task_type: String,
196    /// Optional dedup key. When `None`, the key is auto-generated by hashing
197    /// `task_type` and `payload`, so two submissions with the same type and
198    /// payload are deduplicated automatically.
199    pub key: Option<String>,
200    pub priority: Priority,
201    pub payload: Option<Vec<u8>>,
202    pub expected_read_bytes: i64,
203    pub expected_write_bytes: i64,
204}
205
206impl TaskSubmission {
207    /// Resolve the effective dedup key. Always incorporates the task type
208    /// so different task types never collide, even with the same logical key.
209    ///
210    /// - Explicit key: `hash(task_type + ":" + key)`
211    /// - No key: `hash(task_type + ":" + payload)`
212    pub fn effective_key(&self) -> String {
213        match &self.key {
214            Some(k) => generate_dedup_key(&self.task_type, Some(k.as_bytes())),
215            None => generate_dedup_key(&self.task_type, self.payload.as_deref()),
216        }
217    }
218
219    /// Create a submission with a typed payload serialized to JSON bytes.
220    ///
221    /// The dedup key is auto-generated from the task type and serialized payload.
222    /// Use `TaskRecord::deserialize_payload()` on the executor side to recover the type.
223    pub fn with_payload<T: serde::Serialize>(
224        task_type: &str,
225        priority: Priority,
226        data: &T,
227        expected_read_bytes: i64,
228        expected_write_bytes: i64,
229    ) -> Result<Self, serde_json::Error> {
230        let payload = serde_json::to_vec(data)?;
231        Ok(Self {
232            task_type: task_type.to_string(),
233            key: None,
234            priority,
235            payload: Some(payload),
236            expected_read_bytes,
237            expected_write_bytes,
238        })
239    }
240}
241
242/// A strongly-typed task that bundles serialization, task type name, and default
243/// IO estimates.
244///
245/// Implementing this trait collapses the 6 fields of [`TaskSubmission`] into a
246/// derive-friendly pattern. Use [`Scheduler::submit_typed`] to submit and
247/// [`TaskContext::deserialize_typed`] on the executor side.
248///
249/// # Example
250///
251/// ```ignore
252/// use serde::{Serialize, Deserialize};
253/// use taskmill::{TypedTask, Priority};
254///
255/// #[derive(Serialize, Deserialize)]
256/// struct Thumbnail { path: String, size: u32 }
257///
258/// impl TypedTask for Thumbnail {
259///     const TASK_TYPE: &'static str = "thumbnail";
260///     fn expected_read_bytes(&self) -> i64 { 4096 }
261///     fn expected_write_bytes(&self) -> i64 { 1024 }
262/// }
263/// ```
264pub trait TypedTask: Serialize + DeserializeOwned + Send + 'static {
265    /// Unique name used to register and look up the executor.
266    const TASK_TYPE: &'static str;
267
268    /// Estimated bytes this task will read. Default: 0.
269    fn expected_read_bytes(&self) -> i64 {
270        0
271    }
272
273    /// Estimated bytes this task will write. Default: 0.
274    fn expected_write_bytes(&self) -> i64 {
275        0
276    }
277
278    /// Scheduling priority. Default: [`Priority::NORMAL`].
279    fn priority(&self) -> Priority {
280        Priority::NORMAL
281    }
282}
283
284impl TaskSubmission {
285    /// Create a submission from a [`TypedTask`], serializing the payload and
286    /// pulling task type, priority, and IO estimates from the trait.
287    pub fn from_typed<T: TypedTask>(task: &T) -> Result<Self, serde_json::Error> {
288        let payload = serde_json::to_vec(task)?;
289        Ok(Self {
290            task_type: T::TASK_TYPE.to_string(),
291            key: None,
292            priority: task.priority(),
293            payload: Some(payload),
294            expected_read_bytes: task.expected_read_bytes(),
295            expected_write_bytes: task.expected_write_bytes(),
296        })
297    }
298}
299
300/// Unified lookup result for querying a task by its dedup inputs.
301///
302/// Returned by [`TaskStore::task_lookup`] and [`Scheduler::task_lookup`].
303/// Tells the caller whether a task is currently active (pending, running,
304/// or paused) or has finished (completed or failed), without requiring
305/// them to manually compute the dedup key or query two tables.
306#[derive(Debug, Clone, Serialize, Deserialize)]
307#[serde(tag = "location", content = "record")]
308pub enum TaskLookup {
309    /// Task is in the active queue (pending, running, or paused).
310    Active(TaskRecord),
311    /// Task has finished and is in the history table.
312    /// Contains the most recent history entry for that key.
313    History(TaskHistoryRecord),
314    /// No task with this key exists in either table.
315    NotFound,
316}
317
318/// Aggregate statistics for a task type from history.
319#[derive(Debug, Clone, Default, Serialize, Deserialize)]
320pub struct TypeStats {
321    pub count: i64,
322    pub avg_duration_ms: f64,
323    pub avg_read_bytes: f64,
324    pub avg_write_bytes: f64,
325    pub failure_rate: f64,
326}
327
328#[cfg(test)]
329mod tests {
330    use super::*;
331
332    #[derive(Serialize, Deserialize, Debug, PartialEq)]
333    struct Thumbnail {
334        path: String,
335        size: u32,
336    }
337
338    impl TypedTask for Thumbnail {
339        const TASK_TYPE: &'static str = "thumbnail";
340
341        fn expected_read_bytes(&self) -> i64 {
342            4096
343        }
344
345        fn expected_write_bytes(&self) -> i64 {
346            1024
347        }
348    }
349
350    #[test]
351    fn typed_task_to_submission() {
352        let task = Thumbnail {
353            path: "/photos/a.jpg".into(),
354            size: 256,
355        };
356        let sub = TaskSubmission::from_typed(&task).unwrap();
357
358        assert_eq!(sub.task_type, "thumbnail");
359        assert_eq!(sub.priority, Priority::NORMAL);
360        assert_eq!(sub.expected_read_bytes, 4096);
361        assert_eq!(sub.expected_write_bytes, 1024);
362        assert!(sub.key.is_none());
363
364        // Payload round-trips correctly.
365        let recovered: Thumbnail = serde_json::from_slice(sub.payload.as_ref().unwrap()).unwrap();
366        assert_eq!(recovered, task);
367    }
368
369    #[test]
370    fn typed_task_custom_priority() {
371        #[derive(Serialize, Deserialize)]
372        struct Urgent {
373            id: u64,
374        }
375
376        impl TypedTask for Urgent {
377            const TASK_TYPE: &'static str = "urgent";
378
379            fn priority(&self) -> Priority {
380                Priority::HIGH
381            }
382        }
383
384        let sub = TaskSubmission::from_typed(&Urgent { id: 42 }).unwrap();
385        assert_eq!(sub.priority, Priority::HIGH);
386        assert_eq!(sub.task_type, "urgent");
387    }
388
389    #[test]
390    fn typed_task_defaults() {
391        #[derive(Serialize, Deserialize)]
392        struct Minimal;
393
394        impl TypedTask for Minimal {
395            const TASK_TYPE: &'static str = "minimal";
396        }
397
398        let sub = TaskSubmission::from_typed(&Minimal).unwrap();
399        assert_eq!(sub.expected_read_bytes, 0);
400        assert_eq!(sub.expected_write_bytes, 0);
401        assert_eq!(sub.priority, Priority::NORMAL);
402    }
403}