use crate::event::FlowSide;
pub trait Reassembler: Send + 'static {
fn segment(&mut self, seq: u32, payload: &[u8]);
fn fin(&mut self) {}
fn rst(&mut self) {}
}
pub trait ReassemblerFactory<K>: Send + 'static {
type Reassembler: Reassembler;
fn new_reassembler(&mut self, key: &K, side: FlowSide) -> Self::Reassembler;
}
#[derive(Debug, Default)]
pub struct BufferedReassembler {
buffer: Vec<u8>,
expected_seq: Option<u32>,
dropped_segments: u64,
}
impl BufferedReassembler {
pub fn new() -> Self {
Self::default()
}
pub fn take(&mut self) -> Vec<u8> {
std::mem::take(&mut self.buffer)
}
pub fn dropped_segments(&self) -> u64 {
self.dropped_segments
}
pub fn buffered_len(&self) -> usize {
self.buffer.len()
}
}
impl Reassembler for BufferedReassembler {
fn segment(&mut self, seq: u32, payload: &[u8]) {
if payload.is_empty() {
return;
}
match self.expected_seq {
None => {
self.expected_seq = Some(seq.wrapping_add(payload.len() as u32));
self.buffer.extend_from_slice(payload);
}
Some(exp) if seq == exp => {
self.expected_seq = Some(seq.wrapping_add(payload.len() as u32));
self.buffer.extend_from_slice(payload);
}
Some(_) => {
self.dropped_segments += 1;
}
}
}
}
#[derive(Debug, Default)]
pub struct BufferedReassemblerFactory;
impl<K: Send + 'static> ReassemblerFactory<K> for BufferedReassemblerFactory {
type Reassembler = BufferedReassembler;
fn new_reassembler(&mut self, _key: &K, _side: FlowSide) -> BufferedReassembler {
BufferedReassembler::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn in_order_concatenates() {
let mut r = BufferedReassembler::new();
r.segment(100, b"abc");
r.segment(103, b"def");
r.segment(106, b"gh");
assert_eq!(r.take(), b"abcdefgh");
assert_eq!(r.dropped_segments(), 0);
}
#[test]
fn ooo_dropped() {
let mut r = BufferedReassembler::new();
r.segment(100, b"hello"); r.segment(110, b"world"); assert_eq!(r.take(), b"hello");
assert_eq!(r.dropped_segments(), 1);
}
#[test]
fn take_resets_buffer_only() {
let mut r = BufferedReassembler::new();
r.segment(0, b"abc"); let drained = r.take();
assert_eq!(drained, b"abc");
assert_eq!(r.buffered_len(), 0);
r.segment(3, b"def");
assert_eq!(r.take(), b"def");
assert_eq!(r.dropped_segments(), 0);
}
#[test]
fn empty_payload_ignored() {
let mut r = BufferedReassembler::new();
r.segment(0, b"");
assert_eq!(r.expected_seq, None);
assert_eq!(r.dropped_segments(), 0);
}
#[test]
fn factory_creates_fresh_reassembler() {
let mut f = BufferedReassemblerFactory;
let mut r1: BufferedReassembler = f.new_reassembler(&42u32, FlowSide::Initiator);
let mut r2: BufferedReassembler = f.new_reassembler(&42u32, FlowSide::Responder);
r1.segment(0, b"x");
r2.segment(0, b"y");
assert_eq!(r1.take(), b"x");
assert_eq!(r2.take(), b"y");
}
#[test]
fn fin_rst_default_noops_compile() {
let mut r = BufferedReassembler::new();
r.fin();
r.rst();
}
}