use std::sync::{
atomic::{AtomicU32, Ordering},
mpsc::Receiver,
Arc
};
#[cfg(feature = "inspect")]
use std::time::Instant;
use bitflags::bitflags;
use pcap::Active;
use super::PacketHandler;
use crate::err::Error;
#[cfg(feature = "idle")]
use crate::idlemon;
use super::Msg;
bitflags! {
pub(crate) struct CmdFlags: u32 {
const KILL = 0b00000001;
#[cfg(feature = "inspect")]
const INSPECT = 0b00000010;
#[cfg(feature = "idle")]
const IDLE = 0b00000100;
const CHANNEL = 0b00001000;
const ALL = {
let fl = 0;
let fl = fl | Self::KILL.bits();
#[cfg(feature = "inspect")]
let fl = fl | Self::INSPECT.bits();
#[cfg(feature = "idle")]
let fl = fl | Self::IDLE.bits();
let fl = fl | Self::CHANNEL.bits();
fl
};
}
}
pub(super) struct RunParams {
pub(super) cmdreq: Arc<AtomicU32>,
#[cfg(feature = "idle")]
pub(super) idle_sh: Option<Arc<idlemon::Shared>>,
pub(super) ch_rx: Receiver<Msg>
}
pub(super) fn run<E>(
mut cap: pcap::Capture<Active>,
mut handler: impl PacketHandler<Error = E> + Send + 'static,
rp: RunParams
) -> Result<(), Error<E>> {
#[cfg(feature = "inspect")]
let start_time = Instant::now();
#[cfg(feature = "inspect")]
let mut num_raw_pkts: u64 = 0;
#[cfg(feature = "inspect")]
let mut raw_pkt_bytes: u64 = 0;
let res = 'outer: loop {
let res = cap.next_packet();
match res {
Ok(pkt) => {
#[cfg(feature = "inspect")]
{
num_raw_pkts = num_raw_pkts.wrapping_add(1);
raw_pkt_bytes = raw_pkt_bytes.wrapping_add(pkt.len() as u64);
}
#[cfg(feature = "idle")]
if let Some(ref idle_sh) = rp.idle_sh {
idle_sh.touch();
}
if let Err(e) = handler.proc(pkt) {
break 'outer Err(Error::App(e));
}
}
Err(pcap::Error::TimeoutExpired) => {
}
Err(_e) => {
}
}
const MASK: u32 = !CmdFlags::ALL.bits();
let cmd = rp.cmdreq.fetch_and(MASK, Ordering::SeqCst);
if cmd & CmdFlags::KILL.bits() != 0 {
break Ok(());
}
#[cfg(feature = "inspect")]
if cmd & CmdFlags::INSPECT.bits() != 0 {
let stats = cap.stats().unwrap();
let info = super::RecvInfo {
overflow_dropped: stats.dropped,
if_dropped: stats.if_dropped,
num_raw_pkts,
raw_pkt_bytes,
runtime: Instant::now() - start_time
};
handler.inspect(&info);
}
#[cfg(feature = "idle")]
if cmd & CmdFlags::IDLE.bits() != 0 {
if let Err(e) = handler.idle() {
break 'outer Err(Error::App(e));
}
}
if cmd & CmdFlags::CHANNEL.bits() != 0 {
while let Ok(msg) = rp.ch_rx.try_recv() {
match msg {
#[cfg(feature = "inspect")]
Msg::Inspect(tx) => {
let stats = cap.stats().unwrap();
let info = super::RecvInfo {
overflow_dropped: stats.dropped,
if_dropped: stats.if_dropped,
num_raw_pkts,
raw_pkt_bytes,
runtime: Instant::now() - start_time
};
if let Err(e) = tx.send(info) {
break 'outer Err(Error::Internal(format!(
"Unable to respond to channel request; {}",
e
)));
}
}
}
}
}
};
handler.shutdown();
res
}