async_pcap/async_pcap.rs
1use std::sync::Arc;
2use std::sync::atomic::{AtomicBool, Ordering};
3
4use pcap::{Active, Capture, Error, PacketHeader};
5use tokio::sync::Mutex;
6use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
7
8/// Represents a network packet with its header and raw data.
9#[derive(Debug, Clone)]
10pub struct Packet {
11 /// Packet header information provided by pcap
12 pub header: PacketHeader,
13 /// Raw packet data
14 pub data: Vec<u8>,
15}
16
17/// An asynchronous wrapper around a `pcap::Capture`.
18///
19/// `AsyncCapture` owns the receiver side of a channel that receives
20/// captured packets or a stop signal. It allows async code to
21/// `await` new packets without blocking a thread.
22pub struct AsyncCapture {
23 rx: Mutex<UnboundedReceiver<PacketOrStop>>,
24}
25
26/// Enum used internally to represent either a captured packet
27/// or a stop signal to terminate the capture.
28enum PacketOrStop {
29 /// A captured packet
30 Packet(Result<Packet, Error>),
31 /// Signal that capture has stopped
32 Stop,
33}
34
35/// Handle to control the asynchronous capture.
36///
37/// `AsyncCaptureHandle` allows stopping the capture from another
38/// thread or async task.
39#[derive(Clone)]
40pub struct AsyncCaptureHandle {
41 tx: UnboundedSender<PacketOrStop>,
42 stop_flag: Arc<AtomicBool>,
43}
44
45impl AsyncCapture {
46 /// Creates a new asynchronous capture from a `pcap::Capture<Active>`.
47 ///
48 /// Spawns a background thread that reads packets and sends them
49 /// through a channel for async consumption.
50 ///
51 /// Returns a tuple of `(AsyncCapture, AsyncCaptureHandle)`.
52 pub fn new(mut cap: Capture<Active>) -> (Self, AsyncCaptureHandle) {
53 let (tx, rx) = unbounded_channel::<PacketOrStop>();
54 let stop_flag = Arc::new(AtomicBool::new(false));
55 let handle = AsyncCaptureHandle {
56 tx: tx.clone(),
57 stop_flag: stop_flag.clone(),
58 };
59
60 std::thread::spawn(move || {
61 loop {
62 if stop_flag.load(Ordering::Relaxed) {
63 log::info!("AsyncCapture thread is aborted.");
64 break;
65 }
66 let res = cap.next_packet();
67
68 let owned = res.map(|packet| Packet {
69 header: *packet.header,
70 data: packet.data.to_vec(),
71 });
72 if let Err(e) = tx.send(PacketOrStop::Packet(owned)) {
73 // Receiver dropped, exit thread
74 log::debug!("{e}");
75 break;
76 }
77 }
78 // Send a Stop message when capture thread ends
79 let _ = tx.send(PacketOrStop::Stop);
80 });
81
82 (Self { rx: Mutex::new(rx) }, handle)
83 }
84
85 /// Waits for the next packet asynchronously.
86 ///
87 /// Returns `Some(Result<Packet, Error>)` if a packet is received,
88 /// or `None` if the capture has stopped.
89 pub async fn next_packet(&self) -> Option<Result<Packet, Error>> {
90 let mut rx = self.rx.lock().await;
91 match rx.recv().await {
92 Some(PacketOrStop::Packet(pkt)) => Some(pkt),
93 Some(PacketOrStop::Stop) | None => None,
94 }
95 }
96}
97
98impl AsyncCaptureHandle {
99 /// Stops the capture from another thread or asynchronous task.
100 ///
101 /// This method sets the internal stop flag, signaling the background
102 /// capture thread to terminate gracefully. It also sends a `Stop`
103 /// message through the internal channel to ensure that any awaiting
104 /// calls to [`AsyncCapture::next_packet()`] will return `None`.
105 ///
106 /// # Notes
107 ///
108 /// - Calling this method multiple times is safe and idempotent.
109 /// - Once stopped, the background thread will no longer produce packets.
110 /// - After calling `stop`, any future calls to
111 /// [`AsyncCapture::next_packet()`] will immediately return `None`.
112 pub fn stop(&self) {
113 if !self.stop_flag.swap(true, Ordering::Relaxed) {
114 let _ = self.tx.send(PacketOrStop::Stop);
115 }
116 }
117}
118
119impl Drop for AsyncCaptureHandle {
120 /// Automatically stops the capture when the last handle is dropped.
121 ///
122 /// This ensures that the background capture thread is terminated
123 /// even if [`AsyncCaptureHandle::stop()`] was not called explicitly.
124 ///
125 /// When the last instance of this handle is dropped, the stop flag
126 /// is set, and a `Stop` signal is sent to notify all waiting receivers.
127 ///
128 /// # Notes
129 ///
130 /// - Dropping cloned handles does **not** stop the capture as long as
131 /// other handles still exist.
132 /// - The capture thread will only be stopped automatically when the
133 /// **last** handle is dropped.
134 fn drop(&mut self) {
135 // Auto-stop when the last handle is dropped
136 if Arc::strong_count(&self.stop_flag) == 1 {
137 self.stop();
138 }
139 }
140}