dynpool 0.0.2

A thread manager that is lightweight, flexible, and rescalable.
Documentation
//! A messy but reasonably elegant and reliable invariant checker.

extern crate dynpool;
extern crate rand;

// use std::io::prelude::*;
// use std::fs::File;
use std::sync::RwLock;
use std::collections::{HashMap};
use std::time::{Instant, Duration};
use rand::{prelude::*, thread_rng, distributions::{Poisson, Uniform}};
use dynpool::{Decision, Scale, System, Pool, InternalEvent, PoolPanicedError};

#[derive(Copy, Clone, Eq, PartialEq, Debug)]
enum ExceptionType {
    Hole,
    Over,
    Under,
}

#[derive(Copy, Clone, Debug)]
struct Exception {
    ty: ExceptionType,
    at: Instant,
}

impl Exception {
    fn check(&mut self, new: Exception) -> bool {
        if self.ty == new.ty {
            new.at.duration_since(self.at) > Duration::from_millis(10)
        } else {
            *self = new;
            false
        }
    }
}

struct State {
    except: HashMap<usize, Exception>,
    counts: Vec<i32>,
    // exlog: File,
}

impl State {
    fn add_count(&mut self, index: usize, val: i32) {
        let len = self.counts.len().max(index + 1);
        self.counts.resize(len, 0);
        self.counts[index] += val;

        if len >= index {
            let mut tolen = 0;
            for (i, &v) in self.counts.iter().enumerate() {
                if v != 0 { tolen = i + 1}
            }
            for i in tolen..self.counts.len() {
                self.except.remove(&i);
            }
            self.counts.truncate(tolen);
        }
    }

    fn init(&mut self, _index: usize) {
        // println!("INIT {}", index);
        self.check();
    }

    fn close(&mut self, _index: usize) {
        // println!("CLOSE {}", index);
        self.check();
    }

    fn check(&mut self) {
        use std::collections::hash_map::Entry::*;

        let now = Instant::now();

        for (index, &c) in self.counts.iter().enumerate() {
            let ex = match c {
                _ if c < 0 => Some(ExceptionType::Under),
                0 => Some(ExceptionType::Hole),
                1 => None,
                _ => Some(ExceptionType::Over),
            }.map(|x| Exception {
                at: now,
                ty: x,
            });

            match ex {
                Some(ex) => match self.except.entry(index) {
                    Occupied(mut ent) => if ent.get_mut().check(ex) {
                        println!("STATE: {:?}", &self.counts);
                        panic!("exception {:?} at {}", ent.get().ty, index);
                    },
                    Vacant(ent) => { ent.insert(ex); },
                },
                None => {
                    if let Some(_e) = self.except.remove(&index) {
                        // let micros = e.at.elapsed().subsec_micros();
                        // let ty = match e.ty {
                        //     ExceptionType::Hole => "hole",
                        //     ExceptionType::Over => "over",
                        //     ExceptionType::Under => "under",
                        // };
                        // writeln!(self.exlog, "\"{}\",{}", ty, micros).unwrap();
                    }
                },
            }
        }
    }
}

struct Sys {
    state: RwLock<State>,
    start: Instant,
    length: Duration,
}


impl System for Sys {
    type Data = usize;

    fn init(&self, index: usize) -> usize {
        self.state.write().unwrap().init(index);
        index
    }

    fn close(&self, index: usize) {
        self.state.write().unwrap().close(index);
    }

    fn work(&self, &mut _index: &mut usize) -> Decision {
        use self::Decision::*;

        match Uniform::from(0..100).sample(&mut thread_rng()) {
            0..=60 => Again,
            61..=98 => Incomplete,
            99 => Restart,
            _ => unreachable!(),
        }
    }

    fn scale(&self) -> Scale {
        use self::Scale::*;

        if self.start.elapsed() > self.length {
            return Shutdown;
        }

        let mut rng = thread_rng();

        let active = Poisson::new(4.).sample(&mut rng) as usize;
        match Uniform::from(0..10).sample(&mut rng) {
            0..=4 => Active(active),
            5..=7 => Mixed { active, max_inactive: Uniform::from(0..15).sample(&mut rng) },
            8..=9 => NoTerm { active },
            _ => unreachable!(),
        }
    }

    fn note_event(&self, event: InternalEvent) {
        use self::InternalEvent::*;

        let (index, diff) = match event {
            ThreadSpawn { index } => (index, 1),
            ThreadShutdown { index } => (index, -1),
            WorkerLoopSkip { .. } => return,
            _ => {
                // println!("{:?}", event);
                return
            },
        };

        let mut state = self.state.write().unwrap();
        state.add_count(index, diff);
        // println!("{:?} :: {:?}", event, state.counts);
    }
}


fn main() -> Result<(), PoolPanicedError> {
    // let mut exlog = File::create("/home/sam/tmp/smasher_log.csv").unwrap();
    // writeln!(exlog, "type,duration").unwrap();
    Pool::start_bg(Sys {
        state: RwLock::new(State {
            except: HashMap::new(),
            counts: Vec::new(),
            // exlog,
        }),
        start: Instant::now(),
        length: Duration::from_secs(60),
    }).join()
}