async_pcap/
async_pcap.rs

1use std::sync::Arc;
2use std::sync::atomic::{AtomicBool, Ordering};
3
4use pcap::{Active, Capture, Error, PacketHeader};
5use tokio::sync::Mutex;
6use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
7
8/// Represents a network packet with its header and raw data.
9#[derive(Debug, Clone)]
10pub struct Packet {
11    /// Packet header information provided by pcap
12    pub header: PacketHeader,
13    /// Raw packet data
14    pub data: Vec<u8>,
15}
16
17/// An asynchronous wrapper around a `pcap::Capture`.
18///  
19/// `AsyncCapture` owns the receiver side of a channel that receives
20/// captured packets or a stop signal. It allows async code to
21/// `await` new packets without blocking a thread.
22pub struct AsyncCapture {
23    rx: Mutex<UnboundedReceiver<PacketOrStop>>,
24}
25
26/// Enum used internally to represent either a captured packet
27/// or a stop signal to terminate the capture.
28enum PacketOrStop {
29    /// A captured packet
30    Packet(Result<Packet, Error>),
31    /// Signal that capture has stopped
32    Stop,
33}
34
35/// Handle to control the asynchronous capture.
36///  
37/// `AsyncCaptureHandle` allows stopping the capture from another
38/// thread or async task.
39#[derive(Clone)]
40pub struct AsyncCaptureHandle {
41    tx: UnboundedSender<PacketOrStop>,
42    stop_flag: Arc<AtomicBool>,
43}
44
45impl AsyncCapture {
46    /// Creates a new asynchronous capture from a `pcap::Capture<Active>`.
47    ///
48    /// Spawns a background thread that reads packets and sends them
49    /// through a channel for async consumption.
50    ///
51    /// Returns a tuple of `(AsyncCapture, AsyncCaptureHandle)`.
52    pub fn new(mut cap: Capture<Active>) -> (Self, AsyncCaptureHandle) {
53        let (tx, rx) = unbounded_channel::<PacketOrStop>();
54        let stop_flag = Arc::new(AtomicBool::new(false));
55        let handle = AsyncCaptureHandle {
56            tx: tx.clone(),
57            stop_flag,
58        };
59
60        std::thread::spawn(move || {
61            loop {
62                let res = cap.next_packet();
63
64                let owned = res.map(|packet| Packet {
65                    header: *packet.header,
66                    data: packet.data.to_vec(),
67                });
68                if tx.send(PacketOrStop::Packet(owned)).is_err() {
69                    // Receiver dropped, exit thread
70                    break;
71                }
72            }
73            // Send a Stop message when capture thread ends
74            let _ = tx.send(PacketOrStop::Stop);
75        });
76
77        (Self { rx: Mutex::new(rx) }, handle)
78    }
79
80    /// Waits for the next packet asynchronously.
81    ///
82    /// Returns `Some(Result<Packet, Error>)` if a packet is received,
83    /// or `None` if the capture has stopped.
84    pub async fn next_packet(&self) -> Option<Result<Packet, Error>> {
85        let mut rx = self.rx.lock().await;
86        match rx.recv().await {
87            Some(PacketOrStop::Packet(pkt)) => Some(pkt),
88            Some(PacketOrStop::Stop) | None => None,
89        }
90    }
91}
92
93impl AsyncCaptureHandle {
94    /// Stops the capture from another thread or async task.
95    ///
96    /// Sends a stop signal to the capture thread, causing
97    /// `AsyncCapture::next_packet()` to return `None`.
98    pub fn stop(&self) {
99        self.stop_flag.store(true, Ordering::Relaxed);
100        let _ = self.tx.send(PacketOrStop::Stop);
101    }
102}