use pcap::{Active, Capture, PacketHeader};
use tokio::sync::Mutex;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
#[derive(Debug, Clone)]
pub struct Packet {
pub header: PacketHeader,
pub data: Vec<u8>,
}
pub struct AsyncCapture {
rx: Mutex<UnboundedReceiver<PacketOrStop>>,
}
enum PacketOrStop {
Packet(Packet),
Stop,
}
#[derive(Clone)]
pub struct AsyncCaptureHandle {
tx: UnboundedSender<PacketOrStop>,
}
impl AsyncCapture {
pub fn new(mut cap: Capture<Active>) -> (Self, AsyncCaptureHandle) {
let (tx, rx) = unbounded_channel::<PacketOrStop>();
let handle = AsyncCaptureHandle { tx: tx.clone() };
std::thread::spawn(move || {
while let Ok(packet) = cap.next_packet() {
let owned = Packet {
header: *packet.header,
data: packet.data.to_vec(),
};
if tx.send(PacketOrStop::Packet(owned)).is_err() {
break;
}
}
let _ = tx.send(PacketOrStop::Stop);
});
(Self { rx: Mutex::new(rx) }, handle)
}
pub async fn next_packet(&self) -> Option<Packet> {
let mut rx = self.rx.lock().await;
match rx.recv().await {
Some(PacketOrStop::Packet(pkt)) => Some(pkt),
Some(PacketOrStop::Stop) | None => None,
}
}
}
impl AsyncCaptureHandle {
pub fn stop(&self) {
let _ = self.tx.send(PacketOrStop::Stop);
}
}