use parking_lot::Mutex;
use async_channel as mpmc;
use tokio::{sync::broadcast, task::JoinSet};
use crate::{dispatcher, worker::Worker};
type Ref<T1> = std::sync::Arc<T1>;
pub struct Executor<S1, E1> {
state: broadcast::Receiver<S1>,
event: mpmc::Receiver<E1>,
inner: JoinSet<()>,
count: usize,
wal: Vec<dispatcher::Wal<E1>>,
}
impl<S1: Clone, E1: Clone> Executor<S1, E1>
where
S1: Send + Sync + 'static,
E1: Send + Sync + 'static,
{
pub fn new(state: broadcast::Receiver<S1>, event: mpmc::Receiver<E1>, count: usize) -> Self {
let (inner, mut wal) = (JoinSet::new(), Vec::new());
for _ in 0..count {
let value = Ref::new(Mutex::new(None));
wal.push(value);
}
Self {
state,
event,
inner,
count,
wal,
}
}
async fn execute<W1: 'static>(&mut self, wi: usize, worker: Ref<W1>, state: Ref<S1>)
where
W1: Worker<S1, E1>,
{
let (wal, receiver) = (self.wal[wi].clone(), self.event.clone());
let worker = move |event| {
let (worker, state) = (worker.clone(), state.clone());
async move { worker.execute(state, event).await }
};
self.inner.spawn(dispatcher::dispatch(wal, receiver, worker));
}
async fn respawn<W1: 'static>(&mut self, worker: Ref<W1>, state: Ref<S1>)
where
W1: Worker<S1, E1>,
{
self.inner.shutdown().await;
for wi in 0..self.count {
let (worker, state) = (worker.clone(), state.clone());
self.execute(wi, worker, state).await;
}
}
pub async fn receive<W1: 'static>(mut self, worker: W1)
where
W1: Worker<S1, E1>,
{
let worker = Ref::new(worker);
while let Ok(state) = self.state.recv().await {
let (worker, state) = (worker.clone(), Ref::new(state));
self.respawn(worker, state).await;
}
}
}