use std::collections::{HashMap, HashSet};
use std::net::IpAddr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use chrono::Utc;
use crossbeam_channel::{Receiver, RecvTimeoutError, unbounded};
use crate::alert::{AlertEngine, FlowEvent};
use crate::capture::RawPacket;
use crate::decode::decode_packet;
use crate::enrich::Enricher;
use crate::enrich::names::NameResolver;
use crate::flow::tracker::FlowTracker;
use crate::flow::{Dir, FlowKey, FlowStatsSnapshot, FlowUpdate};
use crate::identity::BinaryIdentity;
use crate::ipc::protocol::{ConnectionDto, Event};
use crate::ipc::server::{Bus, LiveState};
use crate::notify::Notifier;
use crate::process::lookup_socket_proc;
use crate::store::Store;
use crate::store::models::ConnectionRow;
pub struct WorkerCtx {
pub store: Arc<Mutex<Store>>,
pub engine: AlertEngine,
pub enricher: Arc<dyn Enricher>,
pub live: Arc<Mutex<LiveState>>,
pub bus: Arc<Bus>,
pub notifier: Arc<dyn Notifier>,
pub local_addrs: HashSet<IpAddr>,
}
struct FlowMeta {
id: u64,
outbound: bool,
process_id: Option<i64>,
dest_id: Option<i64>,
}
pub fn run_worker(raw_rx: Receiver<RawPacket>, ctx: WorkerCtx, stop: Arc<AtomicBool>) {
let (done_tx, done_rx) = unbounded();
let mut tracker = FlowTracker::new().with_completion_sink(done_tx);
tracker.set_local_addrs(ctx.local_addrs.clone());
let mut resolver = NameResolver::new();
let mut metas: HashMap<FlowKey, FlowMeta> = HashMap::new();
let mut next_id: u64 = 1;
loop {
if stop.load(Ordering::Relaxed) {
break;
}
match raw_rx.recv_timeout(Duration::from_millis(200)) {
Ok(raw) => {
let mut decoded = decode_packet(&raw);
resolver.observe_packet(&decoded);
let ts_ms = decoded.timestamp.timestamp_millis();
if let Some(upd) = tracker.update(&mut decoded) {
handle_update(&ctx, &resolver, &mut metas, &mut next_id, upd, ts_ms);
}
while let Ok(snap) = done_rx.try_recv() {
persist_closed(&ctx, &mut metas, snap);
}
}
Err(RecvTimeoutError::Timeout) => {
ctx.bus.broadcast(&Event::Heartbeat {
ts_ms: Utc::now().timestamp_millis(),
});
}
Err(RecvTimeoutError::Disconnected) => break,
}
}
while let Ok(snap) = done_rx.try_recv() {
persist_closed(&ctx, &mut metas, snap);
}
}
fn handle_update(
ctx: &WorkerCtx,
resolver: &NameResolver,
metas: &mut HashMap<FlowKey, FlowMeta>,
next_id: &mut u64,
upd: FlowUpdate,
ts_ms: i64,
) {
let key = upd.key.clone();
let (src_addr, src_port, dst_addr, dst_port) = match upd.dir {
Dir::LowToHigh => (key.addr_low, key.port_low, key.addr_high, key.port_high),
Dir::HighToLow => (key.addr_high, key.port_high, key.addr_low, key.port_low),
};
let dst_local = ctx.local_addrs.contains(&dst_addr);
let src_local = ctx.local_addrs.contains(&src_addr);
let (local_addr, local_port, remote_addr, remote_port, pkt_outbound) =
if dst_local && !src_local {
(dst_addr, dst_port, src_addr, src_port, false)
} else {
(src_addr, src_port, dst_addr, dst_port, true)
};
let bytes = upd.bytes as u64;
let (up_delta, down_delta) = if pkt_outbound { (bytes, 0) } else { (0, bytes) };
let (id, outbound) = {
let meta = metas.entry(key.clone()).or_insert_with(|| {
let id = *next_id;
*next_id += 1;
FlowMeta {
id,
outbound: pkt_outbound,
process_id: None,
dest_id: None,
}
});
(meta.id, meta.outbound)
};
let socketproc = lookup_socket_proc(key.protocol, local_addr, local_port);
let identity = socketproc
.as_ref()
.and_then(|p| p.exe_path.as_ref())
.map(|exe| BinaryIdentity::identify(exe));
let name = resolver.name_for(remote_addr).map(|s| s.to_string());
let geo = ctx.enricher.lookup(remote_addr);
let ev = FlowEvent {
ts: ts_ms,
key: key.clone(),
proc_: socketproc.clone(),
identity,
dest_ip: remote_addr,
dest_port: remote_port,
local_port,
protocol: key.protocol,
name: name.clone(),
geo: geo.clone(),
bytes_up: up_delta,
bytes_down: down_delta,
flow_first_seen: upd.first_seen,
outbound,
};
let outcome = {
let store = ctx.store.lock().unwrap();
ctx.engine.evaluate_full(&store, &ev)
};
if let Ok(o) = outcome {
if let Some(meta) = metas.get_mut(&key) {
meta.process_id = o.process_id;
meta.dest_id = Some(o.dest_id);
}
for a in &o.alerts {
ctx.bus.broadcast(&Event::Alert(a.clone()));
ctx.notifier.notify(a);
}
}
let dto = {
let mut live = ctx.live.lock().unwrap();
let dto = live.conns.entry(key).or_insert_with(|| ConnectionDto {
id,
process: socketproc
.as_ref()
.map(|p| p.name.clone())
.unwrap_or_else(|| "?".into()),
pid: socketproc.as_ref().map(|p| p.pid),
dest_name: name.clone().unwrap_or_else(|| remote_addr.to_string()),
remote_ip: remote_addr.to_string(),
remote_port,
local_port,
protocol: key_protocol(&ev),
country: geo.country.clone(),
asn: geo.asn,
as_org: geo.as_org.clone(),
bytes_up: 0,
bytes_down: 0,
first_seen_ms: ts_ms,
last_seen_ms: ts_ms,
outbound,
});
dto.bytes_up += up_delta;
dto.bytes_down += down_delta;
dto.last_seen_ms = ts_ms;
if let Some(n) = &name {
dto.dest_name = n.clone();
}
if geo.country.is_some() {
dto.country = geo.country.clone();
}
if geo.asn.is_some() {
dto.asn = geo.asn;
dto.as_org = geo.as_org.clone();
}
if let Some(p) = &socketproc {
dto.process = p.name.clone();
dto.pid = Some(p.pid);
}
dto.clone()
};
ctx.bus.broadcast(&Event::Flow(dto));
}
fn key_protocol(ev: &FlowEvent) -> u8 {
ev.protocol
}
fn persist_closed(
ctx: &WorkerCtx,
metas: &mut HashMap<FlowKey, FlowMeta>,
snap: FlowStatsSnapshot,
) {
let meta = metas.remove(&snap.key);
let name = {
let mut live = ctx.live.lock().unwrap();
let n = live.conns.get(&snap.key).map(|d| d.dest_name.clone());
live.conns.remove(&snap.key);
n
};
if let Some(m) = &meta {
ctx.bus.broadcast(&Event::Closed { id: m.id });
}
let row = ConnectionRow {
id: 0,
process_id: meta.as_ref().and_then(|m| m.process_id),
dest_id: meta.as_ref().and_then(|m| m.dest_id),
proto: snap.protocol,
local_port: snap.local_port.unwrap_or(0),
remote_port: snap.remote_port.unwrap_or(0),
bytes_up: snap.bytes_up,
bytes_down: snap.bytes_down,
name,
ts_start_ms: snap.first_seen.timestamp_millis(),
ts_end_ms: snap.last_seen.timestamp_millis(),
};
let _ = ctx.store.lock().unwrap().insert_connection(&row);
}