apecs 0.6.1

An asyncronous and parallel entity-component system
Documentation
use std::{collections::VecDeque, future::Future, pin::Pin, sync::atomic::AtomicU64};

use anyhow::Context;
use rayon::prelude::*;

use super::{
    resource_manager::{LoanManager, ResourceManager},
    schedule::{Borrow, Dependency, IsBatch, IsSchedule, IsSystem},
    CanFetch, Request, Resource,
};

static SYSTEM_ITERATION: AtomicU64 = AtomicU64::new(0);

#[inline]
/// Get the current system iteration timestamp.
///
/// This can be used to track changes in components over time with
/// [`Entry::has_changed_since`](crate::Entry::has_changed_since) and similar
/// functions.
pub fn current_iteration() -> u64 {
    SYSTEM_ITERATION.load(std::sync::atomic::Ordering::Relaxed)
}

/// Increment the system iteration counter, returning the previous value.
#[inline]
pub(crate) fn increment_current_iteration() -> u64 {
    SYSTEM_ITERATION.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
}

/// A future representing an async system.
pub type AsyncSystemFuture =
    Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + Sync + 'static>>;

/// Whether or not a system should continue execution.
pub enum ShouldContinue {
    Yes,
    No,
}

/// Returns a syncronous system result meaning everything is ok and the system
/// should run again next frame.
pub fn ok() -> anyhow::Result<ShouldContinue> {
    Ok(ShouldContinue::Yes)
}

/// Returns a syncronous system result meaning everything is ok, but the system
/// should not be run again.
pub fn end() -> anyhow::Result<ShouldContinue> {
    Ok(ShouldContinue::No)
}

/// Returns a syncronous system result meaning an error occured and the system
/// should not run again.
pub fn err(err: anyhow::Error) -> anyhow::Result<ShouldContinue> {
    Err(err)
}

pub type SystemFunction =
    Box<dyn FnMut(Resource) -> anyhow::Result<ShouldContinue> + Send + Sync + 'static>;

pub struct SyncSystem {
    pub name: String,
    pub borrows: Vec<Borrow>,
    pub dependencies: Vec<Dependency>,
    pub barrier: usize,
    pub prepare: fn(&mut LoanManager<'_>) -> anyhow::Result<Resource>,
    pub function: SystemFunction,
}

impl std::fmt::Debug for SyncSystem {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("SyncSystem")
            .field("name", &self.name)
            .field("borrows", &self.borrows)
            .field("function", &"FnMut(_)")
            .finish()
    }
}

impl IsSystem for SyncSystem {
    fn name(&self) -> &str {
        self.name.as_str()
    }

    fn borrows(&self) -> &[Borrow] {
        &self.borrows
    }

    fn dependencies(&self) -> &[Dependency] {
        &self.dependencies
    }

    fn barrier(&self) -> usize {
        self.barrier
    }

    fn set_barrier(&mut self, barrier: usize) {
        self.barrier = barrier;
    }

    fn prep(&self, loan_mngr: &mut LoanManager<'_>) -> anyhow::Result<Resource> {
        (self.prepare)(loan_mngr)
    }

    fn run(&mut self, data: Resource) -> anyhow::Result<ShouldContinue> {
        (self.function)(data)
    }
}

impl SyncSystem {
    pub fn new<T, F>(name: impl AsRef<str>, mut sys_fn: F, dependencies: Vec<Dependency>) -> Self
    where
        F: FnMut(T) -> anyhow::Result<ShouldContinue> + Send + Sync + 'static,
        T: CanFetch + Send + Sync + 'static,
    {
        SyncSystem {
            name: name.as_ref().to_string(),
            borrows: T::borrows(),
            dependencies,
            barrier: 0,
            prepare: |loan_mngr: &mut LoanManager| {
                let rez: Resource = Resource::from(Box::new(T::construct(loan_mngr)?));
                Ok(rez)
            },
            function: Box::new(move |b: Resource| {
                let box_t: Box<T> = b.downcast().ok().context("cannot downcast")?;
                let t: T = *box_t;
                sys_fn(t)
            }),
        }
    }
}

#[derive(Debug, Default)]
pub struct SyncBatch(Vec<SyncSystem>, usize);

