mod cli;
mod config;
mod error;
mod generator;
mod transmitter;
use clap::Parser;
use cli::Cli;
use config::{FlowConfig, parse_yaml_file, validate_config};
use error::Result;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
use std::time::Duration;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
enum ExporterId {
V5 { engine_type: u8, engine_id: u8 },
V7(usize),
V9(u32),
IPFix(u32),
}
fn main() -> Result<()> {
let args = Cli::parse();
rayon::ThreadPoolBuilder::new()
.num_threads(args.threads)
.build_global()
.map_err(|e| {
error::NetflowError::Configuration(format!("Failed to configure thread pool: {}", e))
})?;
if args.verbose {
println!("NetFlow Generator starting...");
println!("Using {} threads for parallel processing", args.threads);
}
if args.once {
run_once(&args)?;
} else {
let interval_secs = args.interval.unwrap_or(2);
if args.verbose {
println!(
"Continuous mode: sending flows every {} seconds (Ctrl+C to stop)",
interval_secs
);
}
let shutdown = Arc::new(AtomicBool::new(false));
let shutdown_clone = shutdown.clone();
ctrlc::set_handler(move || {
shutdown_clone.store(true, Ordering::Relaxed);
})
.map_err(|e| {
error::NetflowError::Configuration(format!("Failed to set Ctrl+C handler: {}", e))
})?;
let config = if let Some(ref config_path) = args.config {
if args.verbose {
println!("Loading configuration from {:?}", config_path);
}
let cfg = parse_yaml_file(config_path)?;
validate_config(&cfg)?;
if args.verbose {
println!("Configuration loaded: {} flow(s)", cfg.flows.len());
}
Some(cfg)
} else {
if args.verbose {
println!("No configuration provided, using default samples");
}
None
};
let destination = parse_destination(&args)?;
let mut pcap_writer = if let Some(ref output_path) = args.output {
Some(transmitter::PersistentPcapWriter::new(
output_path,
destination,
args.verbose,
)?)
} else {
None
};
let mut v5_sequence_numbers: HashMap<(u8, u8), u32> = HashMap::new();
let mut v9_sequence_numbers: HashMap<u32, u32> = HashMap::new();
let mut ipfix_sequence_numbers: HashMap<u32, u32> = HashMap::new();
let mut last_template_send = std::time::Instant::now();
const TEMPLATE_REFRESH_INTERVAL: Duration = Duration::from_secs(30);
let mut iteration = 1;
loop {
if shutdown.load(Ordering::Relaxed) {
if args.verbose {
println!("\nReceived shutdown signal, exiting gracefully...");
}
break;
}
if args.verbose {
println!("\n--- Iteration {} ---", iteration);
}
let send_templates =
iteration == 1 || last_template_send.elapsed() >= TEMPLATE_REFRESH_INTERVAL;
if send_templates && iteration > 1 {
if args.verbose {
println!(
"Template refresh: {} seconds since last send",
last_template_send.elapsed().as_secs()
);
}
last_template_send = std::time::Instant::now();
} else if iteration == 1 && args.verbose {
println!("Sending initial templates");
}
let packets = if let Some(ref cfg) = config {
generate_packets_from_config(
cfg,
&mut v5_sequence_numbers,
&mut v9_sequence_numbers,
&mut ipfix_sequence_numbers,
send_templates,
args.verbose,
)?
} else {
let v9_seq = *v9_sequence_numbers.get(&1).unwrap_or(&0);
let ipfix_seq = *ipfix_sequence_numbers.get(&2).unwrap_or(&0);
let (packets, next_v9_seq, next_ipfix_seq) =
generator::generate_all_samples_with_seq(v9_seq, ipfix_seq, send_templates)?;
v9_sequence_numbers.insert(1, next_v9_seq);
ipfix_sequence_numbers.insert(2, next_ipfix_seq);
packets
};
if args.verbose {
println!("Generated {} packet(s)", packets.len());
}
if let Some(ref mut writer) = pcap_writer {
writer.write_packets(&packets)?;
} else {
if args.verbose {
println!("Transmitting packets to {}", destination);
}
transmitter::send_udp(&packets, destination, args.source_port, args.verbose)?;
}
iteration += 1;
let sleep_start = std::time::Instant::now();
let sleep_duration = Duration::from_secs(interval_secs);
while sleep_start.elapsed() < sleep_duration {
if shutdown.load(Ordering::Relaxed) {
break;
}
thread::sleep(Duration::from_millis(100));
}
}
if let Some(writer) = pcap_writer {
writer.close()?;
}
if args.verbose {
println!("Shutdown complete.");
}
}
Ok(())
}
fn run_once(args: &Cli) -> Result<()> {
let packets = if let Some(ref config_path) = args.config {
if args.verbose {
println!("Loading configuration from {:?}", config_path);
}
let config = parse_yaml_file(config_path)?;
validate_config(&config)?;
if args.verbose {
println!("Configuration loaded: {} flow(s)", config.flows.len());
}
let mut v5_sequence_numbers = HashMap::new();
let mut v9_sequence_numbers = HashMap::new();
let mut ipfix_sequence_numbers = HashMap::new();
generate_packets_from_config(
&config,
&mut v5_sequence_numbers,
&mut v9_sequence_numbers,
&mut ipfix_sequence_numbers,
true, args.verbose,
)?
} else {
if args.verbose {
println!("No configuration provided, using default samples");
}
generator::generate_all_samples()?
};
if args.verbose {
println!("Generated {} packet(s)", packets.len());
}
let destination = parse_destination(args)?;
if let Some(ref output_path) = args.output {
transmitter::write_to_file(&packets, output_path, destination, args.verbose, true)?;
} else {
if args.verbose {
println!("Transmitting packets to {}", destination);
}
transmitter::send_udp(&packets, destination, args.source_port, args.verbose)?;
}
if args.verbose {
println!("Done!");
}
Ok(())
}
fn generate_packets_from_config(
config: &config::Config,
v5_sequence_numbers: &mut HashMap<(u8, u8), u32>,
v9_sequence_numbers: &mut HashMap<u32, u32>,
ipfix_sequence_numbers: &mut HashMap<u32, u32>,
send_templates: bool,
verbose: bool,
) -> Result<Vec<Vec<u8>>> {
use rayon::prelude::*;
if config.flows.is_empty() {
return Ok(Vec::new());
}
let grouped_flows = group_flows_by_exporter(&config.flows);
if verbose {
println!(
"Processing {} exporter group(s) in parallel",
grouped_flows.len()
);
}
let results: Vec<(ExporterId, Vec<Vec<u8>>, u32)> = grouped_flows
.par_iter()
.map(|(exporter_id, flows)| {
let initial_seq = match exporter_id {
ExporterId::V5 {
engine_type,
engine_id,
} => *v5_sequence_numbers
.get(&(*engine_type, *engine_id))
.unwrap_or(&0),
ExporterId::V7(_) => 0, ExporterId::V9(source_id) => *v9_sequence_numbers.get(source_id).unwrap_or(&0),
ExporterId::IPFix(obs_domain_id) => {
*ipfix_sequence_numbers.get(obs_domain_id).unwrap_or(&0)
}
};
if verbose {
match exporter_id {
ExporterId::V5 {
engine_type,
engine_id,
} => {
println!(
"Processing V5 exporter (engine_type={}, engine_id={}) with {} flow(s)",
engine_type,
engine_id,
flows.len()
);
}
ExporterId::V7(index) => {
println!("Processing V7 flow #{}", index);
}
ExporterId::V9(source_id) => {
println!(
"Processing V9 exporter (source_id={}) with {} flow(s)",
source_id,
flows.len()
);
}
ExporterId::IPFix(obs_domain_id) => {
println!(
"Processing IPFIX exporter (observation_domain_id={}) with {} flow(s)",
obs_domain_id,
flows.len()
);
}
}
}
let (packets, next_seq) =
process_exporter_group(flows, initial_seq, send_templates, verbose)?;
Ok((*exporter_id, packets, next_seq))
})
.collect::<Result<Vec<_>>>()?;
let mut all_packets = Vec::new();
for (exporter_id, packets, next_seq) in results {
all_packets.extend(packets);
match exporter_id {
ExporterId::V5 {
engine_type,
engine_id,
} => {
v5_sequence_numbers.insert((engine_type, engine_id), next_seq);
}
ExporterId::V9(source_id) => {
v9_sequence_numbers.insert(source_id, next_seq);
}
ExporterId::IPFix(obs_domain_id) => {
ipfix_sequence_numbers.insert(obs_domain_id, next_seq);
}
ExporterId::V7(_) => {
}
}
}
if verbose {
println!("Generated {} packet(s) total", all_packets.len());
}
Ok(all_packets)
}
fn parse_destination(args: &Cli) -> Result<SocketAddr> {
if let Some(ref dest_str) = args.dest {
dest_str.parse().map_err(|e| {
error::NetflowError::InvalidDestination(format!(
"Invalid destination '{}': {}",
dest_str, e
))
})
} else {
"127.0.0.1:2055".parse().map_err(|e| {
error::NetflowError::InvalidDestination(format!("Invalid default destination: {}", e))
})
}
}
fn extract_exporter_id(flow: &FlowConfig, index: usize) -> ExporterId {
match flow {
FlowConfig::V5(config) => {
let engine_type = config
.header
.as_ref()
.and_then(|h| h.engine_type)
.unwrap_or(0);
let engine_id = config
.header
.as_ref()
.and_then(|h| h.engine_id)
.unwrap_or(0);
ExporterId::V5 {
engine_type,
engine_id,
}
}
FlowConfig::V7(_) => ExporterId::V7(index),
FlowConfig::V9(config) => {
let source_id = config
.header
.as_ref()
.and_then(|h| h.source_id)
.unwrap_or(1);
ExporterId::V9(source_id)
}
FlowConfig::IPFix(config) => {
let observation_domain_id = config
.header
.as_ref()
.and_then(|h| h.observation_domain_id)
.unwrap_or(1);
ExporterId::IPFix(observation_domain_id)
}
}
}
fn group_flows_by_exporter(flows: &[FlowConfig]) -> HashMap<ExporterId, Vec<FlowConfig>> {
let mut groups: HashMap<ExporterId, Vec<FlowConfig>> = HashMap::new();
for (index, flow) in flows.iter().enumerate() {
let exporter_id = extract_exporter_id(flow, index);
groups.entry(exporter_id).or_default().push(flow.clone());
}
groups
}
fn process_exporter_group(
flows: &[FlowConfig],
initial_sequence: u32,
send_templates: bool,
verbose: bool,
) -> Result<(Vec<Vec<u8>>, u32)> {
let mut packets = Vec::new();
let mut current_seq = initial_sequence;
for flow in flows {
match flow {
FlowConfig::V5(v5_config) => {
if verbose {
println!(" Generating NetFlow V5 packet...");
}
let packet = generator::build_v5_packet(v5_config.clone(), Some(current_seq))?;
packets.push(packet);
let num_records = u32::try_from(v5_config.flowsets.len()).map_err(|_| {
error::NetflowError::Generation("Too many V5 flowsets".to_string())
})?;
current_seq = current_seq.checked_add(num_records).ok_or_else(|| {
error::NetflowError::Generation("Sequence number overflow".to_string())
})?;
}
FlowConfig::V7(v7_config) => {
if verbose {
println!(" Generating NetFlow V7 packet...");
}
let packet = generator::build_v7_packet(v7_config.clone())?;
packets.push(packet);
}
FlowConfig::V9(v9_config) => {
if verbose {
let template_msg = if send_templates {
" (with templates)"
} else {
""
};
println!(" Generating NetFlow V9 packet(s){}...", template_msg);
}
let (batch, next_seq) = generator::build_v9_packets(
v9_config.clone(),
Some(current_seq),
send_templates,
)?;
packets.extend(batch);
current_seq = next_seq;
}
FlowConfig::IPFix(ipfix_config) => {
if verbose {
let template_msg = if send_templates {
" (with templates)"
} else {
""
};
println!(" Generating IPFIX packet(s){}...", template_msg);
}
let (batch, next_seq) = generator::build_ipfix_packets(
ipfix_config.clone(),
Some(current_seq),
send_templates,
)?;
packets.extend(batch);
current_seq = next_seq;
}
}
}
Ok((packets, current_seq))
}