#![forbid(unsafe_code)]
#![doc = include_str!("../README.md")]
#[cfg(feature = "netflow_common")]
pub mod netflow_common;
pub mod protocol;
pub mod scoped_parser;
pub mod static_versions;
mod tests;
pub mod variable_versions;
#[cfg(feature = "netflow_common")]
use crate::netflow_common::{NetflowCommon, NetflowCommonError, NetflowCommonFlowSet};
use static_versions::{
v5::{V5, V5Parser},
v7::{V7, V7Parser},
};
use variable_versions::ParserConfig;
use variable_versions::ipfix::{IPFix, IPFixParser};
use variable_versions::v9::{V9, V9Parser};
use nom_derive::{Nom, Parse};
use serde::Serialize;
use std::sync::Arc;
fn count_valid_templates<T>(
cache: &lru::LruCache<u16, variable_versions::ttl::TemplateWithTtl<T>>,
ttl_config: &Option<variable_versions::ttl::TtlConfig>,
) -> usize {
match ttl_config {
Some(cfg) => cache.iter().filter(|(_, t)| !t.is_expired(cfg)).count(),
None => cache.len(),
}
}
pub use scoped_parser::{
AutoScopedParser, DEFAULT_MAX_SOURCES, IpfixSourceKey, RouterScopedParser, ScopingInfo,
V9SourceKey, extract_scoping_info,
};
pub use variable_versions::template_events::{
TemplateEvent, TemplateHook, TemplateHookError, TemplateHooks, TemplateProtocol,
};
pub use variable_versions::enterprise_registry::{EnterpriseFieldDef, EnterpriseFieldRegistry};
pub use variable_versions::metrics::{CacheInfo, CacheMetrics, ParserCacheInfo};
pub use variable_versions::ttl::TtlConfig;
pub use variable_versions::{
Config, ConfigError, DEFAULT_MAX_RECORDS_PER_FLOWSET, NoTemplateInfo, PendingFlowsConfig,
};
pub use variable_versions::ipfix::lookup::IpfixField;
pub use variable_versions::ipfix::{Ipfix, IpfixFieldPair, IpfixFlowRecord, IpfixParser};
pub use variable_versions::field_value::{DataNumber, FieldDataType, FieldValue};
pub use variable_versions::v9::lookup::V9Field;
pub use variable_versions::v9::{V9FieldPair, V9FlowRecord};
#[non_exhaustive]
#[derive(Debug, PartialEq, Clone, Serialize)]
pub enum NetflowPacket {
V5(V5),
V7(V7),
V9(V9),
IPFix(IPFix),
}
impl NetflowPacket {
pub fn is_v5(&self) -> bool {
matches!(self, Self::V5(_v))
}
pub fn is_v7(&self) -> bool {
matches!(self, Self::V7(_v))
}
pub fn is_v9(&self) -> bool {
matches!(self, Self::V9(_v))
}
pub fn is_ipfix(&self) -> bool {
matches!(self, Self::IPFix(_v))
}
#[cfg(feature = "netflow_common")]
pub fn as_netflow_common(&self) -> Result<NetflowCommon, NetflowCommonError> {
self.try_into()
}
}
#[non_exhaustive]
#[derive(Debug, Clone, Serialize)]
#[must_use = "parsing results should not be discarded; check .packets and .error"]
pub struct ParseResult {
pub packets: Vec<NetflowPacket>,
pub error: Option<NetflowError>,
}
impl ParseResult {
pub fn is_ok(&self) -> bool {
self.error.is_none()
}
pub fn is_err(&self) -> bool {
self.error.is_some()
}
}
#[derive(Nom)]
struct GenericNetflowHeader {
version: u16,
}
#[derive(Debug)]
pub struct NetflowParser {
pub(crate) v9_parser: V9Parser,
pub(crate) ipfix_parser: IPFixParser,
pub(crate) allowed_versions: [bool; 11],
pub(crate) max_error_sample_size: usize,
template_hooks: TemplateHooks,
}
#[derive(Clone)]
pub struct NetflowParserBuilder {
v9_config: Config,
ipfix_config: Config,
allowed_versions: [bool; 11],
requested_versions: Option<Vec<u16>>,
max_error_sample_size: usize,
template_hooks: TemplateHooks,
}
fn versions_to_array(versions: &[u16]) -> [bool; 11] {
let mut arr = [false; 11];
for &v in versions {
if matches!(v, 5 | 7 | 9 | 10) {
arr[v as usize] = true;
}
}
arr
}
impl std::fmt::Debug for NetflowParserBuilder {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("NetflowParserBuilder")
.field("v9_config", &self.v9_config)
.field("ipfix_config", &self.ipfix_config)
.field("allowed_versions", &self.allowed_versions)
.field("max_error_sample_size", &self.max_error_sample_size)
.field(
"template_hooks",
&format!("{} hooks", self.template_hooks.len()),
)
.finish()
}
}
impl Default for NetflowParserBuilder {
fn default() -> Self {
Self {
v9_config: Config::new(1000, None),
ipfix_config: Config::new(1000, None),
allowed_versions: versions_to_array(&[5, 7, 9, 10]),
requested_versions: None,
max_error_sample_size: 256,
template_hooks: TemplateHooks::new(),
}
}
}
impl NetflowParserBuilder {
#[must_use = "builder methods consume self and return a new builder; the return value must be used"]
pub fn with_cache_size(mut self, size: usize) -> Self {
self.v9_config.max_template_cache_size = size;
self.ipfix_config.max_template_cache_size = size;
self
}
#[must_use = "builder methods consume self and return a new builder; the return value must be used"]
pub fn with_v9_cache_size(mut self, size: usize) -> Self {
self.v9_config.max_template_cache_size = size;
self
}
#[must_use = "builder methods consume self and return a new builder; the return value must be used"]
pub fn with_ipfix_cache_size(mut self, size: usize) -> Self {
self.ipfix_config.max_template_cache_size = size;
self
}
#[must_use = "builder methods consume self and return a new builder; the return value must be used"]
pub fn with_max_field_count(mut self, count: usize) -> Self {
self.v9_config.max_field_count = count;
self.ipfix_config.max_field_count = count;
self
}
#[must_use = "builder methods consume self and return a new builder; the return value must be used"]
pub fn with_v9_max_field_count(mut self, count: usize) -> Self {
self.v9_config.max_field_count = count;
self
}
#[must_use = "builder methods consume self and return a new builder; the return value must be used"]
pub fn with_ipfix_max_field_count(mut self, count: usize) -> Self {
self.ipfix_config.max_field_count = count;
self
}
#[must_use = "builder methods consume self and return a new builder; the return value must be used"]
pub fn with_max_template_total_size(mut self, size: usize) -> Self {
self.v9_config.max_template_total_size = size;
self.ipfix_config.max_template_total_size = size;
self
}
#[must_use = "builder methods consume self and return a new builder; the return value must be used"]
pub fn with_v9_max_template_total_size(mut self, size: usize) -> Self {
self.v9_config.max_template_total_size = size;
self
}
#[must_use = "builder methods consume self and return a new builder; the return value must be used"]
pub fn with_ipfix_max_template_total_size(mut self, size: usize) -> Self {
self.ipfix_config.max_template_total_size = size;
self
}
#[must_use = "builder methods consume self and return a new builder; the return value must be used"]
pub fn with_ttl(mut self, ttl: variable_versions::ttl::TtlConfig) -> Self {
self.ipfix_config.ttl_config = Some(ttl.clone());
self.v9_config.ttl_config = Some(ttl);
self
}
#[must_use = "builder methods consume self and return a new builder; the return value must be used"]
pub fn with_v9_ttl(mut self, ttl: variable_versions::ttl::TtlConfig) -> Self {
self.v9_config.ttl_config = Some(ttl);
self
}
#[must_use = "builder methods consume self and return a new builder; the return value must be used"]
pub fn with_ipfix_ttl(mut self, ttl: variable_versions::ttl::TtlConfig) -> Self {
self.ipfix_config.ttl_config = Some(ttl);
self
}
#[must_use = "builder methods consume self and return a new builder; the return value must be used"]
pub fn with_allowed_versions(mut self, versions: &[u16]) -> Self {
self.allowed_versions = versions_to_array(versions);
self.requested_versions = Some(versions.to_vec());
self
}
#[must_use = "builder methods consume self and return a new builder; the return value must be used"]
pub fn with_max_error_sample_size(mut self, size: usize) -> Self {
self.max_error_sample_size = size;
self.v9_config.max_error_sample_size = size;
self.ipfix_config.max_error_sample_size = size;
self
}
#[must_use = "builder methods consume self and return a new builder; the return value must be used"]
pub fn with_max_records_per_flowset(mut self, count: usize) -> Self {
self.v9_config.max_records_per_flowset = count;
self.ipfix_config.max_records_per_flowset = count;
self
}
#[must_use = "builder methods consume self and return a new builder; the return value must be used"]
pub fn with_v9_max_records_per_flowset(mut self, count: usize) -> Self {
self.v9_config.max_records_per_flowset = count;
self
}
#[must_use = "builder methods consume self and return a new builder; the return value must be used"]
pub fn with_ipfix_max_records_per_flowset(mut self, count: usize) -> Self {
self.ipfix_config.max_records_per_flowset = count;
self
}
#[must_use = "builder methods consume self and return a new builder; the return value must be used"]
pub fn register_enterprise_field(mut self, def: EnterpriseFieldDef) -> Self {
Arc::make_mut(&mut self.ipfix_config.enterprise_registry).register(def);
self
}
#[must_use = "builder methods consume self and return a new builder; the return value must be used"]
pub fn register_enterprise_fields(
mut self,
defs: impl IntoIterator<Item = EnterpriseFieldDef>,
) -> Self {
for def in defs.into_iter() {
Arc::make_mut(&mut self.ipfix_config.enterprise_registry).register(def);
}
self
}
#[must_use = "builder methods consume self and return a new builder; the return value must be used"]
pub fn with_pending_flows(mut self, config: PendingFlowsConfig) -> Self {
self.ipfix_config.pending_flows_config = Some(config.clone());
self.v9_config.pending_flows_config = Some(config);
self
}
#[must_use = "builder methods consume self and return a new builder; the return value must be used"]
pub fn with_v9_pending_flows(mut self, config: PendingFlowsConfig) -> Self {
self.v9_config.pending_flows_config = Some(config);
self
}
#[must_use = "builder methods consume self and return a new builder; the return value must be used"]
pub fn with_ipfix_pending_flows(mut self, config: PendingFlowsConfig) -> Self {
self.ipfix_config.pending_flows_config = Some(config);
self
}
#[must_use = "builder methods consume self and return a new builder; the return value must be used"]
pub fn single_source(self) -> Self {
self
}
pub fn try_multi_source(self) -> Result<AutoScopedParser, ConfigError> {
AutoScopedParser::try_with_builder(self)
}
#[must_use = "builder methods consume self and return a new builder; the return value must be used"]
pub fn on_template_event<F>(mut self, hook: F) -> Self
where
F: Fn(&TemplateEvent) -> Result<(), TemplateHookError> + Send + Sync + 'static,
{
self.template_hooks.register(hook);
self
}
pub fn validate(&self) -> Result<(), ConfigError> {
V9Parser::validate_config(&self.v9_config)?;
IPFixParser::validate_config(&self.ipfix_config)?;
if let Some(versions) = &self.requested_versions {
if versions.is_empty() {
return Err(ConfigError::EmptyAllowedVersions);
}
for &v in versions {
if !matches!(v, 5 | 7 | 9 | 10) {
return Err(ConfigError::InvalidAllowedVersion(v));
}
}
}
Ok(())
}
pub fn build(self) -> Result<NetflowParser, ConfigError> {
self.validate()?;
let v9_parser = V9Parser::try_new(self.v9_config)?;
let ipfix_parser = IPFixParser::try_new(self.ipfix_config)?;
Ok(NetflowParser {
v9_parser,
ipfix_parser,
allowed_versions: self.allowed_versions,
max_error_sample_size: self.max_error_sample_size,
template_hooks: self.template_hooks,
})
}
}
#[derive(Debug, Clone)]
pub(crate) enum ParsedNetflow<'a> {
Success {
packet: NetflowPacket,
remaining: &'a [u8],
},
Error {
error: NetflowError,
},
UnallowedVersion {
version: u16,
},
}
#[non_exhaustive]
#[derive(Debug, PartialEq, Clone, Serialize)]
pub enum NetflowError {
Incomplete {
available: usize,
context: String,
},
UnsupportedVersion {
version: u16,
offset: usize,
sample: Vec<u8>,
},
FilteredVersion {
version: u16,
},
ParseError {
offset: usize,
context: String,
kind: String,
remaining: Vec<u8>,
},
Partial {
message: String,
},
}
impl std::fmt::Display for NetflowError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
NetflowError::Incomplete { available, context } => {
write!(
f,
"Incomplete data: {} (only {} bytes available)",
context, available
)
}
NetflowError::UnsupportedVersion {
version, offset, ..
} => {
write!(
f,
"Unsupported NetFlow version {} at offset {}",
version, offset
)
}
NetflowError::FilteredVersion { version } => {
write!(
f,
"NetFlow version {} filtered out by allowed_versions configuration",
version
)
}
NetflowError::ParseError {
offset,
context,
kind,
..
} => {
write!(
f,
"Parse error at offset {}: {} ({})",
offset, context, kind
)
}
NetflowError::Partial { message } => {
write!(f, "Partial parse error: {}", message)
}
}
}
}
impl std::error::Error for NetflowError {}
pub struct NetflowPacketIterator<'a> {
parser: &'a mut NetflowParser,
remaining: &'a [u8],
errored: bool,
}
impl std::fmt::Debug for NetflowPacketIterator<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("NetflowPacketIterator")
.field("remaining_bytes", &self.remaining.len())
.field("errored", &self.errored)
.finish()
}
}
impl<'a> NetflowPacketIterator<'a> {
pub fn remaining(&self) -> &'a [u8] {
self.remaining
}
pub fn is_complete(&self) -> bool {
self.remaining.is_empty() || self.errored
}
}
impl<'a> Iterator for NetflowPacketIterator<'a> {
type Item = Result<NetflowPacket, NetflowError>;
fn next(&mut self) -> Option<Self::Item> {
if self.errored || self.remaining.is_empty() {
return None;
}
match self.parser.parse_packet_by_version(self.remaining) {
ParsedNetflow::Success {
packet,
remaining: new_remaining,
} => {
self.remaining = new_remaining;
Some(Ok(packet))
}
ParsedNetflow::UnallowedVersion { version } => {
self.errored = true;
self.remaining = &[];
Some(Err(NetflowError::FilteredVersion { version }))
}
ParsedNetflow::Error { error } => {
self.errored = true;
self.remaining = &[];
Some(Err(error))
}
}
}
}
impl Default for NetflowParser {
fn default() -> Self {
Self {
v9_parser: V9Parser::default(),
ipfix_parser: IPFixParser::default(),
allowed_versions: versions_to_array(&[5, 7, 9, 10]),
max_error_sample_size: 256,
template_hooks: TemplateHooks::new(),
}
}
}
impl NetflowParser {
pub fn builder() -> NetflowParserBuilder {
NetflowParserBuilder::default()
}
pub fn allowed_versions(&self) -> &[bool; 11] {
&self.allowed_versions
}
pub fn is_version_allowed(&self, version: u16) -> bool {
(version as usize) < self.allowed_versions.len()
&& self.allowed_versions[version as usize]
}
pub fn max_error_sample_size(&self) -> usize {
self.max_error_sample_size
}
pub fn v9_parser(&self) -> &V9Parser {
&self.v9_parser
}
pub fn v9_parser_mut(&mut self) -> &mut V9Parser {
&mut self.v9_parser
}
pub fn ipfix_parser(&self) -> &IPFixParser {
&self.ipfix_parser
}
pub fn ipfix_parser_mut(&mut self) -> &mut IPFixParser {
&mut self.ipfix_parser
}
pub fn v9_cache_info(&self) -> CacheInfo {
CacheInfo {
current_size: count_valid_templates(
&self.v9_parser.templates,
&self.v9_parser.ttl_config,
) + count_valid_templates(
&self.v9_parser.options_templates,
&self.v9_parser.ttl_config,
),
max_size_per_cache: self.v9_parser.max_template_cache_size,
num_caches: 2,
ttl_config: self.v9_parser.ttl_config.clone(),
metrics: self.v9_parser.metrics.snapshot(),
pending_flow_count: self.v9_parser.pending_flow_count(),
}
}
pub fn ipfix_cache_info(&self) -> CacheInfo {
let ttl = &self.ipfix_parser.ttl_config;
CacheInfo {
current_size: count_valid_templates(&self.ipfix_parser.templates, ttl)
+ count_valid_templates(&self.ipfix_parser.v9_templates, ttl)
+ count_valid_templates(&self.ipfix_parser.ipfix_options_templates, ttl)
+ count_valid_templates(&self.ipfix_parser.v9_options_templates, ttl),
max_size_per_cache: self.ipfix_parser.max_template_cache_size,
num_caches: 4,
ttl_config: self.ipfix_parser.ttl_config.clone(),
metrics: self.ipfix_parser.metrics.snapshot(),
pending_flow_count: self.ipfix_parser.pending_flow_count(),
}
}
pub fn v9_template_ids(&self) -> Vec<u16> {
let mut ids: Vec<u16> = self.v9_parser.templates.iter().map(|(id, _)| *id).collect();
ids.extend(self.v9_parser.options_templates.iter().map(|(id, _)| *id));
ids.sort_unstable();
ids.dedup();
ids
}
pub fn ipfix_template_ids(&self) -> Vec<u16> {
let mut ids: Vec<u16> = self
.ipfix_parser
.templates
.iter()
.map(|(id, _)| *id)
.collect();
ids.extend(self.ipfix_parser.v9_templates.iter().map(|(id, _)| *id));
ids.extend(
self.ipfix_parser
.ipfix_options_templates
.iter()
.map(|(id, _)| *id),
);
ids.extend(
self.ipfix_parser
.v9_options_templates
.iter()
.map(|(id, _)| *id),
);
ids.sort_unstable();
ids.dedup();
ids
}
pub fn has_v9_template(&self, template_id: u16) -> bool {
self.v9_parser
.templates
.peek(&template_id)
.is_some_and(|t| {
self.v9_parser
.ttl_config
.as_ref()
.is_none_or(|cfg| !t.is_expired(cfg))
})
|| self
.v9_parser
.options_templates
.peek(&template_id)
.is_some_and(|t| {
self.v9_parser
.ttl_config
.as_ref()
.is_none_or(|cfg| !t.is_expired(cfg))
})
}
pub fn has_ipfix_template(&self, template_id: u16) -> bool {
let ttl = &self.ipfix_parser.ttl_config;
self.ipfix_parser
.templates
.peek(&template_id)
.is_some_and(|t| ttl.as_ref().is_none_or(|cfg| !t.is_expired(cfg)))
|| self
.ipfix_parser
.v9_templates
.peek(&template_id)
.is_some_and(|t| ttl.as_ref().is_none_or(|cfg| !t.is_expired(cfg)))
|| self
.ipfix_parser
.ipfix_options_templates
.peek(&template_id)
.is_some_and(|t| ttl.as_ref().is_none_or(|cfg| !t.is_expired(cfg)))
|| self
.ipfix_parser
.v9_options_templates
.peek(&template_id)
.is_some_and(|t| ttl.as_ref().is_none_or(|cfg| !t.is_expired(cfg)))
}
pub fn clear_v9_templates(&mut self) {
self.v9_parser.templates.clear();
self.v9_parser.options_templates.clear();
self.v9_parser.clear_pending_flows();
}
pub fn clear_ipfix_templates(&mut self) {
self.ipfix_parser.templates.clear();
self.ipfix_parser.v9_templates.clear();
self.ipfix_parser.ipfix_options_templates.clear();
self.ipfix_parser.v9_options_templates.clear();
self.ipfix_parser.clear_pending_flows();
}
pub fn clear_v9_pending_flows(&mut self) {
self.v9_parser.clear_pending_flows();
}
pub fn clear_ipfix_pending_flows(&mut self) {
self.ipfix_parser.clear_pending_flows();
}
#[inline]
pub fn trigger_template_event(&mut self, event: TemplateEvent) {
self.template_hooks.trigger(&event);
}
#[inline]
pub fn hook_error_count(&self) -> u64 {
self.template_hooks.hook_error_count()
}
#[inline]
pub fn parse_bytes(&mut self, packet: &[u8]) -> ParseResult {
if packet.is_empty() {
return ParseResult {
packets: vec![],
error: None,
};
}
let mut packets = Vec::new();
let mut remaining = packet;
let mut error = None;
while !remaining.is_empty() {
match self.parse_packet_by_version(remaining) {
ParsedNetflow::Success {
packet,
remaining: new_remaining,
} => {
packets.push(packet);
remaining = new_remaining;
}
ParsedNetflow::UnallowedVersion { version } => {
error = Some(NetflowError::FilteredVersion { version });
break;
}
ParsedNetflow::Error { error: e } => {
error = Some(e);
break;
}
}
}
ParseResult { packets, error }
}
#[inline]
pub fn iter_packets<'a>(&'a mut self, packet: &'a [u8]) -> NetflowPacketIterator<'a> {
NetflowPacketIterator {
parser: self,
remaining: packet,
errored: false,
}
}
#[inline]
fn parse_packet_by_version<'a>(&mut self, packet: &'a [u8]) -> ParsedNetflow<'a> {
let hooks_active = !self.template_hooks.is_empty();
let v9_metrics_before = if hooks_active {
Some(self.v9_parser.metrics)
} else {
None
};
let ipfix_metrics_before = if hooks_active {
Some(self.ipfix_parser.metrics)
} else {
None
};
let result = match GenericNetflowHeader::parse(packet) {
Ok((remaining, header))
if (header.version as usize) < self.allowed_versions.len()
&& self.allowed_versions[header.version as usize] =>
{
match header.version {
5 => V5Parser::parse(remaining),
7 => V7Parser::parse(remaining),
9 => self.v9_parser.parse(remaining),
10 => self.ipfix_parser.parse(remaining),
_ => unreachable!("allowed_versions guard only permits 5/7/9/10"),
}
}
Ok((_, header)) if matches!(header.version, 5 | 7 | 9 | 10) => {
ParsedNetflow::UnallowedVersion {
version: header.version,
}
}
Ok((remaining, header)) => {
ParsedNetflow::Error {
error: NetflowError::UnsupportedVersion {
version: header.version,
offset: 0,
sample: remaining[..remaining.len().min(self.max_error_sample_size)]
.to_vec(),
},
}
}
Err(e) => ParsedNetflow::Error {
error: NetflowError::Incomplete {
available: packet.len(),
context: format!("NetFlow header: {}", e),
},
},
};
if hooks_active {
if let ParsedNetflow::Success { ref packet, .. } = result {
self.fire_template_events(packet);
}
if let Some(before) = v9_metrics_before {
let after = self.v9_parser.metrics;
self.fire_metric_delta_events(&before, &after, TemplateProtocol::V9);
}
if let Some(before) = ipfix_metrics_before {
let after = self.ipfix_parser.metrics;
self.fire_metric_delta_events(&before, &after, TemplateProtocol::Ipfix);
}
}
result
}
fn fire_metric_delta_events(
&mut self,
before: &variable_versions::metrics::CacheMetricsInner,
after: &variable_versions::metrics::CacheMetricsInner,
protocol: TemplateProtocol,
) {
const MAX_EVENTS_PER_TYPE: u64 = 64;
let new_collisions = after
.collisions
.saturating_sub(before.collisions)
.min(MAX_EVENTS_PER_TYPE);
for _ in 0..new_collisions {
self.template_hooks.trigger(&TemplateEvent::Collision {
template_id: None, protocol,
});
}
let new_evictions = after
.evictions
.saturating_sub(before.evictions)
.min(MAX_EVENTS_PER_TYPE);
for _ in 0..new_evictions {
self.template_hooks.trigger(&TemplateEvent::Evicted {
template_id: None, protocol,
});
}
let new_expirations = after
.expired
.saturating_sub(before.expired)
.min(MAX_EVENTS_PER_TYPE);
for _ in 0..new_expirations {
self.template_hooks.trigger(&TemplateEvent::Expired {
template_id: None, protocol,
});
}
}
fn fire_template_events(&mut self, packet: &NetflowPacket) {
match packet {
NetflowPacket::V9(v9) => {
for fs in &v9.flowsets {
match &fs.body {
variable_versions::v9::FlowSetBody::Template(templates) => {
for t in &templates.templates {
self.template_hooks.trigger(&TemplateEvent::Learned {
template_id: Some(t.template_id),
protocol: TemplateProtocol::V9,
});
}
}
variable_versions::v9::FlowSetBody::OptionsTemplate(templates) => {
for t in &templates.templates {
self.template_hooks.trigger(&TemplateEvent::Learned {
template_id: Some(t.template_id),
protocol: TemplateProtocol::V9,
});
}
}
variable_versions::v9::FlowSetBody::NoTemplate(info) => {
self.template_hooks
.trigger(&TemplateEvent::MissingTemplate {
template_id: Some(info.template_id),
protocol: TemplateProtocol::V9,
});
}
variable_versions::v9::FlowSetBody::Data(_)
| variable_versions::v9::FlowSetBody::OptionsData(_)
| variable_versions::v9::FlowSetBody::Empty => {}
}
}
}
NetflowPacket::IPFix(ipfix) => {
for fs in &ipfix.flowsets {
match &fs.body {
variable_versions::ipfix::FlowSetBody::Template(t) => {
self.template_hooks.trigger(&TemplateEvent::Learned {
template_id: Some(t.template_id),
protocol: TemplateProtocol::Ipfix,
});
}
variable_versions::ipfix::FlowSetBody::Templates(ts) => {
for t in ts {
self.template_hooks.trigger(&TemplateEvent::Learned {
template_id: Some(t.template_id),
protocol: TemplateProtocol::Ipfix,
});
}
}
variable_versions::ipfix::FlowSetBody::V9Template(t) => {
self.template_hooks.trigger(&TemplateEvent::Learned {
template_id: Some(t.template_id),
protocol: TemplateProtocol::Ipfix,
});
}
variable_versions::ipfix::FlowSetBody::V9Templates(ts) => {
for t in ts {
self.template_hooks.trigger(&TemplateEvent::Learned {
template_id: Some(t.template_id),
protocol: TemplateProtocol::Ipfix,
});
}
}
variable_versions::ipfix::FlowSetBody::OptionsTemplate(t) => {
self.template_hooks.trigger(&TemplateEvent::Learned {
template_id: Some(t.template_id),
protocol: TemplateProtocol::Ipfix,
});
}
variable_versions::ipfix::FlowSetBody::OptionsTemplates(ts) => {
for t in ts {
self.template_hooks.trigger(&TemplateEvent::Learned {
template_id: Some(t.template_id),
protocol: TemplateProtocol::Ipfix,
});
}
}
variable_versions::ipfix::FlowSetBody::V9OptionsTemplate(t) => {
self.template_hooks.trigger(&TemplateEvent::Learned {
template_id: Some(t.template_id),
protocol: TemplateProtocol::Ipfix,
});
}
variable_versions::ipfix::FlowSetBody::V9OptionsTemplates(ts) => {
for t in ts {
self.template_hooks.trigger(&TemplateEvent::Learned {
template_id: Some(t.template_id),
protocol: TemplateProtocol::Ipfix,
});
}
}
variable_versions::ipfix::FlowSetBody::NoTemplate(info) => {
self.template_hooks
.trigger(&TemplateEvent::MissingTemplate {
template_id: Some(info.template_id),
protocol: TemplateProtocol::Ipfix,
});
}
variable_versions::ipfix::FlowSetBody::Data(_)
| variable_versions::ipfix::FlowSetBody::OptionsData(_)
| variable_versions::ipfix::FlowSetBody::V9Data(_)
| variable_versions::ipfix::FlowSetBody::V9OptionsData(_)
| variable_versions::ipfix::FlowSetBody::Empty => {}
}
}
}
NetflowPacket::V5(_) | NetflowPacket::V7(_) => {}
}
}
#[cfg(feature = "netflow_common")]
#[inline]
pub fn parse_bytes_as_netflow_common_flowsets(
&mut self,
packet: &[u8],
) -> (Vec<NetflowCommonFlowSet>, Option<NetflowError>) {
let netflow_packets = self.parse_bytes(packet);
let flowsets = netflow_packets
.packets
.iter()
.flat_map(|n| n.as_netflow_common().unwrap_or_default().flowsets)
.collect();
(flowsets, netflow_packets.error)
}
}