impl IsBatch for SyncBatch {
    type System = SyncSystem;
    type ExtraRunData<'a> = ();

    fn systems(&self) -> &[Self::System] {
        &self.0
    }

    fn systems_mut(&mut self) -> &mut [Self::System] {
        &mut self.0
    }

    fn trim_systems(&mut self, should_remove: rustc_hash::FxHashSet<&str>) {
        self.0.retain(|sys| !should_remove.contains(sys.name()))
    }

    fn add_system(&mut self, system: Self::System) {
        self.0.push(system);
    }

    fn get_barrier(&self) -> usize {
        self.1
    }

    fn set_barrier(&mut self, barrier: usize) {
        self.1 = barrier;
    }

    fn take_systems(&mut self) -> Vec<Self::System> {
        std::mem::replace(&mut self.0, vec![])
    }

    fn set_systems(&mut self, systems: Vec<Self::System>) {
        self.0 = systems;
    }

    fn run(
        &mut self,
        parallelism: u32,
        _: Self::ExtraRunData<'_>,
        resource_manager: &mut ResourceManager,
    ) -> anyhow::Result<()> {
        let mut loan_mngr = LoanManager(resource_manager);
        let systems = self.take_systems();
        let mut data = vec![];
        for sys in systems.iter() {
            data.push(sys.prep(&mut loan_mngr)?);
        }
        let (remaining_systems, errs): (Vec<_>, Vec<_>) = if parallelism > 1 {
            let available_threads = rayon::current_num_threads();
            if parallelism > available_threads as u32 {
                log::warn!(
                    "the rayon threadpool does not contain enough threads! requested {}, have {}",
                    parallelism,
                    available_threads
                );
            }
            (systems, data)
                .into_par_iter()
                .filter_map(|(mut system, data)| {
                    log::trace!("running par system '{}'", system.name());
                    let _ = increment_current_iteration();
                    match system.run(data) {
                        Ok(ShouldContinue::Yes) => Some(rayon::iter::Either::Left(system)),
                        Ok(ShouldContinue::No) => None,
                        Err(err) => Some(rayon::iter::Either::Right(err)),
                    }
                })
                .partition_map(|e| e)
        } else {
            let mut remaining_systems = vec![];
            let mut errs = vec![];
            systems
                .into_iter()
                .zip(data.into_iter())
                .for_each(|(mut system, data)| {
                    log::trace!("running system '{}'", system.name());
                    let _ = increment_current_iteration();
                    match system.run(data) {
                        Ok(ShouldContinue::Yes) => {
                            remaining_systems.push(system);
                        }
                        Ok(ShouldContinue::No) => {}
                        Err(err) => {
                            errs.push(err);
                        }
                    }
                });
            (remaining_systems, errs)
        };

        self.set_systems(remaining_systems);

        errs.into_iter()
            .fold(Ok(()), |may_err, err| match may_err {
                Ok(()) => Err(err),
                Err(prev) => Err(prev.context(format!("and {}", err))),
            })?;

        Ok(())
    }
}

#[derive(Debug, Default)]
pub struct SyncSchedule {
    batches: Vec<SyncBatch>,
    num_threads: u32,
    current_barrier: usize,
}

impl IsSchedule for SyncSchedule {
    type System = SyncSystem;
    type Batch = SyncBatch;

    fn batches_mut(&mut self) -> &mut Vec<Self::Batch> {
        &mut self.batches
    }

    fn batches(&self) -> &[Self::Batch] {
        &self.batches
    }

    fn add_batch(&mut self, batch: Self::Batch) {
        self.batches.push(batch);
    }

    fn set_parallelism(&mut self, threads: u32) {
        self.num_threads = threads;
    }

    fn get_parallelism(&self) -> u32 {
        self.num_threads
    }

    fn current_barrier(&self) -> usize {
        self.current_barrier
    }

    fn add_barrier(&mut self) {
        self.current_barrier += 1;
    }
}

/// In terms of system resource scheduling a request is a system.
impl IsSystem for Request {
    fn name(&self) -> &str {
        "async"
    }

    fn borrows(&self) -> &[Borrow] {
        &self.borrows
    }

    fn dependencies(&self) -> &[Dependency] {
        &[]
    }

    fn barrier(&self) -> usize {
        0
    }

