use super::{BackpressureState, Scheduler};
use crate::unified_pipeline::base::{ActiveSteps, PipelineStep};
pub struct FixedPriorityScheduler {
thread_id: usize,
#[allow(dead_code)]
num_threads: usize,
priorities: [PipelineStep; 9],
backpressure_drain: [PipelineStep; 9],
backpressure_fill: [PipelineStep; 9],
active_steps: ActiveSteps,
active_priorities: [PipelineStep; 9],
num_active: usize,
}
impl FixedPriorityScheduler {
#[must_use]
pub fn new(thread_id: usize, num_threads: usize, active_steps: ActiveSteps) -> Self {
use PipelineStep::{
Compress, Decode, Decompress, FindBoundaries, Group, Process, Read, Serialize, Write,
};
let backpressure_drain =
[Write, Compress, Serialize, Process, Group, Decode, FindBoundaries, Decompress, Read];
let backpressure_fill =
[Read, Decompress, FindBoundaries, Decode, Group, Process, Serialize, Compress, Write];
let is_reader = thread_id == 0;
let is_writer = num_threads > 1 && thread_id == num_threads - 1;
let is_boundary_early = num_threads > 2 && thread_id == 1;
let is_boundary_late = num_threads > 3 && thread_id == num_threads - 2;
let priorities = if is_reader {
[Read, Decompress, FindBoundaries, Decode, Group, Process, Serialize, Compress, Write]
} else if is_writer {
[Write, Compress, Serialize, Process, Group, Decode, FindBoundaries, Decompress, Read]
} else if is_boundary_early {
[FindBoundaries, Decompress, Decode, Group, Process, Serialize, Compress, Write, Read]
} else if is_boundary_late {
[Group, Decode, FindBoundaries, Process, Decompress, Serialize, Compress, Write, Read]
} else {
let rotation = (thread_id - 1) % 5;
let mut p = [
Decompress,
Decode,
Process,
Serialize,
Compress,
Group,
FindBoundaries,
Write,
Read,
];
p[0..5].rotate_left(rotation);
p
};
Self {
thread_id,
num_threads,
priorities,
backpressure_drain,
backpressure_fill,
active_steps,
active_priorities: [PipelineStep::Read; 9],
num_active: 9,
}
}
}
impl Scheduler for FixedPriorityScheduler {
fn get_priorities(&mut self, bp: BackpressureState) -> &[PipelineStep] {
let source = if bp.output_high {
&self.backpressure_drain
} else if bp.input_low && !bp.read_done {
&self.backpressure_fill
} else {
&self.priorities
};
self.active_priorities = *source;
self.num_active = self.active_steps.filter_in_place(&mut self.active_priorities);
&self.active_priorities[..self.num_active]
}
fn record_outcome(&mut self, _step: PipelineStep, _success: bool, _was_contention: bool) {
}
fn thread_id(&self) -> usize {
self.thread_id
}
fn num_threads(&self) -> usize {
self.num_threads
}
fn active_steps(&self) -> &ActiveSteps {
&self.active_steps
}
}
#[cfg(test)]
mod tests {
use super::*;
fn all() -> ActiveSteps {
ActiveSteps::all()
}
#[test]
fn test_reader_priorities() {
let mut scheduler = FixedPriorityScheduler::new(0, 8, all());
let bp = BackpressureState::default();
let priorities = scheduler.get_priorities(bp);
assert_eq!(priorities[0], PipelineStep::Read);
}
#[test]
fn test_writer_priorities() {
let mut scheduler = FixedPriorityScheduler::new(7, 8, all());
let bp = BackpressureState::default();
let priorities = scheduler.get_priorities(bp);
assert_eq!(priorities[0], PipelineStep::Write);
}
#[test]
fn test_boundary_specialist_early() {
let mut scheduler = FixedPriorityScheduler::new(1, 8, all());
let bp = BackpressureState::default();
let priorities = scheduler.get_priorities(bp);
assert_eq!(priorities[0], PipelineStep::FindBoundaries);
}
#[test]
fn test_boundary_specialist_late() {
let mut scheduler = FixedPriorityScheduler::new(6, 8, all());
let bp = BackpressureState::default();
let priorities = scheduler.get_priorities(bp);
assert_eq!(priorities[0], PipelineStep::Group);
}
#[test]
fn test_backpressure_drain() {
let mut scheduler = FixedPriorityScheduler::new(2, 8, all());
let bp = BackpressureState {
output_high: true,
input_low: false,
read_done: false,
memory_high: false,
memory_drained: true,
};
let priorities = scheduler.get_priorities(bp);
assert_eq!(priorities[0], PipelineStep::Write);
}
#[test]
fn test_backpressure_fill() {
let mut scheduler = FixedPriorityScheduler::new(2, 8, all());
let bp = BackpressureState {
output_high: false,
input_low: true,
read_done: false,
memory_high: false,
memory_drained: true,
};
let priorities = scheduler.get_priorities(bp);
assert_eq!(priorities[0], PipelineStep::Read);
}
#[test]
fn test_backpressure_fill_not_when_read_done() {
let mut scheduler = FixedPriorityScheduler::new(2, 8, all());
let bp = BackpressureState {
output_high: false,
input_low: true,
read_done: true,
memory_high: false,
memory_drained: true,
};
let priorities = scheduler.get_priorities(bp);
assert_ne!(priorities[0], PipelineStep::Read);
}
#[test]
fn test_two_threads() {
let mut s0 = FixedPriorityScheduler::new(0, 2, all());
let mut s1 = FixedPriorityScheduler::new(1, 2, all());
let bp = BackpressureState::default();
assert_eq!(s0.get_priorities(bp)[0], PipelineStep::Read);
assert_eq!(s1.get_priorities(bp)[0], PipelineStep::Write);
}
}