use crate::*;
use futures::{
sink::SinkExt,
stream::{BoxStream, StreamExt},
};
use std::sync::Arc;
const MPLEX_CHUNK_SIZE: usize = 4096;
pub struct GhostActorChannelFactory<H: GhostControlHandler> {
inject: InjectLock<H>,
interupt_send: futures::channel::mpsc::Sender<()>,
control: Arc<crate::actor_builder::GhostActorControl>,
}
impl<H: GhostControlHandler> GhostActorChannelFactory<H> {
pub(crate) fn new(
control: Arc<crate::actor_builder::GhostActorControl>,
interupt_send: futures::channel::mpsc::Sender<()>,
) -> (Self, InjectLock<H>) {
let inject = InjectLock::new();
(
Self {
inject: inject.clone(),
interupt_send,
control,
},
inject,
)
}
pub fn attach_receiver<E, R>(&self, receiver: R) -> GhostFuture<()>
where
E: GhostEvent + GhostDispatch<H>,
H: GhostControlHandler + GhostHandler<E>,
R: GhostChannelReceiver<E>,
{
let stream: BoxStream<'static, GhostActorInject<H>> =
Box::pin(receiver.map(|event| {
let inject: GhostActorInject<H> = Box::new(move |handler| {
handler.ghost_actor_dispatch(event);
});
inject
}));
let push_fut = self.inject.push(stream);
let mut i_send = self.interupt_send.clone();
must_future::MustBoxFuture::new(async move {
push_fut.await?;
let _ = i_send.send(()).await;
Ok(())
})
}
pub fn create_channel<E>(&self) -> GhostFuture<GhostSender<E>>
where
E: GhostEvent + GhostDispatch<H>,
H: GhostControlHandler + GhostHandler<E>,
{
let (ghost_sender, receiver) =
<GhostSender<E>>::new(self.control.clone());
let attach_fut = self.attach_receiver(receiver);
must_future::MustBoxFuture::new(async move {
attach_fut.await?;
Ok(ghost_sender)
})
}
}
impl<H: GhostControlHandler> Clone for GhostActorChannelFactory<H> {
fn clone(&self) -> Self {
Self {
inject: self.inject.clone(),
interupt_send: self.interupt_send.clone(),
control: self.control.clone(),
}
}
}
pub struct GhostActorBuilder<H: GhostControlHandler> {
control: Arc<GhostActorControl>,
channel_factory: GhostActorChannelFactory<H>,
inject: InjectLock<H>,
interupt_recv: futures::channel::mpsc::Receiver<()>,
}
impl<H: GhostControlHandler> Default for GhostActorBuilder<H> {
fn default() -> Self {
Self::new()
}
}
impl<H: GhostControlHandler> GhostActorBuilder<H> {
pub fn new() -> Self {
let (interupt_send, interupt_recv) =
futures::channel::mpsc::channel::<()>(10);
let control = Arc::new(GhostActorControl::new(interupt_send.clone()));
let (channel_factory, inject) =
GhostActorChannelFactory::new(control.clone(), interupt_send);
Self {
control,
channel_factory,
inject,
interupt_recv,
}
}
pub fn channel_factory(&self) -> &GhostActorChannelFactory<H> {
&self.channel_factory
}
pub fn spawn(self, mut handler: H) -> GhostFuture<()> {
let GhostActorBuilder {
control,
inject,
interupt_recv,
..
} = self;
let mut stream_multiplexer = Some(<futures::stream::SelectAll<
BoxStream<'static, GhostActorInject<H>>,
>>::new());
let interupt_stream: BoxStream<'static, GhostActorInject<H>> =
Box::pin(interupt_recv.map(|_| {
let inject: GhostActorInject<H> = Box::new(|_| {});
inject
}));
stream_multiplexer.as_mut().unwrap().push(interupt_stream);
let mut stream_multiplexer_chunks =
Some(futures::stream::StreamExt::ready_chunks(
stream_multiplexer.take().unwrap(),
MPLEX_CHUNK_SIZE,
));
must_future::MustBoxFuture::new(async move {
loop {
if control.state.get() == GhostActorStateType::Shutdown {
break;
}
let to_inject = inject.drain().await?;
if !to_inject.is_empty() {
let mut stream_multiplexer = Some(
stream_multiplexer_chunks.take().unwrap().into_inner(),
);
for i in to_inject {
stream_multiplexer.as_mut().unwrap().push(i);
}
stream_multiplexer_chunks =
Some(futures::stream::StreamExt::ready_chunks(
stream_multiplexer.take().unwrap(),
MPLEX_CHUNK_SIZE,
));
}
if control.state.get() == GhostActorStateType::Shutdown {
break;
}
match stream_multiplexer_chunks.as_mut().unwrap().next().await {
Some(inject_list) => {
for i in inject_list {
i(&mut handler);
}
}
None => break,
}
if control.state.get() != GhostActorStateType::Active {
break;
}
}
control.state.set_shutdown().await;
handler.handle_ghost_actor_shutdown().await;
Ok(())
})
}
}
#[derive(Clone)]
pub(crate) struct GhostActorControl {
interupt_send: futures::channel::mpsc::Sender<()>,
state: Arc<GhostActorState>,
}
impl GhostActorControl {
pub(crate) fn new(
interupt_send: futures::channel::mpsc::Sender<()>,
) -> Self {
Self {
interupt_send,
state: Arc::new(GhostActorState::new()),
}
}
pub(crate) fn ghost_actor_shutdown(&self) -> GhostFuture<()> {
let shutdown_recv = self.state.push_shutdown_receiver();
self.state.set_pending_shutdown();
let mut i_send = self.interupt_send.clone();
must_future::MustBoxFuture::new(async move {
let _ = i_send.send(()).await;
let _ = shutdown_recv.await.await;
Ok(())
})
}
pub(crate) fn ghost_actor_shutdown_immediate(&self) -> GhostFuture<()> {
let mut i_send = self.interupt_send.clone();
let shutdown_fut = self.state.set_shutdown();
must_future::MustBoxFuture::new(async move {
let _ = i_send.send(()).await;
let _ = shutdown_fut.await;
Ok(())
})
}
pub(crate) fn ghost_actor_is_active(&self) -> bool {
self.state.get() == GhostActorStateType::Active
}
}
pub(crate) type GhostActorInject<H> = Box<dyn FnOnce(&mut H) + 'static + Send>;
pub(crate) struct InjectLock<H: GhostControlHandler>(
Arc<futures::lock::Mutex<Vec<BoxStream<'static, GhostActorInject<H>>>>>,
);
impl<H: GhostControlHandler> Clone for InjectLock<H> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
impl<H: GhostControlHandler> InjectLock<H> {
pub fn new() -> Self {
Self(Arc::new(futures::lock::Mutex::new(Vec::new())))
}
pub fn push(
&self,
i: BoxStream<'static, GhostActorInject<H>>,
) -> GhostFuture<()> {
let lock = self.0.clone();
must_future::MustBoxFuture::new(async move {
let mut g = lock.lock().await;
g.push(i);
Ok(())
})
}
pub fn drain(
&self,
) -> GhostFuture<Vec<BoxStream<'static, GhostActorInject<H>>>> {
let lock = self.0.clone();
must_future::MustBoxFuture::new(async move {
let mut g = lock.lock().await;
let out = g.drain(..).collect();
Ok(out)
})
}
}
#[repr(u8)]
#[derive(Debug, PartialEq)]
pub(crate) enum GhostActorStateType {
Active = 0x00,
PendingShutdown = 0xfe,
Shutdown = 0xff,
}
impl From<u8> for GhostActorStateType {
fn from(u: u8) -> Self {
match u {
0x00 => GhostActorStateType::Active,
0xfe => GhostActorStateType::PendingShutdown,
0xff => GhostActorStateType::Shutdown,
_ => panic!("corrupt GhostActorStateType"),
}
}
}
pub(crate) struct GhostActorState(
std::sync::atomic::AtomicU8,
Arc<futures::lock::Mutex<Vec<futures::channel::oneshot::Sender<()>>>>,
);
impl GhostActorState {
pub fn new() -> Self {
Self(
std::sync::atomic::AtomicU8::new(GhostActorStateType::Active as u8),
Arc::new(futures::lock::Mutex::new(Vec::new())),
)
}
pub fn push_shutdown_receiver(
&self,
) -> must_future::MustBoxFuture<
'static,
futures::channel::oneshot::Receiver<()>,
> {
let lock = self.1.clone();
must_future::MustBoxFuture::new(async move {
let mut g = lock.lock().await;
let (s, r) = futures::channel::oneshot::channel();
g.push(s);
r
})
}
pub fn set_pending_shutdown(&self) {
self.0.store(
GhostActorStateType::PendingShutdown as u8,
std::sync::atomic::Ordering::SeqCst,
);
}
pub fn set_shutdown(&self) -> must_future::MustBoxFuture<'static, ()> {
self.0.store(
GhostActorStateType::Shutdown as u8,
std::sync::atomic::Ordering::SeqCst,
);
let lock = self.1.clone();
must_future::MustBoxFuture::new(async move {
let mut g = lock.lock().await;
for i in g.drain(..) {
let _ = i.send(());
}
})
}
pub fn get(&self) -> GhostActorStateType {
self.0.load(std::sync::atomic::Ordering::SeqCst).into()
}
}