#![cfg_attr(test, deny(warnings))]
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
type Cb = Arc<Fn(Result<(), &'static str>) -> () + Send + Sync>;
pub type CbOption = Option<Cb>;
#[derive(Clone)]
pub struct Batcher<T: Clone> {
running: Arc<AtomicBool>,
pending_batch: Arc<Mutex<Vec<T>>>,
pending_callbacks: Arc<Mutex<Vec<Cb>>>,
callbacks: Arc<Mutex<Vec<Cb>>>,
run: Arc<Fn(Vec<T>, Batcher<T>) -> () + Send + Sync>,
}
impl<T: Clone> Batcher<T> {
pub fn new(run: Arc<Fn(Vec<T>, Batcher<T>) -> () + Send + Sync>) -> Self {
Batcher {
running: Arc::new(AtomicBool::new(false)),
pending_batch: Arc::new(Mutex::new(Vec::new())),
pending_callbacks: Arc::new(Mutex::new(Vec::new())),
callbacks: Arc::new(Mutex::new(Vec::new())),
run,
}
}
pub fn append(&self, val: Vec<T>, cb: CbOption) -> () {
if self.running.load(Ordering::Relaxed) {
if self.pending_batch.lock().unwrap().len() == 0 {
*self.pending_callbacks.lock().unwrap() = Vec::new();
}
self.pending_batch.lock().unwrap().extend(val);
if let Some(cb) = cb {
self.callbacks.lock().unwrap().push(cb);
}
} else {
if let Some(cb) = cb {
*self.callbacks.lock().unwrap() = vec![cb];
}
self.running.store(true, Ordering::Relaxed);
(self.run)(val, self.clone());
}
}
pub fn done(self, err: Result<(), &'static str>) -> () {
for cb in self.callbacks.lock().unwrap().iter() {
cb(err)
}
self.running.store(false, Ordering::Relaxed);
let mut pending_callbacks = self.pending_callbacks.lock().unwrap();
let mut pending_batch = self.pending_batch.lock().unwrap();
*self.callbacks.lock().unwrap() = pending_callbacks.drain(..).collect();
let nextbatch: Vec<T> = pending_batch.drain(..).collect();
if nextbatch.is_empty() && self.callbacks.lock().unwrap().is_empty() {
return;
}
self.running.store(true, Ordering::Relaxed);
(self.run)(nextbatch, self.clone());
}
}