stackforge_core/sniffer/
channel.rs1use std::sync::Arc;
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::thread::{self, JoinHandle};
4use std::time::Instant;
5
6use bytes::Bytes;
7use crossbeam_channel::{Receiver, Sender, bounded};
8
9use super::capture::{RawPacket, open_capture};
10use super::config::SnifferConfig;
11use super::error::SnifferError;
12
13#[derive(Debug, Clone, Default)]
15pub struct CaptureStats {
16 pub packets_received: u32,
17 pub packets_dropped: u32,
18 pub packets_if_dropped: u32,
19}
20
21pub struct SnifferHandle {
26 receiver: Receiver<RawPacket>,
27 stop_flag: Arc<AtomicBool>,
28 thread: Option<JoinHandle<CaptureStats>>,
29}
30
31impl SnifferHandle {
32 pub fn start(config: SnifferConfig) -> Result<Self, SnifferError> {
34 let (sender, receiver) = bounded(config.channel_capacity);
35 let stop_flag = Arc::new(AtomicBool::new(false));
36 let thread_stop = Arc::clone(&stop_flag);
37
38 let mut capture = open_capture(&config)?;
40
41 let count = config.count;
42 let timeout = config.timeout;
43
44 let thread = thread::Builder::new()
45 .name(format!("sniffer-{}", config.iface))
46 .spawn(move || capture_loop(&mut capture, &sender, &thread_stop, count, timeout))
47 .map_err(|e| SnifferError::CaptureError(format!("failed to spawn thread: {e}")))?;
48
49 Ok(Self {
50 receiver,
51 stop_flag,
52 thread: Some(thread),
53 })
54 }
55
56 pub fn recv(&self) -> Option<RawPacket> {
60 self.receiver.recv().ok()
61 }
62
63 pub fn try_recv(&self) -> Option<RawPacket> {
65 self.receiver.try_recv().ok()
66 }
67
68 pub fn stop(&self) {
70 self.stop_flag.store(true, Ordering::Relaxed);
71 }
72
73 #[must_use]
75 pub fn is_stopped(&self) -> bool {
76 self.stop_flag.load(Ordering::Relaxed)
77 }
78
79 pub fn join(mut self) -> CaptureStats {
81 self.stop();
82 if let Some(handle) = self.thread.take() {
83 handle.join().unwrap_or_default()
84 } else {
85 CaptureStats::default()
86 }
87 }
88
89 #[must_use]
91 pub fn receiver(&self) -> &Receiver<RawPacket> {
92 &self.receiver
93 }
94}
95
96impl Drop for SnifferHandle {
97 fn drop(&mut self) {
98 self.stop_flag.store(true, Ordering::Relaxed);
99 if let Some(handle) = self.thread.take() {
100 let _ = handle.join();
101 }
102 }
103}
104
105fn capture_loop(
107 capture: &mut pcap::Capture<pcap::Active>,
108 sender: &Sender<RawPacket>,
109 stop_flag: &AtomicBool,
110 count: usize,
111 timeout: Option<std::time::Duration>,
112) -> CaptureStats {
113 let start = Instant::now();
114 let mut captured = 0usize;
115
116 loop {
117 if stop_flag.load(Ordering::Relaxed) {
119 break;
120 }
121 if count > 0 && captured >= count {
122 break;
123 }
124 if let Some(t) = timeout {
125 if start.elapsed() >= t {
126 break;
127 }
128 }
129
130 match capture.next_packet() {
132 Ok(packet) => {
133 let raw = RawPacket {
134 data: Bytes::copy_from_slice(packet.data),
135 timestamp_us: packet.header.ts.tv_sec * 1_000_000
136 + i64::from(packet.header.ts.tv_usec),
137 };
138 if sender.send(raw).is_err() {
140 break;
141 }
142 captured += 1;
143 },
144 Err(pcap::Error::TimeoutExpired) => {
145 continue;
147 },
148 Err(_) => {
149 break;
151 },
152 }
153 }
154
155 let stats = capture.stats().unwrap_or(pcap::Stat {
157 received: 0,
158 dropped: 0,
159 if_dropped: 0,
160 });
161
162 CaptureStats {
163 packets_received: stats.received,
164 packets_dropped: stats.dropped,
165 packets_if_dropped: stats.if_dropped,
166 }
167}