net_replay/capture.rs
1//! Capture TCP packets
2//!
3//! [`Capture`] can be used to capture TCP traffic on an interface. Capturing traffic will start two new threads: one
4//! for snooping traffic using a raw socket and another one for filtering and storing those packets.
5//!
6//! Each capture requires a [`Filter`] that will decide which packets will be included in the capture. That trait has a
7//! single method that needs to be implemented: [`Filter::filter`].
8//!
9//! Once a capture is completed, a vector of all the captured packets will be returned. You can use
10//! [`crate::ip::write_pcap_file`] to write the capture to a pcap file or use the [`crate::replay`] module to replay
11//! that data over a TCP connection.
12
13use crate::ip::IpPacket;
14use crate::sock::{Raw, RSockErr};
15
16use std::sync::atomic::AtomicBool;
17use std::sync::atomic::Ordering;
18use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender};
19use std::sync::Arc;
20use std::thread::{spawn, JoinHandle};
21use std::time::Duration;
22
23use thiserror::Error;
24
25/// Errors that can occur when starting a capture session
26#[derive(Debug, Error)]
27pub enum CaptureError {
28 /// Failed to join one of the threads
29 #[error("Failed to join one of the threads")]
30 JoinError,
31 /// Failed to do a socket operation
32 #[error("Failed to do a socket operation")]
33 SockError(#[from] RSockErr),
34}
35
36/// Trait for filtering out the kind of packet(s) you want to store
37///
38/// Any struct that implements this trait can be used to filter out packets during a capture.
39///
40/// # NOTE
41/// If you want to end a capture after some packet or packet sequence has been seen, you would implement that
42/// functionality in this struct. You would put some shareable condition in this struct and set it when that condition
43/// is seen. The controlling thread would then know to stop the capture.
44pub trait Filter: Send {
45 /// Method to determine whether or not a packet will be kept or discarded
46 ///
47 /// Return true to keep the packet or false to discard the packet.
48 fn filter(&mut self, packet: &IpPacket) -> bool;
49}
50
51/// Pre-made filter that will keep all seen packets
52pub struct NoFilter;
53
54impl Filter for NoFilter {
55 fn filter(&mut self, _packet: &IpPacket) -> bool {
56 true
57 }
58}
59
60/// Session state struct for before the capture has started
61///
62/// Has all of the data needed to start a capture: the filter to use and an optional interface to bind to.
63pub struct Ready {
64 filter: Box<dyn Filter>,
65 interface: Option<String>,
66}
67
68/// Session state struct for an active capture
69///
70/// Has all of the data needed to maintain a capture: the thread handle for the snooper, the thread handle for the
71/// packet store, and the signal bool to stop.
72pub struct Capturing {
73 snooper: JoinHandle<()>,
74 store: JoinHandle<Vec<IpPacket>>,
75 cond: Arc<AtomicBool>,
76}
77
78/// Capture TCP packets on an interface
79pub struct Capture<T> {
80 state: T,
81}
82
83impl Capture<Ready> {
84 /// Create a new Capture struct with a filter and interface to capture on
85 ///
86 /// This simply sets up a struct without actually starting to capture.
87 #[must_use]
88 pub fn new(filter: Box<dyn Filter>, interface: Option<String>) -> Self {
89 Self {
90 state: Ready { filter, interface },
91 }
92 }
93
94 /// Start capturing packets
95 ///
96 /// This method will consume the object and return a new one.
97 ///
98 /// # Errors
99 /// Will return an error if something goes wrong with creating the socket or joining a thread after a socket error
100 /// has occurred.
101 pub fn start(self) -> Result<Capture<Capturing>, CaptureError> {
102 let (send, recv) = channel::<IpPacket>();
103 let signal = Arc::new(AtomicBool::new(false));
104 let cap_signal = Arc::clone(&signal);
105 let capture = spawn(move || {
106 let capture = Store {
107 chan: recv,
108 cond: cap_signal,
109 store: Vec::with_capacity(1000),
110 filter: self.state.filter,
111 };
112 capture.start()
113 });
114
115 let sock = Raw::new();
116 let mut sock = match sock {
117 Ok(s) => s,
118 Err(e) => {
119 // I believe the relaxed ordering is fine because the exact timing of the threads stopping is not
120 // super important. As long as they get the message shortly after the bool is set. That should happen
121 // with relaxed ordering.
122 signal.store(true, Ordering::Relaxed);
123 capture.join().map_err(|_| CaptureError::JoinError)?;
124 return Err(CaptureError::SockError(e));
125 }
126 };
127
128 if let Some(iface) = self.state.interface {
129 sock.bind_interface(&iface)
130 .map_err(CaptureError::SockError)?;
131 };
132
133 let snoop_signal = Arc::clone(&signal);
134 let snoop = spawn(move || {
135 let snooper = Snooper {
136 chan: send,
137 cond: snoop_signal,
138 sock,
139 };
140 snooper.start();
141 });
142
143 let cap_state = Capturing {
144 snooper: snoop,
145 store: capture,
146 cond: signal,
147 };
148 Ok(Capture { state: cap_state })
149 }
150}
151
152impl Capture<Capturing> {
153 /// Stop capturing packets and get captured packets
154 ///
155 /// This will signal each thread to stop and get the captured packets.
156 ///
157 /// # Errors
158 /// If there is an error in joining the thread that stores the packets, an error will be returned. No error in
159 /// joining the snooping thread will be returned as long as the store thread returns properly.
160 pub fn end(self) -> Result<Vec<IpPacket>, CaptureError> {
161 // The atomic bool could most likley be changed to just a pointer and some volatile reads and writes and
162 // everything would still just work.
163 self.state.cond.store(true, Ordering::Relaxed);
164 let _cap_res = self.state.snooper.join();
165 let store_res = self.state.store.join();
166 store_res.map_err(|_| CaptureError::JoinError)
167 }
168}
169
170struct Snooper {
171 chan: Sender<IpPacket>,
172 cond: Arc<AtomicBool>,
173 sock: Raw,
174}
175
176impl Snooper {
177 fn start(mut self) {
178 const RECV_WAIT: Duration = Duration::new(0, 50_000_000);
179 let mut buf = Box::new([0u8; 65536]);
180 'recv_loop: loop {
181 if self.cond.load(Ordering::Relaxed) {
182 break 'recv_loop;
183 }
184 let try_recv = self.sock.read_timeout(&mut buf[..], &RECV_WAIT);
185 if let Ok(n) = try_recv {
186 let packet = IpPacket::parse_from_bytes(&&buf[0..n], None);
187 if let Ok(p) = packet {
188 self.chan.send(p).unwrap();
189 }
190 }
191 }
192 }
193}
194
195struct Store {
196 chan: Receiver<IpPacket>,
197 cond: Arc<AtomicBool>,
198 store: Vec<IpPacket>,
199 filter: Box<dyn Filter>,
200}
201
202impl Store {
203 fn start(mut self) -> Vec<IpPacket> {
204 const RECV_WAIT: Duration = Duration::new(0, 50_000_000);
205 'recv_loop: loop {
206 if self.cond.load(Ordering::Relaxed) {
207 break 'recv_loop;
208 }
209 let packet = self.chan.recv_timeout(RECV_WAIT);
210 match packet {
211 Err(RecvTimeoutError::Timeout) => {
212 },
213 Err(RecvTimeoutError::Disconnected) => break 'recv_loop,
214 Ok(ip_packet) => {
215 if self.filter.filter(&ip_packet) {
216 self.store.push(ip_packet);
217 }
218 }
219 }
220 }
221 self.store
222 }
223}