eshanized_polaris_core/
task.rs

1//! Task model and lifecycle management.
2//!
3//! This module defines the task abstraction used throughout Polaris.
4
5use crate::errors::{PolarisError, PolarisResult};
6use bytes::Bytes;
7use chrono::{DateTime, Utc};
8use serde::{Deserialize, Serialize};
9use std::fmt;
10use std::sync::Arc;
11use std::time::Duration;
12use uuid::Uuid;
13
14/// Unique identifier for a task
15#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
16pub struct TaskId(Uuid);
17
18impl TaskId {
19    /// Create a new random task ID
20    pub fn new() -> Self {
21        Self(Uuid::new_v4())
22    }
23
24    /// Get the inner UUID
25    pub fn as_uuid(&self) -> &Uuid {
26        &self.0
27    }
28}
29
30impl Default for TaskId {
31    fn default() -> Self {
32        Self::new()
33    }
34}
35
36impl fmt::Display for TaskId {
37    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
38        write!(f, "{}", self.0)
39    }
40}
41
42impl From<Uuid> for TaskId {
43    fn from(uuid: Uuid) -> Self {
44        Self(uuid)
45    }
46}
47
48/// Task execution status
49#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
50#[non_exhaustive]
51pub enum TaskStatus {
52    /// Task is pending scheduling
53    Pending,
54    /// Task is scheduled and queued
55    Scheduled,
56    /// Task is currently running
57    Running,
58    /// Task completed successfully
59    Completed,
60    /// Task failed
61    Failed,
62    /// Task was cancelled
63    Cancelled,
64    /// Task timed out
65    TimedOut,
66}
67
68impl fmt::Display for TaskStatus {
69    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
70        match self {
71            Self::Pending => write!(f, "pending"),
72            Self::Scheduled => write!(f, "scheduled"),
73            Self::Running => write!(f, "running"),
74            Self::Completed => write!(f, "completed"),
75            Self::Failed => write!(f, "failed"),
76            Self::Cancelled => write!(f, "cancelled"),
77            Self::TimedOut => write!(f, "timed_out"),
78        }
79    }
80}
81
82/// Task priority level
83#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
84pub enum TaskPriority {
85    /// Low priority
86    Low = 0,
87    /// Normal priority (default)
88    Normal = 1,
89    /// High priority
90    High = 2,
91    /// Critical priority
92    Critical = 3,
93}
94
95impl Default for TaskPriority {
96    fn default() -> Self {
97        Self::Normal
98    }
99}
100
101/// A task to be executed in the cluster
102#[derive(Debug, Clone, Serialize, Deserialize)]
103pub struct Task {
104    /// Unique task identifier
105    pub id: TaskId,
106
107    /// Task name/label
108    pub name: String,
109
110    /// Task payload (serialized input)
111    pub payload: Bytes,
112
113    /// Task priority
114    pub priority: TaskPriority,
115
116    /// Maximum execution time
117    #[serde(with = "humantime_serde")]
118    pub timeout: Duration,
119
120    /// Maximum retry attempts
121    pub max_retries: u32,
122
123    /// Current retry attempt
124    pub retry_count: u32,
125
126    /// Task dependencies (must complete before this task runs)
127    pub dependencies: Vec<TaskId>,
128
129    /// Task metadata
130    pub metadata: TaskMetadata,
131
132    /// Task status
133    pub status: TaskStatus,
134
135    /// Task result (if completed)
136    pub result: Option<TaskResult>,
137}
138
139impl Task {
140    /// Create a new task with the given name and payload
141    pub fn new(name: impl Into<String>, payload: Bytes) -> Self {
142        Self {
143            id: TaskId::new(),
144            name: name.into(),
145            payload,
146            priority: TaskPriority::default(),
147            timeout: Duration::from_secs(300), // 5 minutes default
148            max_retries: 3,
149            retry_count: 0,
150            dependencies: Vec::new(),
151            metadata: TaskMetadata::default(),
152            status: TaskStatus::Pending,
153            result: None,
154        }
155    }
156
157    /// Set task priority
158    pub fn with_priority(mut self, priority: TaskPriority) -> Self {
159        self.priority = priority;
160        self
161    }
162
163    /// Set task timeout
164    pub fn with_timeout(mut self, timeout: Duration) -> Self {
165        self.timeout = timeout;
166        self
167    }
168
169    /// Set maximum retries
170    pub fn with_max_retries(mut self, max_retries: u32) -> Self {
171        self.max_retries = max_retries;
172        self
173    }
174
175    /// Add a dependency
176    pub fn with_dependency(mut self, task_id: TaskId) -> Self {
177        self.dependencies.push(task_id);
178        self
179    }
180
181    /// Check if task can be retried
182    pub fn can_retry(&self) -> bool {
183        self.retry_count < self.max_retries
184    }
185
186    /// Increment retry count
187    pub fn increment_retry(&mut self) {
188        self.retry_count += 1;
189    }
190
191    /// Transition to a new status
192    pub fn transition_to(&mut self, new_status: TaskStatus) -> PolarisResult<()> {
193        use TaskStatus::*;
194
195        let valid = match (&self.status, &new_status) {
196            (Pending, Scheduled | Cancelled) => true,
197            (Scheduled, Running | Cancelled) => true,
198            (Running, Completed | Failed | TimedOut | Cancelled) => true,
199            (Failed, Scheduled) if self.can_retry() => true,
200            _ => false,
201        };
202
203        if valid {
204            self.status = new_status;
205            Ok(())
206        } else {
207            Err(PolarisError::InvalidStateTransition {
208                from: self.status.to_string(),
209                to: new_status.to_string(),
210            })
211        }
212    }
213}
214
215/// Task metadata
216#[derive(Debug, Clone, Default, Serialize, Deserialize)]
217pub struct TaskMetadata {
218    /// When the task was created
219    pub created_at: Option<DateTime<Utc>>,
220
221    /// When the task was scheduled
222    pub scheduled_at: Option<DateTime<Utc>>,
223
224    /// When the task started execution
225    pub started_at: Option<DateTime<Utc>>,
226
227    /// When the task completed
228    pub completed_at: Option<DateTime<Utc>>,
229
230    /// Node ID where task is/was executed
231    pub node_id: Option<String>,
232
233    /// Custom user-defined metadata
234    pub tags: std::collections::HashMap<String, String>,
235}
236
237/// Task execution result
238#[derive(Debug, Clone, Serialize, Deserialize)]
239pub struct TaskResult {
240    /// Whether the task succeeded
241    pub success: bool,
242
243    /// Result payload (serialized output)
244    pub output: Option<Bytes>,
245
246    /// Error message if failed
247    pub error: Option<String>,
248
249    /// Execution duration
250    #[serde(with = "humantime_serde")]
251    pub duration: Duration,
252}
253
254/// Handle to a submitted task
255#[derive(Debug, Clone)]
256pub struct TaskHandle {
257    /// Task ID
258    pub id: TaskId,
259
260    /// Shared task state
261    state: Arc<parking_lot::RwLock<TaskHandleState>>,
262}
263
264#[derive(Debug)]
265struct TaskHandleState {
266    status: TaskStatus,
267    result: Option<TaskResult>,
268}
269
270impl TaskHandle {
271    /// Create a new task handle
272    pub fn new(id: TaskId) -> Self {
273        Self {
274            id,
275            state: Arc::new(parking_lot::RwLock::new(TaskHandleState {
276                status: TaskStatus::Pending,
277                result: None,
278            })),
279        }
280    }
281
282    /// Get current task status
283    pub fn status(&self) -> TaskStatus {
284        self.state.read().status
285    }
286
287    /// Check if task is complete
288    pub fn is_complete(&self) -> bool {
289        matches!(
290            self.status(),
291            TaskStatus::Completed | TaskStatus::Failed | TaskStatus::Cancelled | TaskStatus::TimedOut
292        )
293    }
294
295    /// Get task result (blocks until complete)
296    pub async fn result(&self) -> PolarisResult<TaskResult> {
297        // Poll for completion
298        loop {
299            if self.is_complete() {
300                let state = self.state.read();
301                return state
302                    .result
303                    .clone()
304                    .ok_or_else(|| PolarisError::other("Task complete but no result available"));
305            }
306            tokio::time::sleep(Duration::from_millis(100)).await;
307        }
308    }
309
310    /// Update task status (internal use)
311    pub(crate) fn update_status(&self, status: TaskStatus) {
312        self.state.write().status = status;
313    }
314
315    /// Set task result (internal use)
316    pub(crate) fn set_result(&self, result: TaskResult) {
317        let mut state = self.state.write();
318        let success = result.success;
319        state.result = Some(result);
320        state.status = if success {
321            TaskStatus::Completed
322        } else {
323            TaskStatus::Failed
324        };
325    }
326}
327
328#[cfg(test)]
329mod tests {
330    use super::*;
331
332    #[test]
333    fn test_task_id_creation() {
334        let id1 = TaskId::new();
335        let id2 = TaskId::new();
336        assert_ne!(id1, id2);
337    }
338
339    #[test]
340    fn test_task_creation() {
341        let task = Task::new("test_task", Bytes::from("payload"));
342        assert_eq!(task.name, "test_task");
343        assert_eq!(task.status, TaskStatus::Pending);
344        assert_eq!(task.retry_count, 0);
345    }
346
347    #[test]
348    fn test_task_builder() {
349        let task = Task::new("test", Bytes::new())
350            .with_priority(TaskPriority::High)
351            .with_timeout(Duration::from_secs(60))
352            .with_max_retries(5);
353
354        assert_eq!(task.priority, TaskPriority::High);
355        assert_eq!(task.timeout, Duration::from_secs(60));
356        assert_eq!(task.max_retries, 5);
357    }
358
359    #[test]
360    fn test_task_state_transitions() {
361        let mut task = Task::new("test", Bytes::new());
362
363        assert!(task.transition_to(TaskStatus::Scheduled).is_ok());
364        assert_eq!(task.status, TaskStatus::Scheduled);
365
366        assert!(task.transition_to(TaskStatus::Running).is_ok());
367        assert!(task.transition_to(TaskStatus::Completed).is_ok());
368
369        // Invalid transition
370        assert!(task.transition_to(TaskStatus::Running).is_err());
371    }
372
373    #[test]
374    fn test_task_retry_logic() {
375        let mut task = Task::new("test", Bytes::new()).with_max_retries(2);
376
377        assert!(task.can_retry());
378        task.increment_retry();
379        assert!(task.can_retry());
380        task.increment_retry();
381        assert!(!task.can_retry());
382    }
383
384    #[tokio::test]
385    async fn test_task_handle() {
386        let handle = TaskHandle::new(TaskId::new());
387        assert_eq!(handle.status(), TaskStatus::Pending);
388        assert!(!handle.is_complete());
389
390        handle.update_status(TaskStatus::Running);
391        assert_eq!(handle.status(), TaskStatus::Running);
392    }
393}