use std::{collections::VecDeque, future::Future, pin::Pin, sync::atomic::AtomicU64};
use anyhow::Context;
use rayon::prelude::*;
use super::{
resource_manager::{LoanManager, ResourceManager},
schedule::{Borrow, Dependency, IsBatch, IsSchedule, IsSystem},
CanFetch, Request, Resource,
};
static SYSTEM_ITERATION: AtomicU64 = AtomicU64::new(0);
#[inline]
pub fn current_iteration() -> u64 {
SYSTEM_ITERATION.load(std::sync::atomic::Ordering::Relaxed)
}
#[inline]
pub(crate) fn increment_current_iteration() -> u64 {
SYSTEM_ITERATION.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
}
pub type AsyncSystemFuture =
Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + Sync + 'static>>;
pub enum ShouldContinue {
Yes,
No,
}
pub fn ok() -> anyhow::Result<ShouldContinue> {
Ok(ShouldContinue::Yes)
}
pub fn end() -> anyhow::Result<ShouldContinue> {
Ok(ShouldContinue::No)
}
pub fn err(err: anyhow::Error) -> anyhow::Result<ShouldContinue> {
Err(err)
}
pub type SystemFunction =
Box<dyn FnMut(Resource) -> anyhow::Result<ShouldContinue> + Send + Sync + 'static>;
pub struct SyncSystem {
pub name: String,
pub borrows: Vec<Borrow>,
pub dependencies: Vec<Dependency>,
pub barrier: usize,
pub prepare: fn(&mut LoanManager<'_>) -> anyhow::Result<Resource>,
pub function: SystemFunction,
}
impl std::fmt::Debug for SyncSystem {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SyncSystem")
.field("name", &self.name)
.field("borrows", &self.borrows)
.field("function", &"FnMut(_)")
.finish()
}
}
impl IsSystem for SyncSystem {
fn name(&self) -> &str {
self.name.as_str()
}
fn borrows(&self) -> &[Borrow] {
&self.borrows
}
fn dependencies(&self) -> &[Dependency] {
&self.dependencies
}
fn barrier(&self) -> usize {
self.barrier
}
fn set_barrier(&mut self, barrier: usize) {
self.barrier = barrier;
}
fn prep(&self, loan_mngr: &mut LoanManager<'_>) -> anyhow::Result<Resource> {
(self.prepare)(loan_mngr)
}
fn run(&mut self, data: Resource) -> anyhow::Result<ShouldContinue> {
(self.function)(data)
}
}
impl SyncSystem {
pub fn new<T, F>(name: impl AsRef<str>, mut sys_fn: F, dependencies: Vec<Dependency>) -> Self
where
F: FnMut(T) -> anyhow::Result<ShouldContinue> + Send + Sync + 'static,
T: CanFetch + Send + Sync + 'static,
{
SyncSystem {
name: name.as_ref().to_string(),
borrows: T::borrows(),
dependencies,
barrier: 0,
prepare: |loan_mngr: &mut LoanManager| {
let rez: Resource = Resource::from(Box::new(T::construct(loan_mngr)?));
Ok(rez)
},
function: Box::new(move |b: Resource| {
let box_t: Box<T> = b.downcast().ok().context("cannot downcast")?;
let t: T = *box_t;
sys_fn(t)
}),
}
}
}
#[derive(Debug, Default)]
pub struct SyncBatch(Vec<SyncSystem>, usize);
impl IsBatch for SyncBatch {
type System = SyncSystem;
type ExtraRunData<'a> = ();
fn systems(&self) -> &[Self::System] {
&self.0
}
fn systems_mut(&mut self) -> &mut [Self::System] {
&mut self.0
}
fn trim_systems(&mut self, should_remove: rustc_hash::FxHashSet<&str>) {
self.0.retain(|sys| !should_remove.contains(sys.name()))
}
fn add_system(&mut self, system: Self::System) {
self.0.push(system);
}
fn get_barrier(&self) -> usize {
self.1
}
fn set_barrier(&mut self, barrier: usize) {
self.1 = barrier;
}
fn take_systems(&mut self) -> Vec<Self::System> {
std::mem::replace(&mut self.0, vec![])
}
fn set_systems(&mut self, systems: Vec<Self::System>) {
self.0 = systems;
}
fn run(
&mut self,
parallelism: u32,
_: Self::ExtraRunData<'_>,
resource_manager: &mut ResourceManager,
) -> anyhow::Result<()> {
let mut loan_mngr = LoanManager(resource_manager);
let systems = self.take_systems();
let mut data = vec![];
for sys in systems.iter() {
data.push(sys.prep(&mut loan_mngr)?);
}
let (remaining_systems, errs): (Vec<_>, Vec<_>) = if parallelism > 1 {
let available_threads = rayon::current_num_threads();
if parallelism > available_threads as u32 {
log::warn!(
"the rayon threadpool does not contain enough threads! requested {}, have {}",
parallelism,
available_threads
);
}
(systems, data)
.into_par_iter()
.filter_map(|(mut system, data)| {
log::trace!("running par system '{}'", system.name());
let _ = increment_current_iteration();
match system.run(data) {
Ok(ShouldContinue::Yes) => Some(rayon::iter::Either::Left(system)),
Ok(ShouldContinue::No) => None,
Err(err) => Some(rayon::iter::Either::Right(err)),
}
})
.partition_map(|e| e)
} else {
let mut remaining_systems = vec![];
let mut errs = vec![];
systems
.into_iter()
.zip(data.into_iter())
.for_each(|(mut system, data)| {
log::trace!("running system '{}'", system.name());
let _ = increment_current_iteration();
match system.run(data) {
Ok(ShouldContinue::Yes) => {
remaining_systems.push(system);
}
Ok(ShouldContinue::No) => {}
Err(err) => {
errs.push(err);
}
}
});
(remaining_systems, errs)
};
self.set_systems(remaining_systems);
errs.into_iter()
.fold(Ok(()), |may_err, err| match may_err {
Ok(()) => Err(err),
Err(prev) => Err(prev.context(format!("and {}", err))),
})?;
Ok(())
}
}
#[derive(Debug, Default)]
pub struct SyncSchedule {
batches: Vec<SyncBatch>,
num_threads: u32,
current_barrier: usize,
}
impl IsSchedule for SyncSchedule {
type System = SyncSystem;
type Batch = SyncBatch;
fn batches_mut(&mut self) -> &mut Vec<Self::Batch> {
&mut self.batches
}
fn batches(&self) -> &[Self::Batch] {
&self.batches
}
fn add_batch(&mut self, batch: Self::Batch) {
self.batches.push(batch);
}
fn set_parallelism(&mut self, threads: u32) {
self.num_threads = threads;
}
fn get_parallelism(&self) -> u32 {
self.num_threads
}
fn current_barrier(&self) -> usize {
self.current_barrier
}
fn add_barrier(&mut self) {
self.current_barrier += 1;
}
}
impl IsSystem for Request {
fn name(&self) -> &str {
"async"
}
fn borrows(&self) -> &[Borrow] {
&self.borrows
}
fn dependencies(&self) -> &[Dependency] {
&[]
}
fn barrier(&self) -> usize {
0
}
fn set_barrier(&mut self, _: usize) {}
fn prep(&self, loan_mngr: &mut LoanManager<'_>) -> anyhow::Result<Resource> {
(self.construct)(loan_mngr)
}
fn run(&mut self, data: Resource) -> anyhow::Result<ShouldContinue> {
let _ = self.deploy_tx.send(data);
ok()
}
}
#[derive(Debug, Default)]
pub struct AsyncBatch(Vec<Request>);
impl IsBatch for AsyncBatch {
type System = Request;
type ExtraRunData<'a> = &'a async_executor::Executor<'static>;
fn systems(&self) -> &[Self::System] {
self.0.as_slice()
}
fn systems_mut(&mut self) -> &mut [Self::System] {
self.0.as_mut_slice()
}
fn trim_systems(&mut self, should_remove: rustc_hash::FxHashSet<&str>) {
self.0.retain(|s| !should_remove.contains(s.name()));
}
fn add_system(&mut self, system: Self::System) {
self.0.push(system);
}
fn get_barrier(&self) -> usize {
0
}
fn set_barrier(&mut self, _: usize) {}
fn take_systems(&mut self) -> Vec<Self::System> {
std::mem::replace(&mut self.0, vec![])
}
fn set_systems(&mut self, systems: Vec<Self::System>) {
self.0 = systems;
}
fn run(
&mut self,
parallelism: u32,
extra: &async_executor::Executor<'static>,
resource_manager: &mut ResourceManager,
) -> anyhow::Result<()> {
let mut loan_mngr = LoanManager(resource_manager);
let systems = self.take_systems();
let mut data = VecDeque::new();
for sys in systems.iter() {
data.push_back(sys.prep(&mut loan_mngr)?);
}
drop(loan_mngr);
for system in systems.into_iter() {
let data: Resource = data.pop_front().unwrap();
if !system.deploy_tx.is_closed() {
log::trace!(
"sending resource '{}' to async '{}'",
data.type_name().unwrap_or("unknown"),
system.name()
);
system.deploy_tx.try_send(data).unwrap();
} else {
log::trace!(
"cancelling send of resource '{}' to async '{}'",
data.type_name().unwrap_or("unknown"),
system.name()
);
}
}
fn tick(executor: &async_executor::Executor<'static>) {
while executor.try_tick() {
let _ = increment_current_iteration();
}
}
if parallelism > 1 {
(0..parallelism as u32)
.into_par_iter()
.for_each(|_| tick(extra));
} else {
tick(extra);
}
let resources_still_loaned = resource_manager.try_unify_resources("async batch")?;
if resources_still_loaned {
panic!(
"an async system is holding onto resources over an await point! systems:{:#?}",
self.systems()
.iter()
.map(|sys| sys.name())
.collect::<Vec<_>>()
);
}
Ok(())
}
}
#[derive(Debug, Default)]
pub struct AsyncSchedule {
batches: Vec<AsyncBatch>,
num_threads: u32,
}
impl IsSchedule for AsyncSchedule {
type System = Request;
type Batch = AsyncBatch;
fn batches_mut(&mut self) -> &mut Vec<Self::Batch> {
&mut self.batches
}
fn batches(&self) -> &[Self::Batch] {
self.batches.as_slice()
}
fn add_batch(&mut self, batch: Self::Batch) {
self.batches.push(batch);
}
fn set_parallelism(&mut self, threads: u32) {
self.num_threads = threads;
}
fn get_parallelism(&self) -> u32 {
self.num_threads
}
fn current_barrier(&self) -> usize {
0
}
fn add_barrier(&mut self) {}
}