use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use pcap::{Active, Capture, Error, PacketHeader};
use tokio::sync::Mutex;
use tokio::sync::mpsc::{UnboundedReceiver, unbounded_channel};
#[derive(Debug, Clone)]
pub struct Packet {
pub header: PacketHeader,
pub data: Vec<u8>,
}
pub struct AsyncCapture {
rx: Mutex<UnboundedReceiver<PacketOrStop>>,
}
enum PacketOrStop {
Packet(Result<Packet, Error>),
Stop,
}
#[derive(Clone)]
pub struct AsyncCaptureHandle {
stop_flag: Arc<AtomicBool>,
}
impl AsyncCapture {
pub fn new(mut cap: Capture<Active>) -> (Self, AsyncCaptureHandle) {
let (tx, rx) = unbounded_channel::<PacketOrStop>();
let stop_flag = Arc::new(AtomicBool::new(false));
let handle = AsyncCaptureHandle {
stop_flag: stop_flag.clone(),
};
std::thread::spawn(move || {
loop {
if stop_flag.load(Ordering::Relaxed) {
eprintln!("AsyncCapture thread is aborted.");
break;
}
let res = cap.next_packet();
let owned = res.map(|packet| Packet {
header: *packet.header,
data: packet.data.to_vec(),
});
if let Err(e) = tx.send(PacketOrStop::Packet(owned)) {
eprintln!("{e}");
break;
}
}
let _ = tx.send(PacketOrStop::Stop);
});
(Self { rx: Mutex::new(rx) }, handle)
}
pub async fn next_packet(&self) -> Option<Result<Packet, Error>> {
let mut rx = self.rx.lock().await;
match rx.recv().await {
Some(PacketOrStop::Packet(pkt)) => Some(pkt),
Some(PacketOrStop::Stop) | None => None,
}
}
}
impl AsyncCaptureHandle {
pub fn stop(&self) {
self.stop_flag.store(true, Ordering::Relaxed);
}
}
impl Drop for AsyncCaptureHandle {
fn drop(&mut self) {
self.stop();
}
}