1use std::collections::VecDeque;
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::{Arc, Mutex};
9
10pub type Callback = Box<dyn FnOnce() + Send>;
12
13#[derive(Clone)]
18pub struct CallbackExecutor {
19 callbacks: Arc<Mutex<VecDeque<Callback>>>,
20 shutdown: Arc<AtomicBool>,
21 notify: Arc<tokio::sync::Notify>,
22}
23
24impl Default for CallbackExecutor {
25 fn default() -> Self {
26 Self::new()
27 }
28}
29
30impl CallbackExecutor {
31 pub fn new() -> Self {
33 Self {
34 callbacks: Arc::new(Mutex::new(VecDeque::new())),
35 shutdown: Arc::new(AtomicBool::new(false)),
36 notify: Arc::new(tokio::sync::Notify::new()),
37 }
38 }
39
40 pub fn enqueue(&self, callback: Callback) {
44 if !self.is_shutdown() {
45 self.callbacks.lock().unwrap().push_back(callback);
46 self.notify.notify_one();
47 }
48 }
49
50 pub fn process_pending(&self) -> usize {
54 let callbacks: Vec<Callback> = {
55 let mut queue = self.callbacks.lock().unwrap();
56 queue.drain(..).collect()
57 };
58
59 let count = callbacks.len();
60 for callback in callbacks {
61 callback();
62 }
63
64 count
65 }
66
67 pub fn pending_count(&self) -> usize {
69 self.callbacks.lock().unwrap().len()
70 }
71
72 pub fn notified(&self) -> tokio::sync::futures::Notified<'_> {
75 self.notify.notified()
76 }
77
78 pub fn shutdown(&self) {
82 self.shutdown.store(true, Ordering::SeqCst);
83 self.notify.notify_one();
84 }
85
86 pub fn is_shutdown(&self) -> bool {
88 self.shutdown.load(Ordering::SeqCst)
89 }
90}
91
92#[cfg(test)]
93mod tests {
94 use super::*;
95 use std::sync::atomic::AtomicUsize;
96
97 #[test]
98 fn test_enqueue_and_process() {
99 let executor = CallbackExecutor::new();
100 let counter = Arc::new(AtomicUsize::new(0));
101
102 for _ in 0..5 {
104 let counter = counter.clone();
105 executor.enqueue(Box::new(move || {
106 counter.fetch_add(1, Ordering::SeqCst);
107 }));
108 }
109
110 assert_eq!(executor.pending_count(), 5);
111
112 let processed = executor.process_pending();
114
115 assert_eq!(processed, 5);
116 assert_eq!(counter.load(Ordering::SeqCst), 5);
117 assert_eq!(executor.pending_count(), 0);
118 }
119
120 #[test]
121 fn test_shutdown() {
122 let executor = CallbackExecutor::new();
123 let counter = Arc::new(AtomicUsize::new(0));
124
125 let counter1 = counter.clone();
127 executor.enqueue(Box::new(move || {
128 counter1.fetch_add(1, Ordering::SeqCst);
129 }));
130
131 executor.shutdown();
133 assert!(executor.is_shutdown());
134
135 let counter2 = counter.clone();
137 executor.enqueue(Box::new(move || {
138 counter2.fetch_add(1, Ordering::SeqCst);
139 }));
140
141 assert_eq!(executor.pending_count(), 1);
143
144 executor.process_pending();
146 assert_eq!(counter.load(Ordering::SeqCst), 1);
147 }
148
149 #[test]
150 fn test_clone() {
151 let executor = CallbackExecutor::new();
152 let executor2 = executor.clone();
153 let counter = Arc::new(AtomicUsize::new(0));
154
155 let counter1 = counter.clone();
157 executor.enqueue(Box::new(move || {
158 counter1.fetch_add(1, Ordering::SeqCst);
159 }));
160
161 let processed = executor2.process_pending();
163
164 assert_eq!(processed, 1);
165 assert_eq!(counter.load(Ordering::SeqCst), 1);
166 }
167}