net_replay/
capture.rs

1//! Capture TCP packets
2//!
3//! [`Capture`] can be used to capture TCP traffic on an interface. Capturing traffic will start two new threads: one
4//! for snooping traffic using a raw socket and another one for filtering and storing those packets.
5//!
6//! Each capture requires a [`Filter`] that will decide which packets will be included in the capture. That trait has a
7//! single method that needs to be implemented: [`Filter::filter`].
8//!
9//! Once a capture is completed, a vector of all the captured packets will be returned. You can use
10//! [`crate::ip::write_pcap_file`] to write the capture to a pcap file or use the [`crate::replay`] module to replay
11//! that data over a TCP connection.
12
13use crate::ip::IpPacket;
14use crate::sock::{Raw, RSockErr};
15
16use std::sync::atomic::AtomicBool;
17use std::sync::atomic::Ordering;
18use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender};
19use std::sync::Arc;
20use std::thread::{spawn, JoinHandle};
21use std::time::Duration;
22
23use thiserror::Error;
24
25/// Errors that can occur when starting a capture session
26#[derive(Debug, Error)]
27pub enum CaptureError {
28    /// Failed to join one of the threads
29    #[error("Failed to join one of the threads")]
30    JoinError,
31    /// Failed to do a socket operation
32    #[error("Failed to do a socket operation")]
33    SockError(#[from] RSockErr),
34}
35
36/// Trait for filtering out the kind of packet(s) you want to store
37///
38/// Any struct that implements this trait can be used to filter out packets during a capture.
39///
40/// # NOTE
41/// If you want to end a capture after some packet or packet sequence has been seen, you would implement that
42/// functionality in this struct. You would put some shareable condition in this struct and set it when that condition
43/// is seen. The controlling thread would then know to stop the capture.
44pub trait Filter: Send {
45    /// Method to determine whether or not a packet will be kept or discarded
46    ///
47    /// Return true to keep the packet or false to discard the packet.
48    fn filter(&mut self, packet: &IpPacket) -> bool;
49}
50
51/// Pre-made filter that will keep all seen packets
52pub struct NoFilter;
53
54impl Filter for NoFilter {
55    fn filter(&mut self, _packet: &IpPacket) -> bool {
56        true
57    }
58}
59
60/// Session state struct for before the capture has started
61///
62/// Has all of the data needed to start a capture: the filter to use and an optional interface to bind to.
63pub struct Ready {
64    filter: Box<dyn Filter>,
65    interface: Option<String>,
66}
67
68/// Session state struct for an active capture
69///
70/// Has all of the data needed to maintain a capture: the thread handle for the snooper, the thread handle for the
71/// packet store, and the signal bool to stop.
72pub struct Capturing {
73    snooper: JoinHandle<()>,
74    store: JoinHandle<Vec<IpPacket>>,
75    cond: Arc<AtomicBool>,
76}
77
78/// Capture TCP packets on an interface
79pub struct Capture<T> {
80    state: T,
81}
82
83impl Capture<Ready> {
84    /// Create a new Capture struct with a filter and interface to capture on
85    ///
86    /// This simply sets up a struct without actually starting to capture.
87    #[must_use]
88    pub fn new(filter: Box<dyn Filter>, interface: Option<String>) -> Self {
89        Self {
90            state: Ready { filter, interface },
91        }
92    }
93
94    /// Start capturing packets
95    ///
96    /// This method will consume the object and return a new one.
97    ///
98    /// # Errors
99    /// Will return an error if something goes wrong with creating the socket or joining a thread after a socket error
100    /// has occurred.
101    pub fn start(self) -> Result<Capture<Capturing>, CaptureError> {
102        let (send, recv) = channel::<IpPacket>();
103        let signal = Arc::new(AtomicBool::new(false));
104        let cap_signal = Arc::clone(&signal);
105        let capture = spawn(move || {
106            let capture = Store {
107                chan: recv,
108                cond: cap_signal,
109                store: Vec::with_capacity(1000),
110                filter: self.state.filter,
111            };
112            capture.start()
113        });
114
115        let sock = Raw::new();
116        let mut sock = match sock {
117            Ok(s) => s,
118            Err(e) => {
119                // I believe the relaxed ordering is fine because the exact timing of the threads stopping is not
120                // super important. As long as they get the message shortly after the bool is set. That should happen
121                // with relaxed ordering.
122                signal.store(true, Ordering::Relaxed);
123                capture.join().map_err(|_| CaptureError::JoinError)?;
124                return Err(CaptureError::SockError(e));
125            }
126        };
127
128        if let Some(iface) = self.state.interface {
129            sock.bind_interface(&iface)
130                .map_err(CaptureError::SockError)?;
131        };
132
133        let snoop_signal = Arc::clone(&signal);
134        let snoop = spawn(move || {
135            let snooper = Snooper {
136                chan: send,
137                cond: snoop_signal,
138                sock,
139            };
140            snooper.start();
141        });
142
143        let cap_state = Capturing {
144            snooper: snoop,
145            store: capture,
146            cond: signal,
147        };
148        Ok(Capture { state: cap_state })
149    }
150}
151
152impl Capture<Capturing> {
153    /// Stop capturing packets and get captured packets
154    ///
155    /// This will signal each thread to stop and get the captured packets.
156    ///
157    /// # Errors
158    /// If there is an error in joining the thread that stores the packets, an error will be returned. No error in
159    /// joining the snooping thread will be returned as long as the store thread returns properly.
160    pub fn end(self) -> Result<Vec<IpPacket>, CaptureError> {
161        // The atomic bool could most likley be changed to just a pointer and some volatile reads and writes and
162        // everything would still just work.
163        self.state.cond.store(true, Ordering::Relaxed);
164        let _cap_res = self.state.snooper.join();
165        let store_res = self.state.store.join();
166        store_res.map_err(|_| CaptureError::JoinError)
167    }
168}
169
170struct Snooper {
171    chan: Sender<IpPacket>,
172    cond: Arc<AtomicBool>,
173    sock: Raw,
174}
175
176impl Snooper {
177    fn start(mut self) {
178        const RECV_WAIT: Duration = Duration::new(0, 50_000_000);
179        let mut buf = Box::new([0u8; 65536]);
180        'recv_loop: loop {
181            if self.cond.load(Ordering::Relaxed) {
182                break 'recv_loop;
183            }
184            let try_recv = self.sock.read_timeout(&mut buf[..], &RECV_WAIT);
185            if let Ok(n) = try_recv {
186                let packet = IpPacket::parse_from_bytes(&&buf[0..n], None);
187                if let Ok(p) = packet {
188                    self.chan.send(p).unwrap();
189                }
190            }
191        }
192    }
193}
194
195struct Store {
196    chan: Receiver<IpPacket>,
197    cond: Arc<AtomicBool>,
198    store: Vec<IpPacket>,
199    filter: Box<dyn Filter>,
200}
201
202impl Store {
203    fn start(mut self) -> Vec<IpPacket> {
204        const RECV_WAIT: Duration = Duration::new(0, 50_000_000);
205        'recv_loop: loop {
206            if self.cond.load(Ordering::Relaxed) {
207                break 'recv_loop;
208            }
209            let packet = self.chan.recv_timeout(RECV_WAIT);
210            match packet {
211                Err(RecvTimeoutError::Timeout) => {
212                },
213                Err(RecvTimeoutError::Disconnected) => break 'recv_loop,
214                Ok(ip_packet) => {
215                    if self.filter.filter(&ip_packet) {
216                        self.store.push(ip_packet);
217                    }
218                }
219            }
220        }
221        self.store
222    }
223}