stage 0.2.0

An ergonomic, composable Actor Model, designed for painless concurrency
Documentation
use std::{
    sync::{Arc, Condvar, Mutex},
    time::Duration,
};

use tracing::{debug, error, info};

use crate::io::reactor::{IoReactor, IoReactorConfig};
pub use crate::system::{config::ActorSystemConfig, handle::SysHandle};
use crate::{
    actors::{
        reactor::{ActorReactor, ActorReactorConfig},
        work_pool::ActorWorkPool,
    },
    system::{
        reactor::{CoreReactor, CoreReactorConfig},
        thread_pool::{ShutdownResult, ThreadDeed, ThreadPool},
    },
    traits::{Loop, StageExtension, StageReactor},
};

mod config;
mod handle;
mod reactor;
mod thread_pool;

/// The central system that defines a node of Actors.
pub struct ActorSystem {
    thread_pool: ThreadPool,
}

pub struct ActorSystemBuilder {
    actor_config: ActorReactorConfig,
    actor_workpool: ActorWorkPool,
    config: ActorSystemConfig,
    shutdown_flag: Arc<(Mutex<bool>, Condvar)>,
    thread_pool: ThreadPool,
}

impl ActorSystem {
    /// Gets the [`SysHandle`] used to send commands to the internal CoreReactor. Primary means of
    /// controlling the ActorSystem.
    pub fn get_handle(&self) -> SysHandle {
        SysHandle
    }

    /// Blocks the current thread until the ActorSystem has shutdown.
    pub fn await_shutdown(&self) -> ShutdownResult {
        info!("Blocking until system shutdown");
        self.thread_pool.await_shutdown(None)
    }
}

impl ActorSystemBuilder {
    /// Starts the process of building the ActorSystem.
    pub fn new(mut config: ActorSystemConfig) -> Self {
        let thread_prefix = match &config.name {
            Some(name) => name.clone(),
            None => String::from("Stage"),
        };
        let thread_pool = ThreadPool::new(thread_prefix);

        let (core_tx, core_rx) = crossbeam_channel::bounded(512);

        SysHandle::init(core_tx);

        let shutdown_flag: Arc<(Mutex<bool>, Condvar)> = Default::default();

        let core_config = CoreReactorConfig {
            inbound: core_rx,
            iter_timeout: Duration::from_millis(50), // TODO: Expose in config
            shutdown_flag: shutdown_flag.clone(),
        };

        let actor_workpool = ActorWorkPool;
        let initial_worker = actor_workpool.spawn_worker();

        // TODO: Expose all in config
        let actor_config = ActorReactorConfig {
            thread_wait_time: Duration::from_millis(50),
            time_slice: Duration::from_millis(1),
            warn_threshold: Duration::from_millis(20),
        };
        let actor_reactor_cfg = (0, actor_config.clone(), initial_worker);
        // TODO: Expose all in config
        let io_reactor_cfg =
            IoReactorConfig { event_buffer_capacity: 2048, poll_timeout: Duration::from_millis(1) };

        info!("Spawning initially required Reactors (CoreReactor and first ActorReactor)");
        spawn_reactor::<CoreReactor>(&thread_pool, core_config, shutdown_flag.clone());
        spawn_reactor::<ActorReactor>(&thread_pool, actor_reactor_cfg, shutdown_flag.clone());
        spawn_reactor::<IoReactor>(&thread_pool, io_reactor_cfg, shutdown_flag.clone());
        config.threads -= 3;

        Self { actor_config, actor_workpool, config, shutdown_flag, thread_pool }
    }

    /// See [`ActorSystem::get_handle`].
    pub fn get_handle(&self) -> SysHandle {
        SysHandle
    }

    /// Add a [`StageReactor`] to the System. See trait for more details.
    pub fn add_reactor<T: StageReactor>(&mut self, config: T::Config) -> Result<(), OutOfThreads> {
        if self.config.threads == 0 {
            error!("Too many Reactors! Review the Reactors/Extensions you're adding, or increase the thread limit");
            return Err(OutOfThreads);
        }

        debug!("Spawning reactor of name \"{}\"", T::THREAD_NAME);
        spawn_reactor::<T>(&self.thread_pool, config, self.shutdown_flag.clone());
        self.config.threads -= 1;

        Ok(())
    }

    /// Add a [`StageExtension`] to the System. See trait for more details.
    pub fn add_extension<T: StageExtension>(&mut self) -> Result<(), T::Error> {
        T::extend(self)
    }

    /// Builds the ActorSystem and uses the rest of the threads available as ActorReactors
    pub fn start(mut self) -> ActorSystem {
        debug!("Building the ActorSystem. Using the remaining thread count for ActorReactors");
        for i in 0..self.config.threads {
            let config = (i + 1, self.actor_config.clone(), self.actor_workpool.spawn_worker());
            self.add_reactor::<ActorReactor>(config).unwrap();
        }

        ActorSystem { thread_pool: self.thread_pool }
    }
}

#[derive(Clone, Debug)]
pub struct OutOfThreads;

#[allow(clippy::mutex_atomic)]
fn spawn_reactor<T: StageReactor>(
    pool: &ThreadPool, config: T::Config, shutdown_flag: Arc<(Mutex<bool>, Condvar)>,
) -> Arc<ThreadDeed> {
    pool.spawn(T::THREAD_NAME.into(), move || {
        let mut reactor = T::new(config);
        while !*shutdown_flag.0.lock().unwrap() {
            if let Loop::Break = reactor.iter() {
                debug!("Acknowledging {}'s shutdown", T::THREAD_NAME);
                break;
            }
        }
    })
}