use std::{sync::Arc, time::Duration};
use axum::{extract::State, routing::get, Json, Router};
use serde::Serialize;
use tokio::{net::TcpListener, sync::RwLock, time::sleep};
use std::net::{IpAddr, SocketAddr};
use std::fs::File;
use std::io::{BufRead, BufReader};
#[derive(Debug, Clone, Serialize, Hash, PartialEq, Eq)]
pub struct Connection {
pub proto: String,
pub src_ip: IpAddr,
pub src_port: u16,
pub dst_ip: IpAddr,
pub dst_port: u16,
pub state: String,
}
type SharedConnections = Arc<RwLock<Vec<Connection>>>;
#[derive(Clone)]
struct AppState {
connections: SharedConnections,
node_name: Option<String>,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let conntrack_env = std::env::var("CONNTRACK_PATH").unwrap_or_else(|_| "auto".into());
let conntrack_path = resolve_conntrack_path(&conntrack_env);
let verbose = std::env::var("KFLOW_DEBUG").is_ok();
if verbose {
if conntrack_path.is_empty() {
println!("kflow daemon starting; CONNTRACK_PATH={} (no candidate found yet)", conntrack_env);
} else {
println!("kflow daemon starting; CONNTRACK_PATH={} -> {}", conntrack_env, conntrack_path);
}
}
let state: SharedConnections = Arc::new(RwLock::new(Vec::new()));
let node_name = std::env::var("KUBE_NODE_NAME").ok();
{
let state = state.clone();
let path = conntrack_path.clone();
tokio::spawn(async move {
use std::collections::HashSet;
let mut prev_set: HashSet<Connection> = HashSet::new();
loop {
let flows = read_conntrack(&path);
let new_set: HashSet<Connection> = flows.iter().cloned().collect();
for added in new_set.difference(&prev_set) {
println!("Added connection: {:?}", added);
}
for removed in prev_set.difference(&new_set) {
println!("Removed connection: {:?}", removed);
}
prev_set = new_set;
{
let mut w = state.write().await;
*w = flows;
}
sleep(Duration::from_secs(2)).await;
}
});
}
let app_state = AppState { connections: state, node_name };
let app = Router::new()
.route("/connections", get(list_connections))
.with_state(app_state);
let addr: SocketAddr = "0.0.0.0:8080".parse().unwrap();
println!("Listening on {addr}");
let listener = TcpListener::bind(addr).await?;
axum::serve(listener, app).await?;
Ok(())
}
fn resolve_conntrack_path(requested: &str) -> String {
use std::path::Path;
if requested == "auto" {
if let Some(found) = detect_conntrack_candidate() {
if std::env::var("KFLOW_DEBUG").is_ok() {
eprintln!("auto-detected conntrack path: {}", found);
}
return found;
}
return String::new();
}
if Path::new(requested).exists() {
return requested.to_string();
}
let alt_candidates = [
format!("/host{}", requested),
"/host/proc/net/nf_conntrack".into(),
"/host/proc/net/ip_conntrack".into(),
"/proc/net/nf_conntrack".into(),
"/proc/net/ip_conntrack".into(),
];
for c in &alt_candidates {
if Path::new(c).exists() {
if std::env::var("KFLOW_DEBUG").is_ok() {
eprintln!("resolved conntrack path '{}' -> '{}'", requested, c);
}
return c.to_string();
}
}
eprintln!("conntrack path '{}' does not exist and no alternatives found; will retry detection periodically", requested);
String::new()
}
fn detect_conntrack_candidate() -> Option<String> {
use std::path::Path;
use std::fs::File;
use std::io::BufRead;
let candidates = [
"/host/proc/net/nf_conntrack",
"/host/proc/net/ip_conntrack",
"/host/proc/net/nf_conntrack6",
"/proc/net/nf_conntrack",
"/proc/net/ip_conntrack",
"/proc/net/nf_conntrack6",
"/proc/net/ip_conntrack6",
];
for c in &candidates {
let p = Path::new(c);
if !p.exists() {
continue;
}
match File::open(p) {
Ok(f) => {
let mut reader = std::io::BufReader::new(f);
let mut line = String::new();
match reader.read_line(&mut line) {
Ok(n) => {
if n > 0 && !line.trim().is_empty() {
if std::env::var("KFLOW_DEBUG").is_ok() {
eprintln!("candidate {} exists and has content", c);
}
return Some(c.to_string());
} else {
if std::env::var("KFLOW_DEBUG").is_ok() {
eprintln!("candidate {} exists but was empty; skipping", c);
}
continue;
}
}
Err(e) => {
if std::env::var("KFLOW_DEBUG").is_ok() {
eprintln!("failed reading {}: {}", c, e);
}
continue;
}
}
}
Err(e) => {
if std::env::var("KFLOW_DEBUG").is_ok() {
eprintln!("failed opening {}: {}", c, e);
}
continue;
}
}
}
None
}
#[derive(Debug, Serialize)]
struct ConnectionsResponse {
node_name: Option<String>,
connections: Vec<Connection>,
}
async fn list_connections(
State(state): State<AppState>,
) -> Json<ConnectionsResponse> {
let snapshot = state.connections.read().await;
Json(ConnectionsResponse {
node_name: state.node_name.clone(),
connections: snapshot.clone(),
})
}
fn read_conntrack(path: &str) -> Vec<Connection> {
let mut path_to_use = path.to_string();
if path_to_use.is_empty() {
if let Some(detected) = detect_conntrack_candidate() {
path_to_use = detected;
if std::env::var("KFLOW_DEBUG").is_ok() {
eprintln!("detected conntrack path during read: {}", path_to_use);
}
} else {
return vec![];
}
}
let file = match File::open(&path_to_use) {
Ok(f) => f,
Err(e) => {
if std::env::var("KFLOW_DEBUG").is_ok() {
eprintln!("Failed to open conntrack file {}: {}", path_to_use, e);
}
return vec![];
}
};
let reader = BufReader::new(file);
let lines: Vec<String> = reader.lines().filter_map(|l| l.ok()).collect();
if std::env::var("KFLOW_DEBUG").is_ok() {
eprintln!("read_conntrack: read {} lines from {}", lines.len(), path_to_use);
for (i, ln) in lines.iter().enumerate().take(5) {
eprintln!(" [{}] {}", i, ln);
}
}
lines
.into_iter()
.filter_map(|l| parse_conntrack_line(&l))
.collect()
}
fn parse_conntrack_line(line: &str) -> Option<Connection> {
let parts: Vec<&str> = line.split_whitespace().collect();
if parts.len() < 5 {
return None;
}
let mut proto: Option<String> = None;
let mut state: Option<String> = None;
let mut src_ip: Option<IpAddr> = None;
let mut dst_ip: Option<IpAddr> = None;
let mut src_port: Option<u16> = None;
let mut dst_port: Option<u16> = None;
for p in &parts {
if proto.is_none() && (*p == "tcp" || *p == "udp") {
proto = Some((*p).to_string());
} else if p.starts_with("src=") && src_ip.is_none() {
src_ip = p["src=".len()..].parse().ok();
} else if p.starts_with("dst=") && dst_ip.is_none() {
dst_ip = p["dst=".len()..].parse().ok();
} else if p.starts_with("sport=") && src_port.is_none() {
src_port = p["sport=".len()..].parse().ok();
} else if p.starts_with("dport=") && dst_port.is_none() {
dst_port = p["dport=".len()..].parse().ok();
} else if state.is_none()
&& (*p == "ESTABLISHED"
|| *p == "SYN_SENT"
|| *p == "SYN_RECV"
|| *p == "FIN_WAIT"
|| *p == "TIME_WAIT")
{
state = Some((*p).to_string());
}
}
Some(Connection {
proto: proto?,
src_ip: src_ip?,
src_port: src_port?,
dst_ip: dst_ip?,
dst_port: dst_port?,
state: state.unwrap_or_else(|| "UNKNOWN".into()),
})
}