Skip to main content

stackforge_core/sniffer/
channel.rs

1use std::sync::Arc;
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::thread::{self, JoinHandle};
4use std::time::Instant;
5
6use bytes::Bytes;
7use crossbeam_channel::{Receiver, Sender, bounded};
8
9use super::capture::{RawPacket, open_capture};
10use super::config::SnifferConfig;
11use super::error::SnifferError;
12
13/// Capture statistics from libpcap.
14#[derive(Debug, Clone, Default)]
15pub struct CaptureStats {
16    pub packets_received: u32,
17    pub packets_dropped: u32,
18    pub packets_if_dropped: u32,
19}
20
21/// A threaded packet sniffer that pushes captured packets into a channel.
22///
23/// The capture runs on a dedicated OS thread. Consumers read packets from
24/// the receiver end of a bounded crossbeam channel.
25pub struct SnifferHandle {
26    receiver: Receiver<RawPacket>,
27    stop_flag: Arc<AtomicBool>,
28    thread: Option<JoinHandle<CaptureStats>>,
29}
30
31impl SnifferHandle {
32    /// Start a new sniffer with the given configuration.
33    pub fn start(config: SnifferConfig) -> Result<Self, SnifferError> {
34        let (sender, receiver) = bounded(config.channel_capacity);
35        let stop_flag = Arc::new(AtomicBool::new(false));
36        let thread_stop = Arc::clone(&stop_flag);
37
38        // Open capture on the main thread so errors propagate immediately
39        let mut capture = open_capture(&config)?;
40
41        let count = config.count;
42        let timeout = config.timeout;
43
44        let thread = thread::Builder::new()
45            .name(format!("sniffer-{}", config.iface))
46            .spawn(move || capture_loop(&mut capture, &sender, &thread_stop, count, timeout))
47            .map_err(|e| SnifferError::CaptureError(format!("failed to spawn thread: {e}")))?;
48
49        Ok(Self {
50            receiver,
51            stop_flag,
52            thread: Some(thread),
53        })
54    }
55
56    /// Receive the next captured packet, blocking until one is available.
57    ///
58    /// Returns `None` when the capture has ended (count/timeout reached or stopped).
59    pub fn recv(&self) -> Option<RawPacket> {
60        self.receiver.recv().ok()
61    }
62
63    /// Try to receive a packet without blocking.
64    pub fn try_recv(&self) -> Option<RawPacket> {
65        self.receiver.try_recv().ok()
66    }
67
68    /// Signal the capture thread to stop.
69    pub fn stop(&self) {
70        self.stop_flag.store(true, Ordering::Relaxed);
71    }
72
73    /// Check if the sniffer has been signaled to stop.
74    #[must_use]
75    pub fn is_stopped(&self) -> bool {
76        self.stop_flag.load(Ordering::Relaxed)
77    }
78
79    /// Wait for the capture thread to finish and return stats.
80    pub fn join(mut self) -> CaptureStats {
81        self.stop();
82        if let Some(handle) = self.thread.take() {
83            handle.join().unwrap_or_default()
84        } else {
85            CaptureStats::default()
86        }
87    }
88
89    /// Get a reference to the receiver for use with `select!` or direct iteration.
90    #[must_use]
91    pub fn receiver(&self) -> &Receiver<RawPacket> {
92        &self.receiver
93    }
94}
95
96impl Drop for SnifferHandle {
97    fn drop(&mut self) {
98        self.stop_flag.store(true, Ordering::Relaxed);
99        if let Some(handle) = self.thread.take() {
100            let _ = handle.join();
101        }
102    }
103}
104
105/// The main capture loop running on a dedicated thread.
106fn capture_loop(
107    capture: &mut pcap::Capture<pcap::Active>,
108    sender: &Sender<RawPacket>,
109    stop_flag: &AtomicBool,
110    count: usize,
111    timeout: Option<std::time::Duration>,
112) -> CaptureStats {
113    let start = Instant::now();
114    let mut captured = 0usize;
115
116    loop {
117        // Check stop conditions
118        if stop_flag.load(Ordering::Relaxed) {
119            break;
120        }
121        if count > 0 && captured >= count {
122            break;
123        }
124        if let Some(t) = timeout {
125            if start.elapsed() >= t {
126                break;
127            }
128        }
129
130        // Read next packet (with the 100ms read timeout from open_capture)
131        match capture.next_packet() {
132            Ok(packet) => {
133                let raw = RawPacket {
134                    data: Bytes::copy_from_slice(packet.data),
135                    timestamp_us: packet.header.ts.tv_sec * 1_000_000
136                        + i64::from(packet.header.ts.tv_usec),
137                };
138                // If the channel is full or closed, stop
139                if sender.send(raw).is_err() {
140                    break;
141                }
142                captured += 1;
143            },
144            Err(pcap::Error::TimeoutExpired) => {
145                // Read timeout — just loop and check stop conditions
146                continue;
147            },
148            Err(_) => {
149                // Other errors — stop the capture
150                break;
151            },
152        }
153    }
154
155    // Collect stats before we're done
156    let stats = capture.stats().unwrap_or(pcap::Stat {
157        received: 0,
158        dropped: 0,
159        if_dropped: 0,
160    });
161
162    CaptureStats {
163        packets_received: stats.received,
164        packets_dropped: stats.dropped,
165        packets_if_dropped: stats.if_dropped,
166    }
167}