use std::collections::VecDeque;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::sync::Mutex;
use std::thread;
use std::time::Duration;
pub struct PushBus<T> {
iters: Arc<Mutex<Vec<PushBusIter<T>>>>,
name: String,
}
impl<T: Clone> Clone for PushBus<T> {
fn clone(&self) -> Self {
Self {
iters: self.iters.clone(),
name: self.name.clone(),
}
}
}
impl<T> PushBus<T> {
pub fn close(&mut self) {
self.iters
.lock()
.unwrap()
.iter_mut()
.for_each(|i| i.running.store(false, std::sync::atomic::Ordering::Relaxed));
}
}
#[derive(Clone)]
pub struct PushBusIter<T> {
data: Arc<Mutex<VecDeque<Option<T>>>>,
running: Arc<AtomicBool>,
sleep: bool,
}
impl<T> PushBus<T> {
pub fn new(name: &str) -> Self {
Self {
iters: Default::default(),
name: name.to_string(),
}
}
}
const SLEEP_DURATION: Duration = Duration::from_millis(1);
impl<T> Iterator for PushBusIter<T> {
type Item = Option<T>;
fn next(&mut self) -> Option<Self::Item> {
if !self.running.load(std::sync::atomic::Ordering::Relaxed) {
return None;
}
let v = self.data.lock().unwrap().pop_front();
if v.is_some() {
self.sleep = false;
return v;
}
if self.sleep {
thread::sleep(SLEEP_DURATION);
}
self.sleep = true;
Some(None)
}
}
impl<T: Send + Sync + 'static + Clone> PushBus<T> {
pub fn iter(&self) -> Box<dyn Iterator<Item = Option<T>> + Send + Sync> {
let x = PushBusIter {
data: Arc::new(Mutex::new(VecDeque::new())),
running: Arc::new(AtomicBool::new(true)),
sleep: false,
};
self.iters.lock().unwrap().push(x.clone());
Box::new(x)
}
pub fn push(&self, item: Option<T>) {
let mut iters = self.iters.lock().unwrap();
iters.retain(|i| i.running.load(std::sync::atomic::Ordering::Relaxed));
iters.iter_mut().for_each(|i| {
let mut items = i.data.lock().unwrap();
let name = self.name.as_str();
let len = items.len();
if len > 10_000 {
eprintln!("{name} pushbus too deep: {len}");
}
items.push_back(item.clone())
});
}
}
impl<T> Drop for PushBusIter<T> {
fn drop(&mut self) {
self.running
.store(false, std::sync::atomic::Ordering::Relaxed);
}
}
#[cfg(test)]
mod tests {
use crate::pushbus::PushBus;
#[test]
fn test_clone() {
let pb1 = PushBus::new("test");
let pb2 = pb1.clone();
let mut i1 = pb1.iter();
let mut i2 = pb2.iter();
pb1.push(Some(1));
pb1.push(Some(2));
assert_eq!(Some(1), i1.next().unwrap());
assert_eq!(Some(1), i2.next().unwrap());
assert_eq!(Some(2), i1.next().unwrap());
assert_eq!(Some(2), i2.next().unwrap());
assert_eq!(None, i1.next().unwrap());
assert_eq!(None, i2.next().unwrap());
let mut i1 = pb1.iter();
let mut i2 = pb2.iter();
assert_eq!(None, i1.next().unwrap());
assert_eq!(None, i2.next().unwrap());
}
}