Skip to main content

durable_execution_sdk_testing/checkpoint_server/
scheduler.rs

1//! Scheduler implementations for test execution orchestration.
2//!
3//! This module provides scheduler implementations that manage when handler
4//! re-invocations occur during test execution. Two implementations are provided:
5//!
6//! - `QueueScheduler`: For time-skipping mode, processes functions in FIFO order
7//! - `TimerScheduler`: For real-time mode, respects actual timestamps using tokio timers
8//!
9//! # Requirements
10//!
11//! - 17.1: WHEN time skipping is enabled, THE Scheduler SHALL use a queue-based
12//!   approach that processes operations in FIFO order
13//! - 17.2: WHEN time skipping is disabled, THE Scheduler SHALL use timer-based
14//!   scheduling that respects actual timestamps
15//! - 17.3: WHEN a function is scheduled, THE Scheduler SHALL execute any checkpoint
16//!   updates before invoking the handler
17//! - 17.4: WHEN the scheduler has pending functions, THE Scheduler SHALL report
18//!   that scheduled functions exist via has_scheduled_function()
19//! - 17.5: WHEN execution completes, THE Scheduler SHALL flush any remaining
20//!   scheduled functions
21
22use std::collections::VecDeque;
23use std::future::Future;
24use std::pin::Pin;
25use std::sync::atomic::{AtomicBool, Ordering};
26use std::sync::Arc;
27
28use chrono::{DateTime, Utc};
29use tokio::sync::Mutex;
30use tokio::task::JoinHandle;
31
32use crate::error::TestError;
33
34/// Type alias for a boxed async function that returns nothing.
35pub type BoxedAsyncFn = Box<dyn FnOnce() -> Pin<Box<dyn Future<Output = ()> + Send>> + Send>;
36
37/// Type alias for an error handler function.
38pub type ErrorHandler = Box<dyn FnOnce(TestError) + Send>;
39
40/// Type alias for a checkpoint update function.
41pub type CheckpointUpdateFn =
42    Box<dyn FnOnce() -> Pin<Box<dyn Future<Output = Result<(), TestError>> + Send>> + Send>;
43
44/// A scheduled function with its metadata.
45pub struct ScheduledFunction {
46    /// The function to execute
47    pub start_invocation: BoxedAsyncFn,
48    /// Error handler for failures
49    pub on_error: ErrorHandler,
50    /// Optional timestamp for when to execute (used by TimerScheduler)
51    pub timestamp: Option<DateTime<Utc>>,
52    /// Optional checkpoint update to run before invocation
53    pub update_checkpoint: Option<CheckpointUpdateFn>,
54}
55
56impl std::fmt::Debug for ScheduledFunction {
57    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58        f.debug_struct("ScheduledFunction")
59            .field("timestamp", &self.timestamp)
60            .field("has_update_checkpoint", &self.update_checkpoint.is_some())
61            .finish()
62    }
63}
64
65/// Trait for scheduling handler invocations.
66///
67/// Schedulers manage when handler re-invocations occur during test execution.
68/// Different implementations provide different scheduling strategies.
69pub trait Scheduler: Send {
70    /// Schedule a function to be executed.
71    ///
72    /// # Arguments
73    ///
74    /// * `start_invocation` - The function to execute
75    /// * `on_error` - Error handler for failures
76    /// * `timestamp` - Optional timestamp for when to execute (ignored by QueueScheduler)
77    /// * `update_checkpoint` - Optional checkpoint update to run before invocation
78    fn schedule_function(
79        &mut self,
80        start_invocation: BoxedAsyncFn,
81        on_error: ErrorHandler,
82        timestamp: Option<DateTime<Utc>>,
83        update_checkpoint: Option<CheckpointUpdateFn>,
84    );
85
86    /// Check if there are scheduled functions pending.
87    fn has_scheduled_function(&self) -> bool;
88
89    /// Flush all scheduled functions without executing them.
90    fn flush_timers(&mut self);
91
92    /// Process the next scheduled function (if any).
93    ///
94    /// Returns `true` if a function was processed, `false` if the queue is empty.
95    fn process_next(&mut self) -> Pin<Box<dyn Future<Output = bool> + Send + '_>>;
96}
97
98/// Queue-based scheduler for time-skipping mode.
99///
100/// Executes functions sequentially in FIFO order, ignoring timestamps.
101/// This is used when time skipping is enabled to process operations
102/// as quickly as possible without waiting for actual time to pass.
103pub struct QueueScheduler {
104    /// Queue of scheduled functions
105    function_queue: VecDeque<ScheduledFunction>,
106    /// Flag indicating if processing is in progress
107    is_processing: Arc<AtomicBool>,
108}
109
110impl QueueScheduler {
111    /// Create a new queue scheduler.
112    pub fn new() -> Self {
113        Self {
114            function_queue: VecDeque::new(),
115            is_processing: Arc::new(AtomicBool::new(false)),
116        }
117    }
118
119    /// Check if the scheduler is currently processing.
120    pub fn is_processing(&self) -> bool {
121        self.is_processing.load(Ordering::SeqCst)
122    }
123
124    /// Get the number of queued functions.
125    pub fn queue_len(&self) -> usize {
126        self.function_queue.len()
127    }
128}
129
130impl Default for QueueScheduler {
131    fn default() -> Self {
132        Self::new()
133    }
134}
135
136impl Scheduler for QueueScheduler {
137    fn schedule_function(
138        &mut self,
139        start_invocation: BoxedAsyncFn,
140        on_error: ErrorHandler,
141        timestamp: Option<DateTime<Utc>>,
142        update_checkpoint: Option<CheckpointUpdateFn>,
143    ) {
144        let scheduled = ScheduledFunction {
145            start_invocation,
146            on_error,
147            timestamp,
148            update_checkpoint,
149        };
150        self.function_queue.push_back(scheduled);
151    }
152
153    fn has_scheduled_function(&self) -> bool {
154        !self.function_queue.is_empty()
155    }
156
157    fn flush_timers(&mut self) {
158        self.function_queue.clear();
159    }
160
161    fn process_next(&mut self) -> Pin<Box<dyn Future<Output = bool> + Send + '_>> {
162        Box::pin(async move {
163            if let Some(scheduled) = self.function_queue.pop_front() {
164                self.is_processing.store(true, Ordering::SeqCst);
165
166                // Execute checkpoint update if provided
167                if let Some(update_fn) = scheduled.update_checkpoint {
168                    let update_future = update_fn();
169                    if let Err(e) = update_future.await {
170                        (scheduled.on_error)(e);
171                        self.is_processing.store(false, Ordering::SeqCst);
172                        return true;
173                    }
174                }
175
176                // Execute the scheduled function
177                let invocation_future = (scheduled.start_invocation)();
178                invocation_future.await;
179
180                self.is_processing.store(false, Ordering::SeqCst);
181                true
182            } else {
183                false
184            }
185        })
186    }
187}
188
189/// Timer-based scheduler for real-time mode.
190///
191/// Respects actual timestamps using tokio timers. Functions are scheduled
192/// to execute at their specified timestamps, with earlier timestamps
193/// executing first.
194pub struct TimerScheduler {
195    /// Handles to spawned timer tasks
196    scheduled_tasks: Vec<JoinHandle<()>>,
197    /// Shared state for tracking pending functions
198    pending_count: Arc<Mutex<usize>>,
199}
200
201impl TimerScheduler {
202    /// Create a new timer scheduler.
203    pub fn new() -> Self {
204        Self {
205            scheduled_tasks: Vec::new(),
206            pending_count: Arc::new(Mutex::new(0)),
207        }
208    }
209
210    /// Get the number of pending tasks.
211    pub async fn pending_count(&self) -> usize {
212        *self.pending_count.lock().await
213    }
214}
215
216impl Default for TimerScheduler {
217    fn default() -> Self {
218        Self::new()
219    }
220}
221
222impl Scheduler for TimerScheduler {
223    fn schedule_function(
224        &mut self,
225        start_invocation: BoxedAsyncFn,
226        on_error: ErrorHandler,
227        timestamp: Option<DateTime<Utc>>,
228        update_checkpoint: Option<CheckpointUpdateFn>,
229    ) {
230        let pending_count = Arc::clone(&self.pending_count);
231
232        // Increment pending count
233        let pending_count_clone = Arc::clone(&pending_count);
234        tokio::spawn(async move {
235            let mut count = pending_count_clone.lock().await;
236            *count += 1;
237        });
238
239        let handle = tokio::spawn(async move {
240            // Calculate delay if timestamp is provided
241            if let Some(ts) = timestamp {
242                let now = Utc::now();
243                if ts > now {
244                    let duration = (ts - now).to_std().unwrap_or_default();
245                    tokio::time::sleep(duration).await;
246                }
247            }
248
249            // Execute checkpoint update if provided
250            if let Some(update_fn) = update_checkpoint {
251                let update_future = update_fn();
252                if let Err(e) = update_future.await {
253                    (on_error)(e);
254                    // Decrement pending count
255                    let mut count = pending_count.lock().await;
256                    *count = count.saturating_sub(1);
257                    return;
258                }
259            }
260
261            // Execute the scheduled function
262            let invocation_future = start_invocation();
263            invocation_future.await;
264
265            // Decrement pending count
266            let mut count = pending_count.lock().await;
267            *count = count.saturating_sub(1);
268        });
269
270        self.scheduled_tasks.push(handle);
271    }
272
273    fn has_scheduled_function(&self) -> bool {
274        // Check if any tasks are still running
275        self.scheduled_tasks.iter().any(|h| !h.is_finished())
276    }
277
278    fn flush_timers(&mut self) {
279        // Abort all pending tasks
280        for handle in self.scheduled_tasks.drain(..) {
281            handle.abort();
282        }
283    }
284
285    fn process_next(&mut self) -> Pin<Box<dyn Future<Output = bool> + Send + '_>> {
286        // TimerScheduler processes functions asynchronously via spawned tasks,
287        // so this method just cleans up finished tasks and returns whether
288        // there are still pending tasks.
289        Box::pin(async move {
290            // Remove finished tasks
291            self.scheduled_tasks.retain(|h| !h.is_finished());
292            !self.scheduled_tasks.is_empty()
293        })
294    }
295}
296
297#[cfg(test)]
298mod tests {
299    use super::*;
300    use std::sync::atomic::AtomicUsize;
301
302    #[tokio::test]
303    async fn test_queue_scheduler_new() {
304        let scheduler = QueueScheduler::new();
305        assert!(!scheduler.has_scheduled_function());
306        assert!(!scheduler.is_processing());
307        assert_eq!(scheduler.queue_len(), 0);
308    }
309
310    #[tokio::test]
311    async fn test_queue_scheduler_schedule_and_process() {
312        let mut scheduler = QueueScheduler::new();
313        let counter = Arc::new(AtomicUsize::new(0));
314        let counter_clone = Arc::clone(&counter);
315
316        scheduler.schedule_function(
317            Box::new(move || {
318                let counter = Arc::clone(&counter_clone);
319                Box::pin(async move {
320                    counter.fetch_add(1, Ordering::SeqCst);
321                })
322            }),
323            Box::new(|_| {}),
324            None,
325            None,
326        );
327
328        assert!(scheduler.has_scheduled_function());
329        assert_eq!(scheduler.queue_len(), 1);
330
331        let processed = scheduler.process_next().await;
332        assert!(processed);
333        assert_eq!(counter.load(Ordering::SeqCst), 1);
334        assert!(!scheduler.has_scheduled_function());
335    }
336
337    #[tokio::test]
338    async fn test_queue_scheduler_fifo_order() {
339        let mut scheduler = QueueScheduler::new();
340        let order = Arc::new(Mutex::new(Vec::new()));
341
342        // Schedule three functions
343        for i in 0..3 {
344            let order_clone = Arc::clone(&order);
345            scheduler.schedule_function(
346                Box::new(move || {
347                    Box::pin(async move {
348                        order_clone.lock().await.push(i);
349                    })
350                }),
351                Box::new(|_| {}),
352                None,
353                None,
354            );
355        }
356
357        // Process all
358        while scheduler.process_next().await {}
359
360        // Verify FIFO order
361        let result = order.lock().await;
362        assert_eq!(*result, vec![0, 1, 2]);
363    }
364
365    #[tokio::test]
366    async fn test_queue_scheduler_with_checkpoint_update() {
367        let mut scheduler = QueueScheduler::new();
368        let checkpoint_called = Arc::new(AtomicBool::new(false));
369        let invocation_called = Arc::new(AtomicBool::new(false));
370
371        let checkpoint_clone = Arc::clone(&checkpoint_called);
372        let invocation_clone = Arc::clone(&invocation_called);
373
374        scheduler.schedule_function(
375            Box::new(move || {
376                let invocation = Arc::clone(&invocation_clone);
377                Box::pin(async move {
378                    invocation.store(true, Ordering::SeqCst);
379                })
380            }),
381            Box::new(|_| {}),
382            None,
383            Some(Box::new(move || {
384                let checkpoint = Arc::clone(&checkpoint_clone);
385                Box::pin(async move {
386                    checkpoint.store(true, Ordering::SeqCst);
387                    Ok(())
388                })
389            })),
390        );
391
392        scheduler.process_next().await;
393
394        assert!(checkpoint_called.load(Ordering::SeqCst));
395        assert!(invocation_called.load(Ordering::SeqCst));
396    }
397
398    #[tokio::test]
399    async fn test_queue_scheduler_flush() {
400        let mut scheduler = QueueScheduler::new();
401
402        for _ in 0..5 {
403            scheduler.schedule_function(
404                Box::new(|| Box::pin(async {})),
405                Box::new(|_| {}),
406                None,
407                None,
408            );
409        }
410
411        assert_eq!(scheduler.queue_len(), 5);
412        scheduler.flush_timers();
413        assert_eq!(scheduler.queue_len(), 0);
414        assert!(!scheduler.has_scheduled_function());
415    }
416
417    #[tokio::test]
418    async fn test_timer_scheduler_new() {
419        let scheduler = TimerScheduler::new();
420        assert!(!scheduler.has_scheduled_function());
421    }
422
423    #[tokio::test]
424    async fn test_timer_scheduler_schedule_immediate() {
425        let mut scheduler = TimerScheduler::new();
426        let counter = Arc::new(AtomicUsize::new(0));
427        let counter_clone = Arc::clone(&counter);
428
429        scheduler.schedule_function(
430            Box::new(move || {
431                let counter = Arc::clone(&counter_clone);
432                Box::pin(async move {
433                    counter.fetch_add(1, Ordering::SeqCst);
434                })
435            }),
436            Box::new(|_| {}),
437            None, // No timestamp = immediate execution
438            None,
439        );
440
441        // Wait a bit for the task to complete
442        tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
443
444        assert_eq!(counter.load(Ordering::SeqCst), 1);
445    }
446
447    #[tokio::test]
448    async fn test_timer_scheduler_flush() {
449        let mut scheduler = TimerScheduler::new();
450        let counter = Arc::new(AtomicUsize::new(0));
451
452        // Schedule a function with a future timestamp
453        let counter_clone = Arc::clone(&counter);
454        let future_time = Utc::now() + chrono::Duration::seconds(10);
455
456        scheduler.schedule_function(
457            Box::new(move || {
458                let counter = Arc::clone(&counter_clone);
459                Box::pin(async move {
460                    counter.fetch_add(1, Ordering::SeqCst);
461                })
462            }),
463            Box::new(|_| {}),
464            Some(future_time),
465            None,
466        );
467
468        // Flush before it executes
469        scheduler.flush_timers();
470
471        // Wait a bit
472        tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
473
474        // Counter should still be 0 because we flushed
475        assert_eq!(counter.load(Ordering::SeqCst), 0);
476    }
477}