use std::collections::{HashMap, VecDeque};
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
use ahash::RandomState;
use flowscope::{
BufferedReassembler, BufferedReassemblerFactory, EndReason, FlowEvent, FlowExtractor, FlowSide,
FlowTracker, FlowTrackerConfig, Reassembler, ReassemblerFactory, SessionEvent, SessionParser,
SessionParserFactory, Timestamp,
};
use futures_core::Stream;
use crate::async_adapters::tokio_adapter::AsyncCapture;
use crate::dedup::Dedup;
use crate::error::Error;
use crate::traits::PacketSource;
pub struct SessionStream<S, E, F>
where
S: PacketSource + std::os::unix::io::AsRawFd,
E: FlowExtractor,
E::Key: Eq + std::hash::Hash + Clone + Send + 'static,
F: SessionParserFactory<E::Key>,
{
cap: AsyncCapture<S>,
tracker: FlowTracker<E, ()>,
parser_factory: F,
parsers: HashMap<E::Key, F::Parser, RandomState>,
reassembler_factory: BufferedReassemblerFactory,
reassemblers: HashMap<(E::Key, FlowSide), BufferedReassembler, RandomState>,
pending: VecDeque<SessionEvent<E::Key, <F::Parser as SessionParser>::Message>>,
sweep: tokio::time::Interval,
dedup: Option<Dedup>,
}
impl<S, E, F> SessionStream<S, E, F>
where
S: PacketSource + std::os::unix::io::AsRawFd,
E: FlowExtractor,
E::Key: Eq + std::hash::Hash + Clone + Send + 'static,
F: SessionParserFactory<E::Key>,
{
pub(crate) fn new_with_config(
cap: AsyncCapture<S>,
extractor: E,
parser_factory: F,
config: FlowTrackerConfig,
) -> Self {
let reassembler_factory = build_reassembler_factory(&config);
let tracker = FlowTracker::with_config(extractor, config);
let sweep = tokio::time::interval(tracker.config().sweep_interval);
Self {
cap,
tracker,
parser_factory,
parsers: HashMap::with_hasher(RandomState::new()),
reassembler_factory,
reassemblers: HashMap::with_hasher(RandomState::new()),
pending: VecDeque::new(),
sweep,
dedup: None,
}
}
pub(crate) fn new_with_config_and_dedup(
cap: AsyncCapture<S>,
extractor: E,
parser_factory: F,
config: FlowTrackerConfig,
dedup: Option<Dedup>,
) -> Self {
let mut s = Self::new_with_config(cap, extractor, parser_factory, config);
s.dedup = dedup;
s
}
pub fn with_config(mut self, config: FlowTrackerConfig) -> Self {
let new_interval = config.sweep_interval;
self.reassembler_factory = build_reassembler_factory(&config);
self.tracker.set_config(config);
self.sweep = tokio::time::interval(new_interval);
self
}
pub fn with_dedup(mut self, dedup: Dedup) -> Self {
self.dedup = Some(dedup);
self
}
pub fn dedup(&self) -> Option<&Dedup> {
self.dedup.as_ref()
}
pub fn dedup_mut(&mut self) -> Option<&mut Dedup> {
self.dedup.as_mut()
}
pub fn tracker(&self) -> &FlowTracker<E, ()> {
&self.tracker
}
}
impl<S, E, F> Stream for SessionStream<S, E, F>
where
S: PacketSource + std::os::unix::io::AsRawFd + Unpin,
E: FlowExtractor + Unpin,
E::Key: Eq + std::hash::Hash + Clone + Send + 'static + Unpin,
F: SessionParserFactory<E::Key> + Unpin,
F::Parser: Unpin,
<F::Parser as SessionParser>::Message: Unpin,
{
type Item = Result<SessionEvent<E::Key, <F::Parser as SessionParser>::Message>, Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
loop {
if let Some(ev) = this.pending.pop_front() {
return Poll::Ready(Some(Ok(ev)));
}
if this.sweep.poll_tick(cx).is_ready() {
let now = current_timestamp();
let parsers = &mut this.parsers;
let parser_factory = &mut this.parser_factory;
let reassemblers = &mut this.reassemblers;
let pending = &mut this.pending;
for ev in this.tracker.sweep(now) {
process_session_event::<E::Key, F>(
ev,
parsers,
parser_factory,
reassemblers,
pending,
);
}
if !this.pending.is_empty() {
continue;
}
}
let mut guard = match this.cap.poll_read_ready_mut(cx) {
Poll::Ready(Ok(g)) => g,
Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(Error::Io(e)))),
Poll::Pending => return Poll::Pending,
};
let got_batch = {
let inner = guard.get_inner_mut();
if let Some(batch) = inner.next_batch() {
for pkt in &batch {
if let Some(d) = this.dedup.as_mut()
&& !d.keep(&pkt)
{
continue;
}
let view = pkt.view();
let parsers = &mut this.parsers;
let parser_factory = &mut this.parser_factory;
let reassemblers = &mut this.reassemblers;
let reassembler_factory = &mut this.reassembler_factory;
let pending = &mut this.pending;
let evts =
this.tracker
.track_with_payload(view, |key, side, seq, payload| {
if payload.is_empty() {
return;
}
reassemblers
.entry((key.clone(), side))
.or_insert_with(|| {
reassembler_factory.new_reassembler(key, side)
})
.segment(seq, payload);
});
for ev in evts {
process_session_event::<E::Key, F>(
ev,
parsers,
parser_factory,
reassemblers,
pending,
);
}
}
drop(batch);
true
} else {
false
}
};
if !got_batch {
guard.clear_ready();
}
}
}
}
fn build_reassembler_factory(config: &FlowTrackerConfig) -> BufferedReassemblerFactory {
let mut factory = BufferedReassemblerFactory::default();
if let Some(cap) = config.max_reassembler_buffer {
factory = factory.with_max_buffer(cap);
}
factory.with_overflow_policy(config.overflow_policy)
}
fn process_session_event<K, F>(
ev: FlowEvent<K>,
parsers: &mut HashMap<K, F::Parser, RandomState>,
parser_factory: &mut F,
reassemblers: &mut HashMap<(K, FlowSide), BufferedReassembler, RandomState>,
pending: &mut VecDeque<SessionEvent<K, <F::Parser as SessionParser>::Message>>,
) where
K: Eq + std::hash::Hash + Clone,
F: SessionParserFactory<K>,
{
match ev {
FlowEvent::Started { key, ts, .. } => {
pending.push_back(SessionEvent::Started { key, ts });
}
FlowEvent::Packet { key, side, ts, .. } => {
let drained = match reassemblers.get_mut(&(key.clone(), side)) {
Some(r) => r.take(),
None => return,
};
if drained.is_empty() {
return;
}
let parser = parsers
.entry(key.clone())
.or_insert_with(|| parser_factory.new_parser(&key));
let messages = match side {
FlowSide::Initiator => parser.feed_initiator(&drained),
FlowSide::Responder => parser.feed_responder(&drained),
};
for m in messages {
pending.push_back(SessionEvent::Application {
key: key.clone(),
side,
message: m,
ts,
});
}
}
FlowEvent::Ended {
key, reason, stats, ..
} => {
let graceful = matches!(reason, EndReason::Fin | EndReason::IdleTimeout);
for side in [FlowSide::Initiator, FlowSide::Responder] {
let r = reassemblers.remove(&(key.clone(), side));
if !graceful {
drop(r);
continue;
}
if let Some(mut r) = r {
let drained = r.take();
if !drained.is_empty() {
let parser = parsers
.entry(key.clone())
.or_insert_with(|| parser_factory.new_parser(&key));
let messages = match side {
FlowSide::Initiator => parser.feed_initiator(&drained),
FlowSide::Responder => parser.feed_responder(&drained),
};
for m in messages {
pending.push_back(SessionEvent::Application {
key: key.clone(),
side,
message: m,
ts: stats.last_seen,
});
}
}
}
}
if let Some(mut parser) = parsers.remove(&key) {
match reason {
EndReason::Fin | EndReason::IdleTimeout => {
for m in parser.fin_initiator() {
pending.push_back(SessionEvent::Application {
key: key.clone(),
side: FlowSide::Initiator,
message: m,
ts: stats.last_seen,
});
}
for m in parser.fin_responder() {
pending.push_back(SessionEvent::Application {
key: key.clone(),
side: FlowSide::Responder,
message: m,
ts: stats.last_seen,
});
}
}
EndReason::Rst | EndReason::Evicted | EndReason::BufferOverflow => {
parser.rst_initiator();
parser.rst_responder();
}
}
}
pending.push_back(SessionEvent::Closed { key, reason, stats });
}
FlowEvent::Anomaly { kind, ts, .. } => {
tracing::warn!(
target: "netring::flow",
?kind, ?ts,
"flow tracker anomaly (use FlowStream for structured handling)"
);
}
FlowEvent::Established { .. } | FlowEvent::StateChange { .. } => {}
}
}
fn current_timestamp() -> Timestamp {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or(Duration::ZERO);
Timestamp::new(now.as_secs() as u32, now.subsec_nanos())
}
#[cfg(test)]
mod tests {
use super::*;
use flowscope::{AnomalyKind, FlowStats, HistoryString, OverflowPolicy};
#[derive(Default, Clone)]
struct EchoParser;
impl SessionParser for EchoParser {
type Message = (FlowSide, Vec<u8>);
fn feed_initiator(&mut self, b: &[u8]) -> Vec<(FlowSide, Vec<u8>)> {
vec![(FlowSide::Initiator, b.to_vec())]
}
fn feed_responder(&mut self, b: &[u8]) -> Vec<(FlowSide, Vec<u8>)> {
vec![(FlowSide::Responder, b.to_vec())]
}
}
fn ts() -> Timestamp {
Timestamp::new(0, 0)
}
type TestState = (
HashMap<u32, EchoParser, RandomState>,
EchoParser,
HashMap<(u32, FlowSide), BufferedReassembler, RandomState>,
VecDeque<SessionEvent<u32, (FlowSide, Vec<u8>)>>,
);
fn empty_state() -> TestState {
(
HashMap::with_hasher(RandomState::new()),
EchoParser,
HashMap::with_hasher(RandomState::new()),
VecDeque::new(),
)
}
#[test]
fn started_event_pushes_session_started() {
let (mut parsers, mut factory, mut reassemblers, mut pending) = empty_state();
process_session_event::<u32, EchoParser>(
FlowEvent::Started {
key: 7,
side: FlowSide::Initiator,
ts: ts(),
l4: None,
},
&mut parsers,
&mut factory,
&mut reassemblers,
&mut pending,
);
assert!(matches!(
pending.pop_front(),
Some(SessionEvent::Started { key: 7, .. })
));
}
#[test]
fn packet_event_drains_reassembler_into_parser() {
let (mut parsers, mut factory, mut reassemblers, mut pending) = empty_state();
let mut r = BufferedReassembler::new();
r.segment(0, b"hello");
reassemblers.insert((7u32, FlowSide::Initiator), r);
process_session_event::<u32, EchoParser>(
FlowEvent::Packet {
key: 7,
side: FlowSide::Initiator,
len: 5,
ts: ts(),
},
&mut parsers,
&mut factory,
&mut reassemblers,
&mut pending,
);
match pending.pop_front() {
Some(SessionEvent::Application {
key, side, message, ..
}) => {
assert_eq!(key, 7);
assert_eq!(side, FlowSide::Initiator);
assert_eq!(message, (FlowSide::Initiator, b"hello".to_vec()));
}
other => panic!("expected Application, got {other:?}"),
}
assert!(
reassemblers
.get(&(7, FlowSide::Initiator))
.map(|r| r.buffered_len())
== Some(0)
);
}
#[test]
fn packet_event_with_no_reassembler_is_silent() {
let (mut parsers, mut factory, mut reassemblers, mut pending) = empty_state();
process_session_event::<u32, EchoParser>(
FlowEvent::Packet {
key: 7,
side: FlowSide::Initiator,
len: 0,
ts: ts(),
},
&mut parsers,
&mut factory,
&mut reassemblers,
&mut pending,
);
assert!(pending.is_empty());
}
#[test]
fn ended_fin_drains_reassembler_then_calls_fin() {
let (mut parsers, mut factory, mut reassemblers, mut pending) = empty_state();
let mut r = BufferedReassembler::new();
r.segment(0, b"residual");
reassemblers.insert((7u32, FlowSide::Initiator), r);
process_session_event::<u32, EchoParser>(
FlowEvent::Ended {
key: 7,
reason: EndReason::Fin,
stats: FlowStats::default(),
history: HistoryString::default(),
},
&mut parsers,
&mut factory,
&mut reassemblers,
&mut pending,
);
match pending.pop_front() {
Some(SessionEvent::Application { message, .. }) => {
assert_eq!(message, (FlowSide::Initiator, b"residual".to_vec()));
}
other => panic!("expected residual Application, got {other:?}"),
}
match pending.pop_front() {
Some(SessionEvent::Closed { reason, key, .. }) => {
assert_eq!(key, 7);
assert!(matches!(reason, EndReason::Fin));
}
other => panic!("expected Closed, got {other:?}"),
}
assert!(reassemblers.is_empty());
}
#[test]
fn ended_buffer_overflow_drops_reassembler_without_drain() {
let (mut parsers, mut factory, mut reassemblers, mut pending) = empty_state();
let mut r = BufferedReassembler::new();
r.segment(0, b"suspect-data-from-poisoned-flow");
reassemblers.insert((7u32, FlowSide::Initiator), r);
process_session_event::<u32, EchoParser>(
FlowEvent::Ended {
key: 7,
reason: EndReason::BufferOverflow,
stats: FlowStats::default(),
history: HistoryString::default(),
},
&mut parsers,
&mut factory,
&mut reassemblers,
&mut pending,
);
assert_eq!(pending.len(), 1);
match pending.pop_front() {
Some(SessionEvent::Closed { reason, key, .. }) => {
assert_eq!(key, 7);
assert!(matches!(reason, EndReason::BufferOverflow));
}
other => panic!("expected Closed, got {other:?}"),
}
assert!(reassemblers.is_empty());
}
#[test]
fn ended_rst_drops_reassembler_without_drain() {
let (mut parsers, mut factory, mut reassemblers, mut pending) = empty_state();
parsers.insert(7u32, EchoParser);
let mut r = BufferedReassembler::new();
r.segment(0, b"abc");
reassemblers.insert((7u32, FlowSide::Responder), r);
process_session_event::<u32, EchoParser>(
FlowEvent::Ended {
key: 7,
reason: EndReason::Rst,
stats: FlowStats::default(),
history: HistoryString::default(),
},
&mut parsers,
&mut factory,
&mut reassemblers,
&mut pending,
);
assert!(reassemblers.is_empty());
assert!(!parsers.contains_key(&7));
assert_eq!(pending.len(), 1);
match pending.pop_front() {
Some(SessionEvent::Closed { reason, .. }) => {
assert!(matches!(reason, EndReason::Rst));
}
other => panic!("expected Closed, got {other:?}"),
}
}
#[test]
fn anomaly_event_does_not_emit_session_event() {
let (mut parsers, mut factory, mut reassemblers, mut pending) = empty_state();
process_session_event::<u32, EchoParser>(
FlowEvent::Anomaly {
key: Some(42),
kind: AnomalyKind::OutOfOrderSegment {
side: FlowSide::Initiator,
count: 3,
},
ts: ts(),
},
&mut parsers,
&mut factory,
&mut reassemblers,
&mut pending,
);
assert!(pending.is_empty());
}
#[test]
fn build_factory_picks_up_cap_and_policy() {
let mut cfg = FlowTrackerConfig::default();
cfg.max_reassembler_buffer = Some(64);
cfg.overflow_policy = OverflowPolicy::DropFlow;
let mut factory = build_reassembler_factory(&cfg);
let mut r: BufferedReassembler = factory.new_reassembler(&7u32, FlowSide::Initiator);
r.segment(0, &[0u8; 128]);
assert!(r.is_poisoned());
}
#[test]
fn build_factory_unbounded_when_cap_unset() {
let cfg = FlowTrackerConfig::default();
let mut factory = build_reassembler_factory(&cfg);
let mut r: BufferedReassembler = factory.new_reassembler(&7u32, FlowSide::Initiator);
r.segment(0, &vec![0u8; 4096]);
assert!(!r.is_poisoned());
assert_eq!(r.buffered_len(), 4096);
}
}