use petgraph::prelude::NodeIndex;
use std::collections::VecDeque;
const INITIAL_CAPACITY: usize = 256;
#[derive(Debug, Clone, Copy, PartialEq, Eq, thiserror::Error)]
#[error("cannot schedule a node above the current processing depth")]
pub struct SchedulerError;
#[derive(Debug)]
pub struct Scheduler {
multi_queue: Vec<VecDeque<NodeIndex>>,
curr_depth: usize,
pending_events: usize,
}
impl Scheduler {
pub(crate) fn new() -> Self {
Self {
multi_queue: Vec::new(),
curr_depth: 0,
pending_events: 0,
}
}
pub(crate) const fn has_pending_event(&self) -> bool {
self.pending_events > 0
}
pub(crate) fn resize(&mut self, max_depth: u32) {
self.multi_queue.resize(
max_depth as usize,
VecDeque::with_capacity(INITIAL_CAPACITY),
);
}
pub(crate) fn enable_depth(&mut self, depth: u32) {
let required_len = depth + 1;
if self.multi_queue.len() < required_len as usize {
self.resize(required_len);
}
}
#[inline(always)]
pub(crate) fn schedule(
&mut self,
node_index: NodeIndex,
depth: u32,
) -> Result<(), SchedulerError> {
if (depth as usize) < self.curr_depth {
return Err(SchedulerError);
}
self.multi_queue[depth as usize].push_back(node_index);
self.pending_events += 1;
Ok(())
}
#[inline(always)]
pub(crate) fn pop(&mut self) -> Option<NodeIndex> {
while self.curr_depth < self.multi_queue.len() && self.has_pending_event() {
if let Some(item) = self.multi_queue[self.curr_depth].pop_front() {
debug_assert!(
self.pending_events > 0,
"pending_events underflow in scheduler pop"
);
self.pending_events -= 1;
return Some(item);
}
self.curr_depth += 1;
}
self.curr_depth = 0;
debug_assert!(
self.pending_events == 0,
"pending_events not reset after exhausting queue"
);
None
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_scheduler_basic() {
let mut scheduler = Scheduler::new();
scheduler.resize(5);
let _ = scheduler.schedule(NodeIndex::from(0), 0);
let _ = scheduler.schedule(NodeIndex::from(1), 1);
let _ = scheduler.schedule(NodeIndex::from(2), 4);
let _ = scheduler.schedule(NodeIndex::from(3), 2);
let _ = scheduler.schedule(NodeIndex::from(4), 3);
let _ = scheduler.schedule(NodeIndex::from(5), 1);
let _ = scheduler.schedule(NodeIndex::from(6), 2);
let _ = scheduler.schedule(NodeIndex::from(7), 4);
let item = scheduler.pop();
assert!(item.is_some());
let item = item.unwrap();
assert_eq!(item, NodeIndex::from(0));
assert_eq!(scheduler.curr_depth, 0);
let item = scheduler.pop();
assert!(item.is_some());
let item = item.unwrap();
assert_eq!(item, NodeIndex::from(1));
assert_eq!(scheduler.curr_depth, 1);
let item = scheduler.pop();
assert!(item.is_some());
let item = item.unwrap();
assert_eq!(item, NodeIndex::from(5));
assert_eq!(scheduler.curr_depth, 1);
let item = scheduler.pop();
assert!(item.is_some());
let item = item.unwrap();
assert_eq!(item, NodeIndex::from(3));
assert_eq!(scheduler.curr_depth, 2);
let item = scheduler.pop();
assert!(item.is_some());
let item = item.unwrap();
assert_eq!(item, NodeIndex::from(6));
assert_eq!(scheduler.curr_depth, 2);
let item = scheduler.pop();
assert!(item.is_some());
let item = item.unwrap();
assert_eq!(item, NodeIndex::from(4));
assert_eq!(scheduler.curr_depth, 3);
let item = scheduler.pop();
assert!(item.is_some());
let item = item.unwrap();
assert_eq!(item, NodeIndex::from(2));
assert_eq!(scheduler.curr_depth, 4);
let item = scheduler.pop();
assert!(item.is_some());
let item = item.unwrap();
assert_eq!(item, NodeIndex::from(7));
assert_eq!(scheduler.curr_depth, 4);
let item = scheduler.pop();
assert!(item.is_none());
}
#[test]
fn test_scheduler_needs_to_exhaust() {
let mut scheduler = Scheduler::new();
scheduler.resize(3);
let _ = scheduler.schedule(NodeIndex::from(0), 0);
let _ = scheduler.schedule(NodeIndex::from(2), 2);
let item = scheduler.pop();
assert!(item.is_some());
let item = item.unwrap();
assert_eq!(item, NodeIndex::from(0));
assert_eq!(scheduler.curr_depth, 0);
let item = scheduler.pop();
assert!(item.is_some());
let item = item.unwrap();
assert_eq!(item, NodeIndex::from(2));
assert_eq!(scheduler.curr_depth, 2);
let item = scheduler.pop();
assert!(item.is_none());
}
}