use std::sync::RwLock;
use std::time::{Duration, Instant};
use {Decision, Scale, System};
pub trait SystemWork: Sync + Send + 'static {
type Data;
fn init(&self, index: usize) -> Self::Data;
fn work(&self, data: &mut Self::Data) -> Decision;
fn close(&self, Self::Data) {}
}
impl<T> SystemWork for T
where
T: System,
{
type Data = <Self as System>::Data;
fn init(&self, index: usize) -> Self::Data {
System::init(self, index)
}
fn work(&self, data: &mut Self::Data) -> Decision {
System::work(self, data)
}
fn close(&self, data: Self::Data) {
System::close(self, data)
}
}
pub trait SettableSystem: System {
fn set_scale(&self, scale: Scale);
}
pub fn with_scale(sys: impl SystemWork, scale: Scale) -> impl SettableSystem {
pub struct Sys<W>(W, RwLock<Scale>);
impl<W: SystemWork> System for Sys<W> {
type Data = W::Data;
fn init(&self, index: usize) -> W::Data {
self.0.init(index)
}
fn work(&self, data: &mut W::Data) -> Decision {
self.0.work(data)
}
fn close(&self, data: W::Data) {
self.0.close(data)
}
fn scale(&self) -> Scale {
*self.1.read().unwrap()
}
}
impl<W: SystemWork> SettableSystem for Sys<W> {
fn set_scale(&self, scale: Scale) {
*self.1.write().unwrap() = scale;
}
}
Sys(sys, RwLock::new(scale))
}
pub fn with_threads(w: impl SystemWork, threads: usize) -> impl SettableSystem {
with_scale(w, Scale::active(threads))
}
pub fn simple_func_worker(func: impl Fn(usize) + Sync + Send + 'static) -> impl SystemWork {
pub struct Sys<F>(F);
impl<F: Fn(usize) + Sync + Send + 'static> SystemWork for Sys<F> {
type Data = usize;
fn init(&self, index: usize) -> usize {
index
}
fn work(&self, &mut index: &mut usize) -> Decision {
(self.0)(index);
Decision::Again
}
}
Sys(func)
}
pub fn func_worker<F>(init: impl Fn(usize) -> F + Sync + Send + 'static) -> impl SystemWork
where
F: FnMut() -> Decision,
{
pub struct Sys<F>(F);
impl<G: FnMut() -> Decision, F: Fn(usize) -> G + Sync + Send + 'static> SystemWork for Sys<F> {
type Data = G;
fn init(&self, index: usize) -> G {
(self.0)(index)
}
fn work(&self, func: &mut G) -> Decision {
func()
}
}
Sys(init)
}
pub fn dyn_func_worker(
init: impl Fn(usize) -> Box<dyn FnMut() -> Decision> + Sync + Send + 'static,
) -> impl SystemWork {
type BoxedWorkFn = Box<dyn FnMut() -> Decision>;
pub struct Sys<F>(F);
impl<F: Fn(usize) -> BoxedWorkFn + Sync + Send + 'static> SystemWork for Sys<F> {
type Data = BoxedWorkFn;
fn init(&self, index: usize) -> BoxedWorkFn {
(self.0)(index)
}
fn work(&self, func: &mut BoxedWorkFn) -> Decision {
func()
}
}
Sys(init)
}
pub fn shutdown_after(sys: impl System, after: Instant) -> impl System {
pub struct Shutdown<C> {
after: Instant,
inner: C,
}
impl<C: System + Sync + Send + 'static> System for Shutdown<C> {
type Data = C::Data;
fn init(&self, index: usize) -> Self::Data {
self.inner.init(index)
}
fn work(&self, data: &mut Self::Data) -> Decision {
self.inner.work(data)
}
fn close(&self, data: Self::Data) {
self.inner.close(data)
}
fn scale(&self) -> Scale {
if Instant::now() > self.after {
Scale::Shutdown
} else {
self.inner.scale()
}
}
}
Shutdown { after, inner: sys }
}
pub fn cache_scale(sys: impl System, rate: Duration) -> impl System {
pub struct Cached<C> {
scale: RwLock<Option<(Instant, Scale)>>,
rate: Duration,
inner: C,
}
impl<C: System + Sync + Send + 'static> System for Cached<C> {
type Data = C::Data;
fn init(&self, index: usize) -> Self::Data {
self.inner.init(index)
}
fn work(&self, data: &mut Self::Data) -> Decision {
self.inner.work(data)
}
fn close(&self, data: Self::Data) {
self.inner.close(data)
}
fn scale(&self) -> Scale {
match &*self.scale.read().unwrap() {
Some((at, scale)) if at.elapsed() < self.rate => return *scale,
_ => (),
}
match &mut *self.scale.write().unwrap() {
Some((at, scale)) if at.elapsed() < self.rate => return *scale,
write => {
let scale = self.inner.scale();
*write = Some((Instant::now(), scale));
scale
}
}
}
}
Cached {
scale: RwLock::new(None),
rate,
inner: sys,
}
}