    fn set_barrier(&mut self, _: usize) {}

    fn prep(&self, loan_mngr: &mut LoanManager<'_>) -> anyhow::Result<Resource> {
        (self.construct)(loan_mngr)
    }

    fn run(&mut self, data: Resource) -> anyhow::Result<ShouldContinue> {
        // it's perfectly normal for an async system to drop the resource
        // receiver, eg cases where an async system had two asyncs being `race`d
        // and one of them won (and the one that lost had just called facade.visit and
        // was awaiting resources)
        let _ = self.deploy_tx.send(data);
        ok()
    }
}

#[derive(Debug, Default)]
pub struct AsyncBatch(Vec<Request>);

impl IsBatch for AsyncBatch {
    type System = Request;
    type ExtraRunData<'a> = &'a async_executor::Executor<'static>;

    fn systems(&self) -> &[Self::System] {
        self.0.as_slice()
    }

    fn systems_mut(&mut self) -> &mut [Self::System] {
        self.0.as_mut_slice()
    }

    fn trim_systems(&mut self, should_remove: rustc_hash::FxHashSet<&str>) {
        self.0.retain(|s| !should_remove.contains(s.name()));
    }

    fn add_system(&mut self, system: Self::System) {
        self.0.push(system);
    }

    fn get_barrier(&self) -> usize {
        0
    }

    fn set_barrier(&mut self, _: usize) {}

    fn take_systems(&mut self) -> Vec<Self::System> {
        std::mem::replace(&mut self.0, vec![])
    }

    fn set_systems(&mut self, systems: Vec<Self::System>) {
        self.0 = systems;
    }

    fn run(
        &mut self,
        parallelism: u32,
        extra: &async_executor::Executor<'static>,
        resource_manager: &mut ResourceManager,
    ) -> anyhow::Result<()> {
        let mut loan_mngr = LoanManager(resource_manager);
        let systems = self.take_systems();
        let mut data = VecDeque::new();
        for sys in systems.iter() {
            data.push_back(sys.prep(&mut loan_mngr)?);
        }
        drop(loan_mngr);

        for system in systems.into_iter() {
            let data: Resource = data.pop_front().unwrap();
            // send the resources off, if need be
            if !system.deploy_tx.is_closed() {
                log::trace!(
                    "sending resource '{}' to async '{}'",
                    data.type_name().unwrap_or("unknown"),
                    system.name()
                );
                // UNWRAP: safe because we checked above that the channel is still open
                system.deploy_tx.try_send(data).unwrap();
            } else {
                log::trace!(
                    "cancelling send of resource '{}' to async '{}'",
                    data.type_name().unwrap_or("unknown"),
                    system.name()
                );
            }
        }

        fn tick(executor: &async_executor::Executor<'static>) {
            while executor.try_tick() {
                let _ = increment_current_iteration();
            }
        }
        // tick the executor
        if parallelism > 1 {
            (0..parallelism as u32)
                .into_par_iter()
                .for_each(|_| tick(extra));
        } else {
            tick(extra);
        }

        let resources_still_loaned = resource_manager.try_unify_resources("async batch")?;
        if resources_still_loaned {
            panic!(
                "an async system is holding onto resources over an await point! systems:{:#?}",
                self.systems()
                    .iter()
                    .map(|sys| sys.name())
                    .collect::<Vec<_>>()
            );
        }

        Ok(())
    }
}

#[derive(Debug, Default)]
pub struct AsyncSchedule {
    batches: Vec<AsyncBatch>,
    num_threads: u32,
}

impl IsSchedule for AsyncSchedule {
    type System = Request;
    type Batch = AsyncBatch;

    fn batches_mut(&mut self) -> &mut Vec<Self::Batch> {
        &mut self.batches
    }

    fn batches(&self) -> &[Self::Batch] {
        self.batches.as_slice()
    }

    fn add_batch(&mut self, batch: Self::Batch) {
        self.batches.push(batch);
    }

    fn set_parallelism(&mut self, threads: u32) {
        self.num_threads = threads;
    }

    fn get_parallelism(&self) -> u32 {
        self.num_threads
    }

    fn current_barrier(&self) -> usize {
        0
    }

    fn add_barrier(&mut self) {}
}