async_pcap/async_pcap.rs
1use pcap::{Active, Capture, PacketHeader};
2use tokio::sync::Mutex;
3use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
4
5/// Represents a network packet with its header and raw data.
6#[derive(Debug, Clone)]
7pub struct Packet {
8 /// Packet header information provided by pcap
9 pub header: PacketHeader,
10 /// Raw packet data
11 pub data: Vec<u8>,
12}
13
14/// An asynchronous wrapper around a `pcap::Capture`.
15///
16/// `AsyncCapture` owns the receiver side of a channel that receives
17/// captured packets or a stop signal. It allows async code to
18/// `await` new packets without blocking a thread.
19pub struct AsyncCapture {
20 rx: Mutex<UnboundedReceiver<PacketOrStop>>,
21}
22
23/// Enum used internally to represent either a captured packet
24/// or a stop signal to terminate the capture.
25enum PacketOrStop {
26 /// A captured packet
27 Packet(Packet),
28 /// Signal that capture has stopped
29 Stop,
30}
31
32/// Handle to control the asynchronous capture.
33///
34/// `AsyncCaptureHandle` allows stopping the capture from another
35/// thread or async task.
36#[derive(Clone)]
37pub struct AsyncCaptureHandle {
38 tx: UnboundedSender<PacketOrStop>,
39}
40
41impl AsyncCapture {
42 /// Creates a new asynchronous capture from a `pcap::Capture<Active>`.
43 ///
44 /// Spawns a background thread that reads packets and sends them
45 /// through a channel for async consumption.
46 ///
47 /// Returns a tuple of `(AsyncCapture, AsyncCaptureHandle)`.
48 pub fn new(mut cap: Capture<Active>) -> (Self, AsyncCaptureHandle) {
49 let (tx, rx) = unbounded_channel::<PacketOrStop>();
50 let handle = AsyncCaptureHandle { tx: tx.clone() };
51
52 std::thread::spawn(move || {
53 while let Ok(packet) = cap.next_packet() {
54 let owned = Packet {
55 header: *packet.header,
56 data: packet.data.to_vec(),
57 };
58 if tx.send(PacketOrStop::Packet(owned)).is_err() {
59 // Receiver dropped, exit thread
60 break;
61 }
62 }
63 // Send a Stop message when capture thread ends
64 let _ = tx.send(PacketOrStop::Stop);
65 });
66
67 (Self { rx: Mutex::new(rx) }, handle)
68 }
69
70 /// Waits for the next packet asynchronously.
71 ///
72 /// Returns `Some(Packet)` if a packet is received,
73 /// or `None` if the capture has stopped.
74 pub async fn next_packet(&self) -> Option<Packet> {
75 let mut rx = self.rx.lock().await;
76 match rx.recv().await {
77 Some(PacketOrStop::Packet(pkt)) => Some(pkt),
78 Some(PacketOrStop::Stop) | None => None,
79 }
80 }
81}
82
83impl AsyncCaptureHandle {
84 /// Stops the capture from another thread or async task.
85 ///
86 /// Sends a stop signal to the capture thread, causing
87 /// `AsyncCapture::next_packet()` to return `None`.
88 pub fn stop(&self) {
89 let _ = self.tx.send(PacketOrStop::Stop);
90 }
91}