use std::collections::{HashMap, VecDeque};
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
use ahash::RandomState;
use flowscope::{
EndReason, FlowEvent, FlowExtractor, FlowSide, FlowTracker, FlowTrackerConfig, SessionEvent,
SessionParser, SessionParserFactory, Timestamp,
};
use futures_core::Stream;
use crate::async_adapters::tokio_adapter::AsyncCapture;
use crate::error::Error;
use crate::traits::PacketSource;
pub struct SessionStream<S, E, F>
where
S: PacketSource + std::os::unix::io::AsRawFd,
E: FlowExtractor,
E::Key: Eq + std::hash::Hash + Clone + Send + 'static,
F: SessionParserFactory<E::Key>,
{
cap: AsyncCapture<S>,
tracker: FlowTracker<E, ()>,
factory: F,
parsers: HashMap<E::Key, F::Parser, RandomState>,
pending: VecDeque<SessionEvent<E::Key, <F::Parser as SessionParser>::Message>>,
sweep: tokio::time::Interval,
}
impl<S, E, F> SessionStream<S, E, F>
where
S: PacketSource + std::os::unix::io::AsRawFd,
E: FlowExtractor,
E::Key: Eq + std::hash::Hash + Clone + Send + 'static,
F: SessionParserFactory<E::Key>,
{
pub(crate) fn new_with_config(
cap: AsyncCapture<S>,
extractor: E,
factory: F,
config: FlowTrackerConfig,
) -> Self {
let tracker = FlowTracker::with_config(extractor, config);
let sweep = tokio::time::interval(tracker.config().sweep_interval);
Self {
cap,
tracker,
factory,
parsers: HashMap::with_hasher(RandomState::new()),
pending: VecDeque::new(),
sweep,
}
}
pub fn with_config(mut self, config: FlowTrackerConfig) -> Self {
let new_interval = config.sweep_interval;
self.tracker.set_config(config);
self.sweep = tokio::time::interval(new_interval);
self
}
pub fn tracker(&self) -> &FlowTracker<E, ()> {
&self.tracker
}
}
impl<S, E, F> Stream for SessionStream<S, E, F>
where
S: PacketSource + std::os::unix::io::AsRawFd + Unpin,
E: FlowExtractor + Unpin,
E::Key: Eq + std::hash::Hash + Clone + Send + 'static + Unpin,
F: SessionParserFactory<E::Key> + Unpin,
F::Parser: Unpin,
<F::Parser as SessionParser>::Message: Unpin,
{
type Item = Result<SessionEvent<E::Key, <F::Parser as SessionParser>::Message>, Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
loop {
if let Some(ev) = this.pending.pop_front() {
return Poll::Ready(Some(Ok(ev)));
}
if this.sweep.poll_tick(cx).is_ready() {
let now = current_timestamp();
for ev in this.tracker.sweep(now) {
convert_event(ev, &mut this.parsers, &mut this.pending);
}
if !this.pending.is_empty() {
continue;
}
}
let mut guard = match this.cap.poll_read_ready_mut(cx) {
Poll::Ready(Ok(g)) => g,
Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(Error::Io(e)))),
Poll::Pending => return Poll::Pending,
};
let got_batch = {
let inner = guard.get_inner_mut();
if let Some(batch) = inner.next_batch() {
for pkt in &batch {
let view = pkt.view();
let view_ts = view.timestamp;
let parsers = &mut this.parsers;
let factory = &mut this.factory;
let pending = &mut this.pending;
let evts =
this.tracker
.track_with_payload(view, |key, side, _seq, payload| {
if payload.is_empty() {
return;
}
let parser = parsers
.entry(key.clone())
.or_insert_with(|| factory.new_parser(key));
let messages = match side {
FlowSide::Initiator => parser.feed_initiator(payload),
FlowSide::Responder => parser.feed_responder(payload),
};
for message in messages {
pending.push_back(SessionEvent::Application {
key: key.clone(),
side,
message,
ts: view_ts,
});
}
});
for ev in evts {
convert_event(ev, parsers, pending);
}
}
drop(batch);
true
} else {
false
}
};
if !got_batch {
guard.clear_ready();
}
}
}
}
fn convert_event<K, P>(
ev: FlowEvent<K>,
parsers: &mut HashMap<K, P, RandomState>,
pending: &mut VecDeque<SessionEvent<K, P::Message>>,
) where
K: Eq + std::hash::Hash + Clone,
P: SessionParser,
{
match ev {
FlowEvent::Started { key, ts, .. } => {
pending.push_back(SessionEvent::Started { key, ts });
}
FlowEvent::Ended {
key, reason, stats, ..
} => {
if let Some(mut parser) = parsers.remove(&key) {
match reason {
EndReason::Fin | EndReason::IdleTimeout => {
for m in parser.fin_initiator() {
pending.push_back(SessionEvent::Application {
key: key.clone(),
side: FlowSide::Initiator,
message: m,
ts: stats.last_seen,
});
}
for m in parser.fin_responder() {
pending.push_back(SessionEvent::Application {
key: key.clone(),
side: FlowSide::Responder,
message: m,
ts: stats.last_seen,
});
}
}
EndReason::Rst | EndReason::Evicted | EndReason::BufferOverflow => {
parser.rst_initiator();
parser.rst_responder();
}
}
}
pending.push_back(SessionEvent::Closed { key, reason, stats });
}
FlowEvent::Anomaly { kind, ts, .. } => {
tracing::warn!(
target: "netring::flow",
?kind, ?ts,
"flow tracker anomaly (use FlowStream for structured handling)"
);
}
_ => {}
}
}
fn current_timestamp() -> Timestamp {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or(Duration::ZERO);
Timestamp::new(now.as_secs() as u32, now.subsec_nanos())
}
#[cfg(test)]
mod tests {
use super::*;
use flowscope::{AnomalyKind, FlowStats};
#[derive(Default)]
struct DummyParser;
impl SessionParser for DummyParser {
type Message = ();
fn feed_initiator(&mut self, _: &[u8]) -> Vec<()> {
Vec::new()
}
fn feed_responder(&mut self, _: &[u8]) -> Vec<()> {
Vec::new()
}
}
fn ts() -> Timestamp {
Timestamp::new(0, 0)
}
#[test]
fn buffer_overflow_close_emits_closed_event() {
let mut parsers: HashMap<u32, DummyParser, RandomState> =
HashMap::with_hasher(RandomState::new());
parsers.insert(7, DummyParser);
let mut pending: VecDeque<SessionEvent<u32, ()>> = VecDeque::new();
convert_event::<u32, DummyParser>(
FlowEvent::Ended {
key: 7,
reason: EndReason::BufferOverflow,
stats: FlowStats::default(),
history: Default::default(),
},
&mut parsers,
&mut pending,
);
assert_eq!(pending.len(), 1);
match pending.pop_front().unwrap() {
SessionEvent::Closed { reason, key, .. } => {
assert!(matches!(reason, EndReason::BufferOverflow));
assert_eq!(key, 7);
}
other => panic!("expected Closed, got {other:?}"),
}
assert!(!parsers.contains_key(&7));
}
#[test]
fn anomaly_event_does_not_emit_session_event() {
let mut parsers: HashMap<u32, DummyParser, RandomState> =
HashMap::with_hasher(RandomState::new());
let mut pending: VecDeque<SessionEvent<u32, ()>> = VecDeque::new();
convert_event::<u32, DummyParser>(
FlowEvent::Anomaly {
key: Some(42),
kind: AnomalyKind::OutOfOrderSegment {
side: FlowSide::Initiator,
count: 3,
},
ts: ts(),
},
&mut parsers,
&mut pending,
);
assert!(pending.is_empty());
}
}