stage 0.2.0

An ergonomic, composable Actor Model, designed for painless concurrency
Documentation
use std::{
    borrow::Cow,
    collections::HashMap,
    sync::{Arc, Condvar, Mutex},
    time::{Duration, SystemTime},
};

use crossbeam_channel::{unbounded, Receiver, RecvTimeoutError};
use futures::channel::oneshot::Sender as SendOnce;
use tracing::info;

use crate::{
    actors::{
        stream::ActorStream, work_pool::ActorWorkPool, Actor, ActorCtx, ActorRef, BuildActorOp,
    },
    system::{handle::CoreError, SysHandle},
    traits::{Loop, StageReactor},
    utils::sys_msgs::ActorStart,
};

/// Driver for all internal operations. This is responsible for taking system requests from Actors
/// and executing them. Such requests include but are not limited to spawning Actors, shutting down
/// the system, and registering Monitors.
pub struct CoreReactor {
    actor_handles: Vec<Actor>,
    actor_incrementer: u16,
    actor_workpool: ActorWorkPool,
    inbound: Receiver<SysOp>,
    iter_timeout: Duration,
    name_lookup: HashMap<Cow<'static, str>, Actor>,
    shutdown_flag: Arc<(Mutex<bool>, Condvar)>,
}

/// Payload for requesting operations of the [`CoreReactor`].
pub enum SysOp {
    NewActor { data: BuildActorOp, mailbox: SendOnce<Result<Actor, CoreError>> },
    StopSystem,
    MonitorActor { actor: ActorRef, monitor: Actor },
    StopActor { actor: ActorRef },
}

impl StageReactor for CoreReactor {
    type Config = CoreReactorConfig;
    const THREAD_NAME: &'static str = "CoreReactor";

    fn new(config: Self::Config) -> Self {
        let Self::Config { inbound, iter_timeout, shutdown_flag } = config;

        Self {
            inbound,
            iter_timeout,
            shutdown_flag,
            actor_handles: vec![],
            actor_incrementer: 0,
            actor_workpool: ActorWorkPool,
            name_lookup: Default::default(),
        }
    }

    fn iter(&mut self) -> Loop {
        match self.inbound.recv_timeout(self.iter_timeout) {
            Ok(op) => self.process(op),
            Err(e) => match e {
                RecvTimeoutError::Timeout => Loop::Continue,
                _ => Loop::Break,
            },
        }
    }
}

impl CoreReactor {
    fn process(&mut self, op: SysOp) -> Loop {
        match op {
            SysOp::NewActor { data, mailbox } => self.new_actor(data, mailbox),
            SysOp::StopSystem => self.stop_system(),
            SysOp::MonitorActor { actor, monitor } => self.monitor_actor(actor, monitor),
            SysOp::StopActor { actor } => {
                self.actor_workpool.stop_actor(actor);
                Loop::Continue
            }
        }
    }

    fn new_actor(
        &mut self, data: BuildActorOp, mailbox: SendOnce<Result<Actor, CoreError>>,
    ) -> Loop {
        info!("Spawning new Actor");
        let _ = mailbox.send(self.inner_new_actor(data));

        Loop::Continue
    }

    fn gen_actor_id(&mut self) -> u64 {
        let now = SystemTime::now().duration_since(*crate::STAGE_EPOCH).unwrap().as_millis() as u64;
        let id = (now << 20) | self.actor_incrementer as u64;
        self.actor_incrementer = self.actor_incrementer.overflowing_add(1).0;
        id
    }

    fn monitor_actor(&mut self, actor: ActorRef, monitor: Actor) -> Loop {
        self.actor_workpool.monitor(monitor, actor);

        Loop::Continue
    }

    #[allow(clippy::mutex_atomic)]
    fn stop_system(&mut self) -> Loop {
        let mut g = self.shutdown_flag.0.lock().unwrap();
        *g = true;
        self.shutdown_flag.1.notify_all();

        Loop::Break
    }

    fn inner_new_actor(&mut self, data: BuildActorOp) -> Result<Actor, CoreError> {
        let BuildActorOp { name, wrapper } = data;

        let mut named = false;

        if let Some(name) = &name {
            named = true;
            if self.name_lookup.contains_key(name) {
                return Err(CoreError::ActorNameTaken);
            }
        }

        let actor_ref = ActorRef { id: self.gen_actor_id(), name: name.clone() };
        let (msg_tx, msg_rx) = unbounded();

        let handle = Actor { mailbox: msg_tx, reference: actor_ref, work_pool: ActorWorkPool };

        let ctx = ActorCtx { this: handle.clone(), sys: SysHandle };

        let stream = Box::pin(ActorStream::new(ctx, wrapper, msg_rx));
        self.actor_workpool.insert(stream);
        let _ = handle.send(ActorStart);

        if named {
            self.name_lookup.insert(name.unwrap(), handle.clone());
        }

        self.actor_handles.push(handle.clone());
        Ok(handle)
    }
}

pub struct CoreReactorConfig {
    pub(crate) inbound: Receiver<SysOp>,
    pub(crate) iter_timeout: Duration,
    pub(crate) shutdown_flag: Arc<(Mutex<bool>, Condvar)>,
}