use std::{collections::VecDeque, marker::PhantomData, sync::mpsc::{Sender, Receiver, channel}, thread};
use std::fmt::Debug;
use hashbrown::HashMap;
#[derive(Debug, Clone)]
pub enum QID { INIT, STEP(usize), DONE }
pub struct QMsg<Q> { qid:QID, q: Q }
#[derive(Debug)]
pub struct RMsg<R> { wid: WID, qid:QID, r:Option<R> }
#[derive(Debug,PartialEq,Eq,Hash,Clone,Copy)]
pub struct WID { n:usize }
pub trait Worker<Q,R>:Send+Sync+Default where R:Debug {
fn new(_wid:WID)->Self { Self::default() }
fn work_loop(&mut self, wid:WID, rx:&Receiver<Option<QMsg<Q>>>, tx:&Sender<RMsg<R>>) {
macro_rules! work_phase {
[$qid:expr, $x:expr] => {
let (qid, r) = ($qid, $x);
if tx.send(RMsg{ wid, qid, r }).is_err() { self.on_work_send_err($qid) }}}
work_phase![QID::INIT, self.work_init(wid)];
let mut stream = rx.iter();
while let Some(Some(QMsg{qid, q})) = stream.next() {
if let QID::STEP(_) = qid { work_phase![qid.clone(), self.work_step(&qid, q)]; }
else { panic!("Worker {:?} got unexpected qid instead of STEP: {:?}", wid, qid)}}
work_phase![QID::DONE, self.work_done()]; }
fn on_work_send_err(&mut self, qid:QID) {
println!("failed to send response for qid:{:?}", qid); }
fn work_step(&mut self, _qid:&QID, _q:Q)->Option<R> { None }
fn work_init(&mut self, _wid:WID)->Option<R> { None }
fn work_done(&mut self)->Option<R> { None }}
#[derive(Debug)]
pub enum SwarmCmd<Q:Debug,V:Debug> {
Pass,
Halt,
Send(Q),
Batch(Vec<(WID, Q)>),
Panic(String),
Return(V),
Kill(WID)}
pub struct Swarm<Q,R,W> where W:Default+Worker<Q,R>, Q:Debug, R:Debug {
nq: usize,
me: Sender<RMsg<R>>,
rx: Receiver<RMsg<R>>,
whs: HashMap<WID, Sender<Option<QMsg<Q>>>>,
nw: usize,
_w: PhantomData<W>,
qq: VecDeque<(QID, Q)>,
threads: Vec<thread::JoinHandle<()>> }
impl<Q,R,W> Swarm<Q,R,W> where Q:'static+Send+Debug, R:'static+Send+Debug, W:Default+Worker<Q, R> {
pub fn new(num_workers:usize)->Self {
let (me, rx) = channel();
let n = if num_workers==0 { num_cpus::get() } else { num_workers };
let mut this = Self { nq: 0, me, rx, whs:HashMap::new(), nw:0, qq:VecDeque::new(), _w:PhantomData, threads:vec![]};
for _ in 0..n { this.spawn(); }
this }
fn spawn(&mut self)->WID {
let wid = WID{ n: self.nw }; self.nw+=1;
let me2 = self.me.clone();
let (wtx, wrx) = channel();
self.threads.push(thread::spawn(move || { W::new(wid).work_loop(wid, &wrx, &me2) }));
self.whs.insert(wid, wtx);
wid }
pub fn add(&mut self, q:Q)->&Self {
let qid:QID = QID::STEP(self.nq);
self.qq.push_back((qid, q));
self.nq+=1;
self}
pub fn get_worker(&mut self, wid:WID)->&Sender<Option<QMsg<Q>>> {
self.whs.get(&wid).expect(format!("requested non-exestant worker {:?}", wid).as_str()) }
pub fn kill(&mut self, w:WID) {
if let Some(h) = self.whs.remove(&w) {
if h.send(None).is_err() { panic!("couldn't kill worker") }}
else { panic!("worker was already gone") }}
pub fn send(&mut self, wid:WID, q:Q) {
let qid = QID::STEP(self.nq); self.nq+=1;
if self.get_worker(wid).send(Some(QMsg{ qid, q })).is_err() {
panic!("couldn't send message to worker {:?}", wid) }}
pub fn run<F,V>(&mut self, mut on_msg:F)->Option<V> where V:Debug, F:FnMut(WID, &QID, Option<R>)->SwarmCmd<Q,V> {
let mut res = None;
loop {
let RMsg { wid, qid, r } = self.rx.recv().expect("failed to read RMsg from queue!");
let cmd = on_msg(wid, &qid, r);
match cmd {
SwarmCmd::Pass => {},
SwarmCmd::Halt => break,
SwarmCmd::Kill(w) => { self.kill(w); if self.whs.is_empty() { break }},
SwarmCmd::Send(q) => self.send(wid, q),
SwarmCmd::Batch(wqs) => for (wid, q) in wqs { self.send(wid, q) },
SwarmCmd::Panic(msg) => panic!("{}", msg),
SwarmCmd::Return(v) => { res = Some(v); break } }}
while let Some(&w) = self.whs.keys().take(1).next() { self.kill(w); }
while !self.threads.is_empty() { self.threads.pop().unwrap().join().unwrap() }
res}}