use std::{
borrow::Borrow,
sync::{mpsc, Arc, RwLock},
};
use crate::{
dispatch::{
dispatcher::{ThreadLocal, ThreadPoolWrapper},
stage::Stage,
},
world::World,
};
use std::borrow::BorrowMut;
pub fn new_async<'a, R>(
world: R,
stages: Vec<Stage<'static>>,
thread_local: ThreadLocal<'a>,
thread_pool: Arc<RwLock<ThreadPoolWrapper>>,
) -> AsyncDispatcher<'a, R> {
AsyncDispatcher {
data: Data::Inner(Inner { world, stages }),
thread_local,
thread_pool,
}
}
pub struct AsyncDispatcher<'a, R> {
data: Data<R>,
thread_local: ThreadLocal<'a>,
thread_pool: Arc<RwLock<ThreadPoolWrapper>>,
}
impl<'a, R> AsyncDispatcher<'a, R>
where
R: Borrow<World> + Send + Sync + 'static,
{
pub fn setup(&mut self)
where
R: BorrowMut<World>,
{
let inner = self.data.inner();
let stages = &mut inner.stages;
let world = inner.world.borrow_mut();
for stage in stages {
stage.setup(world);
}
for sys in &mut self.thread_local {
sys.setup(world);
}
}
pub fn dispatch(&mut self) {
let (snd, mut inner) = self.data.sender();
self.thread_pool
.read()
.unwrap()
.as_ref()
.unwrap()
.spawn(move || {
let world: &World = inner.world.borrow();
for stage in &mut inner.stages {
stage.execute(world);
}
let _ = snd.send(inner);
});
}
pub fn wait(&mut self) {
let world = self.data.inner().world.borrow();
for sys in &mut self.thread_local {
sys.run_now(world);
}
}
pub fn wait_without_tl(&mut self) {
self.data.inner();
}
pub fn running(&mut self) -> bool {
self.data.inner_noblock().is_none()
}
#[deprecated(since = "0.8.0", note = "renamed to `world`")]
pub fn res(&mut self) -> &R {
self.world()
}
pub fn world(&mut self) -> &R {
&self.data.inner().world
}
#[deprecated(since = "0.8.0", note = "renamed to `world_mut`")]
pub fn mut_res(&mut self) -> &mut R {
&mut self.data.inner().world
}
pub fn world_mut(&mut self) -> &mut R {
&mut self.data.inner().world
}
}
enum Data<R> {
Inner(Inner<R>),
Rx(mpsc::Receiver<Inner<R>>),
}
impl<R> Data<R> {
fn inner(&mut self) -> &mut Inner<R> {
*self = match self {
Data::Inner(inner) => return inner,
Data::Rx(rx) => Data::Inner(rx.recv().expect("Sender dropped")),
};
self.inner()
}
fn inner_noblock(&mut self) -> Option<&mut Inner<R>> {
use std::sync::mpsc::TryRecvError;
let new_self;
match *self {
Data::Inner(ref mut inner) => return Some(inner),
Data::Rx(ref mut rx) => {
let inner = rx
.try_recv()
.map(Some)
.or_else(|e| match e {
TryRecvError::Empty => Ok(None),
TryRecvError::Disconnected => Err(e),
})
.expect("Sender dropped");
match inner {
Some(inner) => new_self = Data::Inner(inner),
None => return None,
}
}
}
*self = new_self;
self.inner_noblock()
}
fn sender(&mut self) -> (mpsc::Sender<Inner<R>>, Inner<R>) {
use std::mem::replace;
self.inner();
let (snd, rx) = mpsc::channel();
let inner = replace(&mut *self, Data::Rx(rx));
let inner = match inner {
Data::Inner(inner) => inner,
Data::Rx(_) => unreachable!(),
};
(snd, inner)
}
}
struct Inner<R> {
stages: Vec<Stage<'static>>,
world: R,
}