use std::{
io::{ErrorKind, Read},
os::fd::AsRawFd,
sync::mpsc::Receiver
};
#[cfg(feature = "inspect")]
use std::time::Instant;
#[cfg(feature = "idle")]
use {crate::idlemon, std::sync::Arc};
use mio::{
unix::{pipe, SourceFd},
Events, Interest, Poll, Token
};
use pcap::Active;
use super::PacketHandler;
use crate::{err::Error, Msg};
pub(super) struct RunParams {
pub(super) ctl_rx: pipe::Receiver,
#[cfg(feature = "idle")]
pub(super) idle_sh: Option<Arc<idlemon::Shared>>,
pub(super) ch_rx: Receiver<Msg>
}
const PKTREADER: Token = Token(0);
const CONTROLLER: Token = Token(1);
pub(super) fn run<E>(
cap: pcap::Capture<Active>,
mut handler: impl PacketHandler<Error = E> + Send + 'static,
mut rp: RunParams
) -> Result<(), Error<E>> {
let mut poll = Poll::new().unwrap();
let mut events = Events::with_capacity(128);
let mut cap = cap.setnonblock().unwrap();
rp.ctl_rx.set_nonblocking(true)?;
poll
.registry()
.register(
&mut SourceFd(&cap.as_raw_fd()),
PKTREADER,
Interest::READABLE
)
.unwrap();
poll
.registry()
.register(&mut rp.ctl_rx, CONTROLLER, Interest::READABLE)
.unwrap();
#[cfg(feature = "inspect")]
let (start_time, mut num_raw_pkts, mut raw_pkt_bytes) =
(Instant::now(), 0u64, 0u64);
let res = 'outer: loop {
poll.poll(&mut events, None).unwrap();
for evt in events.iter() {
match evt.token() {
PKTREADER => loop {
let res = cap.next_packet();
match res {
Ok(pkt) => {
#[cfg(feature = "inspect")]
{
num_raw_pkts = num_raw_pkts.wrapping_add(1);
raw_pkt_bytes = raw_pkt_bytes.wrapping_add(pkt.len() as u64);
}
#[cfg(feature = "idle")]
if let Some(ref idle_sh) = rp.idle_sh {
idle_sh.touch();
}
if let Err(e) = handler.proc(pkt) {
break 'outer Err(Error::App(e));
}
}
Err(pcap::Error::TimeoutExpired) => {
break;
}
Err(_e) => {
break;
}
}
},
CONTROLLER => {
let mut cmd = [0u8];
'readctrls: loop {
match rp.ctl_rx.read(&mut cmd) {
Ok(_n) => {
}
Err(e) => {
let ek = e.kind();
if ek == ErrorKind::WouldBlock {
break 'readctrls;
} else {
eprintln!("Unexpected error {}", e);
break 'readctrls;
}
}
}
match cmd[0] {
b't' => {
break 'outer Ok(());
}
#[cfg(feature = "inspect")]
b'i' => {
let stats = cap.stats().unwrap();
let info = super::RecvInfo {
overflow_dropped: stats.dropped,
if_dropped: stats.if_dropped,
num_raw_pkts,
raw_pkt_bytes,
runtime: Instant::now() - start_time
};
handler.inspect(&info);
}
#[cfg(feature = "idle")]
b'd' => {
let _ = handler.idle();
}
b'c' => {
while let Ok(msg) = rp.ch_rx.try_recv() {
match msg {
#[cfg(feature = "inspect")]
Msg::Inspect(tx) => {
let stats = cap.stats().unwrap();
let info = super::RecvInfo {
overflow_dropped: stats.dropped,
if_dropped: stats.if_dropped,
num_raw_pkts,
raw_pkt_bytes,
runtime: Instant::now() - start_time
};
if let Err(e) = tx.send(info) {
break 'outer Err(Error::Internal(format!(
"Unable to respond to channel request; {}",
e
)));
}
}
}
}
}
_ => {
unreachable!()
}
}
}
}
_ => unreachable!()
}
}
};
handler.shutdown();
res
}