use crate::api::Message;
use crate::api::StreamId;
use crate::api::handover::HandoverOrderedStream;
use crate::api::handover::HandoverReadiness;
use crate::api::handover::HandoverUnorderedStream;
use crate::api::handover::SocketHandoverState;
use crate::packet::SkippedStream;
use crate::packet::data::Data;
use crate::rx::IntervalList;
use crate::rx::ReassemblyKey;
use crate::rx::reassembly_streams::ReassemblyStreams;
use crate::types::Fsn;
use crate::types::Mid;
use crate::types::StreamKey;
use crate::types::Tsn;
use std::collections::HashMap;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub struct InterleavedKey {
pub mid: Mid, pub fsn: Fsn, }
impl ReassemblyKey for InterleavedKey {
fn next(&self) -> Self {
InterleavedKey { mid: self.mid, fsn: self.fsn + 1 }
}
}
pub struct OrderedStream {
stream_id: StreamId,
intervals: IntervalList<InterleavedKey>,
next_mid: Mid,
}
impl OrderedStream {
fn new(stream_id: StreamId, next_mid: Mid) -> Self {
Self { stream_id, intervals: IntervalList::default(), next_mid }
}
fn try_assemble_next(&mut self, on_reassembled: &mut dyn FnMut(Message)) -> usize {
let mut assembled_bytes = 0;
while let Some(interval) =
self.intervals.pop_front_if_complete_and(|i| i.start.mid == self.next_mid)
{
let stream_id = self.stream_id;
let ppid = interval.ppid;
let payload = interval.collect_payload();
assembled_bytes += payload.len();
on_reassembled(Message::new(stream_id, ppid, payload));
self.next_mid += 1;
}
assembled_bytes
}
fn add(&mut self, data: Data, on_reassembled: &mut dyn FnMut(Message)) -> isize {
if data.mid < self.next_mid {
return 0;
}
if data.mid == self.next_mid && data.is_beginning && data.is_end {
on_reassembled(Message::new(self.stream_id, data.ppid, data.payload));
self.next_mid += 1;
let assembled = self.try_assemble_next(on_reassembled);
return -(assembled as isize);
}
let key = InterleavedKey { mid: data.mid, fsn: data.fsn };
let queued_bytes = data.payload.len() as isize;
self.intervals.add(key, data);
let assembled = self.try_assemble_next(on_reassembled);
queued_bytes - (assembled as isize)
}
}
pub struct UnorderedStream {
stream_id: StreamId,
intervals: IntervalList<InterleavedKey>,
}
impl UnorderedStream {
fn new(stream_id: StreamId) -> Self {
Self { stream_id, intervals: IntervalList::default() }
}
fn add(&mut self, data: Data, on_reassembled: &mut dyn FnMut(Message)) -> isize {
if data.is_beginning && data.is_end {
on_reassembled(Message::new(data.stream_key.id(), data.ppid, data.payload));
return 0;
}
let key = InterleavedKey { mid: data.mid, fsn: data.fsn };
let queued_bytes = data.payload.len() as isize;
let idx = self.intervals.add(key, data);
if let Some(interval) = self.intervals.pop_if_complete(idx) {
let stream_id = self.stream_id;
let ppid = interval.ppid;
let payload = interval.collect_payload();
let total_payload_len = payload.len();
on_reassembled(Message::new(stream_id, ppid, payload));
queued_bytes - (total_payload_len as isize)
} else {
queued_bytes
}
}
}
pub struct InterleavedReassemblyStreams {
ordered: HashMap<StreamId, OrderedStream>,
unordered: HashMap<StreamId, UnorderedStream>,
}
impl InterleavedReassemblyStreams {
pub fn new() -> Self {
Self { ordered: HashMap::new(), unordered: HashMap::new() }
}
}
impl ReassemblyStreams for InterleavedReassemblyStreams {
fn add(&mut self, _tsn: Tsn, data: Data, on_reassembled: &mut dyn FnMut(Message)) -> isize {
match data.stream_key {
StreamKey::Ordered(stream_id) => self
.ordered
.entry(stream_id)
.or_insert_with(|| OrderedStream::new(stream_id, Mid(0)))
.add(data, on_reassembled),
StreamKey::Unordered(stream_id) => self
.unordered
.entry(stream_id)
.or_insert_with(|| UnorderedStream::new(stream_id))
.add(data, on_reassembled),
}
}
fn handle_forward_tsn(
&mut self,
_new_cumulative_ack: Tsn,
skipped_streams: &[SkippedStream],
on_reassembled: &mut dyn FnMut(Message),
) -> usize {
let mut released_bytes = 0;
for skipped_stream in skipped_streams {
if let SkippedStream::IForwardTsn(stream_key, mid) = skipped_stream {
match stream_key {
StreamKey::Ordered(stream_id) => {
let stream = self
.ordered
.entry(*stream_id)
.or_insert_with(|| OrderedStream::new(*stream_id, Mid(0)));
released_bytes +=
stream.intervals.retain(|interval| interval.start.mid > *mid);
if stream.next_mid <= *mid {
stream.next_mid = *mid + 1;
}
released_bytes += stream.try_assemble_next(on_reassembled);
}
StreamKey::Unordered(stream_id) => {
let stream = self
.unordered
.entry(*stream_id)
.or_insert_with(|| UnorderedStream::new(*stream_id));
released_bytes +=
stream.intervals.retain(|interval| interval.start.mid > *mid);
}
}
}
}
released_bytes
}
fn reset_streams(&mut self, streams: &[StreamId]) {
if streams.is_empty() {
for stream in self.ordered.values_mut() {
stream.next_mid = Mid(0);
}
} else {
for stream_id in streams {
if let Some(stream) = self.ordered.get_mut(stream_id) {
stream.next_mid = Mid(0);
}
}
}
}
fn get_handover_readiness(&self) -> HandoverReadiness {
let has_ordered_chunks = self.ordered.values().any(|s| !s.intervals.is_empty());
let has_unordered_chunks = self.unordered.values().any(|s| !s.intervals.is_empty());
HandoverReadiness::STREAM_HAS_UNASSEMBLED_CHUNKS
& (has_ordered_chunks | has_unordered_chunks)
}
fn add_to_handover_state(&self, state: &mut SocketHandoverState) {
for (stream_id, stream) in &self.ordered {
state
.rx
.ordered_streams
.push(HandoverOrderedStream { id: stream_id.0, next_ssn: stream.next_mid.0 });
}
for stream_id in self.unordered.keys() {
state.rx.unordered_streams.push(HandoverUnorderedStream { id: stream_id.0 });
}
}
fn restore_from_state(&mut self, state: &SocketHandoverState) {
for stream in &state.rx.ordered_streams {
self.ordered.insert(
StreamId(stream.id),
OrderedStream::new(StreamId(stream.id), Mid(stream.next_ssn)),
);
}
for stream in &state.rx.unordered_streams {
self.unordered.insert(StreamId(stream.id), UnorderedStream::new(StreamId(stream.id)));
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::testing::data_sequencer::DataSequencer;
#[test]
fn add_unordered_message_returns_correct_size() {
let mut s = InterleavedReassemblyStreams::new();
let mut seq = DataSequencer::new(StreamId(1));
assert_eq!(s.add(Tsn(1), seq.unordered("a", "B"), &mut |_| {}), 1);
assert_eq!(s.add(Tsn(2), seq.unordered("bcd", ""), &mut |_| {}), 3);
assert_eq!(s.add(Tsn(3), seq.unordered("ef", ""), &mut |_| {}), 2);
assert_eq!(s.add(Tsn(4), seq.unordered("g", "E"), &mut |_| {}), -6);
}
#[test]
fn add_unordered_message_out_of_order_returns_correct_size() {
let mut s = InterleavedReassemblyStreams::new();
let mut seq = DataSequencer::new(StreamId(1));
let mut messages = Vec::new();
let c1 = seq.unordered("a", "B");
let c2 = seq.unordered("bcd", "");
let c3 = seq.unordered("ef", "");
let c4 = seq.unordered("g", "E");
assert_eq!(s.add(Tsn(1), c1, &mut |m| messages.push(m)), 1);
assert_eq!(s.add(Tsn(2), c2, &mut |m| messages.push(m)), 3);
assert_eq!(s.add(Tsn(4), c4, &mut |m| messages.push(m)), 1);
assert!(messages.is_empty());
assert_eq!(s.add(Tsn(3), c3, &mut |m| messages.push(m)), -5);
assert_eq!(messages.len(), 1);
}
#[test]
fn add_simple_ordered_message_returns_correct_size() {
let mut s = InterleavedReassemblyStreams::new();
let mut seq = DataSequencer::new(StreamId(1));
let mut messages = Vec::new();
let c1 = seq.ordered("a", "B");
let c2 = seq.ordered("bcd", "");
let c3 = seq.ordered("ef", "");
let c4 = seq.ordered("g", "E");
assert_eq!(s.add(Tsn(1), c1, &mut |m| messages.push(m)), 1);
assert_eq!(s.add(Tsn(2), c2, &mut |m| messages.push(m)), 3);
assert_eq!(s.add(Tsn(3), c3, &mut |m| messages.push(m)), 2);
assert_eq!(s.add(Tsn(4), c4, &mut |m| messages.push(m)), -6);
assert_eq!(messages.len(), 1);
}
#[test]
fn add_more_complex_ordered_message_returns_correct_size() {
let mut s = InterleavedReassemblyStreams::new();
let mut seq = DataSequencer::new(StreamId(1));
let mut messages = Vec::new();
let c11 = seq.ordered("a", "B");
let c12 = seq.ordered("bcd", "");
let c13 = seq.ordered("ef", "");
let c14 = seq.ordered("g", "E");
let c21 = seq.ordered("h", "BE");
let c31 = seq.ordered("ij", "B");
let c32 = seq.ordered("k", "E");
assert_eq!(s.add(Tsn(1), c11, &mut |m| messages.push(m)), 1);
assert_eq!(s.add(Tsn(3), c13, &mut |m| messages.push(m)), 2);
assert_eq!(s.add(Tsn(4), c14, &mut |m| messages.push(m)), 1);
assert_eq!(s.add(Tsn(5), c21, &mut |m| messages.push(m)), 1);
assert_eq!(s.add(Tsn(6), c31, &mut |m| messages.push(m)), 2);
assert_eq!(s.add(Tsn(7), c32, &mut |m| messages.push(m)), 1);
assert!(messages.is_empty());
assert_eq!(s.add(Tsn(2), c12, &mut |m| messages.push(m)), -8);
assert_eq!(messages.len(), 3);
}
#[test]
fn delete_unordered_message_returns_correct_size() {
let mut s = InterleavedReassemblyStreams::new();
let mut seq = DataSequencer::new(StreamId(1));
let mut messages = Vec::new();
let c1 = seq.unordered("a", "B");
let c2 = seq.unordered("bcd", "");
let c3 = seq.unordered("ef", "");
assert_eq!(s.add(Tsn(1), c1, &mut |m| messages.push(m)), 1);
assert_eq!(s.add(Tsn(2), c2, &mut |m| messages.push(m)), 3);
assert_eq!(s.add(Tsn(3), c3, &mut |m| messages.push(m)), 2);
assert_eq!(
s.handle_forward_tsn(
Tsn(3),
&[SkippedStream::IForwardTsn(StreamKey::Unordered(StreamId(1)), Mid(0))],
&mut |m| messages.push(m)
),
6
);
}
#[test]
fn delete_simple_ordered_message_returns_correct_size() {
let mut s = InterleavedReassemblyStreams::new();
let mut seq = DataSequencer::new(StreamId(1));
let mut messages = Vec::new();
let c1 = seq.ordered("a", "B");
let c2 = seq.ordered("bcd", "");
let c3 = seq.ordered("ef", "");
assert_eq!(s.add(Tsn(1), c1, &mut |m| messages.push(m)), 1);
assert_eq!(s.add(Tsn(2), c2, &mut |m| messages.push(m)), 3);
assert_eq!(s.add(Tsn(3), c3, &mut |m| messages.push(m)), 2);
assert_eq!(
s.handle_forward_tsn(
Tsn(3),
&[SkippedStream::IForwardTsn(StreamKey::Ordered(StreamId(1)), Mid(0))],
&mut |m| messages.push(m)
),
6
);
}
#[test]
fn delete_many_ordered_messages_returns_correct_size() {
let mut s = InterleavedReassemblyStreams::new();
let mut seq = DataSequencer::new(StreamId(1));
let mut messages = Vec::new();
let c1 = seq.ordered("a", "B");
seq.ordered("bcd", ""); let c3 = seq.ordered("ef", "");
let c4 = seq.ordered("g", "E");
let c5 = seq.ordered("h", "BE");
let c6 = seq.ordered("ij", "B");
let c7 = seq.ordered("k", "E");
assert_eq!(s.add(Tsn(1), c1, &mut |m| messages.push(m)), 1);
assert_eq!(s.add(Tsn(3), c3, &mut |m| messages.push(m)), 2);
assert_eq!(s.add(Tsn(4), c4, &mut |m| messages.push(m)), 1);
assert_eq!(s.add(Tsn(5), c5, &mut |m| messages.push(m)), 1);
assert_eq!(s.add(Tsn(6), c6, &mut |m| messages.push(m)), 2);
assert_eq!(s.add(Tsn(7), c7, &mut |m| messages.push(m)), 1);
assert_eq!(
s.handle_forward_tsn(
Tsn(8),
&[SkippedStream::IForwardTsn(StreamKey::Ordered(StreamId(1)), Mid(2))],
&mut |m| messages.push(m)
),
8
);
}
#[test]
fn delete_ordered_message_delives_two_returns_correct_size() {
let mut s = InterleavedReassemblyStreams::new();
let mut seq = DataSequencer::new(StreamId(1));
let mut messages = Vec::new();
let c1 = seq.ordered("a", "B");
seq.ordered("bcd", ""); let c3 = seq.ordered("ef", "");
let c4 = seq.ordered("g", "E");
let c5 = seq.ordered("h", "BE");
let c6 = seq.ordered("ij", "B");
let c7 = seq.ordered("k", "E");
assert_eq!(s.add(Tsn(1), c1, &mut |m| messages.push(m)), 1);
assert_eq!(s.add(Tsn(3), c3, &mut |m| messages.push(m)), 2);
assert_eq!(s.add(Tsn(4), c4, &mut |m| messages.push(m)), 1);
assert_eq!(s.add(Tsn(5), c5, &mut |m| messages.push(m)), 1);
assert_eq!(s.add(Tsn(6), c6, &mut |m| messages.push(m)), 2);
assert_eq!(s.add(Tsn(7), c7, &mut |m| messages.push(m)), 1);
assert_eq!(
s.handle_forward_tsn(
Tsn(8),
&[SkippedStream::IForwardTsn(StreamKey::Ordered(StreamId(1)), Mid(0))],
&mut |m| messages.push(m)
),
8
);
assert_eq!(messages.len(), 2);
}
#[test]
fn can_delete_first_ordered_message() {
let mut s = InterleavedReassemblyStreams::new();
let mut seq = DataSequencer::new(StreamId(1));
let mut messages = Vec::new();
seq.ordered("abc", "BE"); let c2 = seq.ordered("def", "BE");
assert_eq!(
s.handle_forward_tsn(
Tsn(1),
&[SkippedStream::IForwardTsn(StreamKey::Ordered(StreamId(1)), Mid(0))],
&mut |m| messages.push(m)
),
0
);
assert_eq!(s.add(Tsn(2), c2, &mut |m| messages.push(m)), 0);
assert_eq!(messages.len(), 1);
}
#[test]
fn can_reassemble_fast_path_unordered() {
let mut s = InterleavedReassemblyStreams::new();
let mut seq = DataSequencer::new(StreamId(1));
let mut messages = Vec::new();
let data1 = seq.unordered("a", "BE");
let data2 = seq.unordered("b", "BE");
let data3 = seq.unordered("c", "BE");
let data4 = seq.unordered("d", "BE");
assert_eq!(s.add(Tsn(1), data1, &mut |m| messages.push(m)), 0);
assert_eq!(messages.len(), 1);
assert_eq!(s.add(Tsn(3), data3, &mut |m| messages.push(m)), 0);
assert_eq!(messages.len(), 2);
assert_eq!(s.add(Tsn(2), data2, &mut |m| messages.push(m)), 0);
assert_eq!(messages.len(), 3);
assert_eq!(s.add(Tsn(4), data4, &mut |m| messages.push(m)), 0);
assert_eq!(messages.len(), 4);
}
#[test]
fn can_reassemble_fast_path_ordered() {
let mut s = InterleavedReassemblyStreams::new();
let mut seq = DataSequencer::new(StreamId(1));
let mut messages = Vec::new();
let data1 = seq.ordered("a", "BE");
let data2 = seq.ordered("b", "BE");
let data3 = seq.ordered("c", "BE");
let data4 = seq.ordered("d", "BE");
assert_eq!(s.add(Tsn(1), data1, &mut |m| messages.push(m)), 0);
assert_eq!(messages.len(), 1);
assert_eq!(s.add(Tsn(3), data3, &mut |m| messages.push(m)), 1);
assert_eq!(messages.len(), 1);
assert_eq!(s.add(Tsn(2), data2, &mut |m| messages.push(m)), -1);
assert_eq!(messages.len(), 3);
assert_eq!(s.add(Tsn(4), data4, &mut |m| messages.push(m)), 0);
assert_eq!(messages.len(), 4);
}
#[test]
fn can_handover_ordered_streams() {
let mut streams1 = InterleavedReassemblyStreams::new();
let mut seq = DataSequencer::new(StreamId(1));
assert_eq!(streams1.add(Tsn(1), seq.ordered("a", "B"), &mut |_| {}), 1);
assert!(
streams1
.get_handover_readiness()
.contains(HandoverReadiness::STREAM_HAS_UNASSEMBLED_CHUNKS)
);
assert_eq!(streams1.add(Tsn(2), seq.ordered("bcd", "E"), &mut |_| {}), -1);
assert!(streams1.get_handover_readiness().is_ready());
let mut state = SocketHandoverState::default();
streams1.add_to_handover_state(&mut state);
let mut streams2 = InterleavedReassemblyStreams::new();
let mut messages = Vec::new();
streams2.restore_from_state(&state);
let data = seq.ordered("efgh", "BE");
assert_eq!(data.mid, Mid(1));
assert_eq!(streams2.add(Tsn(3), data, &mut |m| messages.push(m)), 0);
assert_eq!(messages.len(), 1);
assert_eq!(messages[0].payload, b"efgh");
}
#[test]
fn can_handover_unordered_streams() {
let mut streams1 = InterleavedReassemblyStreams::new();
let mut seq = DataSequencer::new(StreamId(1));
assert_eq!(streams1.add(Tsn(1), seq.unordered("a", "B"), &mut |_| {}), 1);
assert!(
streams1
.get_handover_readiness()
.contains(HandoverReadiness::STREAM_HAS_UNASSEMBLED_CHUNKS)
);
assert_eq!(streams1.add(Tsn(2), seq.unordered("bcd", "E"), &mut |_| {}), -1);
assert!(streams1.get_handover_readiness().is_ready());
let mut state = SocketHandoverState::default();
streams1.add_to_handover_state(&mut state);
let mut streams2 = InterleavedReassemblyStreams::new();
let mut messages = Vec::new();
streams2.restore_from_state(&state);
let data = seq.unordered("efgh", "BE");
assert_eq!(streams2.add(Tsn(3), data, &mut |m| messages.push(m)), 0);
assert_eq!(messages.len(), 1);
assert_eq!(messages[0].payload, b"efgh");
}
}