dynpool 0.0.2

A thread manager that is lightweight, flexible, and rescalable.
Documentation
use std::sync::RwLock;
use std::time::{Duration, Instant};
use {Decision, Scale, System};

/// A subset of `System` that can perform work, but my not provide scale. Should
/// be implemented by the user or created by functions such as `func_worker`.
pub trait SystemWork: Sync + Send + 'static {
    /// Per-worker data. See `System::Data`.
    type Data;

    /// Create a new worker. See `System::init`.
    fn init(&self, index: usize) -> Self::Data;

    /// Do a unit of work, and return scheduling information. See `System::work`.
    fn work(&self, data: &mut Self::Data) -> Decision;

    /// Consume a closed worker. See `System::close`.
    fn close(&self, Self::Data) {}
}

impl<T> SystemWork for T
where
    T: System,
{
    type Data = <Self as System>::Data;

    fn init(&self, index: usize) -> Self::Data {
        System::init(self, index)
    }

    fn work(&self, data: &mut Self::Data) -> Decision {
        System::work(self, data)
    }

    fn close(&self, data: Self::Data) {
        System::close(self, data)
    }
}

/// Implementors of this trait are systems with a stored scale.
pub trait SettableSystem: System {
    /// Change the number of active and inactive threads in the system, or shut
    /// the system down.
    fn set_scale(&self, scale: Scale);
}

/// Create a system that has an internally stored pool scale.
pub fn with_scale(sys: impl SystemWork, scale: Scale) -> impl SettableSystem {
    pub struct Sys<W>(W, RwLock<Scale>);

    impl<W: SystemWork> System for Sys<W> {
        type Data = W::Data;

        fn init(&self, index: usize) -> W::Data {
            self.0.init(index)
        }

        fn work(&self, data: &mut W::Data) -> Decision {
            self.0.work(data)
        }

        fn close(&self, data: W::Data) {
            self.0.close(data)
        }

        fn scale(&self) -> Scale {
            *self.1.read().unwrap()
        }
    }

    impl<W: SystemWork> SettableSystem for Sys<W> {
        fn set_scale(&self, scale: Scale) {
            *self.1.write().unwrap() = scale;
        }
    }

    Sys(sys, RwLock::new(scale))
}

/// Create a system that has a set number of active workers.
///
/// ```
/// # extern crate dynpool;
/// # use dynpool::*;
/// # use std::time::{Duration, Instant};
/// # fn main() {
/// // Run 10 threads for 0.5 seconds.
/// let time = Instant::now() + Duration::from_millis(500);
/// let worker = simple_func_worker(|index| println!("Hello from {}", index));
/// let sys = shutdown_after(with_threads(worker, 10), time);
/// Pool::start_fg(sys);
/// # }
pub fn with_threads(w: impl SystemWork, threads: usize) -> impl SettableSystem {
    with_scale(w, Scale::active(threads))
}

/// Begin creating a system which calls the given function from each thread, passing the thread's index.
///
/// ```
/// # extern crate dynpool;
/// # use dynpool::*;
/// # use std::time::{Duration, Instant};
/// # fn main() {
/// // Run 10 threads for 0.5 seconds.
/// let time = Instant::now() + Duration::from_millis(500);
/// let worker = simple_func_worker(|index| println!("Hello from {}", index));
/// let sys = shutdown_after(with_threads(worker, 10), time);
/// Pool::start_fg(sys);
/// # }
pub fn simple_func_worker(func: impl Fn(usize) + Sync + Send + 'static) -> impl SystemWork {
    pub struct Sys<F>(F);

    impl<F: Fn(usize) + Sync + Send + 'static> SystemWork for Sys<F> {
        type Data = usize;

        fn init(&self, index: usize) -> usize {
            index
        }
        fn work(&self, &mut index: &mut usize) -> Decision {
            (self.0)(index);
            Decision::Again
        }
    }

    Sys(func)
}

/// Begin creating a system which calls the given closure to create a work function for each thread.
pub fn func_worker<F>(init: impl Fn(usize) -> F + Sync + Send + 'static) -> impl SystemWork
where
    F: FnMut() -> Decision,
{
    pub struct Sys<F>(F);

    impl<G: FnMut() -> Decision, F: Fn(usize) -> G + Sync + Send + 'static> SystemWork for Sys<F> {
        type Data = G;

        fn init(&self, index: usize) -> G {
            (self.0)(index)
        }
        fn work(&self, func: &mut G) -> Decision {
            func()
        }
    }

    Sys(init)
}

