async_pcap/
async_pcap.rs

1use pcap::{Active, Capture, PacketHeader};
2use tokio::sync::Mutex;
3use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
4
5/// Represents a network packet with its header and raw data.
6#[derive(Debug, Clone)]
7pub struct Packet {
8    /// Packet header information provided by pcap
9    pub header: PacketHeader,
10    /// Raw packet data
11    pub data: Vec<u8>,
12}
13
14/// An asynchronous wrapper around a `pcap::Capture`.
15///  
16/// `AsyncCapture` owns the receiver side of a channel that receives
17/// captured packets or a stop signal. It allows async code to
18/// `await` new packets without blocking a thread.
19pub struct AsyncCapture {
20    rx: Mutex<UnboundedReceiver<PacketOrStop>>,
21}
22
23/// Enum used internally to represent either a captured packet
24/// or a stop signal to terminate the capture.
25enum PacketOrStop {
26    /// A captured packet
27    Packet(Packet),
28    /// Signal that capture has stopped
29    Stop,
30}
31
32/// Handle to control the asynchronous capture.
33///  
34/// `AsyncCaptureHandle` allows stopping the capture from another
35/// thread or async task.
36#[derive(Clone)]
37pub struct AsyncCaptureHandle {
38    tx: UnboundedSender<PacketOrStop>,
39}
40
41impl AsyncCapture {
42    /// Creates a new asynchronous capture from a `pcap::Capture<Active>`.
43    ///
44    /// Spawns a background thread that reads packets and sends them
45    /// through a channel for async consumption.
46    ///
47    /// Returns a tuple of `(AsyncCapture, AsyncCaptureHandle)`.
48    pub fn new(mut cap: Capture<Active>) -> (Self, AsyncCaptureHandle) {
49        let (tx, rx) = unbounded_channel::<PacketOrStop>();
50        let handle = AsyncCaptureHandle { tx: tx.clone() };
51
52        std::thread::spawn(move || {
53            while let Ok(packet) = cap.next_packet() {
54                let owned = Packet {
55                    header: *packet.header,
56                    data: packet.data.to_vec(),
57                };
58                if tx.send(PacketOrStop::Packet(owned)).is_err() {
59                    // Receiver dropped, exit thread
60                    break;
61                }
62            }
63            // Send a Stop message when capture thread ends
64            let _ = tx.send(PacketOrStop::Stop);
65        });
66
67        (Self { rx: Mutex::new(rx) }, handle)
68    }
69
70    /// Waits for the next packet asynchronously.
71    ///
72    /// Returns `Some(Packet)` if a packet is received,
73    /// or `None` if the capture has stopped.
74    pub async fn next_packet(&self) -> Option<Packet> {
75        let mut rx = self.rx.lock().await;
76        match rx.recv().await {
77            Some(PacketOrStop::Packet(pkt)) => Some(pkt),
78            Some(PacketOrStop::Stop) | None => None,
79        }
80    }
81}
82
83impl AsyncCaptureHandle {
84    /// Stops the capture from another thread or async task.
85    ///
86    /// Sends a stop signal to the capture thread, causing
87    /// `AsyncCapture::next_packet()` to return `None`.
88    pub fn stop(&self) {
89        let _ = self.tx.send(PacketOrStop::Stop);
90    }
91}