use std::{pin::Pin, sync::Arc};
use crate::{
actor::{Actor, ActorAction},
address::Address,
cfg_runtime,
envelope::EnvelopeProxy,
};
use futures::{lock::Mutex, StreamExt};
#[derive(Debug)]
pub(crate) enum Signal<Msg> {
Message(Msg),
Stop,
}
pub const DEFAULT_CAPACITY: usize = 128;
pub(crate) type InputHandle<A> = Box<dyn EnvelopeProxy<A> + Send + 'static>;
pub struct Context<ACTOR> {
receiver: async_channel::Receiver<Signal<InputHandle<ACTOR>>>,
address: Address<ACTOR>,
stop_handle: Arc<Mutex<()>>,
}
impl<ACTOR> std::fmt::Debug for Context<ACTOR> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Context").finish()
}
}
impl<ACTOR> Default for Context<ACTOR>
where
ACTOR: 'static + Send + Actor + Unpin,
{
fn default() -> Self {
Self::new()
}
}
impl<ACTOR> Context<ACTOR>
where
ACTOR: 'static + Send + Actor + Unpin,
{
#[must_use]
pub fn new() -> Self {
Self::with_capacity(DEFAULT_CAPACITY)
}
#[must_use]
pub fn with_capacity(capacity: usize) -> Self {
let (sender, receiver) = async_channel::bounded(capacity);
let stop_handle = Arc::new(Mutex::new(()));
let address = Address::new(sender, stop_handle.clone());
Self {
receiver,
address,
stop_handle,
}
}
#[must_use]
pub fn address(&self) -> Address<ACTOR> {
self.address.clone()
}
pub async fn run(mut self, mut actor: ACTOR) {
#![allow(clippy::mut_mut)]
let stop_handle = self.stop_handle.clone();
let _mutex_handle = stop_handle.lock().await;
actor.started().await;
let mut running = true;
while running {
match self.receiver.next().await {
Some(Signal::Message(mut envelope)) => {
let actor_pin = Pin::new(&mut actor);
let self_pin = Pin::new(&self);
envelope.handle(actor_pin, self_pin).await;
}
Some(Signal::Stop) | None => {
if let ActorAction::Stop = actor.stopping().await {
running = false;
}
}
}
}
actor.stopped();
}
}
cfg_runtime! {
use futures::FutureExt;
impl<ACTOR> Context<ACTOR>
where
ACTOR: 'static + Send + Actor + Unpin,
{
pub fn spawn(self, actor: ACTOR) -> Address<ACTOR> {
let address = self.address();
let _handle = crate::runtime::spawn(self.run(actor)).boxed();
address
}
}
}