use crate::event::{FlowSide, OverflowPolicy};
pub trait Reassembler: Send + 'static {
fn segment(&mut self, seq: u32, payload: &[u8]);
fn fin(&mut self) {}
fn rst(&mut self) {}
fn dropped_segments(&self) -> u64 {
0
}
fn bytes_dropped_oversize(&self) -> u64 {
0
}
fn is_poisoned(&self) -> bool {
false
}
fn high_watermark(&self) -> u64 {
0
}
}
pub trait ReassemblerFactory<K>: Send + 'static {
type Reassembler: Reassembler;
fn new_reassembler(&mut self, key: &K, side: FlowSide) -> Self::Reassembler;
}
#[derive(Debug, Default)]
pub struct BufferedReassembler {
buffer: Vec<u8>,
expected_seq: Option<u32>,
dropped_segments: u64,
bytes_dropped_oversize: u64,
max_buffer: Option<usize>,
overflow_policy: OverflowPolicy,
poisoned: bool,
high_watermark: u64,
}
impl BufferedReassembler {
pub fn new() -> Self {
Self::default()
}
pub fn with_max_buffer(mut self, max_bytes: usize) -> Self {
self.max_buffer = Some(max_bytes);
self
}
pub fn with_overflow_policy(mut self, policy: OverflowPolicy) -> Self {
self.overflow_policy = policy;
self
}
pub fn take(&mut self) -> Vec<u8> {
std::mem::take(&mut self.buffer)
}
pub fn dropped_segments(&self) -> u64 {
self.dropped_segments
}
pub fn bytes_dropped_oversize(&self) -> u64 {
self.bytes_dropped_oversize
}
pub fn buffered_len(&self) -> usize {
self.buffer.len()
}
pub fn is_poisoned(&self) -> bool {
self.poisoned
}
pub fn high_watermark(&self) -> u64 {
self.high_watermark
}
fn append_with_cap(&mut self, payload: &[u8]) {
let Some(cap) = self.max_buffer else {
self.buffer.extend_from_slice(payload);
self.update_watermark();
return;
};
if self.poisoned {
return;
}
let projected = self.buffer.len() + payload.len();
if projected <= cap {
self.buffer.extend_from_slice(payload);
self.update_watermark();
return;
}
match self.overflow_policy {
OverflowPolicy::DropFlow => {
self.bytes_dropped_oversize += payload.len() as u64;
self.buffer.clear();
self.poisoned = true;
}
OverflowPolicy::SlidingWindow => {
let to_drop = projected - cap;
if to_drop >= self.buffer.len() {
self.bytes_dropped_oversize += self.buffer.len() as u64;
self.buffer.clear();
if payload.len() > cap {
let extra = payload.len() - cap;
self.bytes_dropped_oversize += extra as u64;
self.buffer.extend_from_slice(&payload[extra..]);
} else {
self.buffer.extend_from_slice(payload);
}
} else {
self.bytes_dropped_oversize += to_drop as u64;
self.buffer.drain(..to_drop);
self.buffer.extend_from_slice(payload);
}
self.update_watermark();
}
}
}
#[inline]
fn update_watermark(&mut self) {
let len = self.buffer.len() as u64;
if len > self.high_watermark {
self.high_watermark = len;
}
}
}
impl Reassembler for BufferedReassembler {
fn segment(&mut self, seq: u32, payload: &[u8]) {
if payload.is_empty() {
return;
}
if self.poisoned {
return;
}
match self.expected_seq {
None => {
self.expected_seq = Some(seq.wrapping_add(payload.len() as u32));
self.append_with_cap(payload);
}
Some(exp) if seq == exp => {
self.expected_seq = Some(seq.wrapping_add(payload.len() as u32));
self.append_with_cap(payload);
}
Some(_) => {
self.dropped_segments += 1;
}
}
}
fn dropped_segments(&self) -> u64 {
Self::dropped_segments(self)
}
fn bytes_dropped_oversize(&self) -> u64 {
Self::bytes_dropped_oversize(self)
}
fn is_poisoned(&self) -> bool {
Self::is_poisoned(self)
}
fn high_watermark(&self) -> u64 {
Self::high_watermark(self)
}
}
#[derive(Debug, Default)]
pub struct BufferedReassemblerFactory {
max_buffer: Option<usize>,
overflow_policy: OverflowPolicy,
}
impl BufferedReassemblerFactory {
pub fn with_max_buffer(mut self, max_bytes: usize) -> Self {
self.max_buffer = Some(max_bytes);
self
}
pub fn with_overflow_policy(mut self, policy: OverflowPolicy) -> Self {
self.overflow_policy = policy;
self
}
}
impl<K: Send + 'static> ReassemblerFactory<K> for BufferedReassemblerFactory {
type Reassembler = BufferedReassembler;
fn new_reassembler(&mut self, _key: &K, _side: FlowSide) -> BufferedReassembler {
let mut r = BufferedReassembler::new();
if let Some(cap) = self.max_buffer {
r = r
.with_max_buffer(cap)
.with_overflow_policy(self.overflow_policy);
}
r
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn in_order_concatenates() {
let mut r = BufferedReassembler::new();
r.segment(100, b"abc");
r.segment(103, b"def");
r.segment(106, b"gh");
assert_eq!(r.take(), b"abcdefgh");
assert_eq!(r.dropped_segments(), 0);
}
#[test]
fn ooo_dropped() {
let mut r = BufferedReassembler::new();
r.segment(100, b"hello"); r.segment(110, b"world"); assert_eq!(r.take(), b"hello");
assert_eq!(r.dropped_segments(), 1);
}
#[test]
fn take_resets_buffer_only() {
let mut r = BufferedReassembler::new();
r.segment(0, b"abc"); let drained = r.take();
assert_eq!(drained, b"abc");
assert_eq!(r.buffered_len(), 0);
r.segment(3, b"def");
assert_eq!(r.take(), b"def");
assert_eq!(r.dropped_segments(), 0);
}
#[test]
fn empty_payload_ignored() {
let mut r = BufferedReassembler::new();
r.segment(0, b"");
assert_eq!(r.expected_seq, None);
assert_eq!(r.dropped_segments(), 0);
}
#[test]
fn factory_creates_fresh_reassembler() {
let mut f = BufferedReassemblerFactory::default();
let mut r1: BufferedReassembler = f.new_reassembler(&42u32, FlowSide::Initiator);
let mut r2: BufferedReassembler = f.new_reassembler(&42u32, FlowSide::Responder);
r1.segment(0, b"x");
r2.segment(0, b"y");
assert_eq!(r1.take(), b"x");
assert_eq!(r2.take(), b"y");
}
#[test]
fn fin_rst_default_noops_compile() {
let mut r = BufferedReassembler::new();
r.fin();
r.rst();
}
#[test]
fn cap_unbounded_by_default() {
let mut r = BufferedReassembler::new();
r.segment(0, &[0u8; 10_000]);
assert_eq!(r.buffered_len(), 10_000);
assert_eq!(r.bytes_dropped_oversize(), 0);
assert!(!r.is_poisoned());
}
#[test]
fn cap_drops_oldest_on_overflow_sliding_window() {
let mut r = BufferedReassembler::new().with_max_buffer(100);
r.segment(0, &[b'a'; 80]);
r.segment(80, &[b'b'; 80]);
assert_eq!(r.buffered_len(), 100);
assert_eq!(r.bytes_dropped_oversize(), 60);
let drained = r.take();
assert_eq!(&drained[..20], &[b'a'; 20][..]);
assert_eq!(&drained[20..], &[b'b'; 80][..]);
}
#[test]
fn cap_payload_bigger_than_cap_keeps_tail() {
let mut r = BufferedReassembler::new().with_max_buffer(50);
let payload: Vec<u8> = (0u8..100).collect();
r.segment(0, &payload);
assert_eq!(r.buffered_len(), 50);
assert_eq!(r.bytes_dropped_oversize(), 50);
assert_eq!(r.take(), (50u8..100).collect::<Vec<u8>>());
}
#[test]
fn cap_skips_ooo_segments_without_changing_overflow_counter() {
let mut r = BufferedReassembler::new().with_max_buffer(100);
r.segment(0, &[b'a'; 80]);
r.segment(200, &[b'b'; 80]); assert_eq!(r.dropped_segments(), 1);
assert_eq!(r.bytes_dropped_oversize(), 0);
assert_eq!(r.buffered_len(), 80);
}
#[test]
fn cap_take_resets_buffer_but_not_counters() {
let mut r = BufferedReassembler::new().with_max_buffer(100);
r.segment(0, &[b'a'; 80]);
r.segment(80, &[b'b'; 80]); let _ = r.take();
r.segment(160, &[b'c'; 80]); assert_eq!(r.buffered_len(), 80);
assert_eq!(r.bytes_dropped_oversize(), 60);
assert_eq!(r.dropped_segments(), 0);
}
#[test]
fn cap_poisons_on_overflow_drop_flow() {
let mut r = BufferedReassembler::new()
.with_max_buffer(100)
.with_overflow_policy(OverflowPolicy::DropFlow);
r.segment(0, &[b'a'; 80]);
assert!(!r.is_poisoned());
r.segment(80, &[b'b'; 80]); assert!(r.is_poisoned());
assert_eq!(r.bytes_dropped_oversize(), 80);
assert_eq!(r.buffered_len(), 0);
r.segment(160, &[b'c'; 10]);
assert_eq!(r.buffered_len(), 0);
assert_eq!(r.bytes_dropped_oversize(), 80);
}
#[test]
fn cap_drop_flow_does_not_poison_under_cap() {
let mut r = BufferedReassembler::new()
.with_max_buffer(100)
.with_overflow_policy(OverflowPolicy::DropFlow);
r.segment(0, &[b'a'; 50]);
r.segment(50, &[b'b'; 50]); assert!(!r.is_poisoned());
assert_eq!(r.buffered_len(), 100);
assert_eq!(r.bytes_dropped_oversize(), 0);
}
#[test]
fn factory_propagates_cap_and_policy() {
let mut f = BufferedReassemblerFactory::default()
.with_max_buffer(64)
.with_overflow_policy(OverflowPolicy::DropFlow);
let mut r: BufferedReassembler = f.new_reassembler(&0u32, FlowSide::Initiator);
r.segment(0, &[0u8; 100]);
assert!(r.is_poisoned());
}
#[test]
fn factory_default_unbounded() {
let mut f = BufferedReassemblerFactory::default();
let mut r: BufferedReassembler = f.new_reassembler(&0u32, FlowSide::Initiator);
r.segment(0, &[0u8; 10_000]);
assert_eq!(r.buffered_len(), 10_000);
assert!(!r.is_poisoned());
}
#[test]
fn high_watermark_tracks_peak_buffer_unbounded() {
let mut r = BufferedReassembler::new();
r.segment(0, &[b'a'; 50]);
assert_eq!(r.high_watermark(), 50);
let _ = r.take(); assert_eq!(r.high_watermark(), 50);
r.segment(50, &[b'b'; 20]);
assert_eq!(r.high_watermark(), 50, "buffer is now 20 < 50; unchanged");
let _ = r.take();
r.segment(70, &[b'c'; 100]);
assert_eq!(r.high_watermark(), 100);
}
#[test]
fn high_watermark_reflects_post_rotation_under_sliding_window() {
let mut r = BufferedReassembler::new().with_max_buffer(100);
r.segment(0, &[b'a'; 80]);
assert_eq!(r.high_watermark(), 80);
r.segment(80, &[b'b'; 80]);
assert_eq!(r.high_watermark(), 100);
}
#[test]
fn high_watermark_stays_at_pre_poison_peak_drop_flow() {
let mut r = BufferedReassembler::new()
.with_max_buffer(100)
.with_overflow_policy(OverflowPolicy::DropFlow);
r.segment(0, &[b'a'; 80]);
assert_eq!(r.high_watermark(), 80);
r.segment(80, &[b'b'; 80]); assert!(r.is_poisoned());
assert_eq!(r.high_watermark(), 80);
r.segment(160, &[b'c'; 10]);
assert_eq!(r.high_watermark(), 80);
}
}