extern crate dynpool;
extern crate rand;
use std::sync::RwLock;
use std::collections::{HashMap};
use std::time::{Instant, Duration};
use rand::{prelude::*, thread_rng, distributions::{Poisson, Uniform}};
use dynpool::{Decision, Scale, System, Pool, InternalEvent, PoolPanicedError};
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
enum ExceptionType {
Hole,
Over,
Under,
}
#[derive(Copy, Clone, Debug)]
struct Exception {
ty: ExceptionType,
at: Instant,
}
impl Exception {
fn check(&mut self, new: Exception) -> bool {
if self.ty == new.ty {
new.at.duration_since(self.at) > Duration::from_millis(10)
} else {
*self = new;
false
}
}
}
struct State {
except: HashMap<usize, Exception>,
counts: Vec<i32>,
}
impl State {
fn add_count(&mut self, index: usize, val: i32) {
let len = self.counts.len().max(index + 1);
self.counts.resize(len, 0);
self.counts[index] += val;
if len >= index {
let mut tolen = 0;
for (i, &v) in self.counts.iter().enumerate() {
if v != 0 { tolen = i + 1}
}
for i in tolen..self.counts.len() {
self.except.remove(&i);
}
self.counts.truncate(tolen);
}
}
fn init(&mut self, _index: usize) {
self.check();
}
fn close(&mut self, _index: usize) {
self.check();
}
fn check(&mut self) {
use std::collections::hash_map::Entry::*;
let now = Instant::now();
for (index, &c) in self.counts.iter().enumerate() {
let ex = match c {
_ if c < 0 => Some(ExceptionType::Under),
0 => Some(ExceptionType::Hole),
1 => None,
_ => Some(ExceptionType::Over),
}.map(|x| Exception {
at: now,
ty: x,
});
match ex {
Some(ex) => match self.except.entry(index) {
Occupied(mut ent) => if ent.get_mut().check(ex) {
println!("STATE: {:?}", &self.counts);
panic!("exception {:?} at {}", ent.get().ty, index);
},
Vacant(ent) => { ent.insert(ex); },
},
None => {
if let Some(_e) = self.except.remove(&index) {
}
},
}
}
}
}
struct Sys {
state: RwLock<State>,
start: Instant,
length: Duration,
}
impl System for Sys {
type Data = usize;
fn init(&self, index: usize) -> usize {
self.state.write().unwrap().init(index);
index
}
fn close(&self, index: usize) {
self.state.write().unwrap().close(index);
}
fn work(&self, &mut _index: &mut usize) -> Decision {
use self::Decision::*;
match Uniform::from(0..100).sample(&mut thread_rng()) {
0..=60 => Again,
61..=98 => Incomplete,
99 => Restart,
_ => unreachable!(),
}
}
fn scale(&self) -> Scale {
use self::Scale::*;
if self.start.elapsed() > self.length {
return Shutdown;
}
let mut rng = thread_rng();
let active = Poisson::new(4.).sample(&mut rng) as usize;
match Uniform::from(0..10).sample(&mut rng) {
0..=4 => Active(active),
5..=7 => Mixed { active, max_inactive: Uniform::from(0..15).sample(&mut rng) },
8..=9 => NoTerm { active },
_ => unreachable!(),
}
}
fn note_event(&self, event: InternalEvent) {
use self::InternalEvent::*;
let (index, diff) = match event {
ThreadSpawn { index } => (index, 1),
ThreadShutdown { index } => (index, -1),
WorkerLoopSkip { .. } => return,
_ => {
return
},
};
let mut state = self.state.write().unwrap();
state.add_count(index, diff);
}
}
fn main() -> Result<(), PoolPanicedError> {
Pool::start_bg(Sys {
state: RwLock::new(State {
except: HashMap::new(),
counts: Vec::new(),
}),
start: Instant::now(),
length: Duration::from_secs(60),
}).join()
}