Skip to main content

descartes_core/
task.rs

1//! Task system for short-lived operations in DES
2//!
3//! This module provides a lightweight alternative to Components for operations that:
4//! - Execute once and complete (timeouts, callbacks)
5//! - Don't need persistent state
6//! - Should auto-cleanup after execution
7//!
8//! Tasks are scheduled through the Scheduler and automatically cleaned up after execution,
9//! avoiding the overhead and complexity of full Components for simple operations.
10
11use crate::{Scheduler, SimTime};
12use std::any::Any;
13use std::fmt;
14use std::marker::PhantomData;
15use std::sync::atomic::{AtomicU64, Ordering};
16use tracing::{debug, info, instrument, trace, warn};
17use uuid::Uuid;
18
19/// Unique identifier for tasks
20#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
21pub struct TaskId(pub Uuid);
22
23impl TaskId {
24    /// Create a new task ID.
25    ///
26    /// This is deterministic (derived from a process-local counter) so that tests
27    /// and simulations are reproducible without relying on wall-clock UUIDs.
28    /// During normal simulation execution, task IDs are assigned by the scheduler.
29    pub fn new() -> Self {
30        static NEXT_TASK_ID: AtomicU64 = AtomicU64::new(0);
31        let counter = NEXT_TASK_ID.fetch_add(1, Ordering::Relaxed) + 1;
32        let id = crate::ids::deterministic_uuid(0, crate::ids::UUID_DOMAIN_MANUAL_TASK, counter);
33        Self(id)
34    }
35}
36
37impl Default for TaskId {
38    fn default() -> Self {
39        Self::new()
40    }
41}
42
43impl fmt::Display for TaskId {
44    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
45        write!(f, "Task({})", self.0)
46    }
47}
48
49/// Handle for a scheduled task, allowing cancellation and type-safe result retrieval
50#[derive(Debug, Clone, Copy)]
51pub struct TaskHandle<T> {
52    id: TaskId,
53    _marker: PhantomData<T>,
54}
55
56impl<T> TaskHandle<T> {
57    /// Create a new task handle
58    pub(crate) fn new(id: TaskId) -> Self {
59        Self {
60            id,
61            _marker: PhantomData,
62        }
63    }
64
65    /// Get the task ID
66    pub fn id(&self) -> TaskId {
67        self.id
68    }
69}
70
71/// Trait for tasks that can be executed by the scheduler
72pub trait Task: 'static {
73    /// The type returned by this task
74    type Output: 'static;
75
76    /// Execute the task
77    fn execute(self, scheduler: &mut Scheduler) -> Self::Output;
78}
79
80/// Type-erased task execution trait
81pub(crate) trait TaskExecution {
82    /// Execute the task and return type-erased result.
83    fn execute(self: Box<Self>, scheduler: &mut Scheduler) -> Box<dyn Any>;
84}
85
86/// Wrapper that implements `TaskExecution` for any `Task`.
87///
88/// Note: the scheduler already tracks task identity via the `TaskId` key used
89/// in `pending_tasks`, so the wrapper itself does not store the ID.
90pub(crate) struct TaskWrapper<T: Task> {
91    task: T,
92}
93
94impl<T: Task> TaskWrapper<T> {
95    pub fn new(task: T) -> Self {
96        Self { task }
97    }
98}
99
100impl<T: Task> TaskExecution for TaskWrapper<T> {
101    fn execute(self: Box<Self>, scheduler: &mut Scheduler) -> Box<dyn Any> {
102        let result = self.task.execute(scheduler);
103        Box::new(result)
104    }
105}
106
107/// A task that executes a closure
108pub struct ClosureTask<F, R> {
109    closure: F,
110    _marker: PhantomData<R>,
111}
112
113impl<F, R> ClosureTask<F, R>
114where
115    F: FnOnce(&mut Scheduler) -> R + 'static,
116    R: 'static,
117{
118    /// Create a new closure task
119    pub fn new(closure: F) -> Self {
120        Self {
121            closure,
122            _marker: PhantomData,
123        }
124    }
125}
126
127impl<F, R> Task for ClosureTask<F, R>
128where
129    F: FnOnce(&mut Scheduler) -> R + 'static,
130    R: 'static,
131{
132    type Output = R;
133
134    #[instrument(skip(self, scheduler), fields(task_type = "ClosureTask"))]
135    fn execute(self, scheduler: &mut Scheduler) -> Self::Output {
136        debug!("Executing closure task");
137        let result = (self.closure)(scheduler);
138        trace!("Closure task completed");
139        result
140    }
141}
142
143/// A task that executes after a timeout
144pub struct TimeoutTask<F> {
145    callback: F,
146}
147
148impl<F> TimeoutTask<F>
149where
150    F: FnOnce(&mut Scheduler) + 'static,
151{
152    /// Create a new timeout task
153    pub fn new(callback: F) -> Self {
154        Self { callback }
155    }
156}
157
158impl<F> Task for TimeoutTask<F>
159where
160    F: FnOnce(&mut Scheduler) + 'static,
161{
162    type Output = ();
163
164    #[instrument(skip(self, scheduler), fields(task_type = "TimeoutTask"))]
165    fn execute(self, scheduler: &mut Scheduler) -> Self::Output {
166        debug!("Executing timeout task");
167        (self.callback)(scheduler);
168        trace!("Timeout task completed");
169    }
170}
171
172/// A task that retries an operation with exponential backoff
173pub struct RetryTask<F, R, E> {
174    operation: F,
175    max_attempts: u32,
176    current_attempt: u32,
177    base_delay: SimTime,
178    _marker: PhantomData<(R, E)>,
179}
180
181impl<F, R, E> RetryTask<F, R, E>
182where
183    F: Fn(&mut Scheduler) -> Result<R, E> + 'static,
184    R: 'static,
185    E: 'static,
186{
187    /// Create a new retry task
188    pub fn new(operation: F, max_attempts: u32, base_delay: SimTime) -> Self {
189        Self {
190            operation,
191            max_attempts,
192            current_attempt: 0,
193            base_delay,
194            _marker: PhantomData,
195        }
196    }
197}
198
199impl<F, R, E> Task for RetryTask<F, R, E>
200where
201    F: Fn(&mut Scheduler) -> Result<R, E> + 'static,
202    R: 'static,
203    E: 'static,
204{
205    type Output = Result<R, E>;
206
207    #[instrument(skip(self, scheduler), fields(
208        task_type = "RetryTask",
209        attempt = self.current_attempt + 1,
210        max_attempts = self.max_attempts
211    ))]
212    fn execute(mut self, scheduler: &mut Scheduler) -> Self::Output {
213        self.current_attempt += 1;
214
215        debug!("Executing retry task");
216
217        match (self.operation)(scheduler) {
218            Ok(result) => {
219                info!(attempt = self.current_attempt, "Retry task succeeded");
220                Ok(result)
221            }
222            Err(error) => {
223                if self.current_attempt >= self.max_attempts {
224                    warn!(
225                        attempt = self.current_attempt,
226                        max_attempts = self.max_attempts,
227                        "Retry task failed - max attempts reached"
228                    );
229                    Err(error)
230                } else {
231                    // Schedule retry with exponential backoff
232                    let delay = self.base_delay * (2_u64.pow(self.current_attempt - 1));
233                    debug!(
234                        attempt = self.current_attempt,
235                        next_delay = ?delay,
236                        "Retry task failed - scheduling retry"
237                    );
238
239                    let task_id = scheduler.executing_task_id().unwrap_or_default();
240                    let wrapper = TaskWrapper::new(self);
241                    scheduler.schedule_task_at(
242                        scheduler.time() + delay,
243                        task_id,
244                        Box::new(wrapper),
245                    );
246
247                    // The retry chain continues under the same task ID so the original
248                    // TaskHandle will observe the eventual success/failure.
249                    Err(error)
250                }
251            }
252        }
253    }
254}
255
256/// A task that executes periodically
257#[derive(Clone)]
258pub struct PeriodicTask<F> {
259    callback: F,
260    interval: SimTime,
261    remaining_executions: Option<u32>,
262}
263
264impl<F> PeriodicTask<F>
265where
266    F: Fn(&mut Scheduler) + Clone + 'static,
267{
268    /// Create a new periodic task that runs indefinitely
269    pub fn new(callback: F, interval: SimTime) -> Self {
270        Self {
271            callback,
272            interval,
273            remaining_executions: None,
274        }
275    }
276
277    /// Create a new periodic task that runs a limited number of times
278    pub fn with_count(callback: F, interval: SimTime, count: u32) -> Self {
279        Self {
280            callback,
281            interval,
282            remaining_executions: Some(count),
283        }
284    }
285}
286
287impl<F> Task for PeriodicTask<F>
288where
289    F: Fn(&mut Scheduler) + Clone + 'static,
290{
291    type Output = ();
292
293    #[instrument(skip(self, scheduler), fields(
294        task_type = "PeriodicTask",
295        interval = ?self.interval,
296        remaining = ?self.remaining_executions
297    ))]
298    fn execute(mut self, scheduler: &mut Scheduler) -> Self::Output {
299        debug!("Executing periodic task");
300
301        // Execute the callback
302        (self.callback)(scheduler);
303
304        // Schedule next execution if needed
305        if let Some(remaining) = &mut self.remaining_executions {
306            *remaining -= 1;
307            if *remaining == 0 {
308                info!("Periodic task completed - no more executions");
309                return; // No more executions
310            }
311            debug!(remaining = *remaining, "Periodic task continuing");
312        } else {
313            trace!("Periodic task continuing indefinitely");
314        }
315
316        // Schedule next execution under the same task ID so the original handle can
317        // cancel the whole periodic chain.
318        let task_id = scheduler.executing_task_id().unwrap_or_default();
319        let interval = self.interval;
320        let wrapper = TaskWrapper::new(self);
321        scheduler.schedule_task_at(scheduler.time() + interval, task_id, Box::new(wrapper));
322
323        debug!(
324            next_execution_time = ?(scheduler.time() + interval),
325            "Scheduled next periodic task execution"
326        );
327    }
328}
329
330#[cfg(test)]
331mod tests {
332    use super::*;
333    use std::sync::{Arc, Mutex};
334    use std::time::Duration;
335
336    #[test]
337    fn test_task_id_creation() {
338        let id1 = TaskId::new();
339        let id2 = TaskId::new();
340        assert_ne!(id1, id2);
341
342        let id3 = TaskId::default();
343        assert_ne!(id1, id3);
344    }
345
346    #[test]
347    fn test_task_handle() {
348        let id = TaskId::new();
349        let handle: TaskHandle<i32> = TaskHandle::new(id);
350        assert_eq!(handle.id(), id);
351    }
352
353    #[test]
354    fn test_task_handle_exposes_stable_id() {
355        // TaskHandle should always return the same ID.
356        let task_id = TaskId::new();
357        let handle: TaskHandle<i32> = TaskHandle::new(task_id);
358        assert_eq!(handle.id(), task_id);
359        assert_eq!(handle.id(), task_id);
360    }
361
362    #[test]
363    fn test_closure_task() {
364        let executed = Arc::new(Mutex::new(false));
365        let executed_clone = executed.clone();
366
367        let task = ClosureTask::new(move |_scheduler| {
368            *executed_clone.lock().unwrap() = true;
369            42
370        });
371
372        let mut scheduler = Scheduler::default();
373        let result = task.execute(&mut scheduler);
374
375        assert_eq!(result, 42);
376        assert!(*executed.lock().unwrap());
377    }
378
379    #[test]
380    fn test_timeout_task() {
381        let executed = Arc::new(Mutex::new(false));
382        let executed_clone = executed.clone();
383
384        let task = TimeoutTask::new(move |_scheduler| {
385            *executed_clone.lock().unwrap() = true;
386        });
387
388        let mut scheduler = Scheduler::default();
389        task.execute(&mut scheduler);
390
391        assert!(*executed.lock().unwrap());
392    }
393
394    #[test]
395    fn test_periodic_task_with_count() {
396        let counter = Arc::new(Mutex::new(0));
397        let counter_clone = counter.clone();
398
399        let task = PeriodicTask::with_count(
400            move |_scheduler| {
401                *counter_clone.lock().unwrap() += 1;
402            },
403            SimTime::from_duration(Duration::from_millis(100)),
404            3,
405        );
406
407        use crate::EventEntry;
408
409        let mut scheduler = Scheduler::default();
410        let handle = scheduler.schedule_task(SimTime::zero(), task);
411
412        // First execution should schedule the next execution under the same task ID.
413        let EventEntry::Task(entry) = scheduler.pop().unwrap() else {
414            panic!("expected task event");
415        };
416        assert_eq!(entry.task_id, handle.id());
417        assert!(scheduler.execute_task(entry.task_id));
418
419        assert_eq!(*counter.lock().unwrap(), 1);
420
421        // There should be a scheduled event for the next execution.
422        assert!(scheduler.peek().is_some());
423    }
424
425    #[test]
426    fn test_retry_task_success() {
427        use crate::EventEntry;
428
429        let attempt_count = Arc::new(Mutex::new(0));
430        let attempt_count_clone = attempt_count.clone();
431
432        let task = RetryTask::new(
433            move |_scheduler| {
434                let mut count = attempt_count_clone.lock().unwrap();
435                *count += 1;
436                if *count >= 2 {
437                    Ok(42)
438                } else {
439                    Err("Not ready yet")
440                }
441            },
442            3,
443            SimTime::from_duration(Duration::from_millis(100)),
444        );
445
446        let mut scheduler = Scheduler::default();
447        let handle = scheduler.schedule_task(SimTime::zero(), task);
448
449        // First attempt
450        let EventEntry::Task(entry) = scheduler.pop().unwrap() else {
451            panic!("expected task event");
452        };
453        assert!(scheduler.execute_task(entry.task_id));
454        assert_eq!(*attempt_count.lock().unwrap(), 1);
455
456        // Result should not be available yet (retry scheduled under same ID)
457        let intermediate: Option<Result<i32, &str>> = scheduler.get_task_result(handle);
458        assert!(intermediate.is_none());
459
460        // Second attempt
461        let EventEntry::Task(entry) = scheduler.pop().unwrap() else {
462            panic!("expected task event");
463        };
464        assert!(scheduler.execute_task(entry.task_id));
465        assert_eq!(*attempt_count.lock().unwrap(), 2);
466
467        let final_result: Option<Result<i32, &str>> = scheduler.get_task_result(handle);
468        assert_eq!(final_result, Some(Ok(42)));
469    }
470
471    #[test]
472    fn test_retry_task_max_attempts() {
473        use crate::EventEntry;
474
475        let attempt_count = Arc::new(Mutex::new(0));
476        let attempt_count_clone = attempt_count.clone();
477
478        let task = RetryTask::new(
479            move |_scheduler| -> Result<i32, &'static str> {
480                let mut count = attempt_count_clone.lock().unwrap();
481                *count += 1;
482                Err("Always fails")
483            },
484            2, // Max 2 attempts
485            SimTime::from_duration(Duration::from_millis(100)),
486        );
487
488        let mut scheduler = Scheduler::default();
489        let handle = scheduler.schedule_task(SimTime::zero(), task);
490
491        // First attempt
492        let EventEntry::Task(entry) = scheduler.pop().unwrap() else {
493            panic!("expected task event");
494        };
495        assert!(scheduler.execute_task(entry.task_id));
496        assert_eq!(*attempt_count.lock().unwrap(), 1);
497
498        let intermediate: Option<Result<i32, &'static str>> = scheduler.get_task_result(handle);
499        assert!(intermediate.is_none());
500
501        // Second attempt (max attempts reached)
502        let EventEntry::Task(entry) = scheduler.pop().unwrap() else {
503            panic!("expected task event");
504        };
505        assert!(scheduler.execute_task(entry.task_id));
506        assert_eq!(*attempt_count.lock().unwrap(), 2);
507
508        let final_result: Option<Result<i32, &'static str>> = scheduler.get_task_result(handle);
509        assert_eq!(final_result, Some(Err("Always fails")));
510    }
511}