use netflow_parser::{NetflowParser, RouterScopedParser};
use std::sync::{Arc, Mutex};
use std::thread;
const V5_PACKET: [u8; 72] = [
0, 5, 0, 1, 3, 0, 4, 0, 5, 0, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5,
6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5,
6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7,
];
#[test]
fn test_shared_scoped_parser_across_threads() {
let parser = Arc::new(Mutex::new(RouterScopedParser::<String>::new()));
let mut handles = Vec::new();
for i in 0..4 {
let parser = Arc::clone(&parser);
handles.push(thread::spawn(move || {
let source = format!("router-{}", i);
for _ in 0..10 {
let mut p = parser.lock().unwrap();
let packets = p.parse_from_source(source.clone(), &V5_PACKET).packets;
assert_eq!(packets.len(), 1);
}
}));
}
for h in handles {
h.join().expect("thread panicked");
}
let p = parser.lock().unwrap();
assert_eq!(p.source_count(), 4);
}
#[test]
fn test_independent_parsers_per_thread() {
let mut handles = Vec::new();
for _ in 0..4 {
handles.push(thread::spawn(|| {
let mut parser = NetflowParser::default();
for _ in 0..20 {
let result = parser.parse_bytes(&V5_PACKET);
assert!(result.is_ok());
assert_eq!(result.packets.len(), 1);
}
}));
}
for h in handles {
h.join().expect("thread panicked");
}
}
#[test]
fn test_shared_parser_concurrent_v9_templates() {
let parser = Arc::new(Mutex::new(NetflowParser::default()));
let mut handles = Vec::new();
for i in 0u16..4 {
let parser = Arc::clone(&parser);
handles.push(thread::spawn(move || {
let template_id = 256 + i;
let tid_bytes = template_id.to_be_bytes();
let template_packet: Vec<u8> = vec![
0,
9,
0,
1, 0,
0,
0,
0, 0,
0,
0,
0, 0,
0,
0,
(i + 1) as u8, 0,
0,
0,
1, 0,
0, 0,
12, tid_bytes[0],
tid_bytes[1], 0,
1, 0,
1, 0,
4, ];
let data_packet: Vec<u8> = vec![
0,
9,
0,
1, 0,
0,
0,
0, 0,
0,
0,
0, 0,
0,
0,
(i + 10) as u8, 0,
0,
0,
1, tid_bytes[0],
tid_bytes[1], 0,
8, 0,
0,
0,
(i + 1) as u8, ];
{
let mut p = parser.lock().unwrap();
let result = p.parse_bytes(&template_packet);
assert_eq!(
result.packets.len(),
1,
"Template parse should produce 1 packet"
);
}
{
let mut p = parser.lock().unwrap();
let result = p.parse_bytes(&data_packet);
assert_eq!(
result.packets.len(),
1,
"Data parse should produce 1 packet"
);
assert!(result.error.is_none(), "Data parse should not error");
}
}));
}
for h in handles {
h.join().expect("thread panicked");
}
let p = parser.lock().unwrap();
for i in 0u16..4 {
assert!(
p.has_v9_template(256 + i),
"Template {} should be cached",
256 + i
);
}
}