use std::collections::VecDeque;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
pub type Callback = Box<dyn FnOnce() + Send>;
#[derive(Clone)]
pub struct CallbackExecutor {
callbacks: Arc<Mutex<VecDeque<Callback>>>,
shutdown: Arc<AtomicBool>,
notify: Arc<tokio::sync::Notify>,
}
impl Default for CallbackExecutor {
fn default() -> Self {
Self::new()
}
}
impl CallbackExecutor {
pub fn new() -> Self {
Self {
callbacks: Arc::new(Mutex::new(VecDeque::new())),
shutdown: Arc::new(AtomicBool::new(false)),
notify: Arc::new(tokio::sync::Notify::new()),
}
}
pub fn enqueue(&self, callback: Callback) {
if !self.is_shutdown() {
self.callbacks.lock().unwrap().push_back(callback);
self.notify.notify_one();
}
}
pub fn process_pending(&self) -> usize {
let callbacks: Vec<Callback> = {
let mut queue = self.callbacks.lock().unwrap();
queue.drain(..).collect()
};
let count = callbacks.len();
for callback in callbacks {
callback();
}
count
}
pub fn pending_count(&self) -> usize {
self.callbacks.lock().unwrap().len()
}
pub fn notified(&self) -> tokio::sync::futures::Notified<'_> {
self.notify.notified()
}
pub fn shutdown(&self) {
self.shutdown.store(true, Ordering::SeqCst);
self.notify.notify_one();
}
pub fn is_shutdown(&self) -> bool {
self.shutdown.load(Ordering::SeqCst)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::AtomicUsize;
#[test]
fn test_enqueue_and_process() {
let executor = CallbackExecutor::new();
let counter = Arc::new(AtomicUsize::new(0));
for _ in 0..5 {
let counter = counter.clone();
executor.enqueue(Box::new(move || {
counter.fetch_add(1, Ordering::SeqCst);
}));
}
assert_eq!(executor.pending_count(), 5);
let processed = executor.process_pending();
assert_eq!(processed, 5);
assert_eq!(counter.load(Ordering::SeqCst), 5);
assert_eq!(executor.pending_count(), 0);
}
#[test]
fn test_shutdown() {
let executor = CallbackExecutor::new();
let counter = Arc::new(AtomicUsize::new(0));
let counter1 = counter.clone();
executor.enqueue(Box::new(move || {
counter1.fetch_add(1, Ordering::SeqCst);
}));
executor.shutdown();
assert!(executor.is_shutdown());
let counter2 = counter.clone();
executor.enqueue(Box::new(move || {
counter2.fetch_add(1, Ordering::SeqCst);
}));
assert_eq!(executor.pending_count(), 1);
executor.process_pending();
assert_eq!(counter.load(Ordering::SeqCst), 1);
}
#[test]
fn test_clone() {
let executor = CallbackExecutor::new();
let executor2 = executor.clone();
let counter = Arc::new(AtomicUsize::new(0));
let counter1 = counter.clone();
executor.enqueue(Box::new(move || {
counter1.fetch_add(1, Ordering::SeqCst);
}));
let processed = executor2.process_pending();
assert_eq!(processed, 1);
assert_eq!(counter.load(Ordering::SeqCst), 1);
}
}