1use crate::errors::{PolarisError, PolarisResult};
6use bytes::Bytes;
7use chrono::{DateTime, Utc};
8use serde::{Deserialize, Serialize};
9use std::fmt;
10use std::sync::Arc;
11use std::time::Duration;
12use uuid::Uuid;
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
16pub struct TaskId(Uuid);
17
18impl TaskId {
19 pub fn new() -> Self {
21 Self(Uuid::new_v4())
22 }
23
24 pub fn as_uuid(&self) -> &Uuid {
26 &self.0
27 }
28}
29
30impl Default for TaskId {
31 fn default() -> Self {
32 Self::new()
33 }
34}
35
36impl fmt::Display for TaskId {
37 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
38 write!(f, "{}", self.0)
39 }
40}
41
42impl From<Uuid> for TaskId {
43 fn from(uuid: Uuid) -> Self {
44 Self(uuid)
45 }
46}
47
48#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
50#[non_exhaustive]
51pub enum TaskStatus {
52 Pending,
54 Scheduled,
56 Running,
58 Completed,
60 Failed,
62 Cancelled,
64 TimedOut,
66}
67
68impl fmt::Display for TaskStatus {
69 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
70 match self {
71 Self::Pending => write!(f, "pending"),
72 Self::Scheduled => write!(f, "scheduled"),
73 Self::Running => write!(f, "running"),
74 Self::Completed => write!(f, "completed"),
75 Self::Failed => write!(f, "failed"),
76 Self::Cancelled => write!(f, "cancelled"),
77 Self::TimedOut => write!(f, "timed_out"),
78 }
79 }
80}
81
82#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
84pub enum TaskPriority {
85 Low = 0,
87 Normal = 1,
89 High = 2,
91 Critical = 3,
93}
94
95impl Default for TaskPriority {
96 fn default() -> Self {
97 Self::Normal
98 }
99}
100
101#[derive(Debug, Clone, Serialize, Deserialize)]
103pub struct Task {
104 pub id: TaskId,
106
107 pub name: String,
109
110 pub payload: Bytes,
112
113 pub priority: TaskPriority,
115
116 #[serde(with = "humantime_serde")]
118 pub timeout: Duration,
119
120 pub max_retries: u32,
122
123 pub retry_count: u32,
125
126 pub dependencies: Vec<TaskId>,
128
129 pub metadata: TaskMetadata,
131
132 pub status: TaskStatus,
134
135 pub result: Option<TaskResult>,
137}
138
139impl Task {
140 pub fn new(name: impl Into<String>, payload: Bytes) -> Self {
142 Self {
143 id: TaskId::new(),
144 name: name.into(),
145 payload,
146 priority: TaskPriority::default(),
147 timeout: Duration::from_secs(300), max_retries: 3,
149 retry_count: 0,
150 dependencies: Vec::new(),
151 metadata: TaskMetadata::default(),
152 status: TaskStatus::Pending,
153 result: None,
154 }
155 }
156
157 pub fn with_priority(mut self, priority: TaskPriority) -> Self {
159 self.priority = priority;
160 self
161 }
162
163 pub fn with_timeout(mut self, timeout: Duration) -> Self {
165 self.timeout = timeout;
166 self
167 }
168
169 pub fn with_max_retries(mut self, max_retries: u32) -> Self {
171 self.max_retries = max_retries;
172 self
173 }
174
175 pub fn with_dependency(mut self, task_id: TaskId) -> Self {
177 self.dependencies.push(task_id);
178 self
179 }
180
181 pub fn can_retry(&self) -> bool {
183 self.retry_count < self.max_retries
184 }
185
186 pub fn increment_retry(&mut self) {
188 self.retry_count += 1;
189 }
190
191 pub fn transition_to(&mut self, new_status: TaskStatus) -> PolarisResult<()> {
193 use TaskStatus::*;
194
195 let valid = match (&self.status, &new_status) {
196 (Pending, Scheduled | Cancelled) => true,
197 (Scheduled, Running | Cancelled) => true,
198 (Running, Completed | Failed | TimedOut | Cancelled) => true,
199 (Failed, Scheduled) if self.can_retry() => true,
200 _ => false,
201 };
202
203 if valid {
204 self.status = new_status;
205 Ok(())
206 } else {
207 Err(PolarisError::InvalidStateTransition {
208 from: self.status.to_string(),
209 to: new_status.to_string(),
210 })
211 }
212 }
213}
214
215#[derive(Debug, Clone, Default, Serialize, Deserialize)]
217pub struct TaskMetadata {
218 pub created_at: Option<DateTime<Utc>>,
220
221 pub scheduled_at: Option<DateTime<Utc>>,
223
224 pub started_at: Option<DateTime<Utc>>,
226
227 pub completed_at: Option<DateTime<Utc>>,
229
230 pub node_id: Option<String>,
232
233 pub tags: std::collections::HashMap<String, String>,
235}
236
237#[derive(Debug, Clone, Serialize, Deserialize)]
239pub struct TaskResult {
240 pub success: bool,
242
243 pub output: Option<Bytes>,
245
246 pub error: Option<String>,
248
249 #[serde(with = "humantime_serde")]
251 pub duration: Duration,
252}
253
254#[derive(Debug, Clone)]
256pub struct TaskHandle {
257 pub id: TaskId,
259
260 state: Arc<parking_lot::RwLock<TaskHandleState>>,
262}
263
264#[derive(Debug)]
265struct TaskHandleState {
266 status: TaskStatus,
267 result: Option<TaskResult>,
268}
269
270impl TaskHandle {
271 pub fn new(id: TaskId) -> Self {
273 Self {
274 id,
275 state: Arc::new(parking_lot::RwLock::new(TaskHandleState {
276 status: TaskStatus::Pending,
277 result: None,
278 })),
279 }
280 }
281
282 pub fn status(&self) -> TaskStatus {
284 self.state.read().status
285 }
286
287 pub fn is_complete(&self) -> bool {
289 matches!(
290 self.status(),
291 TaskStatus::Completed | TaskStatus::Failed | TaskStatus::Cancelled | TaskStatus::TimedOut
292 )
293 }
294
295 pub async fn result(&self) -> PolarisResult<TaskResult> {
297 loop {
299 if self.is_complete() {
300 let state = self.state.read();
301 return state
302 .result
303 .clone()
304 .ok_or_else(|| PolarisError::other("Task complete but no result available"));
305 }
306 tokio::time::sleep(Duration::from_millis(100)).await;
307 }
308 }
309
310 pub(crate) fn update_status(&self, status: TaskStatus) {
312 self.state.write().status = status;
313 }
314
315 pub(crate) fn set_result(&self, result: TaskResult) {
317 let mut state = self.state.write();
318 let success = result.success;
319 state.result = Some(result);
320 state.status = if success {
321 TaskStatus::Completed
322 } else {
323 TaskStatus::Failed
324 };
325 }
326}
327
328#[cfg(test)]
329mod tests {
330 use super::*;
331
332 #[test]
333 fn test_task_id_creation() {
334 let id1 = TaskId::new();
335 let id2 = TaskId::new();
336 assert_ne!(id1, id2);
337 }
338
339 #[test]
340 fn test_task_creation() {
341 let task = Task::new("test_task", Bytes::from("payload"));
342 assert_eq!(task.name, "test_task");
343 assert_eq!(task.status, TaskStatus::Pending);
344 assert_eq!(task.retry_count, 0);
345 }
346
347 #[test]
348 fn test_task_builder() {
349 let task = Task::new("test", Bytes::new())
350 .with_priority(TaskPriority::High)
351 .with_timeout(Duration::from_secs(60))
352 .with_max_retries(5);
353
354 assert_eq!(task.priority, TaskPriority::High);
355 assert_eq!(task.timeout, Duration::from_secs(60));
356 assert_eq!(task.max_retries, 5);
357 }
358
359 #[test]
360 fn test_task_state_transitions() {
361 let mut task = Task::new("test", Bytes::new());
362
363 assert!(task.transition_to(TaskStatus::Scheduled).is_ok());
364 assert_eq!(task.status, TaskStatus::Scheduled);
365
366 assert!(task.transition_to(TaskStatus::Running).is_ok());
367 assert!(task.transition_to(TaskStatus::Completed).is_ok());
368
369 assert!(task.transition_to(TaskStatus::Running).is_err());
371 }
372
373 #[test]
374 fn test_task_retry_logic() {
375 let mut task = Task::new("test", Bytes::new()).with_max_retries(2);
376
377 assert!(task.can_retry());
378 task.increment_retry();
379 assert!(task.can_retry());
380 task.increment_retry();
381 assert!(!task.can_retry());
382 }
383
384 #[tokio::test]
385 async fn test_task_handle() {
386 let handle = TaskHandle::new(TaskId::new());
387 assert_eq!(handle.status(), TaskStatus::Pending);
388 assert!(!handle.is_complete());
389
390 handle.update_status(TaskStatus::Running);
391 assert_eq!(handle.status(), TaskStatus::Running);
392 }
393}