use crossbeam_channel::Receiver;
use polars::prelude::*;
use crate::parser::ParsedPacket;
pub fn run_analytics(r: Receiver<ParsedPacket>) {
println!("Analytics thread started.");
let mut timestamps: Vec<u64> = Vec::new();
let mut interface_names = Vec::new();
let mut source_ips: Vec<Option<String>> = Vec::new();
let mut destination_ips: Vec<Option<String>> = Vec::new();
let mut source_ports: Vec<Option<u16>> = Vec::new();
let mut destination_ports: Vec<Option<u16>> = Vec::new();
let mut protocols = Vec::new();
let mut packet_sizes = Vec::new();
for packet in r {
timestamps.push(packet.timestamp);
interface_names.push(packet.interface_name);
source_ips.push(packet.source_ip.map(|ip| ip.to_string()));
destination_ips.push(packet.destination_ip.map(|ip| ip.to_string()));
source_ports.push(packet.source_port);
destination_ports.push(packet.destination_port);
protocols.push(packet.protocol);
packet_sizes.push(packet.packet_size as u32);
}
println!("Analytics thread finished receiving. Creating DataFrame.");
let df_result = DataFrame::new::<Series>(vec![
Series::new("timestamp".into(), timestamps).into(),
Series::new("interface".into(), &interface_names).into(),
Series::new("source_ip".into(), &source_ips).into(),
Series::new("destination_ip".into(), &destination_ips).into(),
Series::new("source_port".into(), source_ports.into_iter().map(|p| p.map(|x| x as i32)).collect::<Vec<Option<i32>>>()).into(),
Series::new("destination_port".into(), destination_ports.into_iter().map(|p| p.map(|x| x as i32)).collect::<Vec<Option<i32>>>()).into(),
Series::new("protocol".into(), &protocols).into(),
Series::new("packet_size".into(), &packet_sizes).into(),
]);
match df_result {
Ok(dataframe) => {
println!("Successfully created DataFrame:");
println!("{}", dataframe);
let min_timestamp: u64 = dataframe["timestamp"].as_series().min::<u64>().transpose().unwrap().unwrap_or(0);
let max_timestamp: u64 = dataframe["timestamp"].as_series().max::<u64>().transpose().unwrap().unwrap_or(0);
let capture_duration_nanos = max_timestamp - min_timestamp;
let capture_duration_secs = capture_duration_nanos as f64 / 1_000_000_000.0;
if capture_duration_secs > 0.0 {
} else {
println!("\nNo packets captured or capture duration too short to calculate rates.");
}
println!("\n--- Protocol Distribution ---");
let protocol_distribution = dataframe
.group_by(["protocol"])
.unwrap()
.select(["protocol"])
.count()
.unwrap()
.sort(["protocol_count"], SortMultipleOptions::default()) .unwrap();
println!("{}", protocol_distribution);
}
Err(e) => {
eprintln!("Error creating DataFrame: {}", e);
}
}
}