1use crate::{Scheduler, SimTime};
12use std::any::Any;
13use std::fmt;
14use std::marker::PhantomData;
15use std::sync::atomic::{AtomicU64, Ordering};
16use tracing::{debug, info, instrument, trace, warn};
17use uuid::Uuid;
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
21pub struct TaskId(pub Uuid);
22
23impl TaskId {
24 pub fn new() -> Self {
30 static NEXT_TASK_ID: AtomicU64 = AtomicU64::new(0);
31 let counter = NEXT_TASK_ID.fetch_add(1, Ordering::Relaxed) + 1;
32 let id = crate::ids::deterministic_uuid(0, crate::ids::UUID_DOMAIN_MANUAL_TASK, counter);
33 Self(id)
34 }
35}
36
37impl Default for TaskId {
38 fn default() -> Self {
39 Self::new()
40 }
41}
42
43impl fmt::Display for TaskId {
44 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
45 write!(f, "Task({})", self.0)
46 }
47}
48
49#[derive(Debug, Clone, Copy)]
51pub struct TaskHandle<T> {
52 id: TaskId,
53 _marker: PhantomData<T>,
54}
55
56impl<T> TaskHandle<T> {
57 pub(crate) fn new(id: TaskId) -> Self {
59 Self {
60 id,
61 _marker: PhantomData,
62 }
63 }
64
65 pub fn id(&self) -> TaskId {
67 self.id
68 }
69}
70
71pub trait Task: 'static {
73 type Output: 'static;
75
76 fn execute(self, scheduler: &mut Scheduler) -> Self::Output;
78}
79
80pub(crate) trait TaskExecution {
82 fn execute(self: Box<Self>, scheduler: &mut Scheduler) -> Box<dyn Any>;
84}
85
86pub(crate) struct TaskWrapper<T: Task> {
91 task: T,
92}
93
94impl<T: Task> TaskWrapper<T> {
95 pub fn new(task: T) -> Self {
96 Self { task }
97 }
98}
99
100impl<T: Task> TaskExecution for TaskWrapper<T> {
101 fn execute(self: Box<Self>, scheduler: &mut Scheduler) -> Box<dyn Any> {
102 let result = self.task.execute(scheduler);
103 Box::new(result)
104 }
105}
106
107pub struct ClosureTask<F, R> {
109 closure: F,
110 _marker: PhantomData<R>,
111}
112
113impl<F, R> ClosureTask<F, R>
114where
115 F: FnOnce(&mut Scheduler) -> R + 'static,
116 R: 'static,
117{
118 pub fn new(closure: F) -> Self {
120 Self {
121 closure,
122 _marker: PhantomData,
123 }
124 }
125}
126
127impl<F, R> Task for ClosureTask<F, R>
128where
129 F: FnOnce(&mut Scheduler) -> R + 'static,
130 R: 'static,
131{
132 type Output = R;
133
134 #[instrument(skip(self, scheduler), fields(task_type = "ClosureTask"))]
135 fn execute(self, scheduler: &mut Scheduler) -> Self::Output {
136 debug!("Executing closure task");
137 let result = (self.closure)(scheduler);
138 trace!("Closure task completed");
139 result
140 }
141}
142
143pub struct TimeoutTask<F> {
145 callback: F,
146}
147
148impl<F> TimeoutTask<F>
149where
150 F: FnOnce(&mut Scheduler) + 'static,
151{
152 pub fn new(callback: F) -> Self {
154 Self { callback }
155 }
156}
157
158impl<F> Task for TimeoutTask<F>
159where
160 F: FnOnce(&mut Scheduler) + 'static,
161{
162 type Output = ();
163
164 #[instrument(skip(self, scheduler), fields(task_type = "TimeoutTask"))]
165 fn execute(self, scheduler: &mut Scheduler) -> Self::Output {
166 debug!("Executing timeout task");
167 (self.callback)(scheduler);
168 trace!("Timeout task completed");
169 }
170}
171
172pub struct RetryTask<F, R, E> {
174 operation: F,
175 max_attempts: u32,
176 current_attempt: u32,
177 base_delay: SimTime,
178 _marker: PhantomData<(R, E)>,
179}
180
181impl<F, R, E> RetryTask<F, R, E>
182where
183 F: Fn(&mut Scheduler) -> Result<R, E> + 'static,
184 R: 'static,
185 E: 'static,
186{
187 pub fn new(operation: F, max_attempts: u32, base_delay: SimTime) -> Self {
189 Self {
190 operation,
191 max_attempts,
192 current_attempt: 0,
193 base_delay,
194 _marker: PhantomData,
195 }
196 }
197}
198
199impl<F, R, E> Task for RetryTask<F, R, E>
200where
201 F: Fn(&mut Scheduler) -> Result<R, E> + 'static,
202 R: 'static,
203 E: 'static,
204{
205 type Output = Result<R, E>;
206
207 #[instrument(skip(self, scheduler), fields(
208 task_type = "RetryTask",
209 attempt = self.current_attempt + 1,
210 max_attempts = self.max_attempts
211 ))]
212 fn execute(mut self, scheduler: &mut Scheduler) -> Self::Output {
213 self.current_attempt += 1;
214
215 debug!("Executing retry task");
216
217 match (self.operation)(scheduler) {
218 Ok(result) => {
219 info!(attempt = self.current_attempt, "Retry task succeeded");
220 Ok(result)
221 }
222 Err(error) => {
223 if self.current_attempt >= self.max_attempts {
224 warn!(
225 attempt = self.current_attempt,
226 max_attempts = self.max_attempts,
227 "Retry task failed - max attempts reached"
228 );
229 Err(error)
230 } else {
231 let delay = self.base_delay * (2_u64.pow(self.current_attempt - 1));
233 debug!(
234 attempt = self.current_attempt,
235 next_delay = ?delay,
236 "Retry task failed - scheduling retry"
237 );
238
239 let task_id = scheduler.executing_task_id().unwrap_or_default();
240 let wrapper = TaskWrapper::new(self);
241 scheduler.schedule_task_at(
242 scheduler.time() + delay,
243 task_id,
244 Box::new(wrapper),
245 );
246
247 Err(error)
250 }
251 }
252 }
253 }
254}
255
256#[derive(Clone)]
258pub struct PeriodicTask<F> {
259 callback: F,
260 interval: SimTime,
261 remaining_executions: Option<u32>,
262}
263
264impl<F> PeriodicTask<F>
265where
266 F: Fn(&mut Scheduler) + Clone + 'static,
267{
268 pub fn new(callback: F, interval: SimTime) -> Self {
270 Self {
271 callback,
272 interval,
273 remaining_executions: None,
274 }
275 }
276
277 pub fn with_count(callback: F, interval: SimTime, count: u32) -> Self {
279 Self {
280 callback,
281 interval,
282 remaining_executions: Some(count),
283 }
284 }
285}
286
287impl<F> Task for PeriodicTask<F>
288where
289 F: Fn(&mut Scheduler) + Clone + 'static,
290{
291 type Output = ();
292
293 #[instrument(skip(self, scheduler), fields(
294 task_type = "PeriodicTask",
295 interval = ?self.interval,
296 remaining = ?self.remaining_executions
297 ))]
298 fn execute(mut self, scheduler: &mut Scheduler) -> Self::Output {
299 debug!("Executing periodic task");
300
301 (self.callback)(scheduler);
303
304 if let Some(remaining) = &mut self.remaining_executions {
306 *remaining -= 1;
307 if *remaining == 0 {
308 info!("Periodic task completed - no more executions");
309 return; }
311 debug!(remaining = *remaining, "Periodic task continuing");
312 } else {
313 trace!("Periodic task continuing indefinitely");
314 }
315
316 let task_id = scheduler.executing_task_id().unwrap_or_default();
319 let interval = self.interval;
320 let wrapper = TaskWrapper::new(self);
321 scheduler.schedule_task_at(scheduler.time() + interval, task_id, Box::new(wrapper));
322
323 debug!(
324 next_execution_time = ?(scheduler.time() + interval),
325 "Scheduled next periodic task execution"
326 );
327 }
328}
329
330#[cfg(test)]
331mod tests {
332 use super::*;
333 use std::sync::{Arc, Mutex};
334 use std::time::Duration;
335
336 #[test]
337 fn test_task_id_creation() {
338 let id1 = TaskId::new();
339 let id2 = TaskId::new();
340 assert_ne!(id1, id2);
341
342 let id3 = TaskId::default();
343 assert_ne!(id1, id3);
344 }
345
346 #[test]
347 fn test_task_handle() {
348 let id = TaskId::new();
349 let handle: TaskHandle<i32> = TaskHandle::new(id);
350 assert_eq!(handle.id(), id);
351 }
352
353 #[test]
354 fn test_task_handle_exposes_stable_id() {
355 let task_id = TaskId::new();
357 let handle: TaskHandle<i32> = TaskHandle::new(task_id);
358 assert_eq!(handle.id(), task_id);
359 assert_eq!(handle.id(), task_id);
360 }
361
362 #[test]
363 fn test_closure_task() {
364 let executed = Arc::new(Mutex::new(false));
365 let executed_clone = executed.clone();
366
367 let task = ClosureTask::new(move |_scheduler| {
368 *executed_clone.lock().unwrap() = true;
369 42
370 });
371
372 let mut scheduler = Scheduler::default();
373 let result = task.execute(&mut scheduler);
374
375 assert_eq!(result, 42);
376 assert!(*executed.lock().unwrap());
377 }
378
379 #[test]
380 fn test_timeout_task() {
381 let executed = Arc::new(Mutex::new(false));
382 let executed_clone = executed.clone();
383
384 let task = TimeoutTask::new(move |_scheduler| {
385 *executed_clone.lock().unwrap() = true;
386 });
387
388 let mut scheduler = Scheduler::default();
389 task.execute(&mut scheduler);
390
391 assert!(*executed.lock().unwrap());
392 }
393
394 #[test]
395 fn test_periodic_task_with_count() {
396 let counter = Arc::new(Mutex::new(0));
397 let counter_clone = counter.clone();
398
399 let task = PeriodicTask::with_count(
400 move |_scheduler| {
401 *counter_clone.lock().unwrap() += 1;
402 },
403 SimTime::from_duration(Duration::from_millis(100)),
404 3,
405 );
406
407 use crate::EventEntry;
408
409 let mut scheduler = Scheduler::default();
410 let handle = scheduler.schedule_task(SimTime::zero(), task);
411
412 let EventEntry::Task(entry) = scheduler.pop().unwrap() else {
414 panic!("expected task event");
415 };
416 assert_eq!(entry.task_id, handle.id());
417 assert!(scheduler.execute_task(entry.task_id));
418
419 assert_eq!(*counter.lock().unwrap(), 1);
420
421 assert!(scheduler.peek().is_some());
423 }
424
425 #[test]
426 fn test_retry_task_success() {
427 use crate::EventEntry;
428
429 let attempt_count = Arc::new(Mutex::new(0));
430 let attempt_count_clone = attempt_count.clone();
431
432 let task = RetryTask::new(
433 move |_scheduler| {
434 let mut count = attempt_count_clone.lock().unwrap();
435 *count += 1;
436 if *count >= 2 {
437 Ok(42)
438 } else {
439 Err("Not ready yet")
440 }
441 },
442 3,
443 SimTime::from_duration(Duration::from_millis(100)),
444 );
445
446 let mut scheduler = Scheduler::default();
447 let handle = scheduler.schedule_task(SimTime::zero(), task);
448
449 let EventEntry::Task(entry) = scheduler.pop().unwrap() else {
451 panic!("expected task event");
452 };
453 assert!(scheduler.execute_task(entry.task_id));
454 assert_eq!(*attempt_count.lock().unwrap(), 1);
455
456 let intermediate: Option<Result<i32, &str>> = scheduler.get_task_result(handle);
458 assert!(intermediate.is_none());
459
460 let EventEntry::Task(entry) = scheduler.pop().unwrap() else {
462 panic!("expected task event");
463 };
464 assert!(scheduler.execute_task(entry.task_id));
465 assert_eq!(*attempt_count.lock().unwrap(), 2);
466
467 let final_result: Option<Result<i32, &str>> = scheduler.get_task_result(handle);
468 assert_eq!(final_result, Some(Ok(42)));
469 }
470
471 #[test]
472 fn test_retry_task_max_attempts() {
473 use crate::EventEntry;
474
475 let attempt_count = Arc::new(Mutex::new(0));
476 let attempt_count_clone = attempt_count.clone();
477
478 let task = RetryTask::new(
479 move |_scheduler| -> Result<i32, &'static str> {
480 let mut count = attempt_count_clone.lock().unwrap();
481 *count += 1;
482 Err("Always fails")
483 },
484 2, SimTime::from_duration(Duration::from_millis(100)),
486 );
487
488 let mut scheduler = Scheduler::default();
489 let handle = scheduler.schedule_task(SimTime::zero(), task);
490
491 let EventEntry::Task(entry) = scheduler.pop().unwrap() else {
493 panic!("expected task event");
494 };
495 assert!(scheduler.execute_task(entry.task_id));
496 assert_eq!(*attempt_count.lock().unwrap(), 1);
497
498 let intermediate: Option<Result<i32, &'static str>> = scheduler.get_task_result(handle);
499 assert!(intermediate.is_none());
500
501 let EventEntry::Task(entry) = scheduler.pop().unwrap() else {
503 panic!("expected task event");
504 };
505 assert!(scheduler.execute_task(entry.task_id));
506 assert_eq!(*attempt_count.lock().unwrap(), 2);
507
508 let final_result: Option<Result<i32, &'static str>> = scheduler.get_task_result(handle);
509 assert_eq!(final_result, Some(Err("Always fails")));
510 }
511}