async_pcap/
async_pcap.rs

1use std::sync::Arc;
2use std::sync::atomic::{AtomicBool, Ordering};
3
4use pcap::{Active, Capture, Error, PacketHeader};
5use tokio::sync::Mutex;
6use tokio::sync::mpsc::{UnboundedReceiver, unbounded_channel};
7
8/// Represents a network packet with its header and raw data.
9#[derive(Debug, Clone)]
10pub struct Packet {
11    /// Packet header information provided by pcap
12    pub header: PacketHeader,
13    /// Raw packet data
14    pub data: Vec<u8>,
15}
16
17/// An asynchronous wrapper around a `pcap::Capture`.
18///  
19/// `AsyncCapture` owns the receiver side of a channel that receives
20/// captured packets or a stop signal. It allows async code to
21/// `await` new packets without blocking a thread.
22pub struct AsyncCapture {
23    rx: Mutex<UnboundedReceiver<PacketOrStop>>,
24}
25
26/// Enum used internally to represent either a captured packet
27/// or a stop signal to terminate the capture.
28enum PacketOrStop {
29    /// A captured packet
30    Packet(Result<Packet, Error>),
31    /// Signal that capture has stopped
32    Stop,
33}
34
35/// Handle to control the asynchronous capture.
36///  
37/// `AsyncCaptureHandle` allows stopping the capture from another
38/// thread or async task.
39#[derive(Clone)]
40pub struct AsyncCaptureHandle {
41    stop_flag: Arc<AtomicBool>,
42}
43
44impl AsyncCapture {
45    /// Creates a new asynchronous capture from a `pcap::Capture<Active>`.
46    ///
47    /// Spawns a background thread that reads packets and sends them
48    /// through a channel for async consumption.
49    ///
50    /// Returns a tuple of `(AsyncCapture, AsyncCaptureHandle)`.
51    pub fn new(mut cap: Capture<Active>) -> (Self, AsyncCaptureHandle) {
52        let (tx, rx) = unbounded_channel::<PacketOrStop>();
53        let stop_flag = Arc::new(AtomicBool::new(false));
54        let handle = AsyncCaptureHandle {
55            stop_flag: stop_flag.clone(),
56        };
57
58        std::thread::spawn(move || {
59            loop {
60                if stop_flag.load(Ordering::Relaxed) {
61                    eprintln!("AsyncCapture thread is aborted.");
62                    break;
63                }
64                let res = cap.next_packet();
65
66                let owned = res.map(|packet| Packet {
67                    header: *packet.header,
68                    data: packet.data.to_vec(),
69                });
70                if let Err(e) = tx.send(PacketOrStop::Packet(owned)) {
71                    // Receiver dropped, exit thread
72                    eprintln!("{e}");
73                    break;
74                }
75            }
76            // Send a Stop message when capture thread ends
77            let _ = tx.send(PacketOrStop::Stop);
78        });
79
80        (Self { rx: Mutex::new(rx) }, handle)
81    }
82
83    /// Waits for the next packet asynchronously.
84    ///
85    /// Returns `Some(Result<Packet, Error>)` if a packet is received,
86    /// or `None` if the capture has stopped.
87    pub async fn next_packet(&self) -> Option<Result<Packet, Error>> {
88        let mut rx = self.rx.lock().await;
89        match rx.recv().await {
90            Some(PacketOrStop::Packet(pkt)) => Some(pkt),
91            Some(PacketOrStop::Stop) | None => None,
92        }
93    }
94}
95
96impl AsyncCaptureHandle {
97    /// Stops the capture from another thread or asynchronous task.
98    ///
99    /// This method sets the internal stop flag, signaling the background
100    /// capture thread to terminate gracefully. It also sends a `Stop`
101    /// message through the internal channel to ensure that any awaiting
102    /// calls to [`AsyncCapture::next_packet()`] will return `None`.
103    ///
104    /// # Notes
105    ///
106    /// - Calling this method multiple times is safe and idempotent.
107    /// - Once stopped, the background thread will no longer produce packets.
108    /// - After calling `stop`, any future calls to
109    ///   [`AsyncCapture::next_packet()`] will immediately return `None`.
110    pub fn stop(&self) {
111        self.stop_flag.store(true, Ordering::Relaxed);
112    }
113}
114
115impl Drop for AsyncCaptureHandle {
116    /// Automatically stops the capture when the last handle is dropped.
117    ///
118    /// This ensures that the background capture thread is terminated
119    /// even if [`AsyncCaptureHandle::stop()`] was not called explicitly.
120    ///
121    /// When the last instance of this handle is dropped, the stop flag
122    /// is set, and a `Stop` signal is sent to notify all waiting receivers.
123    ///
124    /// # Notes
125    ///
126    /// - Dropping cloned handles does **not** stop the capture as long as
127    ///   other handles still exist.
128    /// - The capture thread will only be stopped automatically when the
129    ///   **last** handle is dropped.
130    fn drop(&mut self) {
131        self.stop();
132    }
133}