use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tokio::sync::mpsc;
use tracing::{error, info};
use crate::config::schema::AppConfig;
use crate::metrics::{TrafficDirection, TrafficEvent};
use super::csv::CsvWriter;
use super::pcap::PcapWriter;
#[derive(Debug, Clone)]
pub struct CaptureConfig {
pub pcap_path: Option<PathBuf>,
pub csv_path: Option<PathBuf>,
}
impl CaptureConfig {
pub fn from_app_config(cfg: &AppConfig) -> Self {
let pcap_path = cfg
.pcap
.as_ref()
.filter(|p| p.enabled && !p.path.is_empty())
.map(|p| PathBuf::from(&p.path));
let csv_path = cfg
.csv
.as_ref()
.filter(|c| c.enabled && !c.path.is_empty())
.map(|c| PathBuf::from(&c.path));
Self { pcap_path, csv_path }
}
pub fn is_active(&self) -> bool {
self.pcap_path.is_some() || self.csv_path.is_some()
}
}
#[derive(Debug)]
pub struct CaptureState {
pub pcap_enabled: Arc<AtomicBool>,
pub csv_enabled: Arc<AtomicBool>,
}
impl CaptureState {
pub fn new(pcap_initially: bool, csv_initially: bool) -> Self {
Self {
pcap_enabled: Arc::new(AtomicBool::new(pcap_initially)),
csv_enabled: Arc::new(AtomicBool::new(csv_initially)),
}
}
pub fn toggle_pcap(&self) -> bool {
let prev = self.pcap_enabled.fetch_xor(true, Ordering::Relaxed);
!prev
}
pub fn toggle_csv(&self) -> bool {
let prev = self.csv_enabled.fetch_xor(true, Ordering::Relaxed);
!prev
}
pub fn pcap_on(&self) -> bool { self.pcap_enabled.load(Ordering::Relaxed) }
pub fn csv_on(&self) -> bool { self.csv_enabled.load(Ordering::Relaxed) }
}
impl Clone for CaptureState {
fn clone(&self) -> Self {
Self {
pcap_enabled: self.pcap_enabled.clone(),
csv_enabled: self.csv_enabled.clone(),
}
}
}
#[allow(dead_code)]
pub struct TrafficSink {
config: CaptureConfig,
state: CaptureState,
traffic_rx: mpsc::Receiver<TrafficEvent>,
}
#[allow(dead_code)]
impl TrafficSink {
pub fn new(
config: CaptureConfig,
state: CaptureState,
traffic_rx: mpsc::Receiver<TrafficEvent>,
) -> Self {
Self { config, state, traffic_rx }
}
pub fn config(&self) -> &CaptureConfig { &self.config }
pub fn state(&self) -> &CaptureState { &self.state }
pub async fn run(mut self) {
let mut pcap: Option<PcapWriter> = None;
let mut csv: Option<CsvWriter> = None;
if let Some(p) = &self.config.pcap_path {
if self.state.pcap_on() {
match PcapWriter::create(&p.to_string_lossy()) {
Ok(w) => {
info!(path = %p.display(), "PCAP capture started");
pcap = Some(w);
}
Err(e) => error!(path = %p.display(), error = %e, "cannot open PCAP file"),
}
}
}
if let Some(p) = &self.config.csv_path {
if self.state.csv_on() {
match CsvWriter::create(&p.to_string_lossy()) {
Ok(w) => {
info!(path = %p.display(), "CSV capture started");
csv = Some(w);
}
Err(e) => error!(path = %p.display(), error = %e, "cannot open CSV file"),
}
}
}
let mut flush_counter = 0u32;
while let Some(event) = self.traffic_rx.recv().await {
if self.state.pcap_on() {
if pcap.is_none() {
if let Some(p) = &self.config.pcap_path {
match PcapWriter::create(&p.to_string_lossy()) {
Ok(w) => {
info!(path = %p.display(), "PCAP capture resumed");
pcap = Some(w);
}
Err(e) => error!(%e, "PCAP reopen failed"),
}
}
}
if let Some(w) = &mut pcap {
let upstream_rx = event.direction == TrafficDirection::UpstreamRx;
if let Err(e) = w.write_packet(event.timestamp, upstream_rx, &event.frame) {
error!(error = %e, "PCAP write error");
}
}
} else {
if pcap.is_some() {
if let Some(mut w) = pcap.take() {
w.flush().ok();
}
info!("PCAP capture paused");
}
}
if self.state.csv_on() {
if csv.is_none() {
if let Some(p) = &self.config.csv_path {
match CsvWriter::create(&p.to_string_lossy()) {
Ok(w) => {
info!(path = %p.display(), "CSV capture resumed");
csv = Some(w);
}
Err(e) => error!(%e, "CSV reopen failed"),
}
}
}
if let Some(w) = &mut csv {
if let Err(e) = w.write_event(&event) {
error!(error = %e, "CSV write error");
}
}
} else if csv.is_some() {
if let Some(mut w) = csv.take() {
w.flush().ok();
}
info!("CSV capture paused");
}
flush_counter = flush_counter.wrapping_add(1);
if flush_counter % 64 == 0 {
if let Some(w) = &mut pcap { w.flush().ok(); }
if let Some(w) = &mut csv { w.flush().ok(); }
}
}
if let Some(mut w) = pcap { w.flush().ok(); }
if let Some(mut w) = csv { w.flush().ok(); }
info!("traffic sink shut down");
}
}