use std::{collections::HashMap, marker::PhantomData};
use crate::packet::SequenceNumber;
use super::{Arranging, ArrangingSystem};
pub struct SequencingSystem<T> {
streams: HashMap<u8, SequencingStream<T>>,
}
impl<T> SequencingSystem<T> {
pub fn new() -> SequencingSystem<T> {
SequencingSystem {
streams: HashMap::with_capacity(32),
}
}
}
impl<T> ArrangingSystem for SequencingSystem<T> {
type Stream = SequencingStream<T>;
fn stream_count(&self) -> usize {
self.streams.len()
}
fn get_or_create_stream(&mut self, stream_id: u8) -> &mut Self::Stream {
self.streams
.entry(stream_id)
.or_insert_with(|| SequencingStream::new(stream_id))
}
}
pub struct SequencingStream<T> {
_stream_id: u8,
top_index: u16,
phantom: PhantomData<T>,
unique_item_identifier: u16,
}
impl<T> SequencingStream<T> {
pub fn new(stream_id: u8) -> SequencingStream<T> {
SequencingStream {
_stream_id: stream_id,
top_index: 0,
phantom: PhantomData,
unique_item_identifier: 0,
}
}
#[cfg(test)]
pub fn stream_id(&self) -> u8 {
self._stream_id
}
pub fn new_item_identifier(&mut self) -> SequenceNumber {
let id = self.unique_item_identifier;
self.unique_item_identifier = self.unique_item_identifier.wrapping_add(1);
id
}
}
fn is_u16_within_half_window_from_start(start: u16, incoming: u16) -> bool {
incoming.wrapping_sub(start) <= u16::max_value() / 2 + 1
}
impl<T> Arranging for SequencingStream<T> {
type ArrangingItem = T;
fn arrange(
&mut self,
incoming_index: u16,
item: Self::ArrangingItem,
) -> Option<Self::ArrangingItem> {
if is_u16_within_half_window_from_start(self.top_index, incoming_index) {
self.top_index = incoming_index;
return Some(item);
}
None
}
}
#[cfg(test)]
mod tests {
use super::{Arranging, ArrangingSystem, SequencingSystem};
#[derive(Debug, PartialEq, Clone)]
struct Packet {
pub sequence: u16,
pub ordering_stream: u8,
}
impl Packet {
fn new(sequence: u16, ordering_stream: u8) -> Packet {
Packet {
sequence,
ordering_stream,
}
}
}
#[test]
fn create_stream() {
let mut system: SequencingSystem<Packet> = SequencingSystem::new();
let stream = system.get_or_create_stream(1);
assert_eq!(stream.stream_id(), 1);
}
#[test]
fn create_existing_stream() {
let mut system: SequencingSystem<Packet> = SequencingSystem::new();
system.get_or_create_stream(1);
let stream = system.get_or_create_stream(1);
assert_eq!(stream.stream_id(), 1);
}
macro_rules! assert_sequence {
( [$( $x:expr ),*], [$( $y:expr),*], $stream_id:expr) => {
{
let before = [$($x,)*];
let after = [$($y,)*];
let mut sequence_system = SequencingSystem::<Packet>::new();
let stream = sequence_system.get_or_create_stream(1);
let sequenced_packets: Vec<_> = std::array::IntoIter::new(before)
.filter_map(|seq| stream.arrange(seq, Packet::new(seq, $stream_id)) .map(|p| p.sequence))
.collect();
assert_eq!(after.to_vec(), sequenced_packets);
}
};
}
#[test]
fn can_sequence() {
assert_sequence!([1, 3, 5, 4, 2], [1, 3, 5], 1);
assert_sequence!([1, 5, 4, 3, 2], [1, 5], 1);
assert_sequence!([5, 3, 4, 2, 1], [5], 1);
assert_sequence!([4, 3, 2, 1, 5], [4, 5], 1);
assert_sequence!([2, 1, 4, 3, 5], [2, 4, 5], 1);
assert_sequence!([5, 2, 1, 4, 3], [5], 1);
assert_sequence!([3, 2, 4, 1, 5], [3, 4, 5], 1);
}
#[test]
fn sequence_on_multiple_streams() {
assert_sequence!([1, 3, 5, 4, 2], [1, 3, 5], 1);
assert_sequence!([1, 5, 4, 3, 2], [1, 5], 2);
assert_sequence!([5, 3, 4, 2, 1], [5], 3);
assert_sequence!([4, 3, 2, 1, 5], [4, 5], 4);
assert_sequence!([2, 1, 4, 3, 5], [2, 4, 5], 5);
assert_sequence!([5, 2, 1, 4, 3], [5], 6);
assert_sequence!([3, 2, 4, 1, 5], [3, 4, 5], 7);
}
}