1use chrono::{DateTime, Utc};
2use serde::de::DeserializeOwned;
3use serde::{Deserialize, Serialize};
4use sha2::{Digest, Sha256};
5
6use crate::priority::Priority;
7
8pub const MAX_PAYLOAD_BYTES: usize = 1_048_576;
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
13#[serde(rename_all = "snake_case")]
14pub enum TaskStatus {
15 Pending,
16 Running,
17 Paused,
18}
19
20impl TaskStatus {
21 pub fn as_str(self) -> &'static str {
22 match self {
23 Self::Pending => "pending",
24 Self::Running => "running",
25 Self::Paused => "paused",
26 }
27 }
28}
29
30impl std::str::FromStr for TaskStatus {
31 type Err = String;
32
33 fn from_str(s: &str) -> Result<Self, Self::Err> {
34 match s {
35 "pending" => Ok(Self::Pending),
36 "running" => Ok(Self::Running),
37 "paused" => Ok(Self::Paused),
38 other => Err(format!("unknown TaskStatus: {other}")),
39 }
40 }
41}
42
43#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
45#[serde(rename_all = "snake_case")]
46pub enum HistoryStatus {
47 Completed,
48 Failed,
49}
50
51impl HistoryStatus {
52 pub fn as_str(self) -> &'static str {
53 match self {
54 Self::Completed => "completed",
55 Self::Failed => "failed",
56 }
57 }
58}
59
60impl std::str::FromStr for HistoryStatus {
61 type Err = String;
62
63 fn from_str(s: &str) -> Result<Self, Self::Err> {
64 match s {
65 "completed" => Ok(Self::Completed),
66 "failed" => Ok(Self::Failed),
67 other => Err(format!("unknown HistoryStatus: {other}")),
68 }
69 }
70}
71
72#[derive(Debug, Clone, Serialize, Deserialize)]
74pub struct TaskRecord {
75 pub id: i64,
76 pub task_type: String,
77 pub key: String,
78 pub priority: Priority,
79 pub status: TaskStatus,
80 pub payload: Option<Vec<u8>>,
81 pub expected_read_bytes: i64,
82 pub expected_write_bytes: i64,
83 pub retry_count: i32,
84 pub last_error: Option<String>,
85 pub created_at: DateTime<Utc>,
86 pub started_at: Option<DateTime<Utc>>,
87 pub requeue: bool,
88 pub requeue_priority: Option<Priority>,
89}
90
91impl TaskRecord {
92 pub fn deserialize_payload<T: serde::de::DeserializeOwned>(
96 &self,
97 ) -> Result<Option<T>, serde_json::Error> {
98 match &self.payload {
99 Some(bytes) => serde_json::from_slice(bytes).map(Some),
100 None => Ok(None),
101 }
102 }
103}
104
105#[derive(Debug, Clone, Serialize, Deserialize)]
107pub struct TaskHistoryRecord {
108 pub id: i64,
109 pub task_type: String,
110 pub key: String,
111 pub priority: Priority,
112 pub status: HistoryStatus,
113 pub payload: Option<Vec<u8>>,
114 pub expected_read_bytes: i64,
115 pub expected_write_bytes: i64,
116 pub actual_read_bytes: Option<i64>,
117 pub actual_write_bytes: Option<i64>,
118 pub retry_count: i32,
119 pub last_error: Option<String>,
120 pub created_at: DateTime<Utc>,
121 pub started_at: Option<DateTime<Utc>>,
122 pub completed_at: DateTime<Utc>,
123 pub duration_ms: Option<i64>,
124}
125
126#[derive(Debug, Clone, Serialize, Deserialize)]
128pub struct TaskResult {
129 pub actual_read_bytes: i64,
130 pub actual_write_bytes: i64,
131}
132
133#[derive(Debug, Clone, Serialize, Deserialize)]
135pub struct TaskError {
136 pub message: String,
137 pub retryable: bool,
138 pub actual_read_bytes: i64,
139 pub actual_write_bytes: i64,
140}
141
142impl std::fmt::Display for TaskError {
143 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
144 write!(f, "{}", self.message)
145 }
146}
147
148impl std::error::Error for TaskError {}
149
150#[derive(Debug, Clone, PartialEq, Eq)]
152pub enum SubmitOutcome {
153 Inserted(i64),
155 Upgraded(i64),
157 Requeued(i64),
159 Duplicate,
161}
162
163impl SubmitOutcome {
164 pub fn id(&self) -> Option<i64> {
166 match self {
167 Self::Inserted(id) | Self::Upgraded(id) | Self::Requeued(id) => Some(*id),
168 Self::Duplicate => None,
169 }
170 }
171
172 pub fn is_inserted(&self) -> bool {
174 matches!(self, Self::Inserted(_))
175 }
176}
177
178pub fn generate_dedup_key(task_type: &str, payload: Option<&[u8]>) -> String {
183 let mut hasher = Sha256::new();
184 hasher.update(task_type.as_bytes());
185 hasher.update(b":");
186 if let Some(p) = payload {
187 hasher.update(p);
188 }
189 format!("{:x}", hasher.finalize())
190}
191
192#[derive(Debug, Clone, Serialize, Deserialize)]
194pub struct TaskSubmission {
195 pub task_type: String,
196 pub key: Option<String>,
200 pub priority: Priority,
201 pub payload: Option<Vec<u8>>,
202 pub expected_read_bytes: i64,
203 pub expected_write_bytes: i64,
204}
205
206impl TaskSubmission {
207 pub fn effective_key(&self) -> String {
213 match &self.key {
214 Some(k) => generate_dedup_key(&self.task_type, Some(k.as_bytes())),
215 None => generate_dedup_key(&self.task_type, self.payload.as_deref()),
216 }
217 }
218
219 pub fn with_payload<T: serde::Serialize>(
224 task_type: &str,
225 priority: Priority,
226 data: &T,
227 expected_read_bytes: i64,
228 expected_write_bytes: i64,
229 ) -> Result<Self, serde_json::Error> {
230 let payload = serde_json::to_vec(data)?;
231 Ok(Self {
232 task_type: task_type.to_string(),
233 key: None,
234 priority,
235 payload: Some(payload),
236 expected_read_bytes,
237 expected_write_bytes,
238 })
239 }
240}
241
242pub trait TypedTask: Serialize + DeserializeOwned + Send + 'static {
265 const TASK_TYPE: &'static str;
267
268 fn expected_read_bytes(&self) -> i64 {
270 0
271 }
272
273 fn expected_write_bytes(&self) -> i64 {
275 0
276 }
277
278 fn priority(&self) -> Priority {
280 Priority::NORMAL
281 }
282}
283
284impl TaskSubmission {
285 pub fn from_typed<T: TypedTask>(task: &T) -> Result<Self, serde_json::Error> {
288 let payload = serde_json::to_vec(task)?;
289 Ok(Self {
290 task_type: T::TASK_TYPE.to_string(),
291 key: None,
292 priority: task.priority(),
293 payload: Some(payload),
294 expected_read_bytes: task.expected_read_bytes(),
295 expected_write_bytes: task.expected_write_bytes(),
296 })
297 }
298}
299
300#[derive(Debug, Clone, Serialize, Deserialize)]
307#[serde(tag = "location", content = "record")]
308pub enum TaskLookup {
309 Active(TaskRecord),
311 History(TaskHistoryRecord),
314 NotFound,
316}
317
318#[derive(Debug, Clone, Default, Serialize, Deserialize)]
320pub struct TypeStats {
321 pub count: i64,
322 pub avg_duration_ms: f64,
323 pub avg_read_bytes: f64,
324 pub avg_write_bytes: f64,
325 pub failure_rate: f64,
326}
327
328#[cfg(test)]
329mod tests {
330 use super::*;
331
332 #[derive(Serialize, Deserialize, Debug, PartialEq)]
333 struct Thumbnail {
334 path: String,
335 size: u32,
336 }
337
338 impl TypedTask for Thumbnail {
339 const TASK_TYPE: &'static str = "thumbnail";
340
341 fn expected_read_bytes(&self) -> i64 {
342 4096
343 }
344
345 fn expected_write_bytes(&self) -> i64 {
346 1024
347 }
348 }
349
350 #[test]
351 fn typed_task_to_submission() {
352 let task = Thumbnail {
353 path: "/photos/a.jpg".into(),
354 size: 256,
355 };
356 let sub = TaskSubmission::from_typed(&task).unwrap();
357
358 assert_eq!(sub.task_type, "thumbnail");
359 assert_eq!(sub.priority, Priority::NORMAL);
360 assert_eq!(sub.expected_read_bytes, 4096);
361 assert_eq!(sub.expected_write_bytes, 1024);
362 assert!(sub.key.is_none());
363
364 let recovered: Thumbnail = serde_json::from_slice(sub.payload.as_ref().unwrap()).unwrap();
366 assert_eq!(recovered, task);
367 }
368
369 #[test]
370 fn typed_task_custom_priority() {
371 #[derive(Serialize, Deserialize)]
372 struct Urgent {
373 id: u64,
374 }
375
376 impl TypedTask for Urgent {
377 const TASK_TYPE: &'static str = "urgent";
378
379 fn priority(&self) -> Priority {
380 Priority::HIGH
381 }
382 }
383
384 let sub = TaskSubmission::from_typed(&Urgent { id: 42 }).unwrap();
385 assert_eq!(sub.priority, Priority::HIGH);
386 assert_eq!(sub.task_type, "urgent");
387 }
388
389 #[test]
390 fn typed_task_defaults() {
391 #[derive(Serialize, Deserialize)]
392 struct Minimal;
393
394 impl TypedTask for Minimal {
395 const TASK_TYPE: &'static str = "minimal";
396 }
397
398 let sub = TaskSubmission::from_typed(&Minimal).unwrap();
399 assert_eq!(sub.expected_read_bytes, 0);
400 assert_eq!(sub.expected_write_bytes, 0);
401 assert_eq!(sub.priority, Priority::NORMAL);
402 }
403}