use crate::api::StreamId;
use std::cmp::Ordering;
#[derive(Debug, PartialEq)]
enum SchedulingParameters {
RoundRobin,
WeightedFairQueuing { weight: u16 },
}
#[derive(Debug, PartialEq)]
struct ActiveStreamInfo {
stream_id: StreamId,
parameters: SchedulingParameters,
virtual_start_time: u64,
virtual_finish_time: u64,
bytes_remaining: usize,
}
impl Ord for ActiveStreamInfo {
fn cmp(&self, other: &Self) -> Ordering {
self.virtual_finish_time
.cmp(&other.virtual_finish_time)
.then_with(|| self.stream_id.cmp(&other.stream_id))
}
}
impl PartialOrd for ActiveStreamInfo {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Eq for ActiveStreamInfo {}
impl ActiveStreamInfo {
fn new(stream_id: StreamId, system_virtual_time: u64) -> Self {
Self {
stream_id,
parameters: SchedulingParameters::RoundRobin,
virtual_start_time: system_virtual_time,
virtual_finish_time: 0,
bytes_remaining: 0,
}
}
fn calculate_vt(&self, bytes: usize) -> u64 {
let increment = match self.parameters {
SchedulingParameters::WeightedFairQueuing { weight } => {
(bytes as u64 * 1_000_000) / (weight as u64).max(1)
}
SchedulingParameters::RoundRobin => 1,
};
self.virtual_start_time + increment
}
fn schedule_next_chunk(&mut self, payload_size: usize) {
self.virtual_finish_time = self.calculate_vt(payload_size);
}
fn consume_bytes(&mut self, bytes: usize, max_payload_bytes: usize) -> u64 {
let new_current_vt = self.calculate_vt(bytes);
self.bytes_remaining = self.bytes_remaining.saturating_sub(bytes);
self.virtual_start_time = new_current_vt;
if self.bytes_remaining > 0 {
self.schedule_next_chunk(self.bytes_remaining.min(max_payload_bytes));
}
new_current_vt
}
}
pub struct StreamScheduler {
max_payload_bytes: usize,
current_stream: Option<StreamId>,
system_virtual_time: u64,
active_streams: Vec<ActiveStreamInfo>,
}
impl StreamScheduler {
pub fn new(max_payload_bytes: usize) -> Self {
Self {
max_payload_bytes,
current_stream: None,
system_virtual_time: 0,
active_streams: Vec::new(),
}
}
pub fn set_bytes_remaining(
&mut self,
stream_id: StreamId,
bytes_remaining: usize,
priority: Option<u16>,
) {
if bytes_remaining == 0 {
self.remove_stream(stream_id);
return;
}
let pos = self.active_streams.iter().position(|s| s.stream_id == stream_id);
let active_stream = match pos {
Some(idx) => &mut self.active_streams[idx],
None => {
self.active_streams
.push(ActiveStreamInfo::new(stream_id, self.system_virtual_time));
self.active_streams.last_mut().unwrap()
}
};
active_stream.parameters = priority.map_or(SchedulingParameters::RoundRobin, |weight| {
SchedulingParameters::WeightedFairQueuing { weight }
});
active_stream.bytes_remaining = bytes_remaining;
active_stream
.schedule_next_chunk(active_stream.bytes_remaining.min(self.max_payload_bytes));
}
pub fn peek(&self, max_size: usize) -> Option<(StreamId, usize)> {
let active_stream = self
.current_stream
.and_then(|stream_id| self.active_streams.iter().find(|s| s.stream_id == stream_id))
.or_else(|| self.active_streams.iter().min())?;
Some((active_stream.stream_id, active_stream.bytes_remaining.min(max_size)))
}
pub fn accept(&mut self, stream_id: StreamId, bytes: usize) {
let active_stream = self
.active_streams
.iter_mut()
.find(|s| s.stream_id == stream_id)
.expect("accept called on untracked stream_id");
self.current_stream = Some(stream_id);
self.system_virtual_time = active_stream.consume_bytes(bytes, self.max_payload_bytes);
if active_stream.bytes_remaining == 0 {
self.remove_stream(stream_id);
} else {
if matches!(active_stream.parameters, SchedulingParameters::WeightedFairQueuing { .. })
{
self.current_stream = None;
}
}
}
fn remove_stream(&mut self, stream_id: StreamId) {
if let Some(pos) = self.active_streams.iter().position(|s| s.stream_id == stream_id) {
self.active_streams.swap_remove(pos);
}
if self.current_stream == Some(stream_id) {
self.current_stream = None;
}
if self.active_streams.is_empty() {
self.system_virtual_time = 0;
}
}
}
#[cfg(test)]
mod tests {
use super::*;
const MTU: usize = 1280;
struct TestStreamConfig {
priority: Option<u16>,
packet_size: usize,
}
impl TestStreamConfig {
fn new(priority: Option<u16>, packet_size: usize) -> Self {
Self { priority, packet_size }
}
}
fn send_packets(
q: &mut StreamScheduler,
stream_configs: &[TestStreamConfig],
packet_count: usize,
) -> Vec<usize> {
let mut packet_counts: Vec<usize> = vec![0; stream_configs.len()];
for (idx, config) in stream_configs.iter().enumerate() {
q.set_bytes_remaining(StreamId(idx as u16), config.packet_size, config.priority);
}
for _ in 0..packet_count {
let c = produce(q, MTU).unwrap();
let idx = c.0.0 as usize;
packet_counts[idx] += 1;
let config = &stream_configs[idx];
q.set_bytes_remaining(c.0, config.packet_size, config.priority);
}
packet_counts
}
fn produce(s: &mut StreamScheduler, max_size: usize) -> Option<(StreamId, usize)> {
s.peek(max_size).map(|(stream_id, bytes)| {
s.accept(stream_id, bytes);
(stream_id, bytes)
})
}
#[test]
fn has_no_active_streams() {
let mut s = StreamScheduler::new(MTU);
assert!(produce(&mut s, MTU).is_none());
}
#[test]
fn can_produce_from_single_stream() {
let mut s = StreamScheduler::new(MTU);
s.set_bytes_remaining(StreamId(1), 10, None);
assert_eq!(produce(&mut s, MTU), Some((StreamId(1), 10)));
}
#[test]
fn will_round_robin_between_streams() {
let mut s = StreamScheduler::new(MTU);
s.set_bytes_remaining(StreamId(1), 10, None);
s.set_bytes_remaining(StreamId(2), 10, None);
s.set_bytes_remaining(StreamId(3), 10, None);
assert_eq!(produce(&mut s, MTU), Some((StreamId(1), 10)));
s.set_bytes_remaining(StreamId(1), 10, None);
assert_eq!(produce(&mut s, MTU), Some((StreamId(2), 10)));
s.set_bytes_remaining(StreamId(2), 10, None);
assert_eq!(produce(&mut s, MTU), Some((StreamId(3), 10)));
s.set_bytes_remaining(StreamId(3), 10, None);
assert_eq!(produce(&mut s, MTU), Some((StreamId(1), 10)));
s.set_bytes_remaining(StreamId(1), 10, None);
assert_eq!(produce(&mut s, MTU), Some((StreamId(2), 10)));
s.set_bytes_remaining(StreamId(2), 10, None);
assert_eq!(produce(&mut s, MTU), Some((StreamId(3), 10)));
s.set_bytes_remaining(StreamId(3), 10, None);
assert_eq!(produce(&mut s, MTU), Some((StreamId(1), 10)));
assert_eq!(produce(&mut s, MTU), Some((StreamId(2), 10)));
assert_eq!(produce(&mut s, MTU), Some((StreamId(3), 10)));
assert!(produce(&mut s, MTU).is_none());
}
#[test]
fn will_round_robin_only_when_finished_producing_chunk() {
let mut s = StreamScheduler::new(MTU);
s.set_bytes_remaining(StreamId(1), MTU, None);
s.set_bytes_remaining(StreamId(2), MTU, None);
assert_eq!(produce(&mut s, MTU), Some((StreamId(1), MTU)));
s.set_bytes_remaining(StreamId(1), 3 * MTU, None);
assert_eq!(produce(&mut s, MTU), Some((StreamId(2), MTU)));
s.set_bytes_remaining(StreamId(2), MTU, None);
assert_eq!(produce(&mut s, MTU), Some((StreamId(1), MTU)));
assert_eq!(produce(&mut s, MTU), Some((StreamId(1), MTU)));
assert_eq!(produce(&mut s, MTU), Some((StreamId(1), MTU)));
s.set_bytes_remaining(StreamId(1), MTU, None);
assert_eq!(produce(&mut s, MTU), Some((StreamId(2), MTU)));
assert_eq!(produce(&mut s, MTU), Some((StreamId(1), MTU)));
assert!(produce(&mut s, MTU).is_none());
}
#[test]
fn streams_can_be_made_inactive() {
let mut s = StreamScheduler::new(MTU);
s.set_bytes_remaining(StreamId(1), MTU, None);
assert_eq!(produce(&mut s, MTU), Some((StreamId(1), MTU)));
s.set_bytes_remaining(StreamId(1), MTU, None);
s.set_bytes_remaining(StreamId(1), 0, None);
assert!(produce(&mut s, MTU).is_none());
}
#[test]
fn single_stream_can_be_resumed() {
let mut s = StreamScheduler::new(MTU);
s.set_bytes_remaining(StreamId(1), MTU, None);
assert_eq!(produce(&mut s, MTU), Some((StreamId(1), MTU)));
s.set_bytes_remaining(StreamId(1), MTU, None);
s.set_bytes_remaining(StreamId(1), 0, None);
assert!(produce(&mut s, MTU).is_none());
s.set_bytes_remaining(StreamId(1), MTU, None);
assert_eq!(produce(&mut s, MTU), Some((StreamId(1), MTU)));
assert!(produce(&mut s, MTU).is_none());
}
#[test]
fn will_round_robin_with_paused_stream() {
let mut s = StreamScheduler::new(MTU);
s.set_bytes_remaining(StreamId(1), MTU, None);
s.set_bytes_remaining(StreamId(2), MTU, None);
assert_eq!(produce(&mut s, MTU), Some((StreamId(1), MTU)));
s.set_bytes_remaining(StreamId(1), MTU, None);
assert_eq!(produce(&mut s, MTU), Some((StreamId(2), MTU)));
s.set_bytes_remaining(StreamId(2), MTU, None);
s.set_bytes_remaining(StreamId(1), 0, None);
assert_eq!(produce(&mut s, MTU), Some((StreamId(2), MTU)));
s.set_bytes_remaining(StreamId(2), MTU, None);
s.set_bytes_remaining(StreamId(1), MTU, None);
assert_eq!(produce(&mut s, MTU), Some((StreamId(1), MTU)));
s.set_bytes_remaining(StreamId(1), MTU, None);
assert_eq!(produce(&mut s, MTU), Some((StreamId(2), MTU)));
}
#[test]
fn will_distribute_round_robin_packets_evenly_two_streams() {
let mut s = StreamScheduler::new(MTU);
let counts = send_packets(
&mut s,
&[TestStreamConfig::new(Some(1), 10), TestStreamConfig::new(Some(1), 10)],
10,
);
assert_eq!(counts, &[5, 5]);
}
#[test]
fn will_do_fair_queuing_with_same_size_same_priority() {
let mut s = StreamScheduler::new(MTU);
s.set_bytes_remaining(StreamId(1), 30, Some(2));
s.set_bytes_remaining(StreamId(2), 30, Some(2));
assert_eq!(produce(&mut s, MTU), Some((StreamId(1), 30)));
s.set_bytes_remaining(StreamId(1), 30, Some(2));
assert_eq!(produce(&mut s, MTU), Some((StreamId(2), 30)));
s.set_bytes_remaining(StreamId(2), 30, Some(2));
assert_eq!(produce(&mut s, MTU), Some((StreamId(1), 30)));
s.set_bytes_remaining(StreamId(1), 30, Some(2));
assert_eq!(produce(&mut s, MTU), Some((StreamId(2), 30)));
s.set_bytes_remaining(StreamId(2), 30, Some(2));
assert_eq!(produce(&mut s, MTU), Some((StreamId(1), 30)));
assert_eq!(produce(&mut s, MTU), Some((StreamId(2), 30)));
assert_eq!(produce(&mut s, MTU), None);
}
#[test]
fn will_do_fair_queuing_with_less_produced_than_available() {
let mut s = StreamScheduler::new(60);
s.set_bytes_remaining(StreamId(1), 60, Some(2));
s.set_bytes_remaining(StreamId(2), 60, Some(2));
assert_eq!(produce(&mut s, 30), Some((StreamId(1), 30)));
s.set_bytes_remaining(StreamId(1), 60, Some(2));
assert_eq!(produce(&mut s, 30), Some((StreamId(2), 30)));
s.set_bytes_remaining(StreamId(2), 60, Some(2));
assert_eq!(produce(&mut s, 30), Some((StreamId(1), 30)));
s.set_bytes_remaining(StreamId(1), 30, Some(2));
assert_eq!(produce(&mut s, 30), Some((StreamId(1), 30)));
s.set_bytes_remaining(StreamId(1), 30, Some(2));
assert_eq!(produce(&mut s, 30), Some((StreamId(2), 30)));
s.set_bytes_remaining(StreamId(2), 60, Some(2));
assert_eq!(produce(&mut s, 30), Some((StreamId(1), 30)));
assert_eq!(produce(&mut s, 30), Some((StreamId(2), 30)));
s.set_bytes_remaining(StreamId(2), 30, Some(2));
assert_eq!(produce(&mut s, 30), Some((StreamId(2), 30)));
assert_eq!(produce(&mut s, 30), None);
}
#[test]
fn will_do_fair_queuing_with_same_priority() {
let mut s = StreamScheduler::new(MTU);
s.set_bytes_remaining(StreamId(1), 30, Some(2));
s.set_bytes_remaining(StreamId(2), 70, Some(2));
assert_eq!(produce(&mut s, MTU), Some((StreamId(1), 30)));
s.set_bytes_remaining(StreamId(1), 30, Some(2));
assert_eq!(produce(&mut s, MTU), Some((StreamId(1), 30)));
s.set_bytes_remaining(StreamId(1), 30, Some(2));
assert_eq!(produce(&mut s, MTU), Some((StreamId(2), 70)));
s.set_bytes_remaining(StreamId(2), 70, Some(2));
assert_eq!(produce(&mut s, MTU), Some((StreamId(1), 30)));
assert_eq!(produce(&mut s, MTU), Some((StreamId(2), 70)));
s.set_bytes_remaining(StreamId(2), 70, Some(2));
assert_eq!(produce(&mut s, MTU), Some((StreamId(2), 70)));
assert_eq!(produce(&mut s, MTU), None);
}
#[test]
fn will_do_weighted_fair_queuing_same_size_different_priority() {
let mut s = StreamScheduler::new(MTU);
const SIZE: usize = 4;
const PRIORITY_1: Option<u16> = Some(125);
const PRIORITY_2: Option<u16> = Some(200);
const PRIORITY_3: Option<u16> = Some(500);
s.set_bytes_remaining(StreamId(1), SIZE, PRIORITY_1);
s.set_bytes_remaining(StreamId(2), SIZE, PRIORITY_2);
s.set_bytes_remaining(StreamId(3), SIZE, PRIORITY_3);
assert_eq!(produce(&mut s, MTU), Some((StreamId(3), SIZE)));
s.set_bytes_remaining(StreamId(3), SIZE, PRIORITY_3);
assert_eq!(produce(&mut s, MTU), Some((StreamId(3), SIZE)));
s.set_bytes_remaining(StreamId(3), SIZE, PRIORITY_3);
assert_eq!(produce(&mut s, MTU), Some((StreamId(2), SIZE)));
s.set_bytes_remaining(StreamId(2), SIZE, PRIORITY_2);
assert_eq!(produce(&mut s, MTU), Some((StreamId(3), SIZE)));
assert_eq!(produce(&mut s, MTU), Some((StreamId(1), SIZE)));
s.set_bytes_remaining(StreamId(1), SIZE, PRIORITY_1);
assert_eq!(produce(&mut s, MTU), Some((StreamId(2), SIZE)));
s.set_bytes_remaining(StreamId(2), SIZE, PRIORITY_2);
assert_eq!(produce(&mut s, MTU), Some((StreamId(2), SIZE)));
assert_eq!(produce(&mut s, MTU), Some((StreamId(1), SIZE)));
s.set_bytes_remaining(StreamId(1), SIZE, PRIORITY_1);
assert_eq!(produce(&mut s, MTU), Some((StreamId(1), SIZE)));
assert_eq!(produce(&mut s, MTU), None);
}
#[test]
fn will_do_weighted_fair_queuing_different_size_and_priority() {
let mut s = StreamScheduler::new(MTU);
const PRIORITY_1: Option<u16> = Some(125);
const PRIORITY_2: Option<u16> = Some(200);
const PRIORITY_3: Option<u16> = Some(500);
s.set_bytes_remaining(StreamId(1), 50, PRIORITY_1);
s.set_bytes_remaining(StreamId(2), 50, PRIORITY_2);
s.set_bytes_remaining(StreamId(3), 20, PRIORITY_3);
assert_eq!(produce(&mut s, MTU), Some((StreamId(3), 20)));
s.set_bytes_remaining(StreamId(3), 50, PRIORITY_3);
assert_eq!(produce(&mut s, MTU), Some((StreamId(3), 50)));
s.set_bytes_remaining(StreamId(3), 70, PRIORITY_3);
assert_eq!(produce(&mut s, MTU), Some((StreamId(2), 50)));
s.set_bytes_remaining(StreamId(2), 70, PRIORITY_2);
assert_eq!(produce(&mut s, MTU), Some((StreamId(3), 70)));
assert_eq!(produce(&mut s, MTU), Some((StreamId(1), 50)));
s.set_bytes_remaining(StreamId(1), 20, PRIORITY_1);
assert_eq!(produce(&mut s, MTU), Some((StreamId(1), 20)));
s.set_bytes_remaining(StreamId(1), 70, PRIORITY_1);
assert_eq!(produce(&mut s, MTU), Some((StreamId(2), 70)));
s.set_bytes_remaining(StreamId(2), 20, PRIORITY_2);
assert_eq!(produce(&mut s, MTU), Some((StreamId(2), 20)));
assert_eq!(produce(&mut s, MTU), Some((StreamId(1), 70)));
assert_eq!(produce(&mut s, MTU), None);
}
#[test]
fn will_distribute_wfq_packets_in_two_streams_by_priority() {
let mut s = StreamScheduler::new(MTU);
let counts = send_packets(
&mut s,
&[TestStreamConfig::new(Some(100), 10), TestStreamConfig::new(Some(200), 10)],
15,
);
assert_eq!(counts, &[5, 10]);
}
#[test]
fn will_distribute_wfq_packets_in_four_streams_by_priority() {
let mut s = StreamScheduler::new(MTU);
let counts = send_packets(
&mut s,
&[
TestStreamConfig::new(Some(100), 10),
TestStreamConfig::new(Some(200), 10),
TestStreamConfig::new(Some(300), 10),
TestStreamConfig::new(Some(400), 10),
],
50,
);
assert_eq!(counts, &[5, 10, 15, 20]);
}
#[test]
fn will_distribute_from_two_streams_fairly() {
let mut s = StreamScheduler::new(MTU);
let counts = send_packets(
&mut s,
&[TestStreamConfig::new(Some(100), 8), TestStreamConfig::new(Some(400), 4)],
90,
);
assert_eq!(counts, &[10, 80]);
}
#[test]
fn will_distribute_from_four_streams_fairly() {
let mut s = StreamScheduler::new(MTU);
let counts = send_packets(
&mut s,
&[
TestStreamConfig::new(Some(100), 10),
TestStreamConfig::new(Some(200), 10),
TestStreamConfig::new(Some(200), 20),
TestStreamConfig::new(Some(400), 30),
],
80,
);
assert_eq!(counts, &[15, 30, 15, 20]);
}
#[test]
fn send_large_message_with_small_mtu() {
let mut s = StreamScheduler::new( 100);
s.set_bytes_remaining(StreamId(1), 100, Some(1));
s.set_bytes_remaining(StreamId(2), 100, Some(1));
assert_eq!(produce(&mut s, MTU), Some((StreamId(1), 100)));
s.set_bytes_remaining(StreamId(1), 100, Some(1));
assert_eq!(produce(&mut s, MTU), Some((StreamId(2), 100)));
s.set_bytes_remaining(StreamId(2), 50, Some(1));
assert_eq!(produce(&mut s, MTU), Some((StreamId(2), 50)));
assert_eq!(produce(&mut s, MTU), Some((StreamId(1), 100)));
assert_eq!(produce(&mut s, MTU), None);
}
#[test]
fn send_large_message_with_large_mtu() {
let mut s = StreamScheduler::new( 200);
s.set_bytes_remaining(StreamId(1), 200, Some(1));
s.set_bytes_remaining(StreamId(2), 150, Some(1));
assert_eq!(produce(&mut s, MTU), Some((StreamId(2), 150)));
assert_eq!(produce(&mut s, MTU), Some((StreamId(1), 200)));
assert_eq!(produce(&mut s, MTU), None);
}
}