use netflow_parser::variable_versions::ParserConfig;
use netflow_parser::variable_versions::v9::FlowSetBody as V9FlowSetBody;
use netflow_parser::{NetflowPacket, NetflowParser, PendingFlowsConfig};
use std::time::Duration;
fn v9_template_packet() -> Vec<u8> {
vec![
0x00, 0x09, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x0C, 0x01, 0x00, 0x00, 0x01, 0x00, 0x01, 0x00, 0x04, ]
}
fn v9_data_packet() -> Vec<u8> {
vec![
0x00, 0x09, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x01, 0x01, 0x00, 0x00, 0x08, 0x00, 0x00, 0x00, 0x42, ]
}
fn ipfix_template_packet() -> Vec<u8> {
vec![
0x00, 0x0A, 0x00, 0x1C, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x00, 0x02, 0x00, 0x0C, 0x01, 0x00, 0x00, 0x01, 0x00, 0x01, 0x00, 0x04, ]
}
fn ipfix_data_packet() -> Vec<u8> {
vec![
0x00, 0x0A, 0x00, 0x18, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x01, 0x01, 0x00, 0x00, 0x08, 0x00, 0x00, 0x00, 0x42, ]
}
#[test]
fn test_pending_flows_disabled_by_default() {
let mut parser = NetflowParser::default();
let result = parser.parse_bytes(&v9_data_packet());
let v9 = match result.packets.first() {
Some(NetflowPacket::V9(v9)) => v9,
_ => panic!("Expected a V9 packet, got {:?}", result),
};
assert!(
v9.flowsets
.iter()
.any(|fs| matches!(&fs.body, V9FlowSetBody::NoTemplate(_))),
"V9 packet should contain a NoTemplate flowset for the missing template"
);
let stats = parser.v9_cache_info();
assert_eq!(
stats.pending_flow_count, 0,
"No pending flows when feature is disabled"
);
assert_eq!(
stats.metrics.pending_cached, 0,
"pending_cached metric should be zero when feature is disabled"
);
}
#[test]
fn test_ipfix_pending_flow_replay() {
let mut parser = NetflowParser::builder()
.with_ipfix_pending_flows(PendingFlowsConfig::default())
.build()
.expect("Failed to build parser");
let result1 = parser.parse_bytes(&ipfix_data_packet());
assert!(result1.is_ok(), "Data packet should parse (NoTemplate)");
let stats = parser.ipfix_cache_info();
assert_eq!(stats.pending_flow_count, 1, "Should cache one pending flow");
assert_eq!(
stats.metrics.pending_cached, 1,
"Should record pending_cached metric"
);
let result2 = parser.parse_bytes(&ipfix_template_packet());
assert!(result2.is_ok(), "Template packet should parse");
let has_data_flowset = result2.packets.iter().any(|p| {
if let NetflowPacket::IPFix(ipfix) = p {
ipfix.flowsets.iter().any(|fs| {
matches!(
&fs.body,
netflow_parser::variable_versions::ipfix::FlowSetBody::Data(_)
)
})
} else {
false
}
});
assert!(
has_data_flowset,
"Template packet output should include replayed data flowset"
);
let stats = parser.ipfix_cache_info();
assert_eq!(
stats.pending_flow_count, 0,
"Pending flows should be drained after replay"
);
assert_eq!(
stats.metrics.pending_replayed, 1,
"Should record pending_replayed metric"
);
}
#[test]
fn test_v9_pending_flow_replay() {
let mut parser = NetflowParser::builder()
.with_v9_pending_flows(PendingFlowsConfig::default())
.build()
.expect("Failed to build parser");
let result1 = parser.parse_bytes(&v9_data_packet());
assert!(result1.is_ok(), "Data packet should parse (NoTemplate)");
let stats = parser.v9_cache_info();
assert_eq!(stats.pending_flow_count, 1, "Should cache one pending flow");
let result2 = parser.parse_bytes(&v9_template_packet());
assert!(result2.is_ok(), "Template packet should parse");
let has_data_flowset = result2.packets.iter().any(|p| {
if let NetflowPacket::V9(v9) = p {
v9.flowsets.iter().any(|fs| {
matches!(
&fs.body,
netflow_parser::variable_versions::v9::FlowSetBody::Data(_)
)
})
} else {
false
}
});
assert!(
has_data_flowset,
"Template packet output should include replayed data flowset"
);
let stats = parser.v9_cache_info();
assert_eq!(
stats.pending_flow_count, 0,
"Pending flows should be drained after replay"
);
assert_eq!(
stats.metrics.pending_replayed, 1,
"Should record pending_replayed metric"
);
}
#[test]
fn test_pending_flow_ttl_expiration() {
let mut parser = NetflowParser::builder()
.with_v9_pending_flows(PendingFlowsConfig::with_ttl(256, Duration::ZERO))
.build()
.expect("Failed to build parser");
let _ = parser.parse_bytes(&v9_data_packet());
assert_eq!(parser.v9_cache_info().pending_flow_count, 1);
let result = parser.parse_bytes(&v9_template_packet());
assert!(result.is_ok());
let stats = parser.v9_cache_info();
assert_eq!(
stats.metrics.pending_replayed, 0,
"Expired flow should not be replayed"
);
assert_eq!(
stats.metrics.pending_dropped, 1,
"Expired flow should be dropped"
);
}
#[test]
fn test_pending_flow_lru_eviction() {
let mut parser = NetflowParser::builder()
.with_ipfix_pending_flows(PendingFlowsConfig::new(2)) .build()
.expect("Failed to build parser");
let _ = parser.parse_bytes(&ipfix_data_packet());
let mut data_257 = ipfix_data_packet();
data_257[16] = 0x01;
data_257[17] = 0x01; let _ = parser.parse_bytes(&data_257);
let mut data_258 = ipfix_data_packet();
data_258[16] = 0x01;
data_258[17] = 0x02; let _ = parser.parse_bytes(&data_258);
let stats = parser.ipfix_cache_info();
assert_eq!(stats.pending_flow_count, 2);
assert_eq!(stats.metrics.pending_cached, 3);
assert_eq!(stats.metrics.pending_dropped, 1);
}
#[test]
fn test_builder_propagation() {
let parser = NetflowParser::builder()
.with_pending_flows(PendingFlowsConfig::new(128))
.build()
.expect("Failed to build parser");
assert!(
parser.v9_parser().pending_flows_enabled(),
"V9 parser should have pending cache"
);
assert!(
parser.ipfix_parser().pending_flows_enabled(),
"IPFIX parser should have pending cache"
);
}
#[test]
fn test_v9_only_pending_flows() {
let parser = NetflowParser::builder()
.with_v9_pending_flows(PendingFlowsConfig::new(128))
.build()
.expect("Failed to build parser");
assert!(
parser.v9_parser().pending_flows_enabled(),
"V9 parser should have pending cache"
);
assert!(
!parser.ipfix_parser().pending_flows_enabled(),
"IPFIX parser should NOT have pending cache"
);
}
#[test]
fn test_ipfix_only_pending_flows() {
let parser = NetflowParser::builder()
.with_ipfix_pending_flows(PendingFlowsConfig::new(128))
.build()
.expect("Failed to build parser");
assert!(
!parser.v9_parser().pending_flows_enabled(),
"V9 parser should NOT have pending cache"
);
assert!(
parser.ipfix_parser().pending_flows_enabled(),
"IPFIX parser should have pending cache"
);
}
#[test]
fn test_v9_no_template_continues_parsing() {
let mut parser = NetflowParser::default();
let packet = vec![
0x00, 0x09, 0x00, 0x02, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x01, 0x00, 0x00, 0x08, 0x00, 0x00, 0x00, 0x42, 0x00, 0x00, 0x00, 0x0C, 0x01, 0x01, 0x00, 0x01, 0x00, 0x01, 0x00, 0x04, ];
let result = parser.parse_bytes(&packet);
assert!(result.is_ok(), "Packet should parse successfully");
let v9_packets: Vec<_> = result
.packets
.iter()
.filter_map(|p| {
if let NetflowPacket::V9(v9) = p {
Some(v9)
} else {
None
}
})
.collect();
assert_eq!(v9_packets.len(), 1, "Should have one V9 packet");
let v9 = &v9_packets[0];
assert_eq!(v9.flowsets.len(), 2, "Should have 2 flowsets");
let has_no_template = v9.flowsets.iter().any(|fs| {
matches!(
&fs.body,
netflow_parser::variable_versions::v9::FlowSetBody::NoTemplate(_)
)
});
let has_template = v9.flowsets.iter().any(|fs| {
matches!(
&fs.body,
netflow_parser::variable_versions::v9::FlowSetBody::Template(_)
)
});
assert!(has_no_template, "Should have NoTemplate flowset");
assert!(
has_template,
"Should have Template flowset (parsing continued)"
);
assert!(
parser.has_v9_template(257),
"Template 257 should be cached despite earlier NoTemplate"
);
}
#[test]
fn test_clear_pending_flows() {
let mut parser = NetflowParser::builder()
.with_pending_flows(PendingFlowsConfig::default())
.build()
.expect("Failed to build parser");
let _ = parser.parse_bytes(&v9_data_packet());
let _ = parser.parse_bytes(&ipfix_data_packet());
assert_eq!(parser.v9_cache_info().pending_flow_count, 1);
assert_eq!(parser.ipfix_cache_info().pending_flow_count, 1);
parser.clear_v9_pending_flows();
assert_eq!(parser.v9_cache_info().pending_flow_count, 0);
assert_eq!(parser.ipfix_cache_info().pending_flow_count, 1);
parser.clear_ipfix_pending_flows();
assert_eq!(parser.ipfix_cache_info().pending_flow_count, 0);
}
fn v9_multifield_template_packet() -> Vec<u8> {
vec![
0x00, 0x09, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x14, 0x01, 0x00, 0x00, 0x03, 0x00, 0x01, 0x00, 0x04, 0x00, 0x02, 0x00, 0x04, 0x00, 0x04, 0x00, 0x01, ]
}
fn v9_multifield_data_packet() -> Vec<u8> {
vec![
0x00, 0x09, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x01, 0x01, 0x00, 0x00, 0x10, 0x00, 0x00, 0x03, 0xE8, 0x00, 0x00, 0x00, 0x32, 0x06, 0x00, 0x00, 0x00, ]
}
fn v9_multifield_two_records_packet() -> Vec<u8> {
vec![
0x00, 0x09, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x01, 0x01, 0x00, 0x00, 0x18, 0x00, 0x00, 0x07, 0xD0, 0x00, 0x00, 0x00, 0x64, 0x11, 0x00, 0x00, 0x01, 0xF4, 0x00, 0x00, 0x00, 0x0A, 0x01, 0x00, 0x00, ]
}
fn ipfix_multifield_template_packet() -> Vec<u8> {
vec![
0x00, 0x0A, 0x00, 0x24, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x00, 0x02, 0x00, 0x14, 0x01, 0x00, 0x00, 0x03, 0x00, 0x01, 0x00, 0x04, 0x00, 0x02, 0x00, 0x04, 0x00, 0x04, 0x00, 0x01, ]
}
fn ipfix_multifield_data_packet() -> Vec<u8> {
vec![
0x00, 0x0A, 0x00, 0x20, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x01, 0x01, 0x00, 0x00, 0x10, 0x00, 0x00, 0x03, 0xE8, 0x00, 0x00, 0x00, 0x32, 0x06, 0x00, 0x00, 0x00, ]
}
fn ipfix_multifield_two_records_packet() -> Vec<u8> {
vec![
0x00, 0x0A, 0x00, 0x28, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x01, 0x01, 0x00, 0x00, 0x18, 0x00, 0x00, 0x07, 0xD0, 0x00, 0x00, 0x00, 0x64, 0x11,
0x00, 0x00, 0x01, 0xF4, 0x00, 0x00, 0x00, 0x0A, 0x01, 0x00, 0x00, ]
}
fn v9_options_template_packet() -> Vec<u8> {
vec![
0x00, 0x09, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x00, 0x01, 0x00, 0x18, 0x01, 0x02, 0x00, 0x04, 0x00, 0x08, 0x00, 0x01, 0x00, 0x04, 0x00, 0x03, 0x00, 0x04, 0x00, 0x04, 0x00, 0x04, 0x00, 0x00, ]
}
fn v9_options_data_packet() -> Vec<u8> {
vec![
0x00, 0x09, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x01, 0x01, 0x02, 0x00, 0x10, 0x0A, 0x0A, 0x0A, 0x01, 0x00, 0x00, 0x10, 0x00, 0x00, 0x00, 0x20, 0x00,
]
}
fn ipfix_options_template_packet() -> Vec<u8> {
vec![
0x00, 0x0A, 0x00, 0x26, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x00, 0x03, 0x00, 0x16, 0x01, 0x02, 0x00, 0x03, 0x00, 0x01, 0x00, 0x01, 0x00, 0x04, 0x00, 0x03, 0x00, 0x04, 0x00, 0x04, 0x00, 0x04, ]
}
fn ipfix_options_data_packet() -> Vec<u8> {
vec![
0x00, 0x0A, 0x00, 0x20, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x01, 0x01, 0x02, 0x00, 0x10, 0x0A, 0x0A, 0x0A, 0x01, 0x00, 0x00, 0x10, 0x00, 0x00, 0x00, 0x20, 0x00,
]
}
#[test]
fn test_v9_multifield_pending_replay() {
let mut parser = NetflowParser::builder()
.with_v9_pending_flows(PendingFlowsConfig::default())
.build()
.expect("Failed to build parser");
let result1 = parser.parse_bytes(&v9_multifield_data_packet());
assert!(result1.is_ok());
assert_eq!(parser.v9_cache_info().pending_flow_count, 1);
let result2 = parser.parse_bytes(&v9_multifield_template_packet());
assert!(result2.is_ok());
let has_data = result2.packets.iter().any(|p| {
if let NetflowPacket::V9(v9) = p {
v9.flowsets.iter().any(|fs| {
matches!(
&fs.body,
netflow_parser::variable_versions::v9::FlowSetBody::Data(_)
)
})
} else {
false
}
});
assert!(has_data, "Replayed multi-field data should appear");
let stats = parser.v9_cache_info();
assert_eq!(stats.pending_flow_count, 0);
assert_eq!(stats.metrics.pending_replayed, 1);
}
#[test]
fn test_ipfix_multifield_pending_replay() {
let mut parser = NetflowParser::builder()
.with_ipfix_pending_flows(PendingFlowsConfig::default())
.build()
.expect("Failed to build parser");
let result1 = parser.parse_bytes(&ipfix_multifield_data_packet());
assert!(result1.is_ok());
assert_eq!(parser.ipfix_cache_info().pending_flow_count, 1);
let result2 = parser.parse_bytes(&ipfix_multifield_template_packet());
assert!(result2.is_ok());
let has_data = result2.packets.iter().any(|p| {
if let NetflowPacket::IPFix(ipfix) = p {
ipfix.flowsets.iter().any(|fs| {
matches!(
&fs.body,
netflow_parser::variable_versions::ipfix::FlowSetBody::Data(_)
)
})
} else {
false
}
});
assert!(has_data, "Replayed multi-field data should appear");
let stats = parser.ipfix_cache_info();
assert_eq!(stats.pending_flow_count, 0);
assert_eq!(stats.metrics.pending_replayed, 1);
}
#[test]
fn test_v9_multiple_records_pending_replay() {
let mut parser = NetflowParser::builder()
.with_v9_pending_flows(PendingFlowsConfig::default())
.build()
.expect("Failed to build parser");
let result1 = parser.parse_bytes(&v9_multifield_two_records_packet());
assert!(result1.is_ok());
assert_eq!(parser.v9_cache_info().pending_flow_count, 1);
let result2 = parser.parse_bytes(&v9_multifield_template_packet());
assert!(result2.is_ok());
let record_count: usize = result2
.packets
.iter()
.filter_map(|p| {
if let NetflowPacket::V9(v9) = p {
Some(v9)
} else {
None
}
})
.flat_map(|v9| v9.flowsets.iter())
.filter_map(|fs| {
if let netflow_parser::variable_versions::v9::FlowSetBody::Data(data) = &fs.body {
Some(data.fields.len())
} else {
None
}
})
.sum();
assert_eq!(
record_count, 2,
"Should replay 2 records from the pending flowset"
);
assert_eq!(parser.v9_cache_info().metrics.pending_replayed, 1);
}
#[test]
fn test_ipfix_multiple_records_pending_replay() {
let mut parser = NetflowParser::builder()
.with_ipfix_pending_flows(PendingFlowsConfig::default())
.build()
.expect("Failed to build parser");
let result1 = parser.parse_bytes(&ipfix_multifield_two_records_packet());
assert!(result1.is_ok());
assert_eq!(parser.ipfix_cache_info().pending_flow_count, 1);
let result2 = parser.parse_bytes(&ipfix_multifield_template_packet());
assert!(result2.is_ok());
let record_count: usize = result2
.packets
.iter()
.filter_map(|p| {
if let NetflowPacket::IPFix(ipfix) = p {
Some(ipfix)
} else {
None
}
})
.flat_map(|ipfix| ipfix.flowsets.iter())
.filter_map(|fs| {
if let netflow_parser::variable_versions::ipfix::FlowSetBody::Data(data) = &fs.body
{
Some(data.fields.len())
} else {
None
}
})
.sum();
assert_eq!(
record_count, 2,
"Should replay 2 records from the pending flowset"
);
assert_eq!(parser.ipfix_cache_info().metrics.pending_replayed, 1);
}
#[test]
fn test_v9_options_pending_replay() {
let mut parser = NetflowParser::builder()
.with_v9_pending_flows(PendingFlowsConfig::default())
.build()
.expect("Failed to build parser");
let result1 = parser.parse_bytes(&v9_options_data_packet());
assert!(result1.is_ok());
assert_eq!(parser.v9_cache_info().pending_flow_count, 1);
let result2 = parser.parse_bytes(&v9_options_template_packet());
assert!(result2.is_ok());
let has_options_data = result2.packets.iter().any(|p| {
if let NetflowPacket::V9(v9) = p {
v9.flowsets.iter().any(|fs| {
matches!(
&fs.body,
netflow_parser::variable_versions::v9::FlowSetBody::OptionsData(_)
)
})
} else {
false
}
});
assert!(has_options_data, "Should replay as OptionsData flowset");
let stats = parser.v9_cache_info();
assert_eq!(stats.pending_flow_count, 0);
assert_eq!(stats.metrics.pending_replayed, 1);
}
#[test]
fn test_ipfix_options_pending_replay() {
let mut parser = NetflowParser::builder()
.with_ipfix_pending_flows(PendingFlowsConfig::default())
.build()
.expect("Failed to build parser");
let result1 = parser.parse_bytes(&ipfix_options_data_packet());
assert!(result1.is_ok());
assert_eq!(parser.ipfix_cache_info().pending_flow_count, 1);
let result2 = parser.parse_bytes(&ipfix_options_template_packet());
assert!(result2.is_ok());
let has_options_data = result2.packets.iter().any(|p| {
if let NetflowPacket::IPFix(ipfix) = p {
ipfix.flowsets.iter().any(|fs| {
matches!(
&fs.body,
netflow_parser::variable_versions::ipfix::FlowSetBody::OptionsData(_)
)
})
} else {
false
}
});
assert!(has_options_data, "Should replay as OptionsData flowset");
let stats = parser.ipfix_cache_info();
assert_eq!(stats.pending_flow_count, 0);
assert_eq!(stats.metrics.pending_replayed, 1);
}
#[test]
fn test_v9_multiple_pending_same_template() {
let mut parser = NetflowParser::builder()
.with_v9_pending_flows(PendingFlowsConfig::default())
.build()
.expect("Failed to build parser");
let _ = parser.parse_bytes(&v9_multifield_data_packet());
let _ = parser.parse_bytes(&v9_multifield_two_records_packet());
assert_eq!(
parser.v9_cache_info().pending_flow_count,
2,
"Two entries cached under template 256"
);
assert_eq!(parser.v9_cache_info().metrics.pending_cached, 2);
let result = parser.parse_bytes(&v9_multifield_template_packet());
assert!(result.is_ok());
let data_flowsets: Vec<_> = result
.packets
.iter()
.filter_map(|p| {
if let NetflowPacket::V9(v9) = p {
Some(v9)
} else {
None
}
})
.flat_map(|v9| v9.flowsets.iter())
.filter(|fs| {
matches!(
&fs.body,
netflow_parser::variable_versions::v9::FlowSetBody::Data(_)
)
})
.collect();
assert_eq!(
data_flowsets.len(),
2,
"Both pending entries should be replayed as separate Data flowsets"
);
let stats = parser.v9_cache_info();
assert_eq!(stats.pending_flow_count, 0);
assert_eq!(stats.metrics.pending_replayed, 2);
}
#[test]
fn test_ipfix_multiple_pending_same_template() {
let mut parser = NetflowParser::builder()
.with_ipfix_pending_flows(PendingFlowsConfig::default())
.build()
.expect("Failed to build parser");
let _ = parser.parse_bytes(&ipfix_multifield_data_packet());
let _ = parser.parse_bytes(&ipfix_multifield_two_records_packet());
assert_eq!(parser.ipfix_cache_info().pending_flow_count, 2);
let result = parser.parse_bytes(&ipfix_multifield_template_packet());
assert!(result.is_ok());
let data_flowsets: Vec<_> = result
.packets
.iter()
.filter_map(|p| {
if let NetflowPacket::IPFix(ipfix) = p {
Some(ipfix)
} else {
None
}
})
.flat_map(|ipfix| ipfix.flowsets.iter())
.filter(|fs| {
matches!(
&fs.body,
netflow_parser::variable_versions::ipfix::FlowSetBody::Data(_)
)
})
.collect();
assert_eq!(
data_flowsets.len(),
2,
"Both pending entries should be replayed as separate Data flowsets"
);
let stats = parser.ipfix_cache_info();
assert_eq!(stats.pending_flow_count, 0);
assert_eq!(stats.metrics.pending_replayed, 2);
}
#[test]
fn test_v9_pending_replay_with_same_packet_data() {
let mut parser = NetflowParser::builder()
.with_v9_pending_flows(PendingFlowsConfig::default())
.build()
.expect("Failed to build parser");
let _ = parser.parse_bytes(&v9_data_packet());
assert_eq!(parser.v9_cache_info().pending_flow_count, 1);
let combined_packet = vec![
0x00, 0x09, 0x00, 0x02, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x0C, 0x01, 0x00, 0x00, 0x01, 0x00, 0x01, 0x00, 0x04, 0x01, 0x00, 0x00, 0x08, 0x00, 0x00, 0x00, 0x99, ];
let result = parser.parse_bytes(&combined_packet);
assert!(
result.error.is_none(),
"Combined packet should parse without error"
);
let v9 = match result.packets.as_slice() {
[NetflowPacket::V9(v9)] => v9,
other => panic!("Expected exactly one V9 packet, got {}", other.len()),
};
let data_count = v9
.flowsets
.iter()
.filter(|fs| {
matches!(
&fs.body,
netflow_parser::variable_versions::v9::FlowSetBody::Data(_)
)
})
.count();
assert_eq!(
data_count, 2,
"Should have exactly 1 in-packet Data + 1 replayed Data flowset"
);
let stats = parser.v9_cache_info();
assert_eq!(stats.pending_flow_count, 0, "Pending should be drained");
assert_eq!(stats.metrics.pending_replayed, 1);
}
#[test]
fn test_v9_out_of_order_template_arrival() {
let mut parser = NetflowParser::builder()
.with_v9_pending_flows(PendingFlowsConfig::default())
.build()
.expect("Failed to build parser");
let make_v9_data = |tmpl_id_hi: u8, tmpl_id_lo: u8| -> Vec<u8> {
vec![
0x00, 0x09, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00,
0x00, 0x02, 0x00, 0x00, 0x00, 0x01, tmpl_id_hi, tmpl_id_lo, 0x00, 0x08, 0x00, 0x00, 0x00, 0x42,
]
};
let make_v9_template = |tmpl_id_hi: u8, tmpl_id_lo: u8| -> Vec<u8> {
vec![
0x00, 0x09, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00,
0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x0C, tmpl_id_hi, tmpl_id_lo, 0x00, 0x01, 0x00, 0x01, 0x00, 0x04, ]
};
let _ = parser.parse_bytes(&make_v9_data(0x01, 0x00)); let _ = parser.parse_bytes(&make_v9_data(0x01, 0x01)); let _ = parser.parse_bytes(&make_v9_data(0x01, 0x02)); assert_eq!(parser.v9_cache_info().pending_flow_count, 3);
let r1 = parser.parse_bytes(&make_v9_template(0x01, 0x02));
assert!(r1.is_ok());
assert_eq!(parser.v9_cache_info().pending_flow_count, 2);
assert_eq!(parser.v9_cache_info().metrics.pending_replayed, 1);
let r2 = parser.parse_bytes(&make_v9_template(0x01, 0x00));
assert!(r2.is_ok());
assert_eq!(parser.v9_cache_info().pending_flow_count, 1);
assert_eq!(parser.v9_cache_info().metrics.pending_replayed, 2);
let r3 = parser.parse_bytes(&make_v9_template(0x01, 0x01));
assert!(r3.is_ok());
assert_eq!(parser.v9_cache_info().pending_flow_count, 0);
assert_eq!(parser.v9_cache_info().metrics.pending_replayed, 3);
for (i, result) in [r1, r2, r3].iter().enumerate() {
let has_data = result.packets.iter().any(|p| {
if let NetflowPacket::V9(v9) = p {
v9.flowsets.iter().any(|fs| {
matches!(
&fs.body,
netflow_parser::variable_versions::v9::FlowSetBody::Data(_)
)
})
} else {
false
}
});
assert!(
has_data,
"Template arrival {} should include replayed data",
i + 1
);
}
}
#[test]
fn test_max_entries_per_template_v9() {
let mut config = PendingFlowsConfig::new(256);
config.max_entries_per_template = 2;
let mut parser = NetflowParser::builder()
.with_v9_pending_flows(config)
.build()
.expect("Failed to build parser");
for _ in 0..4 {
let _ = parser.parse_bytes(&v9_data_packet());
}
let stats = parser.v9_cache_info();
assert_eq!(stats.pending_flow_count, 2);
assert_eq!(stats.metrics.pending_cached, 2);
assert_eq!(stats.metrics.pending_dropped, 2);
let result = parser.parse_bytes(&v9_template_packet());
assert!(result.is_ok());
let stats = parser.v9_cache_info();
assert_eq!(stats.pending_flow_count, 0);
assert_eq!(stats.metrics.pending_replayed, 2);
}
#[test]
fn test_max_entries_per_template_ipfix() {
let mut config = PendingFlowsConfig::new(256);
config.max_entries_per_template = 3;
let mut parser = NetflowParser::builder()
.with_ipfix_pending_flows(config)
.build()
.expect("Failed to build parser");
for _ in 0..5 {
let _ = parser.parse_bytes(&ipfix_data_packet());
}
let stats = parser.ipfix_cache_info();
assert_eq!(stats.pending_flow_count, 3);
assert_eq!(stats.metrics.pending_cached, 3);
assert_eq!(stats.metrics.pending_dropped, 2);
let result = parser.parse_bytes(&ipfix_template_packet());
assert!(result.is_ok());
let stats = parser.ipfix_cache_info();
assert_eq!(stats.pending_flow_count, 0);
assert_eq!(stats.metrics.pending_replayed, 3);
}
fn v9_data_packet_with_size(payload_size: usize) -> Vec<u8> {
let flowset_len = (payload_size + 4) as u16; let mut pkt = vec![
0x00, 0x09, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x01, ];
pkt.extend_from_slice(&0x0100u16.to_be_bytes()); pkt.extend_from_slice(&flowset_len.to_be_bytes());
pkt.extend(vec![0xAA; payload_size]);
pkt
}
#[test]
fn test_max_entry_size_bytes_v9() {
let mut config = PendingFlowsConfig::new(256);
config.max_entry_size_bytes = 8;
let mut parser = NetflowParser::builder()
.with_v9_pending_flows(config)
.build()
.expect("Failed to build parser");
let _ = parser.parse_bytes(&v9_data_packet()); assert_eq!(parser.v9_cache_info().pending_flow_count, 1);
assert_eq!(parser.v9_cache_info().metrics.pending_cached, 1);
assert_eq!(parser.v9_cache_info().metrics.pending_dropped, 0);
let _ = parser.parse_bytes(&v9_data_packet_with_size(16));
assert_eq!(parser.v9_cache_info().pending_flow_count, 1);
assert_eq!(parser.v9_cache_info().metrics.pending_cached, 1);
assert_eq!(parser.v9_cache_info().metrics.pending_dropped, 1);
}
fn ipfix_data_packet_with_size(payload_size: usize) -> Vec<u8> {
let set_len = (payload_size + 4) as u16; let msg_len = 16 + set_len; let mut pkt = vec![
0x00, 0x0A, ];
pkt.extend_from_slice(&msg_len.to_be_bytes());
pkt.extend_from_slice(&[
0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x01, ]);
pkt.extend_from_slice(&0x0100u16.to_be_bytes()); pkt.extend_from_slice(&set_len.to_be_bytes());
pkt.extend(vec![0xBB; payload_size]);
pkt
}
#[test]
fn test_max_entry_size_bytes_ipfix() {
let mut config = PendingFlowsConfig::new(256);
config.max_entry_size_bytes = 8;
let mut parser = NetflowParser::builder()
.with_ipfix_pending_flows(config)
.build()
.expect("Failed to build parser");
let _ = parser.parse_bytes(&ipfix_data_packet());
assert_eq!(parser.ipfix_cache_info().pending_flow_count, 1);
assert_eq!(parser.ipfix_cache_info().metrics.pending_cached, 1);
let _ = parser.parse_bytes(&ipfix_data_packet_with_size(16));
assert_eq!(parser.ipfix_cache_info().pending_flow_count, 1);
assert_eq!(parser.ipfix_cache_info().metrics.pending_dropped, 1);
}
#[test]
fn test_zero_pending_cache_size_rejected() {
let result = NetflowParser::builder()
.with_pending_flows(PendingFlowsConfig::new(0))
.build();
assert!(result.is_err(), "max_pending_flows=0 should be rejected");
let result = NetflowParser::builder()
.with_v9_pending_flows(PendingFlowsConfig::new(0))
.build();
assert!(result.is_err(), "V9 max_pending_flows=0 should be rejected");
let result = NetflowParser::builder()
.with_ipfix_pending_flows(PendingFlowsConfig::new(0))
.build();
assert!(
result.is_err(),
"IPFIX max_pending_flows=0 should be rejected"
);
}
#[test]
fn test_dropped_cache_entry_keeps_no_template_in_output() {
let mut config = PendingFlowsConfig::new(256);
config.max_entries_per_template = 1;
let mut parser = NetflowParser::builder()
.with_v9_pending_flows(config)
.build()
.expect("Failed to build parser");
let result = parser.parse_bytes(&v9_data_packet());
let first_packets: Vec<_> = result.packets;
if let Some(NetflowPacket::V9(v9)) = first_packets.first() {
let no_template_count = v9
.flowsets
.iter()
.filter(|fs| matches!(&fs.body, V9FlowSetBody::NoTemplate(_)))
.count();
assert_eq!(
no_template_count, 0,
"Successfully cached entry should be removed from output"
);
}
let result = parser.parse_bytes(&v9_data_packet());
let second_packets: Vec<_> = result.packets;
if let Some(NetflowPacket::V9(v9)) = second_packets.first() {
let no_template_count = v9
.flowsets
.iter()
.filter(|fs| matches!(&fs.body, V9FlowSetBody::NoTemplate(_)))
.count();
assert_eq!(
no_template_count, 1,
"Dropped entry should keep NoTemplate in output"
);
} else {
panic!("Expected V9 packet");
}
assert_eq!(parser.v9_cache_info().metrics.pending_cached, 1);
assert_eq!(parser.v9_cache_info().metrics.pending_dropped, 1);
}
#[test]
fn test_partial_cache_same_template_in_single_packet() {
let mut config = PendingFlowsConfig::new(256);
config.max_entries_per_template = 1;
let mut parser = NetflowParser::builder()
.with_v9_pending_flows(config)
.build()
.expect("Failed to build parser");
let packet = vec![
0x00, 0x09, 0x00, 0x02, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x01, 0x00, 0x00, 0x08, 0x00, 0x00, 0x00, 0x41, 0x01, 0x00, 0x00, 0x08, 0x00, 0x00, 0x00, 0x42, ];
let result = parser.parse_bytes(&packet);
let packets: Vec<_> = result.packets;
let v9 = match packets.first() {
Some(NetflowPacket::V9(v9)) => v9,
_ => panic!("Expected V9 packet"),
};
let no_template_flowsets: Vec<_> = v9
.flowsets
.iter()
.filter(|fs| matches!(&fs.body, V9FlowSetBody::NoTemplate(_)))
.collect();
assert_eq!(
no_template_flowsets.len(),
1,
"Dropped NoTemplate flowset should remain in output"
);
if let V9FlowSetBody::NoTemplate(info) = &no_template_flowsets[0].body {
assert!(
!info.raw_data.is_empty(),
"Retained NoTemplate must preserve raw_data"
);
}
assert_eq!(parser.v9_cache_info().metrics.pending_cached, 1);
assert_eq!(parser.v9_cache_info().metrics.pending_dropped, 1);
}
#[test]
fn test_resize_trims_excess_entries_per_template() {
let mut config = PendingFlowsConfig::new(256);
config.max_entries_per_template = 10;
let mut parser = NetflowParser::builder()
.with_v9_pending_flows(config)
.build()
.expect("Failed to build parser");
for _ in 0..5 {
let _ = parser.parse_bytes(&v9_data_packet());
}
assert_eq!(parser.v9_cache_info().pending_flow_count, 5);
assert_eq!(parser.v9_cache_info().metrics.pending_cached, 5);
assert_eq!(parser.v9_cache_info().metrics.pending_dropped, 0);
let mut stricter = PendingFlowsConfig::new(256);
stricter.max_entries_per_template = 2;
parser
.v9_parser_mut()
.set_pending_flows_config(Some(stricter))
.expect("reconfigure should succeed");
assert_eq!(parser.v9_cache_info().pending_flow_count, 2);
assert_eq!(parser.v9_cache_info().metrics.pending_dropped, 3);
}
#[test]
fn test_resize_trims_oversize_entries() {
let mut config = PendingFlowsConfig::new(256);
config.max_entry_size_bytes = 1024;
let mut parser = NetflowParser::builder()
.with_v9_pending_flows(config)
.build()
.expect("Failed to build parser");
let _ = parser.parse_bytes(&v9_data_packet()); let _ = parser.parse_bytes(&v9_data_packet_with_size(16)); assert_eq!(parser.v9_cache_info().pending_flow_count, 2);
assert_eq!(parser.v9_cache_info().metrics.pending_dropped, 0);
let mut stricter = PendingFlowsConfig::new(256);
stricter.max_entry_size_bytes = 8;
parser
.v9_parser_mut()
.set_pending_flows_config(Some(stricter))
.expect("reconfigure should succeed");
assert_eq!(parser.v9_cache_info().pending_flow_count, 1);
assert_eq!(parser.v9_cache_info().metrics.pending_dropped, 1);
}
#[test]
fn test_resize_trims_ipfix_pending_flows() {
let mut config = PendingFlowsConfig::new(256);
config.max_entries_per_template = 10;
let mut parser = NetflowParser::builder()
.with_ipfix_pending_flows(config)
.build()
.expect("Failed to build parser");
for _ in 0..4 {
let _ = parser.parse_bytes(&ipfix_data_packet());
}
assert_eq!(parser.ipfix_cache_info().pending_flow_count, 4);
assert_eq!(parser.ipfix_cache_info().metrics.pending_dropped, 0);
let mut stricter = PendingFlowsConfig::new(256);
stricter.max_entries_per_template = 1;
parser
.ipfix_parser_mut()
.set_pending_flows_config(Some(stricter))
.expect("reconfigure should succeed");
assert_eq!(parser.ipfix_cache_info().pending_flow_count, 1);
assert_eq!(parser.ipfix_cache_info().metrics.pending_dropped, 3);
}
#[test]
fn test_cache_prunes_expired_entries_for_touched_template() {
let mut parser = NetflowParser::builder()
.with_v9_pending_flows(PendingFlowsConfig::with_ttl(256, Duration::ZERO))
.build()
.expect("Failed to build parser");
let _ = parser.parse_bytes(&v9_data_packet());
assert_eq!(parser.v9_cache_info().pending_flow_count, 1);
assert_eq!(parser.v9_cache_info().metrics.pending_cached, 1);
assert_eq!(parser.v9_cache_info().metrics.pending_dropped, 0);
let _ = parser.parse_bytes(&v9_data_packet());
assert_eq!(parser.v9_cache_info().pending_flow_count, 1);
assert_eq!(parser.v9_cache_info().metrics.pending_cached, 2);
assert_eq!(parser.v9_cache_info().metrics.pending_dropped, 1);
let _ = parser.parse_bytes(&v9_data_packet());
assert_eq!(parser.v9_cache_info().pending_flow_count, 1);
assert_eq!(parser.v9_cache_info().metrics.pending_cached, 3);
assert_eq!(parser.v9_cache_info().metrics.pending_dropped, 2);
}
#[test]
fn test_cache_purges_globally_before_lru_eviction() {
let mut config = PendingFlowsConfig::with_ttl(2, Duration::ZERO); config.max_entries_per_template = 10;
let mut parser = NetflowParser::builder()
.with_v9_pending_flows(config)
.build()
.expect("Failed to build parser");
let _ = parser.parse_bytes(&v9_data_packet()); let mut pkt_257 = v9_data_packet();
pkt_257[20] = 0x01;
pkt_257[21] = 0x01; let _ = parser.parse_bytes(&pkt_257);
assert_eq!(parser.v9_cache_info().pending_flow_count, 2);
assert_eq!(parser.v9_cache_info().metrics.pending_cached, 2);
assert_eq!(parser.v9_cache_info().metrics.pending_dropped, 0);
let mut pkt_258 = v9_data_packet();
pkt_258[20] = 0x01;
pkt_258[21] = 0x02; let _ = parser.parse_bytes(&pkt_258);
assert_eq!(parser.v9_cache_info().pending_flow_count, 1);
assert_eq!(parser.v9_cache_info().metrics.pending_cached, 3);
assert_eq!(parser.v9_cache_info().metrics.pending_dropped, 2);
}
#[test]
fn test_oversized_entry_truncated_at_parse_time_v9() {
let mut config = PendingFlowsConfig::new(256);
config.max_entry_size_bytes = 8;
let mut parser = NetflowParser::builder()
.with_v9_pending_flows(config)
.with_max_error_sample_size(4)
.build()
.expect("Failed to build parser");
let oversized_packet = vec![
0x00, 0x09, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x01, 0x01, 0x00, 0x00, 0x18, 0xAA, 0xBB, 0xCC, 0xDD, 0xEE, 0xFF, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88,
0x99, 0x00, 0xAA, 0xBB, 0xCC, 0xDD,
];
let result = parser.parse_bytes(&oversized_packet);
let packets: Vec<_> = result.packets;
let v9 = match packets.first() {
Some(NetflowPacket::V9(v9)) => v9,
_ => panic!("Expected V9 packet"),
};
let no_templates: Vec<_> = v9
.flowsets
.iter()
.filter(|fs| matches!(&fs.body, V9FlowSetBody::NoTemplate(_)))
.collect();
assert_eq!(
no_templates.len(),
1,
"Oversized entry should stay in output"
);
if let V9FlowSetBody::NoTemplate(info) = &no_templates[0].body {
assert_eq!(
info.raw_data.len(),
4,
"raw_data should be truncated to max_error_sample_size"
);
}
assert_eq!(parser.v9_cache_info().pending_flow_count, 0);
assert_eq!(parser.v9_cache_info().metrics.pending_cached, 0);
assert_eq!(parser.v9_cache_info().metrics.pending_dropped, 1);
}
#[test]
fn test_oversized_entry_truncated_at_parse_time_ipfix() {
let mut config = PendingFlowsConfig::new(256);
config.max_entry_size_bytes = 8;
let mut parser = NetflowParser::builder()
.with_ipfix_pending_flows(config)
.with_max_error_sample_size(4)
.build()
.expect("Failed to build parser");
let oversized_packet = vec![
0x00, 0x0A, 0x00, 0x28, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x01, 0x00, 0x00, 0x18, 0xAA, 0xBB, 0xCC, 0xDD, 0xEE, 0xFF, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88,
0x99, 0x00, 0xAA, 0xBB, 0xCC, 0xDD,
];
let result = parser.parse_bytes(&oversized_packet);
let packets: Vec<_> = result.packets;
let ipfix = match packets.first() {
Some(NetflowPacket::IPFix(ipfix)) => ipfix,
_ => panic!("Expected IPFIX packet"),
};
let no_templates: Vec<_> = ipfix
.flowsets
.iter()
.filter(|fs| {
matches!(
&fs.body,
netflow_parser::variable_versions::ipfix::FlowSetBody::NoTemplate(_)
)
})
.collect();
assert_eq!(
no_templates.len(),
1,
"Oversized entry should stay in output"
);
if let netflow_parser::variable_versions::ipfix::FlowSetBody::NoTemplate(info) =
&no_templates[0].body
{
assert_eq!(
info.raw_data.len(),
4,
"raw_data should be truncated to max_error_sample_size"
);
}
assert_eq!(parser.ipfix_cache_info().pending_flow_count, 0);
assert_eq!(parser.ipfix_cache_info().metrics.pending_cached, 0);
assert_eq!(parser.ipfix_cache_info().metrics.pending_dropped, 1);
}
#[test]
fn test_rejected_entry_raw_data_truncated_v9() {
let mut config = PendingFlowsConfig::new(256);
config.max_entries_per_template = 1;
let mut parser = NetflowParser::builder()
.with_v9_pending_flows(config)
.with_max_error_sample_size(4)
.build()
.expect("Failed to build parser");
let big_body_packet = vec![
0x00, 0x09, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x01, 0x01, 0x00, 0x00, 0x18, 0xAA, 0xBB, 0xCC, 0xDD, 0xEE, 0xFF, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88,
0x99, 0x00, 0xAA, 0xBB, 0xCC, 0xDD,
];
let result = parser.parse_bytes(&big_body_packet);
if let Some(NetflowPacket::V9(v9)) = result.packets.first() {
assert_eq!(
v9.flowsets
.iter()
.filter(|fs| matches!(&fs.body, V9FlowSetBody::NoTemplate(_)))
.count(),
0,
"First entry should be cached"
);
}
let result = parser.parse_bytes(&big_body_packet);
let v9 = match result.packets.first() {
Some(NetflowPacket::V9(v9)) => v9,
_ => panic!("Expected V9 packet"),
};
let no_templates: Vec<_> = v9
.flowsets
.iter()
.filter(|fs| matches!(&fs.body, V9FlowSetBody::NoTemplate(_)))
.collect();
assert_eq!(no_templates.len(), 1);
if let V9FlowSetBody::NoTemplate(info) = &no_templates[0].body {
assert_eq!(
info.raw_data.len(),
4,
"Rejected entry raw_data should be truncated to max_error_sample_size"
);
}
}
#[test]
fn test_rejected_entry_raw_data_truncated_ipfix() {
let mut config = PendingFlowsConfig::new(256);
config.max_entries_per_template = 1;
let mut parser = NetflowParser::builder()
.with_ipfix_pending_flows(config)
.with_max_error_sample_size(4)
.build()
.expect("Failed to build parser");
let big_body_packet = vec![
0x00, 0x0A, 0x00, 0x28, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x01, 0x00, 0x00, 0x18, 0xAA, 0xBB, 0xCC, 0xDD, 0xEE, 0xFF, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88,
0x99, 0x00, 0xAA, 0xBB, 0xCC, 0xDD,
];
let _ = parser.parse_bytes(&big_body_packet);
let result = parser.parse_bytes(&big_body_packet);
let ipfix = match result.packets.first() {
Some(NetflowPacket::IPFix(ipfix)) => ipfix,
_ => panic!("Expected IPFIX packet"),
};
let no_templates: Vec<_> = ipfix
.flowsets
.iter()
.filter(|fs| {
matches!(
&fs.body,
netflow_parser::variable_versions::ipfix::FlowSetBody::NoTemplate(_)
)
})
.collect();
assert_eq!(no_templates.len(), 1);
if let netflow_parser::variable_versions::ipfix::FlowSetBody::NoTemplate(info) =
&no_templates[0].body
{
assert_eq!(
info.raw_data.len(),
4,
"Rejected entry raw_data should be truncated to max_error_sample_size"
);
}
}
#[test]
fn test_would_accept_accounts_for_ttl_v9() {
let mut config = PendingFlowsConfig::with_ttl(256, Duration::ZERO);
config.max_entries_per_template = 1;
let mut parser = NetflowParser::builder()
.with_v9_pending_flows(config)
.build()
.expect("Failed to build parser");
let _ = parser.parse_bytes(&v9_data_packet());
assert_eq!(parser.v9_cache_info().pending_flow_count, 1);
assert_eq!(parser.v9_cache_info().metrics.pending_cached, 1);
let _ = parser.parse_bytes(&v9_data_packet());
assert_eq!(parser.v9_cache_info().pending_flow_count, 1);
assert_eq!(parser.v9_cache_info().metrics.pending_cached, 2);
assert_eq!(parser.v9_cache_info().metrics.pending_dropped, 1);
}
#[test]
fn test_would_accept_accounts_for_ttl_ipfix() {
let mut config = PendingFlowsConfig::with_ttl(256, Duration::ZERO);
config.max_entries_per_template = 1;
let mut parser = NetflowParser::builder()
.with_ipfix_pending_flows(config)
.build()
.expect("Failed to build parser");
let _ = parser.parse_bytes(&ipfix_data_packet());
assert_eq!(parser.ipfix_cache_info().pending_flow_count, 1);
assert_eq!(parser.ipfix_cache_info().metrics.pending_cached, 1);
let _ = parser.parse_bytes(&ipfix_data_packet());
assert_eq!(parser.ipfix_cache_info().pending_flow_count, 1);
assert_eq!(parser.ipfix_cache_info().metrics.pending_cached, 2);
assert_eq!(parser.ipfix_cache_info().metrics.pending_dropped, 1);
}
#[test]
fn test_ipfix_cache_prunes_expired_entries() {
let mut parser = NetflowParser::builder()
.with_ipfix_pending_flows(PendingFlowsConfig::with_ttl(256, Duration::ZERO))
.build()
.expect("Failed to build parser");
let _ = parser.parse_bytes(&ipfix_data_packet());
assert_eq!(parser.ipfix_cache_info().pending_flow_count, 1);
assert_eq!(parser.ipfix_cache_info().metrics.pending_dropped, 0);
let _ = parser.parse_bytes(&ipfix_data_packet());
assert_eq!(parser.ipfix_cache_info().pending_flow_count, 1);
assert_eq!(parser.ipfix_cache_info().metrics.pending_cached, 2);
assert_eq!(parser.ipfix_cache_info().metrics.pending_dropped, 1);
}