use std::collections::HashMap;
use std::hash::Hash;
use ahash::RandomState;
use crate::Timestamp;
use crate::driver::FlowDriver;
use crate::event::{AnomalyKind, EndReason, FlowEvent, FlowSide};
use crate::extractor::FlowExtractor;
use crate::reassembler::{Reassembler, ReassemblerFactory};
use crate::session::{DatagramParser, SessionEvent};
use crate::tracker::{FlowTracker, FlowTrackerConfig};
use crate::view::PacketView;
const POISON_REASON_MAX_BYTES: usize = 256;
fn truncate_reason(s: &str) -> String {
let mut owned = String::from(s);
if owned.len() > POISON_REASON_MAX_BYTES {
let cap = (0..=POISON_REASON_MAX_BYTES)
.rev()
.find(|i| owned.is_char_boundary(*i))
.unwrap_or(0);
owned.truncate(cap);
}
owned
}
#[derive(Debug, Default)]
struct NoopReassembler;
impl Reassembler for NoopReassembler {
fn segment(&mut self, _seq: u32, _payload: &[u8]) {}
}
#[derive(Debug, Default)]
struct NoopReassemblerFactory;
impl<K: Send + 'static> ReassemblerFactory<K> for NoopReassemblerFactory {
type Reassembler = NoopReassembler;
fn new_reassembler(&mut self, _key: &K, _side: FlowSide) -> NoopReassembler {
NoopReassembler
}
}
pub struct FlowDatagramDriver<E, P, S = ()>
where
E: FlowExtractor,
E::Key: Hash + Eq + Clone + Send + 'static,
P: DatagramParser + Default + Clone + Send + 'static,
S: Send + 'static,
{
driver: FlowDriver<E, NoopReassemblerFactory, S>,
parser_factory: P,
parsers: HashMap<E::Key, P, RandomState>,
}
impl<E, P, S> FlowDatagramDriver<E, P, S>
where
E: FlowExtractor,
E::Key: Hash + Eq + Clone + Send + 'static,
P: DatagramParser + Default + Clone + Send + 'static,
S: Default + Send + 'static,
{
pub fn new(extractor: E) -> Self {
Self::with_config(extractor, FlowTrackerConfig::default())
}
pub fn with_config(extractor: E, config: FlowTrackerConfig) -> Self {
Self {
driver: FlowDriver::with_config(extractor, NoopReassemblerFactory, config),
parser_factory: P::default(),
parsers: HashMap::with_hasher(RandomState::new()),
}
}
}
impl<E, P, S> FlowDatagramDriver<E, P, S>
where
E: FlowExtractor,
E::Key: Hash + Eq + Clone + Send + 'static,
P: DatagramParser + Default + Clone + Send + 'static,
S: Send + 'static,
{
pub fn with_emit_anomalies(mut self, enable: bool) -> Self {
self.driver = self.driver.with_emit_anomalies(enable);
self
}
pub fn with_idle_timeout_fn<G>(mut self, f: G) -> Self
where
G: Fn(&E::Key, Option<crate::L4Proto>) -> Option<std::time::Duration> + Send + 'static,
{
self.driver = self.driver.with_idle_timeout_fn(f);
self
}
pub fn with_dedup(mut self, dedup: crate::dedup::Dedup) -> Self {
self.driver = self.driver.with_dedup(dedup);
self
}
pub fn with_monotonic_timestamps(mut self, enable: bool) -> Self {
self.driver = self.driver.with_monotonic_timestamps(enable);
self
}
pub fn track(&mut self, view: PacketView<'_>) -> Vec<SessionEvent<E::Key, P::Message>> {
let udp_payload: Option<Vec<u8>> = extract_udp_payload(view).map(|s| s.to_vec());
let mut flow_events = self.driver.track_pending(view);
let out = self.translate_events(&flow_events, udp_payload.as_deref());
self.driver.finalize(flow_events.as_mut_slice());
out
}
pub fn sweep(&mut self, now: Timestamp) -> Vec<SessionEvent<E::Key, P::Message>> {
let mut flow_events = self.driver.sweep_pending(now);
let out = self.translate_events(&flow_events, None);
self.driver.finalize(flow_events.as_mut_slice());
out
}
pub fn tracker(&self) -> &FlowTracker<E, S> {
self.driver.tracker()
}
pub fn tracker_mut(&mut self) -> &mut FlowTracker<E, S> {
self.driver.tracker_mut()
}
pub fn snapshot_flow_stats(&self) -> impl Iterator<Item = (E::Key, crate::FlowStats)> + '_ {
self.driver.snapshot_flow_stats()
}
pub fn dedup(&self) -> Option<&crate::dedup::Dedup> {
self.driver.dedup()
}
fn translate_events(
&mut self,
flow_events: &[FlowEvent<E::Key>],
udp_payload: Option<&[u8]>,
) -> Vec<SessionEvent<E::Key, P::Message>> {
let mut out: Vec<SessionEvent<E::Key, P::Message>> = Vec::new();
for ev in flow_events {
match ev {
FlowEvent::Started { key, ts, .. } => {
self.parsers
.entry(key.clone())
.or_insert_with(|| self.parser_factory.clone());
out.push(SessionEvent::Started {
key: key.clone(),
ts: *ts,
});
}
FlowEvent::Packet { key, side, ts, .. } => {
let Some(payload) = udp_payload else {
continue;
};
let Some(parser) = self.parsers.get_mut(key) else {
continue;
};
let messages = parser.parse(payload, *side);
for m in messages {
crate::obs::trace_session_message(*side, &m);
out.push(SessionEvent::Application {
key: key.clone(),
side: *side,
message: m,
ts: *ts,
});
}
if parser.is_poisoned() {
let reason = parser.poison_reason().map(truncate_reason);
self.synthesise_parser_poison(key, *side, reason, *ts, &mut out);
}
}
FlowEvent::Ended {
key, reason, stats, ..
} => {
self.parsers.remove(key);
out.push(SessionEvent::Closed {
key: key.clone(),
reason: *reason,
stats: stats.clone(),
});
}
FlowEvent::Anomaly { key, kind, ts } => {
out.push(SessionEvent::Anomaly {
key: key.clone(),
kind: kind.clone(),
ts: *ts,
});
}
FlowEvent::Established { .. } | FlowEvent::StateChange { .. } => {
}
}
}
out
}
fn synthesise_parser_poison(
&mut self,
key: &E::Key,
side: FlowSide,
reason: Option<String>,
ts: Timestamp,
out: &mut Vec<SessionEvent<E::Key, P::Message>>,
) {
if self.driver.emits_anomalies() {
out.push(SessionEvent::Anomaly {
key: Some(key.clone()),
kind: AnomalyKind::SessionParseError { side, reason },
ts,
});
}
let stats = self
.driver
.tracker()
.snapshot_stats(key)
.unwrap_or_default();
crate::obs::record_flow_ended(EndReason::ParseError, &stats);
crate::obs::trace_flow_ended(EndReason::ParseError, &stats);
out.push(SessionEvent::Closed {
key: key.clone(),
reason: EndReason::ParseError,
stats,
});
self.parsers.remove(key);
self.driver.tracker_mut().forget(key);
}
}
fn extract_udp_payload(view: PacketView<'_>) -> Option<&[u8]> {
let sp = etherparse::SlicedPacket::from_ethernet(view.frame).ok()?;
match sp.transport? {
etherparse::TransportSlice::Udp(udp) => Some(udp.payload()),
_ => None,
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::extract::{FiveTuple, parse::test_frames::*};
fn view(frame: &[u8], sec: u32) -> PacketView<'_> {
PacketView::new(frame, Timestamp::new(sec, 0))
}
#[derive(Default, Clone)]
struct EchoUdp;
impl DatagramParser for EchoUdp {
type Message = (FlowSide, Vec<u8>);
fn parse(&mut self, payload: &[u8], side: FlowSide) -> Vec<Self::Message> {
vec![(side, payload.to_vec())]
}
}
#[test]
fn started_and_application_for_udp_packet() {
let mut d = FlowDatagramDriver::<_, EchoUdp>::new(FiveTuple::bidirectional());
let f = ipv4_udp([10, 0, 0, 1], [10, 0, 0, 2], 1, 53, b"query");
let events = d.track(view(&f, 0));
assert!(
events
.iter()
.any(|e| matches!(e, SessionEvent::Started { .. }))
);
let app = events.iter().find_map(|e| match e {
SessionEvent::Application {
message: (s, b), ..
} => Some((*s, b.clone())),
_ => None,
});
assert_eq!(app, Some((FlowSide::Initiator, b"query".to_vec())));
}
#[test]
fn closed_event_on_idle_timeout() {
let cfg = FlowTrackerConfig {
idle_timeout_udp: std::time::Duration::from_secs(1),
..FlowTrackerConfig::default()
};
let mut d = FlowDatagramDriver::<_, EchoUdp>::with_config(FiveTuple::bidirectional(), cfg);
let f = ipv4_udp([10, 0, 0, 1], [10, 0, 0, 2], 1, 53, b"q");
d.track(view(&f, 0));
let ended = d.sweep(Timestamp::new(10, 0));
assert!(
ended
.iter()
.any(|e| matches!(e, SessionEvent::Closed { .. }))
);
}
#[test]
fn tcp_packets_do_not_fire_application_events() {
let mut d = FlowDatagramDriver::<_, EchoUdp>::new(FiveTuple::bidirectional());
let syn = ipv4_tcp(
[0; 6],
[0; 6],
[10, 0, 0, 1],
[10, 0, 0, 2],
1234,
80,
0,
0,
0x02,
b"",
);
let events = d.track(view(&syn, 0));
assert!(
events
.iter()
.any(|e| matches!(e, SessionEvent::Started { .. }))
);
assert!(
!events
.iter()
.any(|e| matches!(e, SessionEvent::Application { .. })),
"TCP packet produced an Application event in the UDP driver"
);
}
#[derive(Default, Clone)]
struct PoisonAfterBytes {
seen: usize,
poisoned: bool,
}
impl DatagramParser for PoisonAfterBytes {
type Message = ();
fn parse(&mut self, payload: &[u8], _side: FlowSide) -> Vec<()> {
self.seen += payload.len();
if self.seen > 5 {
self.poisoned = true;
}
Vec::new()
}
fn is_poisoned(&self) -> bool {
self.poisoned
}
fn poison_reason(&self) -> Option<&str> {
if self.poisoned {
Some("too many bytes")
} else {
None
}
}
}
#[test]
fn datagram_parser_poison_synthesises_parse_error_closed() {
let mut d = FlowDatagramDriver::<_, PoisonAfterBytes>::new(FiveTuple::bidirectional());
let f = ipv4_udp([10, 0, 0, 1], [10, 0, 0, 2], 1, 53, b"0123456789");
let events = d.track(view(&f, 0));
let closed = events.iter().find_map(|e| match e {
SessionEvent::Closed { reason, .. } => Some(*reason),
_ => None,
});
assert_eq!(closed, Some(EndReason::ParseError));
}
}