#![allow(dead_code)]
use parking_lot::Mutex;
use std::collections::HashMap;
use std::hash::Hash;
use std::sync::{Arc, Weak};
pub extern crate once_cell;
pub use mula_proc_macro::mula;
pub struct Mula<T, O, F>
where
T: 'static + Eq + Hash + Clone + Send,
O: 'static + Clone + Send,
F: 'static + Send + Sync + Fn(T) -> O
{
map: Mutex<HashMap<T, Bus<O>>>,
work: F,
}
impl<T, O, F> Mula<T, O, F>
where
T: 'static + Eq + Hash + Clone + Send,
O: 'static + Clone + Send,
F: 'static + Send + Sync + Fn(T) -> O
{
pub fn new(work: F) -> Arc<Self> {
Arc::new(Self {
map: Mutex::new(HashMap::<T, Bus<O>>::new()),
work
})
}
pub fn subscribe_to(mula: Arc<Self>, input: T) -> O {
let thread_mula = mula.clone();
let mut map = mula.map.lock();
let bus = map.entry(input.clone())
.or_insert_with(|| {
std::thread::spawn(move || {
let key = input.clone();
let result = (thread_mula.work)(input);
let mut map = thread_mula.map.lock();
let mut bus = map.remove(&key).unwrap();
bus.broadcast(result);
});
Bus::new()
});
let receiver = bus.add_receiver();
drop(map);
receiver.recv().unwrap()
}
}
type Receiver<O> = Arc<spmc::Receiver<O>>;
struct Bus<O: Clone + Send> {
sender: spmc::Sender<O>,
receiver: spmc::Receiver<O>,
weak_receivers: Vec<Weak<spmc::Receiver<O>>>,
}
impl<O: Clone + Send> Bus<O> {
fn new() -> Self {
let (sender, receiver) = spmc::channel();
Bus {
sender,
receiver,
weak_receivers: Vec::<Weak<spmc::Receiver<O>>>::new(),
}
}
fn add_receiver(&mut self) -> Receiver<O> {
let receiver = Arc::new(self.receiver.clone());
self.weak_receivers.push(Arc::downgrade(&receiver));
receiver
}
fn broadcast(&mut self, msg: O) {
let sender = &mut self.sender;
for receiver in &self.weak_receivers {
if receiver.strong_count() > 0 {
sender.send(msg.clone()).unwrap();
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use rand::Rng;
use std::sync::atomic::{AtomicUsize, Ordering};
#[test]
fn it_works() {
let work_counter = Arc::new(AtomicUsize::new(0));
let subscriber_counter = Arc::new(AtomicUsize::new(0));
let counter = work_counter.clone();
let mula = Mula::new(move |input: &str| {
match input {
"kov" => std::thread::sleep(std::time::Duration::from_secs(2)),
"kovera" => {
let mut rng = rand::thread_rng();
std::thread::sleep(std::time::Duration::from_secs(rng.gen_range(0, 10)));
},
_ => (),
}
counter.fetch_add(1, Ordering::SeqCst);
input.to_uppercase()
});
let m = mula.clone();
let counter = subscriber_counter.clone();
let thread1 = std::thread::spawn(move || {
let upper = Mula::subscribe_to(m, "kov");
assert_eq!(upper, "KOV".to_string());
counter.fetch_add(1, Ordering::SeqCst);
});
let m = mula.clone();
let counter = subscriber_counter.clone();
let thread2 = std::thread::spawn(move || {
let upper = Mula::subscribe_to(m, "kov");
assert_eq!(upper, "KOV".to_string());
counter.fetch_add(1, Ordering::SeqCst);
});
let m = mula.clone();
let counter = subscriber_counter.clone();
let thread3 = std::thread::spawn(move || {
let upper = Mula::subscribe_to(m, "kovera");
assert_eq!(upper, "KOVERA".to_string());
counter.fetch_add(1, Ordering::SeqCst);
});
let m = mula.clone();
let counter = subscriber_counter.clone();
let thread4 = std::thread::spawn(move || {
let upper = Mula::subscribe_to(m, "kovera");
assert_eq!(upper, "KOVERA".to_string());
counter.fetch_add(1, Ordering::SeqCst);
});
let m = mula.clone();
let counter = subscriber_counter.clone();
let thread5 = std::thread::spawn(move || {
let upper = Mula::subscribe_to(m, "kovid");
assert_eq!(upper, "KOVID".to_string());
counter.fetch_add(1, Ordering::SeqCst);
});
let result = Mula::subscribe_to(mula.clone(), "kov");
assert_eq!(result, "KOV");
subscriber_counter.fetch_add(1, Ordering::SeqCst);
thread1.join().unwrap();
thread2.join().unwrap();
thread3.join().unwrap();
thread4.join().unwrap();
thread5.join().unwrap();
assert_eq!(work_counter.load(Ordering::SeqCst), 3);
let result = Mula::subscribe_to(mula, "kov");
assert_eq!(result, "KOV");
subscriber_counter.fetch_add(1, Ordering::SeqCst);
assert_eq!(work_counter.load(Ordering::SeqCst), 4);
assert_eq!(subscriber_counter.load(Ordering::SeqCst), 7);
}
}