#[cfg(feature = "reassembler")]
use crate::event::AnomalyKind;
use crate::event::{EndReason, FlowStats};
use crate::extractor::L4Proto;
pub const METRIC_FLOWS_CREATED: &str = "flowscope_flows_created_total";
pub const METRIC_FLOWS_ENDED: &str = "flowscope_flows_ended_total";
pub const METRIC_FLOWS_ACTIVE: &str = "flowscope_flows_active";
pub const METRIC_PACKETS_UNMATCHED: &str = "flowscope_packets_unmatched_total";
pub const METRIC_BYTES: &str = "flowscope_bytes_total";
pub const METRIC_FLOW_DURATION_SECONDS: &str = "flowscope_flow_duration_seconds";
pub const METRIC_FLOW_PACKETS: &str = "flowscope_flow_packets";
pub const METRIC_FLOW_BYTES: &str = "flowscope_flow_bytes";
pub const METRIC_ANOMALIES: &str = "flowscope_anomalies_total";
pub const METRIC_REASSEMBLY_DROPPED_OOO: &str = "flowscope_reassembly_dropped_ooo_total";
pub const METRIC_REASSEMBLY_BYTES_DROPPED_OVERSIZE: &str =
"flowscope_reassembly_bytes_dropped_oversize_total";
pub const METRIC_REASSEMBLER_HIGH_WATERMARK: &str = "flowscope_reassembler_high_watermark_bytes";
#[cfg(feature = "metrics")]
fn l4_label(l4: Option<L4Proto>) -> &'static str {
match l4 {
Some(L4Proto::Tcp) => "tcp",
Some(L4Proto::Udp) => "udp",
_ => "other",
}
}
#[cfg(feature = "metrics")]
fn reason_label(reason: EndReason) -> &'static str {
match reason {
EndReason::Fin => "fin",
EndReason::Rst => "rst",
EndReason::IdleTimeout => "idle",
EndReason::Evicted => "evicted",
EndReason::BufferOverflow => "buffer_overflow",
EndReason::ParseError => "parse_error",
}
}
#[cfg(all(feature = "metrics", feature = "reassembler"))]
fn anomaly_label(kind: &AnomalyKind) -> &'static str {
match kind {
AnomalyKind::BufferOverflow { .. } => "buffer_overflow",
AnomalyKind::OutOfOrderSegment { .. } => "ooo_segment",
AnomalyKind::FlowTableEvictionPressure { .. } => "flow_table_eviction",
AnomalyKind::SessionParseError { .. } => "parse_error",
}
}
#[cfg(feature = "metrics")]
pub(crate) fn record_flow_created(l4: Option<L4Proto>) {
metrics::counter!(METRIC_FLOWS_CREATED, "l4" => l4_label(l4)).increment(1);
metrics::gauge!(METRIC_FLOWS_ACTIVE).increment(1.0);
}
#[cfg(feature = "metrics")]
pub(crate) fn record_flow_ended(reason: EndReason, stats: &FlowStats) {
metrics::counter!(METRIC_FLOWS_ENDED, "reason" => reason_label(reason)).increment(1);
metrics::gauge!(METRIC_FLOWS_ACTIVE).decrement(1.0);
metrics::counter!(METRIC_BYTES, "side" => "initiator").increment(stats.bytes_initiator);
metrics::counter!(METRIC_BYTES, "side" => "responder").increment(stats.bytes_responder);
let duration = duration_seconds(stats);
metrics::histogram!(METRIC_FLOW_DURATION_SECONDS).record(duration);
metrics::histogram!(METRIC_FLOW_PACKETS)
.record((stats.packets_initiator + stats.packets_responder) as f64);
metrics::histogram!(METRIC_FLOW_BYTES)
.record((stats.bytes_initiator + stats.bytes_responder) as f64);
if stats.reassembly_dropped_ooo_initiator > 0 {
metrics::counter!(METRIC_REASSEMBLY_DROPPED_OOO, "side" => "initiator")
.increment(stats.reassembly_dropped_ooo_initiator);
}
if stats.reassembly_dropped_ooo_responder > 0 {
metrics::counter!(METRIC_REASSEMBLY_DROPPED_OOO, "side" => "responder")
.increment(stats.reassembly_dropped_ooo_responder);
}
if stats.reassembly_bytes_dropped_oversize_initiator > 0 {
metrics::counter!(METRIC_REASSEMBLY_BYTES_DROPPED_OVERSIZE, "side" => "initiator")
.increment(stats.reassembly_bytes_dropped_oversize_initiator);
}
if stats.reassembly_bytes_dropped_oversize_responder > 0 {
metrics::counter!(METRIC_REASSEMBLY_BYTES_DROPPED_OVERSIZE, "side" => "responder")
.increment(stats.reassembly_bytes_dropped_oversize_responder);
}
if stats.reassembler_high_watermark_initiator > 0 {
metrics::histogram!(METRIC_REASSEMBLER_HIGH_WATERMARK, "side" => "initiator")
.record(stats.reassembler_high_watermark_initiator as f64);
}
if stats.reassembler_high_watermark_responder > 0 {
metrics::histogram!(METRIC_REASSEMBLER_HIGH_WATERMARK, "side" => "responder")
.record(stats.reassembler_high_watermark_responder as f64);
}
}
#[cfg(feature = "metrics")]
pub(crate) fn record_packet_unmatched() {
metrics::counter!(METRIC_PACKETS_UNMATCHED).increment(1);
}
#[cfg(all(feature = "metrics", feature = "reassembler"))]
pub(crate) fn record_anomaly(kind: &AnomalyKind) {
metrics::counter!(METRIC_ANOMALIES, "kind" => anomaly_label(kind)).increment(1);
}
#[cfg(feature = "metrics")]
fn duration_seconds(stats: &FlowStats) -> f64 {
let start = stats.started.to_duration();
let end = stats.last_seen.to_duration();
end.saturating_sub(start).as_secs_f64()
}
#[cfg(not(feature = "metrics"))]
#[inline(always)]
pub(crate) fn record_flow_created(_l4: Option<L4Proto>) {}
#[cfg(not(feature = "metrics"))]
#[inline(always)]
pub(crate) fn record_flow_ended(_reason: EndReason, _stats: &FlowStats) {}
#[cfg(not(feature = "metrics"))]
#[inline(always)]
pub(crate) fn record_packet_unmatched() {}
#[cfg(all(not(feature = "metrics"), feature = "reassembler"))]
#[inline(always)]
pub(crate) fn record_anomaly(_kind: &AnomalyKind) {}
#[cfg(feature = "tracing")]
pub(crate) fn trace_flow_started(l4: Option<L4Proto>) {
tracing::info!(target: "flowscope.flow", ?l4, "flow started");
}
#[cfg(feature = "tracing")]
pub(crate) fn trace_flow_ended(reason: EndReason, stats: &FlowStats) {
tracing::info!(
target: "flowscope.flow",
?reason,
packets = stats.packets_initiator + stats.packets_responder,
bytes = stats.bytes_initiator + stats.bytes_responder,
"flow ended"
);
}
#[cfg(all(feature = "tracing", feature = "reassembler"))]
pub(crate) fn trace_anomaly(kind: &AnomalyKind) {
tracing::warn!(target: "flowscope.anomaly", ?kind, "anomaly");
}
#[cfg(not(feature = "tracing"))]
#[inline(always)]
pub(crate) fn trace_flow_started(_l4: Option<L4Proto>) {}
#[cfg(not(feature = "tracing"))]
#[inline(always)]
pub(crate) fn trace_flow_ended(_reason: EndReason, _stats: &FlowStats) {}
#[cfg(all(not(feature = "tracing"), feature = "reassembler"))]
#[inline(always)]
pub(crate) fn trace_anomaly(_kind: &AnomalyKind) {}
#[cfg(all(feature = "tracing-messages", feature = "session"))]
pub(crate) fn trace_session_message<M: std::fmt::Debug>(side: crate::event::FlowSide, msg: &M) {
tracing::trace!(
target: "flowscope.message",
?side,
message = ?msg,
"session message"
);
}
#[cfg(all(not(feature = "tracing-messages"), feature = "session"))]
#[inline(always)]
pub(crate) fn trace_session_message<M>(_side: crate::event::FlowSide, _msg: &M) {}