lk-inside 0.3.1

A terminal user interface (TUI) application for interactive data analysis.
Documentation
use crossbeam_channel::Receiver;
use polars::prelude::*;
use crate::parser::ParsedPacket;

pub fn run_analytics(r: Receiver<ParsedPacket>) {
    println!("Analytics thread started.");
    let mut timestamps: Vec<u64> = Vec::new();
    let mut interface_names = Vec::new();
    let mut source_ips: Vec<Option<String>> = Vec::new();
    let mut destination_ips: Vec<Option<String>> = Vec::new();
    let mut source_ports: Vec<Option<u16>> = Vec::new();
    let mut destination_ports: Vec<Option<u16>> = Vec::new();
    let mut protocols = Vec::new();
    let mut packet_sizes = Vec::new();

    for packet in r {
        timestamps.push(packet.timestamp);
        interface_names.push(packet.interface_name);
        source_ips.push(packet.source_ip.map(|ip| ip.to_string()));
        destination_ips.push(packet.destination_ip.map(|ip| ip.to_string()));
        source_ports.push(packet.source_port);
        destination_ports.push(packet.destination_port);
        protocols.push(packet.protocol);
        packet_sizes.push(packet.packet_size as u32);
    }

    println!("Analytics thread finished receiving. Creating DataFrame.");

    let df_result = DataFrame::new::<Series>(vec![
        Series::new("timestamp".into(), timestamps).into(),
        Series::new("interface".into(), &interface_names).into(),
        Series::new("source_ip".into(), &source_ips).into(),
        Series::new("destination_ip".into(), &destination_ips).into(),
        Series::new("source_port".into(), source_ports.into_iter().map(|p| p.map(|x| x as i32)).collect::<Vec<Option<i32>>>()).into(),
        Series::new("destination_port".into(), destination_ports.into_iter().map(|p| p.map(|x| x as i32)).collect::<Vec<Option<i32>>>()).into(),
        Series::new("protocol".into(), &protocols).into(),
        Series::new("packet_size".into(), &packet_sizes).into(),
    ]);

    match df_result {
        Ok(dataframe) => {
            println!("Successfully created DataFrame:");
            println!("{}", dataframe);

            // Calculate total capture duration
            let min_timestamp: u64 = dataframe["timestamp"].as_series().min::<u64>().transpose().unwrap().unwrap_or(0);
            let max_timestamp: u64 = dataframe["timestamp"].as_series().max::<u64>().transpose().unwrap().unwrap_or(0);
            let capture_duration_nanos = max_timestamp - min_timestamp;
            let capture_duration_secs = capture_duration_nanos as f64 / 1_000_000_000.0;

                        if capture_duration_secs > 0.0 {
                            // The problematic series calculations have been removed.
                        } else {
                            println!("\nNo packets captured or capture duration too short to calculate rates.");
                        }

            // Protocol Distribution
            println!("\n--- Protocol Distribution ---");
            let protocol_distribution = dataframe
                .group_by(["protocol"])
                .unwrap()
                .select(["protocol"])
                .count()
                .unwrap()
                .sort(["protocol_count"], SortMultipleOptions::default()) // Sort by count descending
                .unwrap();
            println!("{}", protocol_distribution);

        }
        Err(e) => {
            eprintln!("Error creating DataFrame: {}", e);
        }
    }
}