/// Begin creating a system which calls the given closure to create a boxed work function for each thread.
pub fn dyn_func_worker(
    init: impl Fn(usize) -> Box<dyn FnMut() -> Decision> + Sync + Send + 'static,
) -> impl SystemWork {
    type BoxedWorkFn = Box<dyn FnMut() -> Decision>;

    pub struct Sys<F>(F);

    impl<F: Fn(usize) -> BoxedWorkFn + Sync + Send + 'static> SystemWork for Sys<F> {
        type Data = BoxedWorkFn;

        fn init(&self, index: usize) -> BoxedWorkFn {
            (self.0)(index)
        }
        fn work(&self, func: &mut BoxedWorkFn) -> Decision {
            func()
        }
    }

    Sys(init)
}

/// Create a system wrapper which will shutdown after the provided time.
///
/// ```
/// # extern crate dynpool;
/// # use dynpool::*;
/// # use std::time::{Duration, Instant};
/// # fn main() {
/// // Run 10 threads for 0.5 seconds.
/// let time = Instant::now() + Duration::from_millis(500);
/// let worker = simple_func_worker(|index| println!("Hello from {}", index));
/// let sys = shutdown_after(with_threads(worker, 10), time);
/// Pool::start_fg(sys);
/// # }
/// ```
pub fn shutdown_after(sys: impl System, after: Instant) -> impl System {
    pub struct Shutdown<C> {
        after: Instant,
        inner: C,
    }

    impl<C: System + Sync + Send + 'static> System for Shutdown<C> {
        type Data = C::Data;

        fn init(&self, index: usize) -> Self::Data {
            self.inner.init(index)
        }

        fn work(&self, data: &mut Self::Data) -> Decision {
            self.inner.work(data)
        }

        fn close(&self, data: Self::Data) {
            self.inner.close(data)
        }

        fn scale(&self) -> Scale {
            if Instant::now() > self.after {
                Scale::Shutdown
            } else {
                self.inner.scale()
            }
        }
    }

    Shutdown { after, inner: sys }
}

/// Wrap a system so that `System::scale` is called no more than once per period
/// of time, and is otherwise cached. This should be used if scale calculation
/// is expensive.
///
/// ```
/// # extern crate dynpool;
/// # use dynpool::*;
/// # use std::thread::sleep_ms;
/// # use std::time::{Duration, Instant};
/// # fn main() {
/// # let time = Instant::now() + Duration::from_millis(1000);
/// struct Sys;
///
/// impl System for Sys {
///     type Data = ();
///
///     fn init(&self, _: usize) {}
///
///     fn work(&self, _: &mut ()) -> Decision { Decision::Again }
///
///     fn scale(&self) -> Scale {
///         sleep_ms(100); // scale function is slow
///         return Scale::active(10);
///     }
/// }
///
/// let sys = cache_scale(Sys, Duration::from_millis(500));
/// Pool::start_fg(shutdown_after(sys, time));
/// # }
/// ```
pub fn cache_scale(sys: impl System, rate: Duration) -> impl System {
    pub struct Cached<C> {
        scale: RwLock<Option<(Instant, Scale)>>,
        rate: Duration,
        inner: C,
    }

    impl<C: System + Sync + Send + 'static> System for Cached<C> {
        type Data = C::Data;

        fn init(&self, index: usize) -> Self::Data {
            self.inner.init(index)
        }

        fn work(&self, data: &mut Self::Data) -> Decision {
            self.inner.work(data)
        }

        fn close(&self, data: Self::Data) {
            self.inner.close(data)
        }

        fn scale(&self) -> Scale {
            // Is there a better way to do this?
            // Yes. I fixed it for you.

            match &*self.scale.read().unwrap() {
                Some((at, scale)) if at.elapsed() < self.rate => return *scale,
                _ => (),
            }

            match &mut *self.scale.write().unwrap() {
                // in case someone else writes first
                Some((at, scale)) if at.elapsed() < self.rate => return *scale,
                // compute and store new scale
                write => {
                    let scale = self.inner.scale();
                    *write = Some((Instant::now(), scale));
                    scale
                }
            }
        }
    }

    Cached {
        scale: RwLock::new(None),
        rate,
        inner: sys,
    }
}