use crate::spsc;
use std::cmp::Ordering;
use std::mem;
#[derive(Debug)]
pub enum EventType<U> {
ModelEvent(U),
Stalled,
Null,
Close,
}
#[derive(Debug)]
pub struct Event<T, U>
where
T: Ord + Copy + num::Zero,
{
pub time: T,
pub src: usize,
pub event_type: EventType<U>,
}
#[derive(Debug)]
pub struct Merger<T, U>
where
T: Ord + Copy + num::Zero,
{
id: usize,
in_queues: Vec<spsc::Consumer<Event<T, U>>>,
n_layers: usize,
paths: Vec<usize>,
winner_q: usize,
safe_time: T,
loser_e: Vec<Event<T, U>>,
ix_to_id: Vec<usize>,
}
fn ltr_walk(n_nodes: usize) -> Vec<usize> {
let n_layers = (n_nodes as f32).log2().ceil() as usize;
let mut visited: Vec<bool> = Vec::new();
for _ in 0..n_nodes + 1 {
visited.push(false);
}
let mut cur_index = 2_usize.pow((n_layers - 1) as u32);
let mut indices = Vec::new();
indices.push(cur_index);
visited[cur_index] = true;
let mut going_up = true;
for _ in 0..n_nodes - 1 {
if going_up {
while visited[cur_index] {
cur_index /= 2;
}
} else {
if cur_index * 2 + 1 >= n_nodes {
going_up = true;
continue;
}
cur_index = cur_index * 2 + 1;
while cur_index * 2 < n_nodes {
cur_index *= 2;
}
}
indices.push(cur_index);
visited[cur_index] = true;
going_up = !going_up;
}
if visited[0] {
indices.pop();
}
indices
}
impl<T, U> Merger<T, U>
where
U: std::fmt::Debug,
T: Ord + Copy + num::Zero,
{
pub fn new(
in_queues: Vec<spsc::Consumer<Event<T, U>>>,
id: usize,
ix_to_id: Vec<usize>,
) -> Merger<T, U> {
let mut loser_e = Vec::new();
let winner_q = 0;
loser_e.push(Event {
time: T::zero(),
event_type: EventType::Null,
src: 0,
});
for i in 1..in_queues.len() {
loser_e.push(Event {
time: T::zero(),
event_type: EventType::Null,
src: i,
});
}
for (loser, ix) in ltr_walk(in_queues.len()).iter().enumerate() {
loser_e[*ix] = Event {
time: T::zero(),
event_type: EventType::Null,
src: loser + 1,
};
}
let n_queues = in_queues.len();
let n_layers = (n_queues as f32).log2().ceil() as usize;
let largest_full_layer = 2_usize.pow((n_queues as f32).log2().floor() as u32);
let last_layer_max_i = ((n_queues + largest_full_layer - 1) % largest_full_layer + 1) * 2;
let offset = (last_layer_max_i + 1) / 2;
let mut paths = Vec::new();
for ix in 0..n_queues {
let v_ix = if ix > last_layer_max_i {
(ix - offset) * 2
} else {
ix
};
let mut index = 0;
assert!(n_layers > 0);
for level in (0..n_layers).rev() {
let base_offset = 2_usize.pow(level as u32);
index = base_offset + v_ix / 2_usize.pow((n_layers - level) as u32);
if index >= loser_e.len() {
continue;
}
break;
}
paths.push(index);
}
Merger {
id,
in_queues,
n_layers,
winner_q,
safe_time: T::zero(),
paths,
loser_e,
ix_to_id,
}
}
fn _try_pop(&mut self) -> Option<Event<T, U>> {
if !self.in_queues[self.winner_q].is_empty() {
self.next()
} else {
None
}
}
}
impl<T, U> Iterator for Merger<T, U>
where
U: std::fmt::Debug,
T: Ord + Copy + num::Zero,
{
type Item = Event<T, U>;
fn next(&mut self) -> Option<Self::Item> {
loop {
let mut new_winner_e = match self.in_queues[self.winner_q].pop() {
Err(_) => {
Event {
time: self.safe_time,
src: self.winner_q,
event_type: EventType::Stalled,
}
}
Ok(event) => event,
};
new_winner_e.src = self.winner_q;
let mut index = self.paths[self.winner_q];
while index != 0 {
let cur_loser = &mut self.loser_e[index];
match cur_loser.time.cmp(&new_winner_e.time) {
Ordering::Less => mem::swap(&mut new_winner_e, cur_loser),
Ordering::Equal => {
if let EventType::Stalled = new_winner_e.event_type {
mem::swap(&mut new_winner_e, cur_loser);
}
}
_ => {}
}
index /= 2;
}
self.winner_q = new_winner_e.src;
self.safe_time = new_winner_e.time;
if let EventType::Null = new_winner_e.event_type {
continue;
}
if let EventType::Stalled = new_winner_e.event_type {
if !self.in_queues[new_winner_e.src].is_empty() {
continue;
}
}
return Some(new_winner_e);
}
}
}
#[cfg(test)]
mod test_merger {
use crate::engine::*;
use crate::spsc;
use std::{thread, time};
#[derive(Debug)]
enum EmptyModel {
None,
}
#[test]
fn test_ltr() {
let ix = ltr_walk(2);
let expected = vec![1];
assert_eq!(ix, expected);
let ix = ltr_walk(3);
let expected = vec![2, 1];
assert_eq!(ix, expected);
let ix = ltr_walk(4);
let expected = vec![2, 1, 3];
assert_eq!(ix, expected);
let ix = ltr_walk(5);
let expected = vec![4, 2, 1, 3];
assert_eq!(ix, expected);
let ix = ltr_walk(13);
let expected = vec![8, 4, 9, 2, 10, 5, 11, 1, 12, 6, 3, 7];
assert_eq!(ix, expected);
}
#[test]
fn test_merge_2() {
test_interleave(2, 5);
test_pushpop(2, 5);
test_ties(2, 5);
}
#[test]
fn test_merge_4() {
test_interleave(4, 10);
test_pushpop(4, 10);
test_ties(4, 10);
}
#[test]
fn test_merge_5() {
test_interleave(5, 10);
test_pushpop(5, 10);
test_ties(5, 10);
}
#[test]
fn test_merge_7() {
test_interleave(7, 10);
test_pushpop(7, 10);
test_ties(7, 10);
}
#[test]
fn test_merge_many_interleave() {
for n_queues in 3..20 {
println!("{} queues =======================", n_queues);
test_interleave(n_queues, n_queues + 5);
}
}
#[test]
fn test_merge_many_threads() {
for n_queues in 3..20 {
println!("{} queues =======================", n_queues);
test_pushpop(n_queues, n_queues + 5);
}
}
#[test]
fn test_merge_many_ties() {
for n_queues in 3..20 {
println!("{} queues =======================", n_queues);
test_ties(n_queues, n_queues + 5);
}
}
fn test_interleave(n_queues: usize, n_events: usize) {
println!("Interleaving");
let mut prod_qs = Vec::new();
let mut cons_qs = Vec::new();
for _ in 0..n_queues {
let (prod, cons) = spsc::new(128);
prod_qs.push(prod);
cons_qs.push(cons);
}
let mut merger = Merger::<u64, EmptyModel>::new(cons_qs, 0, vec![]);
println!("Pushing events");
for (src, prod) in prod_qs.iter().enumerate() {
for i in 1..n_events + 1 {
let e = Event {
time: (src * 1 + i) as u64,
src,
event_type: EventType::ModelEvent(EmptyModel::None),
};
println!(" {} <- {:?}", i, e);
prod.push(e).unwrap();
}
let e = Event {
time: 100000,
src,
event_type: EventType::Close,
};
prod.push(e).unwrap();
}
println!("\nPopping events");
let mut event_count = 0;
let mut cur_time = 0;
while let Some(event) = merger._try_pop() {
println!(" => {:?}", event);
assert!(
cur_time <= event.time,
"Time invariant violated. Previous event was @{}, current event @{}",
cur_time,
event.time
);
cur_time = event.time;
event_count += 1;
}
let expected_count = n_queues * n_events + 1;
assert_eq!(
event_count, expected_count,
"Expected {} events, saw {}",
expected_count, event_count
);
}
fn test_pushpop(n_queues: usize, n_events: usize) {
let mut prod_qs = Vec::new();
let mut cons_qs = Vec::new();
for _ in 0..n_queues {
let (prod, cons) = spsc::new(128);
prod_qs.push(prod);
cons_qs.push(cons);
}
let mut merger = Merger::<u64, EmptyModel>::new(cons_qs, 0, vec![]);
println!("\nMultithreaded pushing and popping events");
let handle = thread::spawn(move || {
let expected_count = n_queues * n_events + 1;
let mut event_count = 0;
let mut cur_time = 0;
while event_count < expected_count {
let event = merger.next().unwrap();
if let EventType::Stalled = event.event_type {
continue;
}
assert!(
cur_time <= event.time,
"Time invariant violated. Previous event was @{}, current event @{}",
cur_time,
event.time
);
cur_time = event.time;
event_count += 1;
}
if let Some(event) = merger._try_pop() {
assert!(
false,
"Merger should not have any more events, got {:?}",
event
);
}
assert_eq!(
event_count, expected_count,
"Expected {} events, saw {}",
expected_count, event_count
);
});
for (src, prod) in prod_qs.iter().enumerate().rev() {
for i in 1..n_events + 1 {
let e = Event {
time: (src * 1 + 4 * i) as u64,
src,
event_type: EventType::ModelEvent(EmptyModel::None),
};
println!(" {} <- {:?}", i, e);
prod.push(e).unwrap();
thread::sleep(time::Duration::from_micros(1));
}
let e = Event {
time: 100000,
src,
event_type: EventType::Close,
};
prod.push(e).unwrap();
}
handle.join().unwrap();
}
fn test_ties(n_queues: usize, n_events: usize) {
println!("\nTie checking");
let mut prod_qs = Vec::new();
let mut cons_qs = Vec::new();
for _ in 0..n_queues {
let (prod, cons) = spsc::new(128);
prod_qs.push(prod);
cons_qs.push(cons);
}
let mut merger = Merger::<u64, EmptyModel>::new(cons_qs, 0, vec![]);
let mut cur_time = 0;
let mut success = true;
for time in 0..n_events {
println!("\nPushing events @{}", time);
for (ix, prod) in prod_qs.iter().enumerate() {
let e = Event {
time: time as u64,
src: ix,
event_type: EventType::ModelEvent(EmptyModel::None),
};
println!(" {} <- {:?}", ix, e);
prod.push(e).unwrap();
}
println!("Popping events");
let mut event_count = 0;
while let Some(event) = merger.next() {
if let EventType::Stalled = event.event_type {
if event.time == time as u64 {
break;
}
}
println!(" => {:?}", event);
assert!(
cur_time <= event.time,
"Time invariant violated. Previous event was @{}, current event @{}",
cur_time,
event.time
);
cur_time = event.time;
if let EventType::Stalled = event.event_type {
} else {
event_count += 1;
}
}
success = success && event_count == n_queues;
println!(
"Expected {} events, saw {}",
n_queues,
event_count
);
}
assert!(success);
}
}