Skip to main content

rust_pipe/schema/
mod.rs

1use chrono::{DateTime, Utc};
2use serde::{Deserialize, Serialize};
3use uuid::Uuid;
4
5/// A unit of work dispatched to a worker.
6#[derive(Debug, Clone, Serialize, Deserialize)]
7#[serde(rename_all = "camelCase")]
8pub struct Task {
9    pub id: Uuid,
10    pub task_type: String,
11    pub payload: serde_json::Value,
12    pub metadata: TaskMetadata,
13}
14
15/// Metadata attached to every task (timeouts, priority, retries).
16#[derive(Debug, Clone, Serialize, Deserialize)]
17#[serde(rename_all = "camelCase")]
18pub struct TaskMetadata {
19    pub created_at: DateTime<Utc>,
20    pub timeout_ms: u64,
21    pub priority: Priority,
22    pub retry_count: u32,
23    pub max_retries: u32,
24    pub trace_id: Option<String>,
25}
26
27/// Task priority level. Higher priority tasks are dispatched first.
28#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
29pub enum Priority {
30    Low = 0,
31    Normal = 1,
32    High = 2,
33    Critical = 3,
34}
35
36/// Result returned by a worker after executing a task.
37#[derive(Debug, Clone, Serialize, Deserialize)]
38#[serde(rename_all = "camelCase")]
39pub struct TaskResult {
40    pub task_id: Uuid,
41    pub status: TaskStatus,
42    pub payload: Option<serde_json::Value>,
43    pub error: Option<TaskError>,
44    pub duration_ms: u64,
45    pub worker_id: String,
46}
47
48/// Lifecycle state of a task.
49#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
50pub enum TaskStatus {
51    Pending,
52    Dispatched,
53    Running,
54    Completed,
55    Failed,
56    TimedOut,
57    Cancelled,
58}
59
60/// Error information from a failed task.
61#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
62pub struct TaskError {
63    pub code: String,
64    pub message: String,
65    pub retryable: bool,
66}
67
68impl Task {
69    pub fn new(task_type: impl Into<String>, payload: serde_json::Value) -> Self {
70        Self {
71            id: Uuid::new_v4(),
72            task_type: task_type.into(),
73            payload,
74            metadata: TaskMetadata {
75                created_at: Utc::now(),
76                timeout_ms: 300_000,
77                priority: Priority::Normal,
78                retry_count: 0,
79                max_retries: 3,
80                trace_id: None,
81            },
82        }
83    }
84
85    pub fn with_timeout(mut self, timeout_ms: u64) -> Self {
86        self.metadata.timeout_ms = timeout_ms;
87        self
88    }
89
90    pub fn with_priority(mut self, priority: Priority) -> Self {
91        self.metadata.priority = priority;
92        self
93    }
94
95    pub fn with_max_retries(mut self, max_retries: u32) -> Self {
96        self.metadata.max_retries = max_retries;
97        self
98    }
99
100    pub fn with_trace_id(mut self, trace_id: impl Into<String>) -> Self {
101        self.metadata.trace_id = Some(trace_id.into());
102        self
103    }
104}
105
106#[cfg(test)]
107mod tests {
108    use super::*;
109    use serde_json::json;
110
111    #[test]
112    fn test_task_new_sets_uuid() {
113        let task = Task::new("test", json!({}));
114        assert_ne!(task.id, Uuid::nil());
115    }
116
117    #[test]
118    fn test_task_new_sets_task_type() {
119        let task = Task::new("scan-target", json!({}));
120        assert_eq!(task.task_type, "scan-target");
121    }
122
123    #[test]
124    fn test_task_new_stores_payload() {
125        let payload = json!({"key": "value", "num": 42});
126        let task = Task::new("t", payload.clone());
127        assert_eq!(task.payload, payload);
128    }
129
130    #[test]
131    fn test_task_new_default_timeout() {
132        let task = Task::new("t", json!({}));
133        assert_eq!(task.metadata.timeout_ms, 300_000);
134    }
135
136    #[test]
137    fn test_task_new_default_priority() {
138        let task = Task::new("t", json!({}));
139        assert_eq!(task.metadata.priority, Priority::Normal);
140    }
141
142    #[test]
143    fn test_task_new_default_retry_count() {
144        let task = Task::new("t", json!({}));
145        assert_eq!(task.metadata.retry_count, 0);
146    }
147
148    #[test]
149    fn test_task_new_default_max_retries() {
150        let task = Task::new("t", json!({}));
151        assert_eq!(task.metadata.max_retries, 3);
152    }
153
154    #[test]
155    fn test_task_new_default_trace_id_none() {
156        let task = Task::new("t", json!({}));
157        assert_eq!(task.metadata.trace_id, None);
158    }
159
160    #[test]
161    fn test_task_new_created_at_is_recent() {
162        let before = Utc::now();
163        let task = Task::new("t", json!({}));
164        let after = Utc::now();
165        assert!(task.metadata.created_at >= before);
166        assert!(task.metadata.created_at <= after);
167    }
168
169    #[test]
170    fn test_task_with_timeout() {
171        let task = Task::new("t", json!({})).with_timeout(5000);
172        assert_eq!(task.metadata.timeout_ms, 5000);
173    }
174
175    #[test]
176    fn test_task_with_priority() {
177        let task = Task::new("t", json!({})).with_priority(Priority::Critical);
178        assert_eq!(task.metadata.priority, Priority::Critical);
179    }
180
181    #[test]
182    fn test_task_with_max_retries() {
183        let task = Task::new("t", json!({})).with_max_retries(10);
184        assert_eq!(task.metadata.max_retries, 10);
185    }
186
187    #[test]
188    fn test_task_with_trace_id() {
189        let task = Task::new("t", json!({})).with_trace_id("abc-123");
190        assert_eq!(task.metadata.trace_id, Some("abc-123".to_string()));
191    }
192
193    #[test]
194    fn test_task_builder_chaining() {
195        let task = Task::new("scan", json!({"url": "http://x.com"}))
196            .with_timeout(1000)
197            .with_priority(Priority::High)
198            .with_max_retries(5)
199            .with_trace_id("trace-1");
200        assert_eq!(task.task_type, "scan");
201        assert_eq!(task.metadata.timeout_ms, 1000);
202        assert_eq!(task.metadata.priority, Priority::High);
203        assert_eq!(task.metadata.max_retries, 5);
204        assert_eq!(task.metadata.trace_id, Some("trace-1".to_string()));
205    }
206
207    #[test]
208    fn test_task_with_timeout_zero() {
209        let task = Task::new("t", json!({})).with_timeout(0);
210        assert_eq!(task.metadata.timeout_ms, 0);
211    }
212
213    #[test]
214    fn test_task_with_max_retries_zero() {
215        let task = Task::new("t", json!({})).with_max_retries(0);
216        assert_eq!(task.metadata.max_retries, 0);
217    }
218
219    #[test]
220    fn test_task_new_from_owned_string() {
221        let task = Task::new(String::from("owned-type"), json!({}));
222        assert_eq!(task.task_type, "owned-type");
223    }
224
225    #[test]
226    fn test_task_serde_roundtrip() {
227        let task = Task::new("roundtrip", json!({"a": 1})).with_trace_id("t1");
228        let json = serde_json::to_string(&task).unwrap();
229        let deserialized: Task = serde_json::from_str(&json).unwrap();
230        assert_eq!(deserialized.id, task.id);
231        assert_eq!(deserialized.task_type, task.task_type);
232        assert_eq!(deserialized.payload, task.payload);
233        assert_eq!(deserialized.metadata.timeout_ms, task.metadata.timeout_ms);
234        assert_eq!(deserialized.metadata.trace_id, task.metadata.trace_id);
235    }
236
237    #[test]
238    fn test_task_result_serde_roundtrip() {
239        let result = TaskResult {
240            task_id: Uuid::new_v4(),
241            status: TaskStatus::Completed,
242            payload: Some(json!({"vulns": 3})),
243            error: None,
244            duration_ms: 1500,
245            worker_id: "w1".to_string(),
246        };
247        let json = serde_json::to_string(&result).unwrap();
248        let de: TaskResult = serde_json::from_str(&json).unwrap();
249        assert_eq!(de.task_id, result.task_id);
250        assert_eq!(de.status, TaskStatus::Completed);
251        assert_eq!(de.duration_ms, 1500);
252    }
253
254    #[test]
255    fn test_task_result_serde_null_optionals() {
256        let result = TaskResult {
257            task_id: Uuid::new_v4(),
258            status: TaskStatus::Failed,
259            payload: None,
260            error: None,
261            duration_ms: 0,
262            worker_id: "w".to_string(),
263        };
264        let json = serde_json::to_string(&result).unwrap();
265        assert!(json.contains("null"));
266        let de: TaskResult = serde_json::from_str(&json).unwrap();
267        assert_eq!(de.payload, None);
268        assert_eq!(de.error, None);
269    }
270
271    #[test]
272    fn test_task_error_serde_roundtrip() {
273        let err = TaskError {
274            code: "E1".into(),
275            message: "fail".into(),
276            retryable: true,
277        };
278        let json = serde_json::to_string(&err).unwrap();
279        let de: TaskError = serde_json::from_str(&json).unwrap();
280        assert_eq!(de.code, "E1");
281        assert!(de.retryable);
282    }
283
284    #[test]
285    fn test_priority_ordering() {
286        assert!(Priority::Low < Priority::Normal);
287        assert!(Priority::Normal < Priority::High);
288        assert!(Priority::High < Priority::Critical);
289    }
290
291    #[test]
292    fn test_priority_serde_all_variants() {
293        for p in [
294            Priority::Low,
295            Priority::Normal,
296            Priority::High,
297            Priority::Critical,
298        ] {
299            let json = serde_json::to_string(&p).unwrap();
300            let de: Priority = serde_json::from_str(&json).unwrap();
301            assert_eq!(de, p);
302        }
303    }
304
305    #[test]
306    fn test_task_status_serde_all_variants() {
307        let variants = [
308            TaskStatus::Pending,
309            TaskStatus::Dispatched,
310            TaskStatus::Running,
311            TaskStatus::Completed,
312            TaskStatus::Failed,
313            TaskStatus::TimedOut,
314            TaskStatus::Cancelled,
315        ];
316        for v in variants {
317            let json = serde_json::to_string(&v).unwrap();
318            let de: TaskStatus = serde_json::from_str(&json).unwrap();
319            assert_eq!(de, v);
320        }
321    }
322
323    #[test]
324    fn test_task_json_uses_camel_case() {
325        let task = Task::new("t", json!({}));
326        let json = serde_json::to_string(&task).unwrap();
327        assert!(json.contains("taskType"));
328        assert!(json.contains("createdAt"));
329        assert!(json.contains("timeoutMs"));
330        assert!(json.contains("retryCount"));
331        assert!(json.contains("maxRetries"));
332        assert!(json.contains("traceId"));
333        assert!(!json.contains("task_type"));
334        assert!(!json.contains("created_at"));
335    }
336
337    #[test]
338    fn test_task_result_json_uses_camel_case() {
339        let result = TaskResult {
340            task_id: Uuid::new_v4(),
341            status: TaskStatus::Completed,
342            payload: None,
343            error: None,
344            duration_ms: 100,
345            worker_id: "w".to_string(),
346        };
347        let json = serde_json::to_string(&result).unwrap();
348        assert!(json.contains("taskId"));
349        assert!(json.contains("durationMs"));
350        assert!(json.contains("workerId"));
351        assert!(!json.contains("task_id"));
352        assert!(!json.contains("duration_ms"));
353    }
354
355    #[test]
356    fn test_task_payload_nested_json() {
357        let payload = json!({"a": {"b": {"c": [1, 2, {"d": true}]}}});
358        let task = Task::new("t", payload.clone());
359        let json = serde_json::to_string(&task).unwrap();
360        let de: Task = serde_json::from_str(&json).unwrap();
361        assert_eq!(de.payload, payload);
362    }
363
364    #[test]
365    fn test_task_payload_empty_object() {
366        let task = Task::new("t", json!({}));
367        let json = serde_json::to_string(&task).unwrap();
368        let de: Task = serde_json::from_str(&json).unwrap();
369        assert_eq!(de.payload, json!({}));
370    }
371
372    #[test]
373    fn test_task_payload_array() {
374        let task = Task::new("t", json!([1, 2, 3]));
375        let json = serde_json::to_string(&task).unwrap();
376        let de: Task = serde_json::from_str(&json).unwrap();
377        assert_eq!(de.payload, json!([1, 2, 3]));
378    }
379}