use crate::{
ConfigError, NetflowError, NetflowPacket, NetflowParser, NetflowParserBuilder, ParseResult,
ParserCacheInfo,
};
use lru::LruCache;
use std::hash::Hash;
use std::net::SocketAddr;
use std::num::NonZeroUsize;
use std::time::{Duration, Instant};
pub const DEFAULT_MAX_SOURCES: usize = 10_000;
#[derive(Debug)]
pub struct RouterScopedParser<K: Hash + Eq> {
parsers: LruCache<K, (NetflowParser, Instant)>,
parser_builder: Option<NetflowParserBuilder>,
max_sources: usize,
}
impl<K: Hash + Eq> Default for RouterScopedParser<K> {
fn default() -> Self {
Self::new()
}
}
impl<K: Hash + Eq> RouterScopedParser<K> {
pub fn new() -> Self {
Self {
parsers: LruCache::new(
NonZeroUsize::new(DEFAULT_MAX_SOURCES)
.expect("DEFAULT_MAX_SOURCES is non-zero"),
),
parser_builder: None,
max_sources: DEFAULT_MAX_SOURCES,
}
}
pub fn try_with_builder(builder: NetflowParserBuilder) -> Result<Self, ConfigError> {
builder.validate()?;
Ok(Self {
parsers: LruCache::new(
NonZeroUsize::new(DEFAULT_MAX_SOURCES)
.expect("DEFAULT_MAX_SOURCES is non-zero"),
),
parser_builder: Some(builder),
max_sources: DEFAULT_MAX_SOURCES,
})
}
pub fn with_max_sources(mut self, max: usize) -> Result<Self, ConfigError> {
if max == 0 {
return Err(ConfigError::InvalidMaxSources(0));
}
self.max_sources = max;
self.parsers
.resize(NonZeroUsize::new(max).expect("max is non-zero"));
Ok(self)
}
pub fn parse_from_source(&mut self, source: K, data: &[u8]) -> ParseResult
where
K: Clone,
{
if let Some((parser, last_seen)) = self.parsers.get_mut(&source) {
*last_seen = Instant::now();
return parser.parse_bytes(data);
}
let p = if let Some(ref builder) = self.parser_builder {
match builder.clone().build() {
Ok(p) => p,
Err(err) => {
return ParseResult {
packets: vec![],
error: Some(NetflowError::Partial {
message: format!("Failed to build parser for source: {err}"),
}),
};
}
}
} else {
NetflowParser::default()
};
let now = Instant::now();
self.parsers.push(source.clone(), (p, now));
let (parser, _) = self.parsers.get_mut(&source).expect("just inserted");
parser.parse_bytes(data)
}
pub fn iter_packets_from_source<'a>(
&'a mut self,
source: K,
data: &'a [u8],
) -> Result<impl Iterator<Item = Result<NetflowPacket, NetflowError>> + 'a, NetflowError>
where
K: Clone,
{
if self.parsers.contains(&source) {
let (parser, last_seen) =
self.parsers.get_mut(&source).expect("checked by contains");
*last_seen = Instant::now();
return Ok(parser.iter_packets(data));
}
let p = if let Some(ref builder) = self.parser_builder {
builder
.clone()
.build()
.map_err(|err| NetflowError::Partial {
message: format!("Failed to build parser for source: {err}"),
})?
} else {
NetflowParser::default()
};
let now = Instant::now();
self.parsers.push(source.clone(), (p, now));
let (parser, _) = self.parsers.get_mut(&source).expect("just inserted");
Ok(parser.iter_packets(data))
}
pub fn prune_idle_sources(&mut self, older_than: Duration) -> usize
where
K: Clone,
{
let now = Instant::now();
let keys_to_remove: Vec<K> = self
.parsers
.iter()
.filter(|(_, (_, last_seen))| now.duration_since(*last_seen) >= older_than)
.map(|(k, _)| k.clone())
.collect();
let count = keys_to_remove.len();
for key in keys_to_remove {
self.parsers.pop(&key);
}
count
}
pub fn get_source_info(&mut self, source: &K) -> Option<ParserCacheInfo> {
self.parsers
.peek(source)
.map(|(parser, _)| ParserCacheInfo {
v9: parser.v9_cache_info(),
ipfix: parser.ipfix_cache_info(),
})
}
pub fn source_count(&self) -> usize {
self.parsers.len()
}
pub fn sources(&self) -> Vec<&K> {
self.parsers.iter().map(|(k, _)| k).collect()
}
pub fn all_info(&self) -> Vec<(&K, ParserCacheInfo)> {
self.parsers
.iter()
.map(|(source, (parser, _))| {
(
source,
ParserCacheInfo {
v9: parser.v9_cache_info(),
ipfix: parser.ipfix_cache_info(),
},
)
})
.collect()
}
pub fn clear_source_templates(&mut self, source: &K) {
if let Some((parser, _)) = self.parsers.peek_mut(source) {
parser.clear_v9_templates();
parser.clear_ipfix_templates();
}
}
pub fn clear_all_templates(&mut self) {
for (_, (parser, _)) in self.parsers.iter_mut() {
parser.clear_v9_templates();
parser.clear_ipfix_templates();
}
}
pub fn remove_source(&mut self, source: &K) -> Option<NetflowParser> {
self.parsers.pop(source).map(|(parser, _)| parser)
}
pub fn get_parser(&mut self, source: &K) -> Option<&NetflowParser> {
self.parsers.peek(source).map(|(parser, _)| parser)
}
pub fn get_parser_mut(&mut self, source: &K) -> Option<&mut NetflowParser> {
self.parsers.peek_mut(source).map(|(parser, _)| parser)
}
}
#[non_exhaustive]
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ScopingInfo {
V9 { source_id: u32 },
IPFix { observation_domain_id: u32 },
Legacy,
Unknown,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct IpfixSourceKey {
pub addr: SocketAddr,
pub observation_domain_id: u32,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct V9SourceKey {
pub addr: SocketAddr,
pub source_id: u32,
}
pub fn extract_scoping_info(data: &[u8]) -> ScopingInfo {
if data.len() < 2 {
return ScopingInfo::Unknown;
}
let version = u16::from_be_bytes([data[0], data[1]]);
match version {
5 | 7 => ScopingInfo::Legacy,
9 => {
if data.len() < 20 {
return ScopingInfo::Unknown;
}
let source_id = u32::from_be_bytes([data[16], data[17], data[18], data[19]]);
ScopingInfo::V9 { source_id }
}
10 => {
if data.len() < 16 {
return ScopingInfo::Unknown;
}
let observation_domain_id =
u32::from_be_bytes([data[12], data[13], data[14], data[15]]);
ScopingInfo::IPFix {
observation_domain_id,
}
}
_ => ScopingInfo::Unknown,
}
}
#[derive(Debug)]
pub struct AutoScopedParser {
ipfix_parsers: LruCache<IpfixSourceKey, (NetflowParser, Instant)>,
v9_parsers: LruCache<V9SourceKey, (NetflowParser, Instant)>,
legacy_parsers: LruCache<SocketAddr, (NetflowParser, Instant)>,
parser_builder: Option<NetflowParserBuilder>,
max_sources: usize,
}
impl Default for AutoScopedParser {
fn default() -> Self {
Self::new()
}
}
impl AutoScopedParser {
pub fn new() -> Self {
let cap =
NonZeroUsize::new(DEFAULT_MAX_SOURCES).expect("DEFAULT_MAX_SOURCES is non-zero");
Self {
ipfix_parsers: LruCache::new(cap),
v9_parsers: LruCache::new(cap),
legacy_parsers: LruCache::new(cap),
parser_builder: None,
max_sources: DEFAULT_MAX_SOURCES,
}
}
pub fn try_with_builder(builder: NetflowParserBuilder) -> Result<Self, ConfigError> {
builder.validate()?;
let cap =
NonZeroUsize::new(DEFAULT_MAX_SOURCES).expect("DEFAULT_MAX_SOURCES is non-zero");
Ok(Self {
ipfix_parsers: LruCache::new(cap),
v9_parsers: LruCache::new(cap),
legacy_parsers: LruCache::new(cap),
parser_builder: Some(builder),
max_sources: DEFAULT_MAX_SOURCES,
})
}
pub fn with_max_sources(mut self, max: usize) -> Result<Self, ConfigError> {
if max == 0 {
return Err(ConfigError::InvalidMaxSources(0));
}
self.max_sources = max;
let cap = NonZeroUsize::new(max).expect("max is non-zero");
self.ipfix_parsers.resize(cap);
self.v9_parsers.resize(cap);
self.legacy_parsers.resize(cap);
Ok(self)
}
pub fn parse_from_source(&mut self, source: SocketAddr, data: &[u8]) -> ParseResult {
let parser = match self.get_or_create_parser(source, data) {
Ok(p) => p,
Err(e) => {
return ParseResult {
packets: vec![],
error: Some(e),
};
}
};
parser.parse_bytes(data)
}
pub fn iter_packets_from_source<'a>(
&'a mut self,
source: SocketAddr,
data: &'a [u8],
) -> Result<impl Iterator<Item = Result<NetflowPacket, NetflowError>> + 'a, NetflowError>
{
let parser = self.get_or_create_parser(source, data)?;
Ok(parser.iter_packets(data))
}
pub fn prune_idle_sources(&mut self, older_than: Duration) -> usize {
let now = Instant::now();
let ipfix_keys: Vec<IpfixSourceKey> = self
.ipfix_parsers
.iter()
.filter(|(_, (_, last_seen))| now.duration_since(*last_seen) >= older_than)
.map(|(k, _)| *k)
.collect();
let v9_keys: Vec<V9SourceKey> = self
.v9_parsers
.iter()
.filter(|(_, (_, last_seen))| now.duration_since(*last_seen) >= older_than)
.map(|(k, _)| *k)
.collect();
let legacy_keys: Vec<SocketAddr> = self
.legacy_parsers
.iter()
.filter(|(_, (_, last_seen))| now.duration_since(*last_seen) >= older_than)
.map(|(k, _)| *k)
.collect();
let count = ipfix_keys.len() + v9_keys.len() + legacy_keys.len();
for key in ipfix_keys {
self.ipfix_parsers.pop(&key);
}
for key in v9_keys {
self.v9_parsers.pop(&key);
}
for key in legacy_keys {
self.legacy_parsers.pop(&key);
}
count
}
pub fn source_count(&self) -> usize {
self.ipfix_parsers.len() + self.v9_parsers.len() + self.legacy_parsers.len()
}
pub fn ipfix_source_count(&self) -> usize {
self.ipfix_parsers.len()
}
pub fn v9_source_count(&self) -> usize {
self.v9_parsers.len()
}
pub fn legacy_source_count(&self) -> usize {
self.legacy_parsers.len()
}
pub fn remove_ipfix_source(&mut self, key: &IpfixSourceKey) -> Option<NetflowParser> {
self.ipfix_parsers.pop(key).map(|(parser, _)| parser)
}
pub fn remove_v9_source(&mut self, key: &V9SourceKey) -> Option<NetflowParser> {
self.v9_parsers.pop(key).map(|(parser, _)| parser)
}
pub fn remove_legacy_source(&mut self, addr: &SocketAddr) -> Option<NetflowParser> {
self.legacy_parsers.pop(addr).map(|(parser, _)| parser)
}
pub fn clear_all_templates(&mut self) {
for (_, (parser, _)) in self.ipfix_parsers.iter_mut() {
parser.clear_v9_templates();
parser.clear_ipfix_templates();
}
for (_, (parser, _)) in self.v9_parsers.iter_mut() {
parser.clear_v9_templates();
parser.clear_ipfix_templates();
}
for (_, (parser, _)) in self.legacy_parsers.iter_mut() {
parser.clear_v9_templates();
parser.clear_ipfix_templates();
}
}
pub fn ipfix_info(&self) -> Vec<(&IpfixSourceKey, ParserCacheInfo)> {
self.ipfix_parsers
.iter()
.map(|(key, (parser, _))| {
(
key,
ParserCacheInfo {
v9: parser.v9_cache_info(),
ipfix: parser.ipfix_cache_info(),
},
)
})
.collect()
}
pub fn v9_info(&self) -> Vec<(&V9SourceKey, ParserCacheInfo)> {
self.v9_parsers
.iter()
.map(|(key, (parser, _))| {
(
key,
ParserCacheInfo {
v9: parser.v9_cache_info(),
ipfix: parser.ipfix_cache_info(),
},
)
})
.collect()
}
pub fn legacy_info(&self) -> Vec<(&SocketAddr, ParserCacheInfo)> {
self.legacy_parsers
.iter()
.map(|(addr, (parser, _))| {
(
addr,
ParserCacheInfo {
v9: parser.v9_cache_info(),
ipfix: parser.ipfix_cache_info(),
},
)
})
.collect()
}
fn get_or_create_parser(
&mut self,
source: SocketAddr,
data: &[u8],
) -> Result<&mut NetflowParser, NetflowError> {
let scoping = extract_scoping_info(data);
let is_new = match &scoping {
ScopingInfo::IPFix {
observation_domain_id,
} => !self.ipfix_parsers.contains(&IpfixSourceKey {
addr: source,
observation_domain_id: *observation_domain_id,
}),
ScopingInfo::V9 { source_id } => !self.v9_parsers.contains(&V9SourceKey {
addr: source,
source_id: *source_id,
}),
ScopingInfo::Legacy => !self.legacy_parsers.contains(&source),
ScopingInfo::Unknown => false, };
if is_new && self.source_count() >= self.max_sources {
self.evict_global_lru();
}
let builder = self.parser_builder.as_ref();
let now = Instant::now();
match scoping {
ScopingInfo::IPFix {
observation_domain_id,
} => {
let key = IpfixSourceKey {
addr: source,
observation_domain_id,
};
if !self.ipfix_parsers.contains(&key) {
let scope = format!("ipfix:{}/{}", source, observation_domain_id);
let parser = Self::build_parser(builder, &scope)?;
self.ipfix_parsers.push(key, (parser, now));
}
let (parser, last_seen) =
self.ipfix_parsers.get_mut(&key).expect("just ensured");
*last_seen = now;
Ok(parser)
}
ScopingInfo::V9 { source_id } => {
let key = V9SourceKey {
addr: source,
source_id,
};
if !self.v9_parsers.contains(&key) {
let scope = format!("v9:{}/{}", source, source_id);
let parser = Self::build_parser(builder, &scope)?;
self.v9_parsers.push(key, (parser, now));
}
let (parser, last_seen) = self.v9_parsers.get_mut(&key).expect("just ensured");
*last_seen = now;
Ok(parser)
}
ScopingInfo::Legacy => {
if !self.legacy_parsers.contains(&source) {
let scope = format!("legacy:{}", source);
let parser = Self::build_parser(builder, &scope)?;
self.legacy_parsers.push(source, (parser, now));
}
let (parser, last_seen) =
self.legacy_parsers.get_mut(&source).expect("just ensured");
*last_seen = now;
Ok(parser)
}
ScopingInfo::Unknown => {
Err(crate::NetflowError::Incomplete {
available: data.len(),
context: "packet too short or unrecognized version for scoping".into(),
})
}
}
}
fn evict_global_lru(&mut self) {
let ipfix_ts = self.ipfix_parsers.peek_lru().map(|(_, (_, ts))| *ts);
let v9_ts = self.v9_parsers.peek_lru().map(|(_, (_, ts))| *ts);
let legacy_ts = self.legacy_parsers.peek_lru().map(|(_, (_, ts))| *ts);
#[derive(Clone, Copy)]
enum MapKind {
Ipfix,
V9,
Legacy,
}
let candidates = [
ipfix_ts.map(|ts| (MapKind::Ipfix, ts, self.ipfix_parsers.len())),
v9_ts.map(|ts| (MapKind::V9, ts, self.v9_parsers.len())),
legacy_ts.map(|ts| (MapKind::Legacy, ts, self.legacy_parsers.len())),
];
let oldest = candidates
.iter()
.flatten()
.min_by(|(_, ts_a, len_a), (_, ts_b, len_b)| {
ts_a.cmp(ts_b).then_with(|| len_b.cmp(len_a)) })
.map(|(kind, _, _)| *kind);
match oldest {
Some(MapKind::Ipfix) => {
self.ipfix_parsers.pop_lru();
}
Some(MapKind::V9) => {
self.v9_parsers.pop_lru();
}
Some(MapKind::Legacy) => {
self.legacy_parsers.pop_lru();
}
None => {}
}
}
fn build_parser(
builder: Option<&NetflowParserBuilder>,
scope: &str,
) -> Result<NetflowParser, NetflowError> {
let mut parser = if let Some(builder) = builder {
builder
.clone()
.build()
.map_err(|err| NetflowError::Partial {
message: format!("Failed to build parser for source: {err}"),
})?
} else {
NetflowParser::default()
};
parser.set_template_store_scope(scope);
Ok(parser)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::net::SocketAddr;
#[test]
fn test_scoped_parser_basic() {
let mut scoped = RouterScopedParser::<SocketAddr>::new();
assert_eq!(scoped.source_count(), 0);
let source1: SocketAddr = "192.168.1.1:2055".parse().unwrap();
let data = vec![0u8; 100];
let _ = scoped.parse_from_source(source1, &data);
assert_eq!(scoped.source_count(), 1);
assert!(scoped.get_source_info(&source1).is_some());
let source2: SocketAddr = "192.168.1.2:2055".parse().unwrap();
let _ = scoped.parse_from_source(source2, &data);
assert_eq!(scoped.source_count(), 2);
assert!(scoped.get_source_info(&source2).is_some());
}
#[test]
fn test_scoped_parser_with_string_keys() {
let mut scoped = RouterScopedParser::<String>::new();
let router1 = "router-nyc-01".to_string();
let router2 = "router-sfo-02".to_string();
let data = vec![0u8; 100];
let _ = scoped.parse_from_source(router1.clone(), &data);
let _ = scoped.parse_from_source(router2.clone(), &data);
assert_eq!(scoped.source_count(), 2);
assert!(scoped.sources().contains(&&router1));
assert!(scoped.sources().contains(&&router2));
}
#[test]
fn test_remove_source() {
let mut scoped = RouterScopedParser::<String>::new();
let router = "router-1".to_string();
let data = vec![0u8; 100];
let _ = scoped.parse_from_source(router.clone(), &data);
assert_eq!(scoped.source_count(), 1);
scoped.remove_source(&router);
assert_eq!(scoped.source_count(), 0);
}
#[test]
fn test_clear_templates() {
let mut scoped = RouterScopedParser::<String>::new();
let router = "router-1".to_string();
let data = vec![0u8; 100];
let _ = scoped.parse_from_source(router.clone(), &data);
scoped.clear_source_templates(&router);
scoped.clear_all_templates();
}
#[test]
fn test_extract_scoping_info_v9() {
let mut data = vec![0u8; 20];
data[0] = 0x00;
data[1] = 0x09; data[16] = 0x12;
data[17] = 0x34;
data[18] = 0x56;
data[19] = 0x78;
let info = extract_scoping_info(&data);
assert_eq!(
info,
ScopingInfo::V9 {
source_id: 0x12345678
}
);
}
#[test]
fn test_extract_scoping_info_ipfix() {
let mut data = vec![0u8; 16];
data[0] = 0x00;
data[1] = 0x0A; data[12] = 0xAB;
data[13] = 0xCD;
data[14] = 0xEF;
data[15] = 0x01;
let info = extract_scoping_info(&data);
assert_eq!(
info,
ScopingInfo::IPFix {
observation_domain_id: 0xABCDEF01
}
);
}
#[test]
fn test_extract_scoping_info_v5() {
let mut data = vec![0u8; 24];
data[0] = 0x00;
data[1] = 0x05;
let info = extract_scoping_info(&data);
assert_eq!(info, ScopingInfo::Legacy);
}
#[test]
fn test_extract_scoping_info_truncated() {
let data = vec![0x00, 0x09];
let info = extract_scoping_info(&data);
assert_eq!(info, ScopingInfo::Unknown);
}
#[test]
fn test_extract_scoping_info_unknown_version() {
let data = vec![0x00, 0xFF];
let info = extract_scoping_info(&data);
assert_eq!(info, ScopingInfo::Unknown);
}
#[test]
fn test_auto_scoped_parser_basic() {
let parser = AutoScopedParser::new();
assert_eq!(parser.source_count(), 0);
assert_eq!(parser.ipfix_source_count(), 0);
assert_eq!(parser.v9_source_count(), 0);
assert_eq!(parser.legacy_source_count(), 0);
}
#[test]
fn test_auto_scoped_parser_routes_correctly() {
let mut parser = AutoScopedParser::new();
let source: SocketAddr = "192.168.1.1:2055".parse().unwrap();
let mut v9_data = vec![0u8; 20];
v9_data[0] = 0x00;
v9_data[1] = 0x09; v9_data[16] = 0x00;
v9_data[17] = 0x00;
v9_data[18] = 0x00;
v9_data[19] = 0x01;
let _ = parser.parse_from_source(source, &v9_data);
assert_eq!(parser.v9_source_count(), 1);
assert_eq!(parser.ipfix_source_count(), 0);
let mut ipfix_data = vec![0u8; 16];
ipfix_data[0] = 0x00;
ipfix_data[1] = 0x0A; ipfix_data[12] = 0x00;
ipfix_data[13] = 0x00;
ipfix_data[14] = 0x00;
ipfix_data[15] = 0x02;
let _ = parser.parse_from_source(source, &ipfix_data);
assert_eq!(parser.v9_source_count(), 1);
assert_eq!(parser.ipfix_source_count(), 1);
assert_eq!(parser.source_count(), 2);
}
#[test]
fn test_auto_scoped_parser_multiple_domains() {
let mut parser = AutoScopedParser::new();
let source: SocketAddr = "192.168.1.1:2055".parse().unwrap();
for domain_id in 1..=3 {
let mut data = vec![0u8; 16];
data[0] = 0x00;
data[1] = 0x0A; data[12] = 0x00;
data[13] = 0x00;
data[14] = 0x00;
data[15] = domain_id;
let _ = parser.parse_from_source(source, &data);
}
assert_eq!(parser.ipfix_source_count(), 3);
}
#[test]
fn test_auto_scoped_parser_stats() {
let mut parser = AutoScopedParser::new();
let source: SocketAddr = "192.168.1.1:2055".parse().unwrap();
let mut ipfix_data = vec![0u8; 16];
ipfix_data[0] = 0x00;
ipfix_data[1] = 0x0A;
ipfix_data[15] = 0x01;
let _ = parser.parse_from_source(source, &ipfix_data);
let ipfix_info = parser.ipfix_info();
assert_eq!(ipfix_info.len(), 1);
assert_eq!(ipfix_info[0].0.observation_domain_id, 1);
}
#[test]
fn test_auto_scoped_parser_clear_all() {
let mut parser = AutoScopedParser::new();
let source: SocketAddr = "192.168.1.1:2055".parse().unwrap();
let mut ipfix_data = vec![0u8; 16];
ipfix_data[0] = 0x00;
ipfix_data[1] = 0x0A;
let _ = parser.parse_from_source(source, &ipfix_data);
parser.clear_all_templates();
assert_eq!(parser.ipfix_source_count(), 1);
}
#[test]
fn test_max_sources_zero_rejected_router() {
let result = RouterScopedParser::<SocketAddr>::new().with_max_sources(0);
assert!(result.is_err());
}
#[test]
fn test_max_sources_zero_rejected_auto() {
let result = AutoScopedParser::new().with_max_sources(0);
assert!(result.is_err());
}
#[test]
fn test_router_scoped_evicts_oldest() {
let mut scoped = RouterScopedParser::<String>::new()
.with_max_sources(2)
.expect("valid");
let data = vec![0u8; 100];
let _ = scoped.parse_from_source("A".to_string(), &data);
let _ = scoped.parse_from_source("B".to_string(), &data);
assert_eq!(scoped.source_count(), 2);
let _ = scoped.parse_from_source("C".to_string(), &data);
assert_eq!(scoped.source_count(), 2);
assert!(scoped.get_parser(&"A".to_string()).is_none());
assert!(scoped.get_parser(&"B".to_string()).is_some());
assert!(scoped.get_parser(&"C".to_string()).is_some());
}
#[test]
fn test_auto_scoped_evicts_oldest() {
let mut parser = AutoScopedParser::new().with_max_sources(2).expect("valid");
let source: SocketAddr = "192.168.1.1:2055".parse().unwrap();
for domain_id in 1..=2u8 {
let mut data = vec![0u8; 16];
data[0] = 0x00;
data[1] = 0x0A;
data[15] = domain_id;
let _ = parser.parse_from_source(source, &data);
}
assert_eq!(parser.source_count(), 2);
let mut data = vec![0u8; 16];
data[0] = 0x00;
data[1] = 0x0A;
data[15] = 3;
let _ = parser.parse_from_source(source, &data);
assert_eq!(parser.source_count(), 2);
}
#[test]
fn test_router_prune_idle_sources() {
let mut scoped = RouterScopedParser::<String>::new();
let data = vec![0u8; 100];
let _ = scoped.parse_from_source("A".to_string(), &data);
assert_eq!(scoped.source_count(), 1);
let pruned = scoped.prune_idle_sources(Duration::from_secs(3600));
assert_eq!(pruned, 0);
assert_eq!(scoped.source_count(), 1);
}
#[test]
fn test_auto_prune_idle_sources() {
let mut parser = AutoScopedParser::new();
let source: SocketAddr = "192.168.1.1:2055".parse().unwrap();
let mut data = vec![0u8; 16];
data[0] = 0x00;
data[1] = 0x0A;
data[15] = 1;
let _ = parser.parse_from_source(source, &data);
assert_eq!(parser.source_count(), 1);
let pruned = parser.prune_idle_sources(Duration::from_secs(3600));
assert_eq!(pruned, 0);
assert_eq!(parser.source_count(), 1);
}
}