use crate::spsc;
use crate::spsc::{Consumer, Producer};
use rand::distributions::{Distribution, Uniform};
use rand_distr::Exp;
use std::time::Instant;
use crate::engine::*;
use crate::worker::{ActorState, Advancer};
type Time = u64;
type Res = u64;
const Q_SIZE: usize = 128;
const T_MULT: Time = 1024 as Time;
const LOOKAHEAD: Time = 1 as Time * T_MULT;
type PHOLDEvent = ();
type FullEvent = crate::engine::Event<Time, PHOLDEvent>;
#[derive(Debug)]
struct Actor {
pub id: usize,
time_limit: Time,
unif: Uniform<usize>,
merger: Merger<Time, PHOLDEvent>,
out_queues: Vec<Producer<FullEvent>>,
out_times: Vec<Time>,
pub count: u64,
}
impl Actor {
fn new(
id: usize,
out_queues: Vec<Producer<FullEvent>>,
in_queues: Vec<Consumer<FullEvent>>,
time_limit: Time,
) -> Actor {
let mut _ix_to_id = Vec::new();
let mut out_times = Vec::new();
for (ix, q) in out_queues.iter().enumerate() {
_ix_to_id.push(ix);
out_times.push(0);
if ix == id {
for _ in 0..100 {
q.push(Event {
event_type: EventType::ModelEvent(()),
src: id,
time: LOOKAHEAD,
})
.unwrap();
}
} else {
q.push(Event {
event_type: EventType::Null,
src: id,
time: LOOKAHEAD,
})
.unwrap();
}
}
Actor {
id,
time_limit,
unif: Uniform::from(0..out_queues.len()),
merger: Merger::new(in_queues, id, _ix_to_id),
out_queues,
out_times,
count: 0,
}
}
}
impl Advancer<Time, Res> for Actor {
fn advance(&mut self) -> ActorState<Time, Res> {
while let Some(mut event) = self.merger.next() {
if event.time > self.time_limit {
println!("{} done", self.id);
for chan in &self.out_queues {
chan.push(Event {
event_type: EventType::Close,
src: self.id,
time: event.time,
})
.unwrap();
}
break;
}
let mut rng = rand::thread_rng();
let exp = Exp::new(1.0).unwrap();
match event.event_type {
EventType::Close => unreachable!(),
EventType::Null => unreachable!(),
EventType::Stalled => {
for (dst_ix, out_time) in self.out_times.iter_mut().enumerate() {
if *out_time < event.time {
self.out_queues[dst_ix]
.push(Event {
event_type: EventType::Null,
src: self.id,
time: event.time + LOOKAHEAD,
})
.unwrap();
*out_time = event.time;
}
}
return ActorState::Continue(event.time);
}
EventType::ModelEvent(_) => {
self.count += 1;
let dst_ix = self.unif.sample(&mut rng);
let cur_time = std::cmp::max(self.out_times[dst_ix], event.time);
let dst_time = cur_time + (exp.sample(&mut rng) as f64 * T_MULT as f64) as Time;
event.time = dst_time + LOOKAHEAD;
self.out_queues[dst_ix].push(event).unwrap();
self.out_times[dst_ix] = dst_time;
}
}
}
ActorState::Done(self.count)
}
}
pub fn transpose<T>(in_vector: Vec<Vec<T>>) -> Vec<Vec<T>> {
let mut result: Vec<Vec<T>> = Vec::new();
for _ in 0..in_vector[0].len() {
result.push(Vec::new());
}
for mut col in in_vector {
for (i, element) in col.drain(..).enumerate() {
result[i].push(element);
}
}
result
}
pub fn run(n_actors: usize, mut time_limit: Time, n_threads: usize) {
time_limit *= T_MULT;
println!("Setup...");
let mut out_queues = Vec::new();
let mut in_queues = Vec::new();
for _ in 0..n_actors {
let mut outs = Vec::new();
let mut ins = Vec::new();
for _ in 0..n_actors {
let (prod, cons) = spsc::new(Q_SIZE);
outs.push(prod);
ins.push(cons);
}
out_queues.push(outs);
in_queues.push(ins);
}
let mut in_queues = transpose(in_queues);
let mut actors = Vec::new();
for id in 0..n_actors {
let outs = out_queues.pop().unwrap();
let ins = in_queues.pop().unwrap();
let a = Actor::new(id, outs, ins, time_limit); actors.push(Box::new(a) as Box<dyn Advancer<Time, Res> + Send>);
}
println!("Run...");
let start = Instant::now();
let counts = crate::start(num_cpus::get() - 1, actors);
let duration = start.elapsed();
let sum_count = counts.iter().sum::<u64>();
let ns_per_count: f64 = if sum_count > 0 {
1000. * duration.as_nanos() as f64 / sum_count as f64
} else {
0.
};
println!(
"= {} in {:.3}s. {} actors, {} threads",
sum_count,
duration.as_secs_f32(),
n_actors,
n_threads,
);
println!(
" {:.3}M count/sec, {:.3}M /actors, {:.3}M /thread",
(1e6 / ns_per_count as f64),
(1e6 / (ns_per_count * n_actors as f64)),
(1e6 / (ns_per_count * n_threads as f64)),
);
println!(
" {:.1} ns/count, {:.1} ns/actor, {:.1} ns/thread",
ns_per_count / 1000. as f64,
ns_per_count * n_actors as f64 / 1000.,
ns_per_count * n_threads as f64 / 1000.,
);
println!("done");
}