use crate::FrozenActor;
use atomic_counter::{AtomicCounter, RelaxedCounter};
use parking_lot::Mutex;
use rand::seq::SliceRandom;
use rand::thread_rng;
use std::collections::VecDeque;
use std::fmt::Debug;
use std::sync::Arc;
pub type LockedTaskHeap<T, R> = Arc<Mutex<VecDeque<FrozenActor<T, R>>>>;
#[derive(Debug)]
pub enum ActorState<T, R>
where
T: Ord + Copy + num::Zero,
{
Continue(T),
Done(R),
}
pub trait Advancer<T, R>: Debug
where
T: Ord + Copy + num::Zero,
{
fn advance(&mut self) -> ActorState<T, R>;
}
pub fn run<T: Ord + Copy + Debug + num::Zero, R: Send>(
id: usize,
counter: Arc<RelaxedCounter>,
n_tasks: usize,
task_heap: Vec<LockedTaskHeap<T, R>>,
) -> Vec<R> {
println!("{} start", id);
let mut counts = Vec::new();
let mut rng = thread_rng();
let mut task = task_heap.choose(&mut rng).unwrap().lock().pop_front();
loop {
if let Some(mut frozen_actor) = task {
match frozen_actor.actor.advance() {
ActorState::Continue(time) => {
frozen_actor.time = time;
let mut heap = task_heap.choose(&mut rng).unwrap().lock();
heap.push_back(frozen_actor);
task = heap.pop_front();
}
ActorState::Done(count) => {
counts.push(count);
counter.inc();
task = task_heap.choose(&mut rng).unwrap().lock().pop_front();
}
}
} else if counter.get() == n_tasks {
println!("{} finished", id);
return counts;
} else {
task = task_heap.choose(&mut rng).unwrap().lock().pop_front();
}
}
}
#[cfg(test)]
mod test {
use crate::worker::*;
#[derive(Debug)]
struct DummyAdvance {
id: usize,
count: u64,
limit: u64,
}
impl DummyAdvance {
fn new(id: usize, limit: u64) -> DummyAdvance {
DummyAdvance {
id,
count: 0,
limit,
}
}
fn _count(&self) -> u64 {
self.count
}
}
impl Advancer<u64, ()> for DummyAdvance {
fn advance(&mut self) -> ActorState<u64, ()> {
self.count += 1;
println!("{}: {}", self.id, self.count);
if self.count < self.limit {
ActorState::Continue(self.count)
} else {
ActorState::Done(())
}
}
}
#[test]
fn test_advance() {
let dummy = &mut DummyAdvance::new(0, 3);
if let ActorState::Continue(_) = dummy.advance() {
} else {
assert!(false);
}
if let ActorState::Continue(_) = dummy.advance() {
} else {
assert!(false);
}
if let ActorState::Done(_) = dummy.advance() {
} else {
assert!(false);
}
}
}