kflow 0.1.2

Node-local daemon and TUI that reads kernel conntrack and visualizes per-node network connections.
Documentation
use clap::Parser;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use tokio::sync::RwLock;

pub const DEFAULT_DAEMONSET: &str = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/k8s/daemonset.yaml"));

pub mod types;
pub mod kubectl;
pub mod fetch;
pub mod tui;

use types::Connection;
use kubectl::{run_kubectl_apply, run_kubectl_delete, discover_pods};
use fetch::{fetch_url, fetch_via_portforward};
use tui::run_tui;

#[derive(Parser, Debug)]
#[command(name = "kflow-cli")]
struct Args {
    #[arg(long)]
    #[arg(long, default_value_t = true)]
    kube: bool,

    #[arg(long)]
    endpoints: Option<String>,

    #[arg(long, default_value_t = 18080)]
    start_port: u16,

    #[command(subcommand)]
    cmd: Option<CommandSub>,
}

#[derive(clap::Subcommand, Debug)]
enum CommandSub {
    Install {
        #[arg(long)]
        file: Option<String>,
        #[arg(short = 'n', long)]
        namespace: Option<String>,
        #[arg(long)]
        conntrack: Option<String>,
    },
    Uninstall {
        #[arg(long)]
        file: Option<String>,
        #[arg(short = 'n', long)]
        namespace: Option<String>,
        #[arg(long)]
        conntrack: Option<String>,
    },
}

pub async fn run_cli() -> anyhow::Result<()> {
    let _ = color_eyre::install();
    let args = Args::parse();

    if let Some(cmd) = &args.cmd {
        match cmd {
            CommandSub::Install { file, namespace, conntrack } => {
                let file_ref = file.as_deref();
                let conn_ref = conntrack.as_deref();
                run_kubectl_apply(file_ref, namespace.as_deref(), conn_ref).await?;
                return Ok(());
            }
            CommandSub::Uninstall { file, namespace, conntrack } => {
                let file_ref = file.as_deref();
                let conn_ref = conntrack.as_deref();
                run_kubectl_delete(file_ref, namespace.as_deref(), conn_ref).await?;
                return Ok(());
            }
        }
    }

    let state: Arc<RwLock<HashMap<String, Vec<Connection>>>> = Arc::new(RwLock::new(HashMap::new()));

    let endpoints_list: Vec<String> = if let Some(s) = args.endpoints.clone() {
        s.split(',').map(|s| s.trim().to_string()).collect()
    } else if args.kube {
        discover_pods().await?
    } else {
        vec![]
    };

    let did_fetch_once = Arc::new(AtomicBool::new(false));
    if args.kube || !endpoints_list.is_empty() {
        let state_clone = state.clone();
        let endpoints_clone = endpoints_list.clone();
        let start_port = args.start_port;
        let kube_mode = args.kube;
        let did_fetch = did_fetch_once.clone();
        tokio::spawn(async move {
            let mut tick = tokio::time::interval(Duration::from_secs(2));
            loop {
                tick.tick().await;
                let mut map = HashMap::new();
                if kube_mode {
                    for (i, pod) in endpoints_clone.iter().enumerate() {
                        let port = start_port + (i as u16);
                        if let Ok(resp) = fetch_via_portforward(pod, port).await {
                            let node = resp.node_name.unwrap_or_else(|| pod.clone());
                            map.insert(node, resp.connections);
                        }
                    }
                } else {
                    for ep in &endpoints_clone {
                        if let Ok(resp) = fetch_url(&format!("{}/connections", ep)).await {
                            let node = resp.node_name.unwrap_or_else(|| ep.clone());
                            map.insert(node, resp.connections);
                        }
                    }
                }
                let mut w = state_clone.write().await;
                *w = map;
                did_fetch.store(true, Ordering::SeqCst);
            }
        });
    }

    run_tui(state, args.kube, did_fetch_once).await?;
    Ok(())
}