use netflow_parser::{
AutoScopedParser, InMemoryTemplateStore, NetflowPacket, NetflowParser, TemplateEvent,
TemplateKind, TemplateProtocol, TemplateStore, TemplateStoreError, TemplateStoreKey,
};
use std::net::SocketAddr;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
fn v9_template_packet(template_id: u16, fields: &[(u16, u16)]) -> Vec<u8> {
let template_record_len = 4 + fields.len() * 4;
let flowset_len = 4 + template_record_len; let mut pkt = Vec::new();
pkt.extend_from_slice(&9u16.to_be_bytes()); pkt.extend_from_slice(&1u16.to_be_bytes()); pkt.extend_from_slice(&0u32.to_be_bytes()); pkt.extend_from_slice(&0u32.to_be_bytes()); pkt.extend_from_slice(&0u32.to_be_bytes()); pkt.extend_from_slice(&0u32.to_be_bytes()); pkt.extend_from_slice(&0u16.to_be_bytes()); pkt.extend_from_slice(&(flowset_len as u16).to_be_bytes());
pkt.extend_from_slice(&template_id.to_be_bytes());
pkt.extend_from_slice(&(fields.len() as u16).to_be_bytes());
for &(ft, fl) in fields {
pkt.extend_from_slice(&ft.to_be_bytes());
pkt.extend_from_slice(&fl.to_be_bytes());
}
pkt
}
fn v9_data_packet(template_id: u16, payload: &[u8]) -> Vec<u8> {
let flowset_len = 4 + payload.len();
let mut pkt = Vec::new();
pkt.extend_from_slice(&9u16.to_be_bytes());
pkt.extend_from_slice(&1u16.to_be_bytes());
pkt.extend_from_slice(&0u32.to_be_bytes());
pkt.extend_from_slice(&0u32.to_be_bytes());
pkt.extend_from_slice(&0u32.to_be_bytes());
pkt.extend_from_slice(&0u32.to_be_bytes());
pkt.extend_from_slice(&template_id.to_be_bytes());
pkt.extend_from_slice(&(flowset_len as u16).to_be_bytes());
pkt.extend_from_slice(payload);
pkt
}
fn ipfix_template_packet(template_id: u16, fields: &[(u16, u16)]) -> Vec<u8> {
let template_record_len = 4 + fields.len() * 4;
let set_len = (4 + template_record_len) as u16;
let msg_len = 16 + set_len;
let mut pkt = Vec::new();
pkt.extend_from_slice(&0x000Au16.to_be_bytes());
pkt.extend_from_slice(&msg_len.to_be_bytes());
pkt.extend_from_slice(&1u32.to_be_bytes());
pkt.extend_from_slice(&1u32.to_be_bytes());
pkt.extend_from_slice(&1u32.to_be_bytes());
pkt.extend_from_slice(&2u16.to_be_bytes());
pkt.extend_from_slice(&set_len.to_be_bytes());
pkt.extend_from_slice(&template_id.to_be_bytes());
pkt.extend_from_slice(&(fields.len() as u16).to_be_bytes());
for &(ft, fl) in fields {
pkt.extend_from_slice(&ft.to_be_bytes());
pkt.extend_from_slice(&fl.to_be_bytes());
}
pkt
}
fn ipfix_data_packet(template_id: u16, payload: &[u8]) -> Vec<u8> {
let set_len = (4 + payload.len()) as u16;
let msg_len = 16 + set_len;
let mut pkt = Vec::new();
pkt.extend_from_slice(&0x000Au16.to_be_bytes());
pkt.extend_from_slice(&msg_len.to_be_bytes());
pkt.extend_from_slice(&1u32.to_be_bytes());
pkt.extend_from_slice(&2u32.to_be_bytes());
pkt.extend_from_slice(&1u32.to_be_bytes());
pkt.extend_from_slice(&template_id.to_be_bytes());
pkt.extend_from_slice(&set_len.to_be_bytes());
pkt.extend_from_slice(payload);
pkt
}
#[test]
fn v9_template_is_written_through_on_learn() {
let store = Arc::new(InMemoryTemplateStore::new());
let mut parser = NetflowParser::builder()
.with_template_store(store.clone())
.build()
.expect("build");
let pkt = v9_template_packet(256, &[(1, 4)]);
let result = parser.parse_bytes(&pkt);
assert!(
result.error.is_none(),
"template parse error: {:?}",
result.error
);
assert!(!result.packets.is_empty());
let key = TemplateStoreKey::new("", TemplateKind::V9Data, 256);
let bytes = store
.get(&key)
.expect("get")
.expect("template should be persisted");
assert!(!bytes.is_empty());
assert_eq!(store.len(), 1);
}
#[test]
fn v9_data_record_is_decoded_via_read_through_after_replica_restart() {
let store = Arc::new(InMemoryTemplateStore::new());
{
let mut a = NetflowParser::builder()
.with_template_store(store.clone())
.build()
.expect("build");
let tmpl = v9_template_packet(256, &[(1, 4)]);
assert!(a.parse_bytes(&tmpl).error.is_none());
}
let mut b = NetflowParser::builder()
.with_template_store(store.clone())
.build()
.expect("build");
let data = v9_data_packet(256, &[0, 0, 0, 0x2A]); let result = b.parse_bytes(&data);
assert!(
result.error.is_none(),
"data parse error: {:?}",
result.error
);
let pkt = result.packets.into_iter().next().expect("one packet");
let v9 = match pkt {
NetflowPacket::V9(v) => v,
other => panic!("expected V9, got {:?}", other),
};
let body = v9.flowsets.into_iter().next().expect("one flowset").body;
let is_data = matches!(
body,
netflow_parser::variable_versions::v9::FlowSetBody::Data(_)
);
assert!(is_data, "expected Data flowset, got {:?}", body);
assert!(b.has_v9_template(256));
}
#[test]
fn v9_no_store_baseline_unchanged() {
let mut parser = NetflowParser::default();
let tmpl = v9_template_packet(256, &[(1, 4)]);
let result = parser.parse_bytes(&tmpl);
assert!(result.error.is_none());
assert!(parser.has_v9_template(256));
}
#[test]
fn v9_clear_templates_propagates_to_store() {
let store = Arc::new(InMemoryTemplateStore::new());
let mut parser = NetflowParser::builder()
.with_template_store(store.clone())
.build()
.expect("build");
assert!(
parser
.parse_bytes(&v9_template_packet(256, &[(1, 4)]))
.error
.is_none()
);
assert_eq!(store.len(), 1);
parser.clear_v9_templates();
assert_eq!(store.len(), 0);
}
#[test]
fn ipfix_template_is_written_through_on_learn() {
let store = Arc::new(InMemoryTemplateStore::new());
let mut parser = NetflowParser::builder()
.with_template_store(store.clone())
.build()
.expect("build");
let pkt = ipfix_template_packet(300, &[(1, 4), (2, 4)]);
let result = parser.parse_bytes(&pkt);
assert!(
result.error.is_none(),
"template parse error: {:?}",
result.error
);
let key = TemplateStoreKey::new("", TemplateKind::IpfixData, 300);
assert!(
store.get(&key).expect("get").is_some(),
"IPFIX template should be persisted"
);
}
#[test]
fn ipfix_data_record_is_decoded_via_read_through() {
let store = Arc::new(InMemoryTemplateStore::new());
{
let mut a = NetflowParser::builder()
.with_template_store(store.clone())
.build()
.expect("build");
let tmpl = ipfix_template_packet(300, &[(1, 4)]);
assert!(a.parse_bytes(&tmpl).error.is_none());
}
let mut b = NetflowParser::builder()
.with_template_store(store.clone())
.build()
.expect("build");
let data = ipfix_data_packet(300, &[0, 0, 0, 0x2A]);
let result = b.parse_bytes(&data);
assert!(
result.error.is_none(),
"data parse error: {:?}",
result.error
);
let pkt = result.packets.into_iter().next().expect("one packet");
let ipfix = match pkt {
NetflowPacket::IPFix(v) => v,
other => panic!("expected IPFIX, got {:?}", other),
};
let body = ipfix.flowsets.into_iter().next().expect("one set").body;
let is_data = matches!(
body,
netflow_parser::variable_versions::ipfix::FlowSetBody::Data(_)
);
assert!(is_data, "expected Data set, got {:?}", body);
assert!(b.has_ipfix_template(300));
}
#[test]
fn ipfix_template_withdrawal_evicts_from_store() {
let store = Arc::new(InMemoryTemplateStore::new());
let mut parser = NetflowParser::builder()
.with_template_store(store.clone())
.build()
.expect("build");
assert!(
parser
.parse_bytes(&ipfix_template_packet(300, &[(1, 4)]))
.error
.is_none()
);
let key = TemplateStoreKey::new("", TemplateKind::IpfixData, 300);
assert!(store.get(&key).unwrap().is_some());
let mut pkt = Vec::new();
let set_len: u16 = 8;
let msg_len: u16 = 16 + set_len;
pkt.extend_from_slice(&0x000Au16.to_be_bytes());
pkt.extend_from_slice(&msg_len.to_be_bytes());
pkt.extend_from_slice(&1u32.to_be_bytes());
pkt.extend_from_slice(&2u32.to_be_bytes());
pkt.extend_from_slice(&1u32.to_be_bytes());
pkt.extend_from_slice(&2u16.to_be_bytes());
pkt.extend_from_slice(&set_len.to_be_bytes());
pkt.extend_from_slice(&300u16.to_be_bytes());
pkt.extend_from_slice(&0u16.to_be_bytes()); let _ = parser.parse_bytes(&pkt);
assert!(
store.get(&key).unwrap().is_none(),
"withdrawal should remove template from store"
);
}
#[test]
fn auto_scoped_parser_uses_per_source_scope() {
let store = Arc::new(InMemoryTemplateStore::new());
let builder = NetflowParser::builder().with_template_store(store.clone());
let mut scoped = AutoScopedParser::try_with_builder(builder).expect("valid");
let src_a: SocketAddr = "10.0.0.1:2055".parse().unwrap();
let src_b: SocketAddr = "10.0.0.2:2055".parse().unwrap();
let tmpl_a = v9_template_packet(256, &[(1, 4)]);
let tmpl_b = v9_template_packet(256, &[(2, 4), (3, 4)]);
let _ = scoped.parse_from_source(src_a, &tmpl_a);
let _ = scoped.parse_from_source(src_b, &tmpl_b);
assert_eq!(store.len(), 2);
let mut found_payloads = Vec::new();
{
let scopes = ["v9:10.0.0.1:2055/0", "v9:10.0.0.2:2055/0"];
for scope in scopes {
let key = TemplateStoreKey::new(scope, TemplateKind::V9Data, 256);
let bytes = store
.get(&key)
.unwrap()
.unwrap_or_else(|| panic!("missing entry for scope {}", scope));
found_payloads.push(bytes);
}
}
assert_ne!(
found_payloads[0], found_payloads[1],
"scoped store entries must hold the distinct templates each source announced"
);
}
#[derive(Debug, Default)]
struct FaultStore {
inner: Mutex<std::collections::HashMap<TemplateStoreKey, Vec<u8>>>,
fail_get: AtomicUsize,
fail_put: AtomicUsize,
fail_remove: AtomicUsize,
backend_errors: AtomicUsize,
}
impl FaultStore {
fn new() -> Self {
Self::default()
}
fn inject_get_failures(&self, n: usize) {
self.fail_get.store(n, Ordering::SeqCst);
}
fn inject_put_failures(&self, n: usize) {
self.fail_put.store(n, Ordering::SeqCst);
}
#[allow(dead_code)]
fn inject_remove_failures(&self, n: usize) {
self.fail_remove.store(n, Ordering::SeqCst);
}
fn observed_errors(&self) -> usize {
self.backend_errors.load(Ordering::SeqCst)
}
}
impl TemplateStore for FaultStore {
fn get(&self, key: &TemplateStoreKey) -> Result<Option<Vec<u8>>, TemplateStoreError> {
if self.fail_get.load(Ordering::SeqCst) > 0 {
self.fail_get.fetch_sub(1, Ordering::SeqCst);
self.backend_errors.fetch_add(1, Ordering::SeqCst);
return Err(TemplateStoreError::Backend("injected".into()));
}
Ok(self.inner.lock().unwrap().get(key).cloned())
}
fn put(&self, key: &TemplateStoreKey, value: &[u8]) -> Result<(), TemplateStoreError> {
if self.fail_put.load(Ordering::SeqCst) > 0 {
self.fail_put.fetch_sub(1, Ordering::SeqCst);
self.backend_errors.fetch_add(1, Ordering::SeqCst);
return Err(TemplateStoreError::Backend("injected".into()));
}
self.inner
.lock()
.unwrap()
.insert(key.clone(), value.to_vec());
Ok(())
}
fn remove(&self, key: &TemplateStoreKey) -> Result<(), TemplateStoreError> {
if self.fail_remove.load(Ordering::SeqCst) > 0 {
self.fail_remove.fetch_sub(1, Ordering::SeqCst);
self.backend_errors.fetch_add(1, Ordering::SeqCst);
return Err(TemplateStoreError::Backend("injected".into()));
}
self.inner.lock().unwrap().remove(key);
Ok(())
}
}
#[test]
fn put_backend_error_is_counted_and_does_not_abort_parsing() {
let store = Arc::new(FaultStore::new());
store.inject_put_failures(1);
let mut parser = NetflowParser::builder()
.with_template_store(store.clone())
.build()
.expect("build");
let pkt = v9_template_packet(256, &[(1, 4)]);
let result = parser.parse_bytes(&pkt);
assert!(result.error.is_none());
assert!(parser.has_v9_template(256));
let metrics = parser.v9_cache_info().metrics;
assert_eq!(
metrics.template_store_backend_errors, 1,
"backend error must be counted"
);
assert_eq!(store.observed_errors(), 1);
}
#[test]
fn get_backend_error_during_read_through_is_counted() {
let store = Arc::new(FaultStore::new());
{
let mut seed = NetflowParser::builder()
.with_template_store(store.clone())
.build()
.expect("build");
let _ = seed.parse_bytes(&v9_template_packet(256, &[(1, 4)]));
}
store.inject_get_failures(1);
let mut b = NetflowParser::builder()
.with_template_store(store.clone())
.build()
.expect("build");
let data = v9_data_packet(256, &[0, 0, 0, 0x2A]);
let _ = b.parse_bytes(&data);
let metrics = b.v9_cache_info().metrics;
assert!(metrics.template_store_backend_errors >= 1);
}
#[test]
fn corrupted_payload_is_counted_and_removed_from_store() {
let store = Arc::new(InMemoryTemplateStore::new());
let key = TemplateStoreKey::new("", TemplateKind::V9Data, 256);
store
.put(&key, &[0xff, 0xff, 0xff, 0xff, 0xff])
.expect("seed");
assert!(store.get(&key).unwrap().is_some());
let mut parser = NetflowParser::builder()
.with_template_store(store.clone())
.build()
.expect("build");
let data = v9_data_packet(256, &[0, 0, 0, 0x2A]);
let _ = parser.parse_bytes(&data);
let metrics = parser.v9_cache_info().metrics;
assert_eq!(
metrics.template_store_codec_errors, 1,
"codec error must be counted"
);
assert!(
store.get(&key).unwrap().is_none(),
"corrupted entry must be removed so a fresh announce can repopulate"
);
}
#[test]
fn lru_eviction_on_full_cache_propagates_to_store() {
let store = Arc::new(InMemoryTemplateStore::new());
let mut parser = NetflowParser::builder()
.with_template_store(store.clone())
.with_cache_size(2)
.build()
.expect("build");
for &(id, field_type) in &[(256u16, 1u16), (257, 2), (258, 3)] {
assert!(
parser
.parse_bytes(&v9_template_packet(id, &[(field_type, 4)]))
.error
.is_none()
);
}
assert!(!parser.has_v9_template(256));
let evicted = TemplateStoreKey::new("", TemplateKind::V9Data, 256);
assert!(
store.get(&evicted).unwrap().is_none(),
"LRU eviction must remove the entry from the secondary store"
);
assert!(store.len() == 2);
}
fn ipfix_options_template_packet(template_id: u16) -> Vec<u8> {
let set_len: u16 = 4 + 14;
let msg_len: u16 = 16 + set_len;
let mut pkt = Vec::new();
pkt.extend_from_slice(&0x000Au16.to_be_bytes());
pkt.extend_from_slice(&msg_len.to_be_bytes());
pkt.extend_from_slice(&1u32.to_be_bytes());
pkt.extend_from_slice(&1u32.to_be_bytes());
pkt.extend_from_slice(&1u32.to_be_bytes());
pkt.extend_from_slice(&3u16.to_be_bytes()); pkt.extend_from_slice(&set_len.to_be_bytes());
pkt.extend_from_slice(&template_id.to_be_bytes());
pkt.extend_from_slice(&2u16.to_be_bytes()); pkt.extend_from_slice(&1u16.to_be_bytes()); pkt.extend_from_slice(&1u16.to_be_bytes()); pkt.extend_from_slice(&4u16.to_be_bytes()); pkt.extend_from_slice(&3u16.to_be_bytes()); pkt.extend_from_slice(&4u16.to_be_bytes()); pkt
}
#[test]
fn ipfix_options_template_read_through_works() {
let store = Arc::new(InMemoryTemplateStore::new());
{
let mut a = NetflowParser::builder()
.with_template_store(store.clone())
.build()
.expect("build");
let _ = a.parse_bytes(&ipfix_options_template_packet(400));
}
let mut b = NetflowParser::builder()
.with_template_store(store.clone())
.build()
.expect("build");
let data = ipfix_data_packet(400, &[0, 0, 0, 1, 0, 0, 0, 2]);
let result = b.parse_bytes(&data);
assert!(
result.error.is_none(),
"options-data parse: {:?}",
result.error
);
assert!(b.has_ipfix_template(400));
}
#[test]
fn read_through_fires_restored_event() {
let store = Arc::new(InMemoryTemplateStore::new());
{
let mut a = NetflowParser::builder()
.with_template_store(store.clone())
.build()
.expect("build");
let _ = a.parse_bytes(&v9_template_packet(256, &[(1, 4)]));
}
let restored: Arc<Mutex<Vec<(TemplateProtocol, u16)>>> = Arc::new(Mutex::new(Vec::new()));
let restored_clone = Arc::clone(&restored);
let mut b = NetflowParser::builder()
.with_template_store(store.clone())
.on_template_event(move |event| {
if let TemplateEvent::Restored {
template_id: Some(id),
protocol,
} = event
{
restored_clone.lock().unwrap().push((*protocol, *id));
}
Ok(())
})
.build()
.expect("build");
let _ = b.parse_bytes(&v9_data_packet(256, &[0, 0, 0, 0x2A]));
let observed = restored.lock().unwrap().clone();
assert_eq!(observed, vec![(TemplateProtocol::V9, 256)]);
}
#[test]
fn read_through_drives_pending_flow_replay() {
use netflow_parser::PendingFlowsConfig;
let store = Arc::new(InMemoryTemplateStore::new());
{
let mut a = NetflowParser::builder()
.with_template_store(store.clone())
.build()
.expect("build");
let _ = a.parse_bytes(&v9_template_packet(256, &[(1, 4)]));
}
let pending_cfg = PendingFlowsConfig::default();
let mut b = NetflowParser::builder()
.with_template_store(store.clone())
.with_pending_flows(pending_cfg)
.build()
.expect("build");
let result = b.parse_bytes(&v9_data_packet(256, &[0, 0, 0, 0x2A]));
assert!(result.error.is_none());
let metrics = b.v9_cache_info().metrics;
assert_eq!(metrics.template_store_restored, 1);
assert_eq!(metrics.pending_replayed, 0);
assert_eq!(metrics.pending_replay_failed, 0);
}
#[test]
fn duplicate_template_id_write_through_overwrites() {
let store = Arc::new(InMemoryTemplateStore::new());
let mut parser = NetflowParser::builder()
.with_template_store(store.clone())
.build()
.expect("build");
let _ = parser.parse_bytes(&v9_template_packet(256, &[(1, 4)]));
let key = TemplateStoreKey::new("", TemplateKind::V9Data, 256);
let bytes_v1 = store.get(&key).unwrap().expect("first write");
let _ = parser.parse_bytes(&v9_template_packet(256, &[(2, 4), (3, 4)]));
let bytes_v2 = store.get(&key).unwrap().expect("second write");
assert_ne!(bytes_v1, bytes_v2, "store must reflect the new definition");
assert_eq!(store.len(), 1, "still one entry under the same key");
}
#[test]
fn set_template_store_scope_retrofit_changes_keys() {
let store = Arc::new(InMemoryTemplateStore::new());
let mut parser = NetflowParser::builder()
.with_template_store(store.clone())
.build()
.expect("build");
let _ = parser.parse_bytes(&v9_template_packet(256, &[(1, 4)]));
assert!(
store
.get(&TemplateStoreKey::new("", TemplateKind::V9Data, 256))
.unwrap()
.is_some()
);
parser.set_template_store_scope("collector-eu");
let _ = parser.parse_bytes(&v9_template_packet(257, &[(2, 4)]));
assert!(
store
.get(&TemplateStoreKey::new(
"collector-eu",
TemplateKind::V9Data,
257
))
.unwrap()
.is_some()
);
}