Skip to main content

zenobuf_core/
executor.rs

1//! Callback executor for processing subscriber callbacks
2//!
3//! This module provides a simple callback queue that allows subscribers to enqueue
4//! callbacks for later processing by the node's spin methods.
5
6use std::collections::VecDeque;
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::{Arc, Mutex};
9
10/// A callback that can be executed by the executor
11pub type Callback = Box<dyn FnOnce() + Send>;
12
13/// A simple executor that queues callbacks for later processing
14///
15/// The executor provides a thread-safe way to enqueue callbacks from subscriber
16/// threads and process them in the node's spin loop.
17#[derive(Clone)]
18pub struct CallbackExecutor {
19    callbacks: Arc<Mutex<VecDeque<Callback>>>,
20    shutdown: Arc<AtomicBool>,
21    notify: Arc<tokio::sync::Notify>,
22}
23
24impl Default for CallbackExecutor {
25    fn default() -> Self {
26        Self::new()
27    }
28}
29
30impl CallbackExecutor {
31    /// Creates a new callback executor
32    pub fn new() -> Self {
33        Self {
34            callbacks: Arc::new(Mutex::new(VecDeque::new())),
35            shutdown: Arc::new(AtomicBool::new(false)),
36            notify: Arc::new(tokio::sync::Notify::new()),
37        }
38    }
39
40    /// Enqueues a callback for later processing
41    ///
42    /// This method is thread-safe and can be called from subscriber callbacks.
43    pub fn enqueue(&self, callback: Callback) {
44        if !self.is_shutdown() {
45            self.callbacks.lock().unwrap().push_back(callback);
46            self.notify.notify_one();
47        }
48    }
49
50    /// Processes all pending callbacks
51    ///
52    /// Returns the number of callbacks that were processed.
53    pub fn process_pending(&self) -> usize {
54        let callbacks: Vec<Callback> = {
55            let mut queue = self.callbacks.lock().unwrap();
56            queue.drain(..).collect()
57        };
58
59        let count = callbacks.len();
60        for callback in callbacks {
61            callback();
62        }
63
64        count
65    }
66
67    /// Returns the number of pending callbacks
68    pub fn pending_count(&self) -> usize {
69        self.callbacks.lock().unwrap().len()
70    }
71
72    /// Returns a future that resolves when a callback is enqueued or shutdown is signaled.
73    /// Must be called (to register interest) before draining the queue.
74    pub fn notified(&self) -> tokio::sync::futures::Notified<'_> {
75        self.notify.notified()
76    }
77
78    /// Signals the executor to shutdown
79    ///
80    /// After shutdown, no new callbacks will be accepted.
81    pub fn shutdown(&self) {
82        self.shutdown.store(true, Ordering::SeqCst);
83        self.notify.notify_one();
84    }
85
86    /// Returns true if the executor has been shutdown
87    pub fn is_shutdown(&self) -> bool {
88        self.shutdown.load(Ordering::SeqCst)
89    }
90}
91
92#[cfg(test)]
93mod tests {
94    use super::*;
95    use std::sync::atomic::AtomicUsize;
96
97    #[test]
98    fn test_enqueue_and_process() {
99        let executor = CallbackExecutor::new();
100        let counter = Arc::new(AtomicUsize::new(0));
101
102        // Enqueue some callbacks
103        for _ in 0..5 {
104            let counter = counter.clone();
105            executor.enqueue(Box::new(move || {
106                counter.fetch_add(1, Ordering::SeqCst);
107            }));
108        }
109
110        assert_eq!(executor.pending_count(), 5);
111
112        // Process callbacks
113        let processed = executor.process_pending();
114
115        assert_eq!(processed, 5);
116        assert_eq!(counter.load(Ordering::SeqCst), 5);
117        assert_eq!(executor.pending_count(), 0);
118    }
119
120    #[test]
121    fn test_shutdown() {
122        let executor = CallbackExecutor::new();
123        let counter = Arc::new(AtomicUsize::new(0));
124
125        // Enqueue before shutdown
126        let counter1 = counter.clone();
127        executor.enqueue(Box::new(move || {
128            counter1.fetch_add(1, Ordering::SeqCst);
129        }));
130
131        // Shutdown
132        executor.shutdown();
133        assert!(executor.is_shutdown());
134
135        // Enqueue after shutdown should be ignored
136        let counter2 = counter.clone();
137        executor.enqueue(Box::new(move || {
138            counter2.fetch_add(1, Ordering::SeqCst);
139        }));
140
141        // Only the first callback should be in the queue
142        assert_eq!(executor.pending_count(), 1);
143
144        // Process should still work for queued callbacks
145        executor.process_pending();
146        assert_eq!(counter.load(Ordering::SeqCst), 1);
147    }
148
149    #[test]
150    fn test_clone() {
151        let executor = CallbackExecutor::new();
152        let executor2 = executor.clone();
153        let counter = Arc::new(AtomicUsize::new(0));
154
155        // Enqueue via first executor
156        let counter1 = counter.clone();
157        executor.enqueue(Box::new(move || {
158            counter1.fetch_add(1, Ordering::SeqCst);
159        }));
160
161        // Process via second executor
162        let processed = executor2.process_pending();
163
164        assert_eq!(processed, 1);
165        assert_eq!(counter.load(Ordering::SeqCst), 1);
166    }
167}