use crate::api::PpId;
use crate::packet::data::Data;
use std::collections::VecDeque;
pub mod data_tracker;
pub mod interleaved_reassembly_streams;
pub mod reassembly_queue;
pub mod reassembly_streams;
pub mod traditional_reassembly_streams;
pub trait ReassemblyKey: Copy + Ord {
fn next(&self) -> Self;
}
#[derive(Debug, PartialEq)]
pub struct Interval<K: ReassemblyKey> {
pub start: K,
pub end: K,
pub has_beginning: bool,
pub has_end: bool,
pub ppid: PpId,
pub payload: VecDeque<Vec<u8>>,
}
impl<K: ReassemblyKey> Interval<K> {
pub fn collect_payload(self) -> Vec<u8> {
let mut parts = self.payload;
if parts.len() == 1 {
parts.pop_front().unwrap()
} else {
let total_payload_len = parts.iter().map(|p| p.len()).sum();
let mut payload = Vec::with_capacity(total_payload_len);
for p in parts {
payload.extend(p);
}
payload
}
}
}
#[derive(Debug)]
pub struct IntervalList<K: ReassemblyKey> {
intervals: Vec<Interval<K>>,
}
impl<K: ReassemblyKey> Default for IntervalList<K> {
fn default() -> Self {
Self { intervals: Vec::new() }
}
}
impl<K: ReassemblyKey> IntervalList<K> {
pub fn is_empty(&self) -> bool {
self.intervals.is_empty()
}
pub fn add(&mut self, key: K, data: Data) -> usize {
let idx = self.intervals.partition_point(|i| i.start < key);
let extend_left = if idx > 0 {
let left = &self.intervals[idx - 1];
left.end.next() == key && !left.has_end && !data.is_beginning
} else {
false
};
let extend_right = if idx < self.intervals.len() {
let right = &self.intervals[idx];
key.next() == right.start && !right.has_beginning && !data.is_end
} else {
false
};
if extend_left && extend_right {
let mut right = self.intervals.remove(idx);
let left = &mut self.intervals[idx - 1];
left.end = right.end;
left.has_end = right.has_end;
left.payload.push_back(data.payload);
left.payload.append(&mut right.payload);
idx - 1
} else if extend_left {
let left = &mut self.intervals[idx - 1];
left.end = key;
left.has_end = data.is_end;
left.payload.push_back(data.payload);
idx - 1
} else if extend_right {
let right = &mut self.intervals[idx];
right.start = key;
right.has_beginning = data.is_beginning;
if data.is_beginning {
right.ppid = data.ppid;
}
right.payload.push_front(data.payload);
idx
} else {
self.intervals.insert(
idx,
Interval {
start: key,
end: key,
has_beginning: data.is_beginning,
has_end: data.is_end,
ppid: data.ppid,
payload: VecDeque::from([data.payload]),
},
);
idx
}
}
pub fn pop_if_complete(&mut self, idx: usize) -> Option<Interval<K>> {
if idx < self.intervals.len()
&& self.intervals[idx].has_beginning
&& self.intervals[idx].has_end
{
Some(self.intervals.remove(idx))
} else {
None
}
}
pub fn pop_front_if_complete_and<F>(&mut self, predicate: F) -> Option<Interval<K>>
where
F: FnOnce(&Interval<K>) -> bool,
{
let interval = self.intervals.first()?;
if interval.has_beginning && interval.has_end && predicate(interval) {
Some(self.intervals.remove(0))
} else {
None
}
}
pub fn retain<F>(&mut self, mut f: F) -> usize
where
F: FnMut(&Interval<K>) -> bool,
{
let mut bytes_removed = 0;
self.intervals.retain(|interval| {
if !f(interval) {
bytes_removed += interval.payload.iter().map(|p| p.len()).sum::<usize>();
false
} else {
true
}
});
bytes_removed
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::api::StreamId;
use crate::testing::data_sequencer::DataSequencer;
use crate::types::Tsn;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
struct TestKey(Tsn);
impl ReassemblyKey for TestKey {
fn next(&self) -> Self {
TestKey(self.0 + 1)
}
}
#[test]
fn add_independent_intervals() {
let mut list = IntervalList::default();
let mut seq = DataSequencer::new(StreamId(0));
let idx = list.add(TestKey(Tsn(3)), seq.ordered("p0", "BE"));
assert_eq!(idx, 0);
assert_eq!(list.intervals.len(), 1);
let idx = list.add(TestKey(Tsn(5)), seq.ordered("p1", "BE"));
assert_eq!(idx, 1);
assert_eq!(list.intervals.len(), 2);
let idx = list.add(TestKey(Tsn(1)), seq.ordered("p2", "BE"));
assert_eq!(idx, 0);
assert_eq!(list.intervals.len(), 3);
assert_eq!(list.intervals[0].start, TestKey(Tsn(1)));
assert_eq!(list.intervals[1].start, TestKey(Tsn(3)));
assert_eq!(list.intervals[2].start, TestKey(Tsn(5)));
}
#[test]
fn add_merge_left() {
let mut list = IntervalList::default();
let mut seq = DataSequencer::new(StreamId(0));
list.add(TestKey(Tsn(10)), seq.ordered("p0", "B"));
let idx = list.add(TestKey(Tsn(11)), seq.ordered("p1", ""));
assert_eq!(idx, 0);
assert_eq!(list.intervals.len(), 1);
assert_eq!(list.intervals[0].start, TestKey(Tsn(10)));
assert_eq!(list.intervals[0].end, TestKey(Tsn(11)));
assert_eq!(list.intervals[0].payload.len(), 2);
}
#[test]
fn add_merge_right() {
let mut list = IntervalList::default();
let mut seq = DataSequencer::new(StreamId(0));
list.add(TestKey(Tsn(11)), seq.ordered("p1", "E"));
let idx = list.add(TestKey(Tsn(10)), seq.ordered("p0", "B"));
assert_eq!(idx, 0);
assert_eq!(list.intervals.len(), 1);
assert_eq!(list.intervals[0].start, TestKey(Tsn(10)));
assert_eq!(list.intervals[0].end, TestKey(Tsn(11)));
assert_eq!(list.intervals[0].payload, &[b"p0", b"p1"]);
assert_eq!(list.intervals[0].ppid, PpId(53));
}
#[test]
fn add_doesnt_merge_right_on_begin_end_border() {
let mut list = IntervalList::default();
let mut seq = DataSequencer::new(StreamId(0));
let idx = list.add(TestKey(Tsn(11)), seq.unordered("p1", "E"));
assert_eq!(idx, 0);
let idx = list.add(TestKey(Tsn(12)), seq.unordered("p2", "B"));
assert_eq!(idx, 1);
assert_eq!(list.intervals.len(), 2);
assert_eq!(list.intervals[0].start, TestKey(Tsn(11)));
assert_eq!(list.intervals[0].end, TestKey(Tsn(11)));
assert_eq!(list.intervals[0].payload, &[b"p1"]);
assert!(!list.intervals[0].has_beginning);
assert!(list.intervals[0].has_end);
assert_eq!(list.intervals[1].start, TestKey(Tsn(12)));
assert_eq!(list.intervals[1].end, TestKey(Tsn(12)));
assert_eq!(list.intervals[1].payload, &[b"p2"]);
assert!(list.intervals[1].has_beginning);
assert!(!list.intervals[1].has_end);
}
#[test]
fn add_merge_both_filling_gap() {
let mut list = IntervalList::default();
let mut seq = DataSequencer::new(StreamId(0));
list.add(TestKey(Tsn(10)), seq.ordered("p0", "B"));
list.add(TestKey(Tsn(12)), seq.ordered("p2", "E"));
assert_eq!(list.intervals.len(), 2);
let idx = list.add(TestKey(Tsn(11)), seq.ordered("p1", ""));
assert_eq!(idx, 0);
assert_eq!(list.intervals.len(), 1);
let interval = &list.intervals[0];
assert_eq!(interval.start, TestKey(Tsn(10)));
assert_eq!(interval.end, TestKey(Tsn(12)));
assert!(interval.has_beginning);
assert!(interval.has_end);
assert_eq!(interval.payload, &[b"p0", b"p1", b"p2"]);
}
#[test]
fn add_wrapping_interval() {
let mut list = IntervalList::default();
let mut seq = DataSequencer::new(StreamId(0));
list.add(TestKey(Tsn(u32::MAX)), seq.ordered("p0", "B"));
let idx = list.add(TestKey(Tsn(0)), seq.ordered("p1", "E"));
assert_eq!(idx, 0);
assert_eq!(list.intervals.len(), 1);
let interval = &list.intervals[0];
assert_eq!(interval.start, TestKey(Tsn(u32::MAX)));
assert_eq!(interval.end, TestKey(Tsn(0)));
assert!(interval.has_end);
assert!(interval.has_beginning);
assert_eq!(interval.payload.len(), 2);
assert_eq!(interval.payload, &[b"p0", b"p1"]);
}
#[test]
fn pop_if_complete_extracts_assembled() {
let mut list = IntervalList::default();
let mut seq = DataSequencer::new(StreamId(0));
list.add(TestKey(Tsn(10)), seq.ordered("p0", "BE"));
list.add(TestKey(Tsn(12)), seq.ordered("p1", "B"));
assert_eq!(list.intervals.len(), 2);
assert!(list.pop_if_complete(1).is_none());
assert_eq!(list.intervals.len(), 2);
let popped = list.pop_if_complete(0).expect("Should pop");
assert_eq!(popped.start, TestKey(Tsn(10)));
assert_eq!(list.intervals.len(), 1);
assert_eq!(list.intervals[0].start, TestKey(Tsn(12)));
}
#[test]
fn retain_removes_matching_intervals() {
let mut list = IntervalList::default();
let mut seq = DataSequencer::new(StreamId(0));
list.add(TestKey(Tsn(10)), seq.ordered("p0", "BE"));
list.add(TestKey(Tsn(20)), seq.ordered("p1", "BE"));
list.add(TestKey(Tsn(30)), seq.ordered("p2", "BE"));
let removed_bytes = list.retain(|i| i.start <= TestKey(Tsn(15)));
assert_eq!(list.intervals.len(), 1);
assert_eq!(list.intervals[0].start, TestKey(Tsn(10)));
assert_eq!(removed_bytes, 4);
}
}