async_pcap/async_pcap.rs
1use std::sync::Arc;
2use std::sync::atomic::{AtomicBool, Ordering};
3
4use pcap::{Active, Capture, Error, PacketHeader};
5use tokio::sync::Mutex;
6use tokio::sync::mpsc::{UnboundedReceiver, unbounded_channel};
7
8/// Represents a network packet with its header and raw data.
9#[derive(Debug, Clone)]
10pub struct Packet {
11 /// Packet header information provided by pcap
12 pub header: PacketHeader,
13 /// Raw packet data
14 pub data: Vec<u8>,
15}
16
17/// An asynchronous wrapper around a `pcap::Capture`.
18///
19/// `AsyncCapture` owns the receiver side of a channel that receives
20/// captured packets or a stop signal. It allows async code to
21/// `await` new packets without blocking a thread.
22pub struct AsyncCapture {
23 rx: Mutex<UnboundedReceiver<PacketOrStop>>,
24}
25
26/// Enum used internally to represent either a captured packet
27/// or a stop signal to terminate the capture.
28enum PacketOrStop {
29 /// A captured packet
30 Packet(Result<Packet, Error>),
31 /// Signal that capture has stopped
32 Stop,
33}
34
35/// Handle to control the asynchronous capture.
36///
37/// `AsyncCaptureHandle` allows stopping the capture from another
38/// thread or async task.
39#[derive(Clone)]
40pub struct AsyncCaptureHandle {
41 stop_flag: Arc<AtomicBool>,
42}
43
44impl AsyncCapture {
45 /// Creates a new asynchronous capture from a `pcap::Capture<Active>`.
46 ///
47 /// Spawns a background thread that reads packets and sends them
48 /// through a channel for async consumption.
49 ///
50 /// Returns a tuple of `(AsyncCapture, AsyncCaptureHandle)`.
51 pub fn new(mut cap: Capture<Active>) -> (Self, AsyncCaptureHandle) {
52 let (tx, rx) = unbounded_channel::<PacketOrStop>();
53 let stop_flag = Arc::new(AtomicBool::new(false));
54 let handle = AsyncCaptureHandle {
55 stop_flag: stop_flag.clone(),
56 };
57
58 std::thread::spawn(move || {
59 loop {
60 if stop_flag.load(Ordering::Relaxed) {
61 eprintln!("AsyncCapture thread is aborted.");
62 break;
63 }
64 let res = cap.next_packet();
65
66 let owned = res.map(|packet| Packet {
67 header: *packet.header,
68 data: packet.data.to_vec(),
69 });
70 if let Err(e) = tx.send(PacketOrStop::Packet(owned)) {
71 // Receiver dropped, exit thread
72 eprintln!("{e}");
73 break;
74 }
75 }
76 // Send a Stop message when capture thread ends
77 let _ = tx.send(PacketOrStop::Stop);
78 });
79
80 (Self { rx: Mutex::new(rx) }, handle)
81 }
82
83 /// Waits for the next packet asynchronously.
84 ///
85 /// Returns `Some(Result<Packet, Error>)` if a packet is received,
86 /// or `None` if the capture has stopped.
87 pub async fn next_packet(&self) -> Option<Result<Packet, Error>> {
88 let mut rx = self.rx.lock().await;
89 match rx.recv().await {
90 Some(PacketOrStop::Packet(pkt)) => Some(pkt),
91 Some(PacketOrStop::Stop) | None => None,
92 }
93 }
94}
95
96impl AsyncCaptureHandle {
97 /// Stops the capture from another thread or asynchronous task.
98 ///
99 /// This method sets the internal stop flag, signaling the background
100 /// capture thread to terminate gracefully. It also sends a `Stop`
101 /// message through the internal channel to ensure that any awaiting
102 /// calls to [`AsyncCapture::next_packet()`] will return `None`.
103 ///
104 /// # Notes
105 ///
106 /// - Calling this method multiple times is safe and idempotent.
107 /// - Once stopped, the background thread will no longer produce packets.
108 /// - After calling `stop`, any future calls to
109 /// [`AsyncCapture::next_packet()`] will immediately return `None`.
110 pub fn stop(&self) {
111 self.stop_flag.store(true, Ordering::Relaxed);
112 }
113}
114
115impl Drop for AsyncCaptureHandle {
116 /// Automatically stops the capture when the last handle is dropped.
117 ///
118 /// This ensures that the background capture thread is terminated
119 /// even if [`AsyncCaptureHandle::stop()`] was not called explicitly.
120 ///
121 /// When the last instance of this handle is dropped, the stop flag
122 /// is set, and a `Stop` signal is sent to notify all waiting receivers.
123 ///
124 /// # Notes
125 ///
126 /// - Dropping cloned handles does **not** stop the capture as long as
127 /// other handles still exist.
128 /// - The capture thread will only be stopped automatically when the
129 /// **last** handle is dropped.
130 fn drop(&mut self) {
131 self.stop();
132 }
133}