#![cfg_attr(docsrs, feature(doc_cfg))]
#[cfg(unix)]
mod unix;
#[cfg(windows)]
mod win;
#[cfg(feature = "idle")]
mod idlemon;
mod cmdsig;
mod err;
use std::{
sync::{mpsc::Sender, Arc},
thread
};
#[cfg(any(feature = "idle", feature = "inspect"))]
use std::time::Duration;
#[cfg(unix)]
use parking_lot::Mutex;
#[cfg(windows)]
use std::sync::atomic::AtomicU32;
pub use err::Error;
use cmdsig::CmdSignal;
enum Msg {
#[cfg(feature = "inspect")]
Inspect(std::sync::mpsc::Sender<RecvInfo>)
}
#[cfg(feature = "inspect")]
#[derive(Debug)]
pub struct RecvInfo {
pub overflow_dropped: u32,
pub if_dropped: u32,
pub num_raw_pkts: u64,
pub raw_pkt_bytes: u64,
pub runtime: Duration
}
pub trait PacketHandler {
type Error;
fn init(&mut self) -> Result<(), Self::Error> {
Ok(())
}
fn proc(&mut self, pkt: pcap::Packet) -> Result<(), Self::Error>;
#[cfg(feature = "idle")]
#[cfg_attr(docsrs, doc(cfg(feature = "idle")))]
fn idle(&mut self) -> Result<(), Self::Error> {
Ok(())
}
#[cfg(feature = "inspect")]
#[cfg_attr(docsrs, doc(cfg(feature = "inspect")))]
#[allow(unused_variables)]
fn inspect(&self, info: &RecvInfo) {}
fn shutdown(&mut self) {}
}
pub struct RecvThread {
devname: String,
bufsize: i32,
#[cfg(feature = "idle")]
idle_dur: Option<Duration>
}
impl RecvThread {
pub fn new(devname: &str) -> Self {
Self {
devname: devname.to_string(),
bufsize: 16 * 1024 * 1024,
#[cfg(feature = "idle")]
idle_dur: None
}
}
pub fn bufsize<E>(mut self, bufsize: usize) -> Result<Self, Error<E>> {
self.bufsize_r(bufsize)?;
Ok(self)
}
pub fn bufsize_r<E>(
&mut self,
bufsize: usize
) -> Result<&mut Self, Error<E>> {
const MIN: usize = 256 * 1024;
if bufsize < MIN {
let m = format!("Buffer size must be at least {}", MIN);
Err(Error::bad_arg(m))
} else {
self.bufsize = i32::try_from(bufsize).map_err(|e| Error::bad_arg(e))?;
Ok(self)
}
}
#[cfg(feature = "idle")]
#[cfg_attr(docsrs, doc(cfg(feature = "idle")))]
pub fn idle_duration(mut self, dur: Duration) -> Self {
self.idle_dur = Some(dur);
self
}
#[cfg(feature = "idle")]
#[cfg_attr(docsrs, doc(cfg(feature = "idle")))]
pub fn idle_duration_r(&mut self, dur: Duration) -> &mut Self {
self.idle_dur = Some(dur);
self
}
}
impl RecvThread {
pub fn run<E>(
self,
mut handler: impl PacketHandler<Error = E> + Send + 'static
) -> Result<Controller<E>, Error<E>>
where
E: Send + 'static
{
let cap = pcap::Capture::from_device(self.devname.as_ref())
.map_err(|e| Error::NetIface(e.to_string()))?
.buffer_size(self.bufsize);
#[cfg(windows)]
let cap = cap.immediate_mode(true);
let cap = cap.open().map_err(|e| Error::NetIface(e.to_string()))?;
#[cfg(unix)]
let (ctl_tx, ctl_rx) = mio::unix::pipe::new().unwrap();
#[cfg(unix)]
let ctl_tx = Arc::new(Mutex::new(ctl_tx));
#[cfg(windows)]
let (cmdreq, hev_wakeup) =
(Arc::new(AtomicU32::new(0)), unsafe { cap.get_event() });
let cmdsig = CmdSignal {
#[cfg(unix)]
ctl_tx,
#[cfg(windows)]
cmdreq: Arc::clone(&cmdreq),
#[cfg(windows)]
hev_wakeup
};
let (ch_tx, ch_rx) = std::sync::mpsc::channel::<Msg>();
#[cfg(feature = "idle")]
let cmdsig2 = cmdsig.clone();
let jh = thread::spawn(move || {
handler.init().map_err(|e| Error::App(e))?;
#[cfg(feature = "idle")]
let idle_res = if let Some(dur) = self.idle_dur {
let r = idlemon::run(dur, cmdsig2);
Some(r)
} else {
None
};
#[cfg(unix)]
let ret = {
#[cfg(feature = "idle")]
let idle_sh = if let Some((idle_sh, _)) = &idle_res {
Some(Arc::clone(idle_sh))
} else {
None
};
let rp = unix::RunParams {
ctl_rx,
#[cfg(feature = "idle")]
idle_sh,
ch_rx
};
unix::run(cap, handler, rp)
};
#[cfg(windows)]
let ret = {
#[cfg(feature = "idle")]
let idle_sh = if let Some((idle_sh, _)) = &idle_res {
Some(Arc::clone(idle_sh))
} else {
None
};
let rp = win::RunParams {
cmdreq,
#[cfg(feature = "idle")]
idle_sh,
ch_rx
};
win::run(cap, handler, rp)
};
#[cfg(feature = "idle")]
if let Some((idle_sh, idle_jh)) = idle_res {
idle_sh.kill();
let _ = idle_jh.join();
}
ret
});
Ok(Controller {
jh: Some(jh),
cmdsig,
ch_tx
})
}
}
pub struct Controller<E> {
jh: Option<thread::JoinHandle<Result<(), Error<E>>>>,
cmdsig: CmdSignal,
#[allow(dead_code)]
ch_tx: Sender<Msg>
}
impl<E> Controller<E>
where
E: 'static
{
#[cfg(feature = "inspect")]
#[cfg_attr(docsrs, doc(cfg(feature = "inspect")))]
pub fn signal_inspect(&mut self) -> Result<(), Error<E>> {
self.cmdsig.inspect();
Ok(())
}
#[cfg(feature = "inspect")]
#[cfg_attr(docsrs, doc(cfg(feature = "inspect")))]
pub fn inspect(&self) -> Result<RecvInfo, Error<E>> {
let (tx, rx) = std::sync::mpsc::channel();
self
.ch_tx
.send(Msg::Inspect(tx))
.map_err(|e| Error::Internal(e.to_string()))?;
self.cmdsig.channel();
rx.recv().map_err(|e| Error::Internal(e.to_string()))
}
pub fn signal_shutdown(&self) -> Result<(), Error<E>> {
if let Some(ref _jh) = self.jh {
self.cmdsig.kill()?;
Ok(())
} else {
Ok(())
}
}
pub fn wait(mut self) -> Result<(), Error<E>> {
if let Some(jh) = self.jh.take() {
jh.join().map_err(|e| *e.downcast::<Error<E>>().unwrap())?
} else {
Ok(())
}
}
pub fn shutdown(mut self) -> Result<(), Error<E>> {
if let Some(jh) = self.jh.take() {
self.cmdsig.kill()?;
jh.join().map_err(|e| *e.downcast::<Error<E>>().unwrap())?
} else {
Ok(())
}
}
}
impl<E> Drop for Controller<E> {
fn drop(&mut self) {
if let Some(jh) = self.jh.take() {
match self.cmdsig.kill::<E>() {
Ok(_) => {
if let Err(e) = jh.join() {
eprintln!("Receiver thread returned error; {:?}", e);
}
}
Err(e) => {
eprintln!(
"Unable to send termination request to receiver thread; {}",
e
);
}
}
}
}
}