use std::collections::HashMap;
use crate::packet::SequenceNumber;
use super::{Arranging, ArrangingSystem};
pub struct OrderingSystem<T> {
streams: HashMap<u8, OrderingStream<T>>,
}
impl<T> OrderingSystem<T> {
pub fn new() -> OrderingSystem<T> {
OrderingSystem {
streams: HashMap::with_capacity(32),
}
}
}
impl<'a, T> ArrangingSystem for OrderingSystem<T> {
type Stream = OrderingStream<T>;
fn stream_count(&self) -> usize {
self.streams.len()
}
fn get_or_create_stream(&mut self, stream_id: u8) -> &mut Self::Stream {
self.streams
.entry(stream_id)
.or_insert_with(|| OrderingStream::new(stream_id))
}
}
pub struct OrderingStream<T> {
_stream_id: u8,
storage: HashMap<u16, T>,
expected_index: u16,
unique_item_identifier: u16,
}
impl<T> OrderingStream<T> {
pub fn new(stream_id: u8) -> OrderingStream<T> {
OrderingStream::with_capacity(1024, stream_id)
}
pub fn with_capacity(size: usize, stream_id: u8) -> OrderingStream<T> {
OrderingStream {
storage: HashMap::with_capacity(size),
expected_index: 0,
_stream_id: stream_id,
unique_item_identifier: 0,
}
}
#[cfg(test)]
pub fn stream_id(&self) -> u8 {
self._stream_id
}
#[cfg(test)]
pub fn expected_index(&self) -> u16 {
self.expected_index
}
pub fn new_item_identifier(&mut self) -> SequenceNumber {
let id = self.unique_item_identifier;
self.unique_item_identifier = self.unique_item_identifier.wrapping_add(1);
id
}
pub fn iter_mut(&mut self) -> IterMut<T> {
IterMut {
items: &mut self.storage,
expected_index: &mut self.expected_index,
}
}
}
fn is_u16_within_half_window_from_start(start: u16, incoming: u16) -> bool {
incoming.wrapping_sub(start) <= u16::max_value() / 2 + 1
}
impl<T> Arranging for OrderingStream<T> {
type ArrangingItem = T;
fn arrange(
&mut self,
incoming_offset: u16,
item: Self::ArrangingItem,
) -> Option<Self::ArrangingItem> {
if incoming_offset == self.expected_index {
self.expected_index = self.expected_index.wrapping_add(1);
Some(item)
} else if is_u16_within_half_window_from_start(self.expected_index, incoming_offset) {
self.storage.insert(incoming_offset, item);
None
} else {
None
}
}
}
pub struct IterMut<'a, T> {
items: &'a mut HashMap<u16, T>,
expected_index: &'a mut u16,
}
impl<'a, T> Iterator for IterMut<'a, T> {
type Item = T;
fn next(&mut self) -> Option<<Self as Iterator>::Item> {
match self.items.remove(&self.expected_index) {
None => None,
Some(e) => {
*self.expected_index = self.expected_index.wrapping_add(1);
Some(e)
}
}
}
}
#[cfg(test)]
mod tests {
use super::{is_u16_within_half_window_from_start, Arranging, ArrangingSystem, OrderingSystem};
#[derive(Debug, PartialEq, Clone)]
struct Packet {
pub sequence: u16,
pub ordering_stream: u8,
}
impl Packet {
fn new(sequence: u16, ordering_stream: u8) -> Packet {
Packet {
sequence,
ordering_stream,
}
}
}
#[test]
fn create_stream() {
let mut system: OrderingSystem<Packet> = OrderingSystem::new();
let stream = system.get_or_create_stream(1);
assert_eq!(stream.expected_index(), 0);
assert_eq!(stream.stream_id(), 1);
}
#[test]
fn create_existing_stream() {
let mut system: OrderingSystem<Packet> = OrderingSystem::new();
system.get_or_create_stream(1);
let stream = system.get_or_create_stream(1);
assert_eq!(stream.stream_id(), 1);
}
#[test]
fn packet_wraps_around_offset() {
let mut system: OrderingSystem<()> = OrderingSystem::new();
let stream = system.get_or_create_stream(1);
for idx in 0..=65500 {
assert![stream.arrange(idx, ()).is_some()];
}
assert![stream.arrange(123, ()).is_none()];
for idx in 65501..=65535u16 {
assert![stream.arrange(idx, ()).is_some()];
}
assert![stream.arrange(0, ()).is_some()];
for idx in 1..123 {
assert![stream.arrange(idx, ()).is_some()];
}
assert![stream.iter_mut().next().is_some()];
}
#[test]
fn exactly_half_u16_packet_is_stored() {
let mut system: OrderingSystem<u16> = OrderingSystem::new();
let stream = system.get_or_create_stream(1);
for idx in 0..=32766 {
assert![stream.arrange(idx, idx).is_some()];
}
assert![stream.arrange(32768, 32768).is_none()];
assert![stream.arrange(32767, 32767).is_some()];
assert_eq![Some(32768), stream.iter_mut().next()];
assert_eq![None, stream.iter_mut().next()];
}
#[test]
fn u16_forward_half() {
assert![!is_u16_within_half_window_from_start(0, 65535)];
assert![!is_u16_within_half_window_from_start(0, 32769)];
assert![is_u16_within_half_window_from_start(0, 32768)];
assert![is_u16_within_half_window_from_start(0, 32767)];
assert![is_u16_within_half_window_from_start(32767, 65535)];
assert![!is_u16_within_half_window_from_start(32766, 65535)];
assert![is_u16_within_half_window_from_start(32768, 65535)];
assert![is_u16_within_half_window_from_start(32769, 0)];
}
#[test]
fn can_iterate() {
let mut system: OrderingSystem<Packet> = OrderingSystem::new();
system.get_or_create_stream(1);
let stream = system.get_or_create_stream(1);
let stub_packet0 = Packet::new(0, 1);
let stub_packet1 = Packet::new(1, 1);
let stub_packet2 = Packet::new(2, 1);
let stub_packet3 = Packet::new(3, 1);
let stub_packet4 = Packet::new(4, 1);
{
assert_eq!(
stream.arrange(0, stub_packet0.clone()).unwrap(),
stub_packet0
);
assert![stream.arrange(3, stub_packet3.clone()).is_none()];
assert![stream.arrange(4, stub_packet4.clone()).is_none()];
assert![stream.arrange(2, stub_packet2.clone()).is_none()];
}
{
let mut iterator = stream.iter_mut();
assert_eq!(iterator.next(), None);
}
{
assert_eq!(
stream.arrange(1, stub_packet1.clone()).unwrap(),
stub_packet1
);
}
{
let mut iterator = stream.iter_mut();
assert_eq!(iterator.next().unwrap(), stub_packet2);
assert_eq!(iterator.next().unwrap(), stub_packet3);
assert_eq!(iterator.next().unwrap(), stub_packet4);
}
}
macro_rules! assert_order {
( [$( $x:expr ),*] , [$( $y:expr),*] , $stream_id:expr) => {
{
let before = [$($x,)*];
let after = [$($y,)*];
let mut ordering_system = OrderingSystem::<Packet>::new();
let stream = ordering_system.get_or_create_stream(1);
let ordered_packets : Vec<_> = std::array::IntoIter::new(before)
.filter_map(|seq| stream.arrange(seq, Packet::new(seq, $stream_id))
.map(|p| Some(p).into_iter() .chain(stream.iter_mut())
.map(|p| p.sequence)
.collect::<Vec<_>>()))
.flatten()
.collect();
assert_eq!(after.to_vec(), ordered_packets);
}
};
}
#[test]
fn expect_right_order() {
assert_order!([0, 2, 4, 3, 1], [0, 1, 2, 3, 4], 1);
assert_order!([0, 4, 3, 2, 1], [0, 1, 2, 3, 4], 1);
assert_order!([4, 2, 3, 1, 0], [0, 1, 2, 3, 4], 1);
assert_order!([3, 2, 1, 0, 4], [0, 1, 2, 3, 4], 1);
assert_order!([1, 0, 3, 2, 4], [0, 1, 2, 3, 4], 1);
assert_order!([4, 1, 0, 3, 2], [0, 1, 2, 3, 4], 1);
assert_order!([2, 1, 3, 0, 4], [0, 1, 2, 3, 4], 1);
assert_order!([1, 0, 3, 2, 4], [0, 1, 2, 3, 4], 1);
}
#[test]
fn order_on_multiple_streams() {
assert_order!([0, 2, 4, 3, 1], [0, 1, 2, 3, 4], 1);
assert_order!([0, 4, 3, 2, 1], [0, 1, 2, 3, 4], 2);
assert_order!([4, 2, 3, 1, 0], [0, 1, 2, 3, 4], 3);
assert_order!([3, 2, 1, 0, 4], [0, 1, 2, 3, 4], 4);
assert_order!([1, 0, 3, 2, 4], [0, 1, 2, 3, 4], 5);
assert_order!([4, 1, 0, 3, 2], [0, 1, 2, 3, 4], 6);
assert_order!([2, 1, 3, 0, 4], [0, 1, 2, 3, 4], 7);
assert_order!([1, 0, 3, 2, 4], [0, 1, 2, 3, 4], 8);
}
}