use super::{BackpressureState, Direction, Scheduler};
use crate::unified_pipeline::base::{ActiveSteps, PipelineStep};
pub struct OptimizedChaseScheduler {
thread_id: usize,
#[allow(dead_code)]
num_threads: usize,
current_step: PipelineStep,
direction: Direction,
priority_buffer: [PipelineStep; 9],
bottleneck_streak: u8,
exclusive_backoff: u8,
is_exclusive_specialist: bool,
exclusive_specialty: Option<PipelineStep>,
active_steps: ActiveSteps,
}
impl OptimizedChaseScheduler {
const EXCLUSIVE_STEPS: [PipelineStep; 4] = [
PipelineStep::Read,
PipelineStep::FindBoundaries,
PipelineStep::Group,
PipelineStep::Write,
];
const BOTTLENECK_STEPS: [PipelineStep; 2] = [PipelineStep::Compress, PipelineStep::Serialize];
const BOTTLENECK_STICKY_THRESHOLD: u8 = 3;
const EXCLUSIVE_BACKOFF_CYCLES: u8 = 5;
#[must_use]
pub fn new(thread_id: usize, num_threads: usize, active_steps: ActiveSteps) -> Self {
let (current_step, is_exclusive_specialist, exclusive_specialty) =
Self::determine_role(thread_id, num_threads);
Self {
thread_id,
num_threads,
current_step,
direction: Direction::Forward,
priority_buffer: PipelineStep::all(),
bottleneck_streak: 0,
exclusive_backoff: 0,
is_exclusive_specialist,
exclusive_specialty,
active_steps,
}
}
fn determine_role(
thread_id: usize,
num_threads: usize,
) -> (PipelineStep, bool, Option<PipelineStep>) {
use PipelineStep::{Compress, FindBoundaries, Group, Read, Serialize, Write};
if thread_id == 0 {
(Read, true, Some(Read))
} else if thread_id == num_threads - 1 && num_threads > 1 {
(Write, true, Some(Write))
} else if thread_id == 1 && num_threads > 2 {
(FindBoundaries, true, Some(FindBoundaries))
} else if thread_id == num_threads - 2 && num_threads > 3 {
(Group, true, Some(Group))
} else {
let bottleneck_idx = (thread_id - 2) % 2;
let step = if bottleneck_idx == 0 { Compress } else { Serialize };
(step, false, None)
}
}
fn is_exclusive(step: PipelineStep) -> bool {
Self::EXCLUSIVE_STEPS.contains(&step)
}
fn is_bottleneck(step: PipelineStep) -> bool {
Self::BOTTLENECK_STEPS.contains(&step)
}
#[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap, clippy::cast_sign_loss)]
fn build_priorities(&mut self, _bp: BackpressureState) {
let mut priorities = Vec::with_capacity(9);
let current_idx = self.current_step.index();
let skip_current = Self::is_exclusive(self.current_step)
&& !self.is_exclusive_specialist
&& self.exclusive_backoff > 0;
if !skip_current {
priorities.push(self.current_step);
}
if let Some(specialty) = self.exclusive_specialty {
if specialty != self.current_step {
priorities.push(specialty);
}
}
if !self.is_exclusive_specialist {
for &step in &Self::BOTTLENECK_STEPS {
if !priorities.contains(&step) {
priorities.push(step);
}
}
}
let (first_dir, second_dir): (i32, i32) = match self.direction {
Direction::Forward => (1, -1), Direction::Backward => (-1, 1), };
for distance in 1..=8 {
let idx1 = current_idx as i32 + first_dir * distance;
if (0..9).contains(&idx1) {
let step = PipelineStep::all()[idx1 as usize];
if !priorities.contains(&step) && self.should_include_step(step) {
priorities.push(step);
}
}
let idx2 = current_idx as i32 + second_dir * distance;
if (0..9).contains(&idx2) {
let step = PipelineStep::all()[idx2 as usize];
if !priorities.contains(&step) && self.should_include_step(step) {
priorities.push(step);
}
}
}
for &step in &PipelineStep::all() {
if !priorities.contains(&step) {
priorities.push(step);
}
}
for (i, &step) in priorities.iter().take(9).enumerate() {
self.priority_buffer[i] = step;
}
}
fn should_include_step(&self, step: PipelineStep) -> bool {
if !Self::is_exclusive(step) {
return true;
}
if self.is_exclusive_specialist {
true
} else {
self.exclusive_backoff == 0
}
}
}
impl Scheduler for OptimizedChaseScheduler {
fn get_priorities(&mut self, bp: BackpressureState) -> &[PipelineStep] {
if self.exclusive_backoff > 0 {
self.exclusive_backoff -= 1;
}
if bp.output_high {
self.direction = Direction::Forward;
} else if bp.input_low && !bp.read_done {
self.direction = Direction::Backward;
}
self.build_priorities(bp);
let n = self.active_steps.filter_in_place(&mut self.priority_buffer);
&self.priority_buffer[..n]
}
fn record_outcome(&mut self, step: PipelineStep, success: bool, was_contention: bool) {
if success {
self.current_step = step;
if Self::is_bottleneck(step) {
self.bottleneck_streak = self.bottleneck_streak.saturating_add(1);
} else {
self.bottleneck_streak = 0;
}
self.exclusive_backoff = 0;
} else {
if was_contention && Self::is_exclusive(step) && !self.is_exclusive_specialist {
self.exclusive_backoff = Self::EXCLUSIVE_BACKOFF_CYCLES;
}
if Self::is_bottleneck(self.current_step)
&& self.bottleneck_streak < Self::BOTTLENECK_STICKY_THRESHOLD
{
self.bottleneck_streak = self.bottleneck_streak.saturating_add(1);
return;
}
let idx = self.current_step.index();
self.current_step = match self.direction {
Direction::Forward => {
if idx < 8 {
PipelineStep::all()[idx + 1]
} else {
PipelineStep::Decompress
}
}
Direction::Backward => {
if idx > 1 {
PipelineStep::all()[idx - 1]
} else {
PipelineStep::Compress
}
}
};
self.bottleneck_streak = 0;
}
}
fn thread_id(&self) -> usize {
self.thread_id
}
fn num_threads(&self) -> usize {
self.num_threads
}
fn active_steps(&self) -> &ActiveSteps {
&self.active_steps
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_thread_0_is_reader_specialist() {
let scheduler = OptimizedChaseScheduler::new(0, 8, ActiveSteps::all());
assert_eq!(scheduler.current_step, PipelineStep::Read);
assert!(scheduler.is_exclusive_specialist);
assert_eq!(scheduler.exclusive_specialty, Some(PipelineStep::Read));
}
#[test]
fn test_thread_7_is_writer_specialist() {
let scheduler = OptimizedChaseScheduler::new(7, 8, ActiveSteps::all());
assert_eq!(scheduler.current_step, PipelineStep::Write);
assert!(scheduler.is_exclusive_specialist);
assert_eq!(scheduler.exclusive_specialty, Some(PipelineStep::Write));
}
#[test]
fn test_thread_1_is_boundary_specialist() {
let scheduler = OptimizedChaseScheduler::new(1, 8, ActiveSteps::all());
assert_eq!(scheduler.current_step, PipelineStep::FindBoundaries);
assert!(scheduler.is_exclusive_specialist);
}
#[test]
fn test_thread_6_is_group_specialist() {
let scheduler = OptimizedChaseScheduler::new(6, 8, ActiveSteps::all());
assert_eq!(scheduler.current_step, PipelineStep::Group);
assert!(scheduler.is_exclusive_specialist);
}
#[test]
fn test_middle_threads_start_on_bottleneck() {
let scheduler = OptimizedChaseScheduler::new(2, 8, ActiveSteps::all());
assert!(
scheduler.current_step == PipelineStep::Compress
|| scheduler.current_step == PipelineStep::Serialize
);
assert!(!scheduler.is_exclusive_specialist);
}
#[test]
fn test_bottleneck_steps_in_priorities() {
let mut scheduler = OptimizedChaseScheduler::new(3, 8, ActiveSteps::all());
let bp = BackpressureState::default();
let priorities = scheduler.get_priorities(bp);
let compress_pos = priorities.iter().position(|&s| s == PipelineStep::Compress);
let serialize_pos = priorities.iter().position(|&s| s == PipelineStep::Serialize);
assert!(compress_pos.expect("compress position should be Some") < 4);
assert!(serialize_pos.expect("serialize position should be Some") < 4);
}
#[test]
fn test_exclusive_backoff_after_contention() {
let mut scheduler = OptimizedChaseScheduler::new(3, 8, ActiveSteps::all()); scheduler.current_step = PipelineStep::Group;
scheduler.record_outcome(PipelineStep::Group, false, true);
assert_eq!(scheduler.exclusive_backoff, OptimizedChaseScheduler::EXCLUSIVE_BACKOFF_CYCLES);
}
#[test]
fn test_specialist_no_backoff() {
let mut scheduler = OptimizedChaseScheduler::new(0, 8, ActiveSteps::all());
scheduler.record_outcome(PipelineStep::Read, false, true);
assert_eq!(scheduler.exclusive_backoff, 0);
}
#[test]
fn test_bottleneck_stickiness() {
let mut scheduler = OptimizedChaseScheduler::new(3, 8, ActiveSteps::all());
scheduler.current_step = PipelineStep::Compress;
scheduler.record_outcome(PipelineStep::Compress, false, false);
assert_eq!(scheduler.current_step, PipelineStep::Compress);
scheduler.record_outcome(PipelineStep::Compress, false, false);
assert_eq!(scheduler.current_step, PipelineStep::Compress);
scheduler.record_outcome(PipelineStep::Compress, false, false);
assert_eq!(scheduler.current_step, PipelineStep::Compress);
scheduler.record_outcome(PipelineStep::Compress, false, false);
assert_ne!(scheduler.current_step, PipelineStep::Compress);
}
#[test]
fn test_wrap_to_parallel_steps() {
let mut scheduler = OptimizedChaseScheduler::new(3, 8, ActiveSteps::all());
scheduler.current_step = PipelineStep::Write;
scheduler.direction = Direction::Forward;
scheduler.bottleneck_streak = 10;
scheduler.record_outcome(PipelineStep::Write, false, false);
assert_eq!(scheduler.current_step, PipelineStep::Decompress);
}
#[test]
fn test_priorities_returns_all_steps() {
let mut scheduler = OptimizedChaseScheduler::new(3, 8, ActiveSteps::all());
let bp = BackpressureState::default();
let priorities = scheduler.get_priorities(bp);
assert_eq!(priorities.len(), 9);
}
}