use super::chase_bottleneck::ChaseBottleneckScheduler;
use super::fixed_priority::FixedPriorityScheduler;
use super::{BackpressureState, Scheduler};
use crate::unified_pipeline::base::{ActiveSteps, PipelineStep};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum Phase {
Startup,
SteadyState,
Drain,
}
pub struct TwoPhaseScheduler {
thread_id: usize,
num_threads: usize,
fixed: FixedPriorityScheduler,
chase: ChaseBottleneckScheduler,
phase: Phase,
items_processed: u64,
startup_threshold: u64,
active_steps: ActiveSteps,
}
impl TwoPhaseScheduler {
const DEFAULT_STARTUP_THRESHOLD: u64 = 1000;
#[must_use]
pub fn new(thread_id: usize, num_threads: usize, active_steps: ActiveSteps) -> Self {
Self {
thread_id,
num_threads,
fixed: FixedPriorityScheduler::new(thread_id, num_threads, active_steps.clone()),
chase: ChaseBottleneckScheduler::new(thread_id, num_threads, active_steps.clone()),
phase: Phase::Startup,
items_processed: 0,
startup_threshold: Self::DEFAULT_STARTUP_THRESHOLD,
active_steps,
}
}
}
impl Scheduler for TwoPhaseScheduler {
fn get_priorities(&mut self, backpressure: BackpressureState) -> &[PipelineStep] {
match self.phase {
Phase::Startup => {
if self.items_processed >= self.startup_threshold {
self.phase = Phase::SteadyState;
}
}
Phase::SteadyState => {
if backpressure.read_done {
self.phase = Phase::Drain;
}
}
Phase::Drain => {
}
}
match self.phase {
Phase::Startup | Phase::Drain => self.chase.get_priorities(backpressure),
Phase::SteadyState => self.fixed.get_priorities(backpressure),
}
}
fn record_outcome(&mut self, step: PipelineStep, success: bool, was_contention: bool) {
if success {
self.items_processed += 1;
}
match self.phase {
Phase::Startup | Phase::Drain => {
self.chase.record_outcome(step, success, was_contention);
}
Phase::SteadyState => {
self.fixed.record_outcome(step, success, was_contention);
}
}
}
fn thread_id(&self) -> usize {
self.thread_id
}
fn num_threads(&self) -> usize {
self.num_threads
}
fn active_steps(&self) -> &ActiveSteps {
&self.active_steps
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_starts_in_startup_phase() {
let scheduler = TwoPhaseScheduler::new(0, 8, ActiveSteps::all());
assert_eq!(scheduler.phase, Phase::Startup);
}
#[test]
fn test_transition_to_steady_state() {
let mut scheduler = TwoPhaseScheduler::new(0, 8, ActiveSteps::all());
let bp = BackpressureState::default();
for _ in 0..1000 {
scheduler.record_outcome(PipelineStep::Process, true, false);
}
scheduler.get_priorities(bp);
assert_eq!(scheduler.phase, Phase::SteadyState);
}
#[test]
fn test_transition_to_drain() {
let mut scheduler = TwoPhaseScheduler::new(0, 8, ActiveSteps::all());
scheduler.phase = Phase::SteadyState;
let bp = BackpressureState {
output_high: false,
input_low: false,
read_done: true,
memory_high: false,
memory_drained: true,
};
scheduler.get_priorities(bp);
assert_eq!(scheduler.phase, Phase::Drain);
}
}