use crate::{logtrace, CmdSender, ReactRuntime, ReactorID};
use std::sync::{
atomic::{self, AtomicBool},
Arc, Mutex,
};
pub struct ThreadedReactorMgr<UserCommand: 'static> {
senders: Vec<IDAndSender<UserCommand>>,
threads: Vec<std::thread::JoinHandle<()>>,
stopcmd: Arc<AtomicBool>,
reactor_uid_map: Mutex<GlobalReactorUIDMap>, }
type ReactorName = String;
#[derive(Debug, Copy, Clone, PartialEq)]
pub struct ReactorUID {
pub runtimeid: usize,
pub reactorid: ReactorID,
}
struct IDAndSender<UserCommand>(usize, CmdSender<UserCommand>);
unsafe impl<UserCommand> Send for IDAndSender<UserCommand> {}
type GlobalReactorUIDMap = std::collections::BTreeMap<ReactorName, ReactorUID>;
impl<UserCommand: 'static> Drop for ThreadedReactorMgr<UserCommand> {
fn drop(&mut self) {
self.stop();
self.wait();
}
}
impl<UserCommand: 'static> ThreadedReactorMgr<UserCommand> {
pub fn new(nthreads: usize) -> Arc<Self> {
let mut me = Self {
senders: Vec::new(),
threads: Vec::new(),
stopcmd: Arc::new(AtomicBool::new(false)),
reactor_uid_map: Mutex::new(GlobalReactorUIDMap::new()),
};
let (tx, rx) = std::sync::mpsc::channel::<IDAndSender<UserCommand>>();
for threadid in 0..nthreads {
let (stopcmd, tx) = (Arc::clone(&me.stopcmd), tx.clone());
let thread = std::thread::Builder::new()
.name(format!("ThreadedReactors-{}", threadid))
.spawn(move || {
logtrace!("Entered ThreadedReactors-{}", threadid);
let mut runtime = ReactRuntime::<UserCommand>::new();
tx.send(IDAndSender(threadid, runtime.get_cmd_sender().clone()))
.expect("Failed to send in thread");
drop(tx);
logtrace!("Start polling events in ThreadedReactors-{}", threadid);
while !stopcmd.load(atomic::Ordering::Acquire) {
runtime.process_events();
std::thread::yield_now();
}
logtrace!("Exiting ThreadedReactors-{}", threadid);
})
.unwrap();
me.threads.push(thread);
}
logtrace!("Waiting for thread initializations");
let mut unsorted_senders = Vec::new();
for _ in 0..nthreads {
let sender = rx
.recv_timeout(std::time::Duration::from_millis(100))
.expect("failed to rect msg");
unsorted_senders.push(sender);
}
unsorted_senders.sort_by(|x, y| x.0.cmp(&y.0));
for (i, e) in unsorted_senders.into_iter().enumerate() {
debug_assert_eq!(i, e.0);
me.senders.push(e);
}
logtrace!("Recved all thread initializations");
Arc::new(me)
}
pub fn stop(&self) {
self.stopcmd.store(true, atomic::Ordering::Relaxed);
}
pub fn wait(&mut self) {
let threads = std::mem::take(&mut self.threads); for t in threads.into_iter() {
t.join().unwrap();
}
}
pub fn get_cmd_sender(&self, runtimeid: usize) -> Option<&CmdSender<UserCommand>> {
self.senders.get(runtimeid).map(|e| &e.1)
}
pub fn find_reactor_uid(&self, key: &str) -> Option<ReactorUID> {
let mapguard = self.reactor_uid_map.lock().unwrap();
mapguard.get(key).copied()
}
pub fn add_reactor_uid(&self, key: ReactorName, value: ReactorUID) -> Result<(), String> {
let mut mapguard = self.reactor_uid_map.lock().unwrap();
match mapguard.insert(key, value) {
Some(_) => Err("Duplicate ReactorName".to_owned()),
_ => Ok(()),
}
}
pub fn remove_reactor_name(&self, key: &str) -> Option<ReactorUID> {
let mut mapguard = self.reactor_uid_map.lock().unwrap();
mapguard.remove(key)
}
pub fn count_reactors(&self) -> usize {
let mapguard = self.reactor_uid_map.lock().unwrap();
mapguard.len()
}
}