parse_tcp/
handler.rs

1use std::convert::Infallible;
2use std::fs::File;
3use std::io::{BufWriter, Seek, SeekFrom, Write};
4use std::ops::Range;
5use std::path::PathBuf;
6use std::sync::Arc;
7
8use eyre::Context;
9use parking_lot::Mutex;
10use tracing::{debug, info, trace};
11use uuid::Uuid;
12
13use crate::connection::{Connection, Direction};
14use crate::flow_table::Flow;
15use crate::serialized::{PacketExtra, ConnInfo, SerializedSegment};
16use crate::stream::{SegmentInfo, SegmentType};
17use crate::ConnectionHandler;
18
19/// threshold for buffered readable bytes before writing out
20const BUFFER_READABLE_THRESHOLD: usize = 64 << 10;
21/// threshold for buffered segment info objects before writing out
22const BUFFER_SEGMENTS_THRESHOLD: usize = 16 << 10;
23/// threshold for total buffered bytes before writing out
24const BUFFER_TOTAL_THRESHOLD: usize = 256 << 10;
25/// how many bytes to advance when hitting BUFFER_TOTAL_THRESHOLD
26const BUFFER_TOTAL_THRESHOLD_ADVANCE: usize = 64 << 10;
27
28pub fn dump_as_readable_ascii(buf: &[u8], newline: bool) {
29    let mut writer = BufWriter::new(std::io::stdout());
30    buf.iter()
31        .copied()
32        .map(|v| {
33            if (b' '..=b'~').contains(&v) || v == b'\n' {
34                v
35            } else {
36                b'.'
37            }
38        })
39        .for_each(|v| writer.write_all(&[v]).expect("failed write"));
40    if newline {
41        let _ = writer.write_all(b"\n");
42    }
43}
44
45/// ConnectionHandler to dump data to stdout
46pub struct DumpHandler {
47    pub gaps: Vec<Range<u64>>,
48    pub segments: Vec<SegmentInfo>,
49    pub buf: Vec<u8>,
50    pub forward_has_data: bool,
51    pub reverse_has_data: bool,
52}
53
54impl DumpHandler {
55    pub fn dump_stream_segments(&self) {
56        debug!("segments (length {})", self.segments.len());
57        for segment in &self.segments {
58            debug!("  offset: {}", segment.offset);
59            debug!("  reverse acked: {}", segment.reverse_acked);
60            match segment.data {
61                SegmentType::Data { len, is_retransmit } => {
62                    debug!("  type: data");
63                    debug!("    len {len}, retransmit {is_retransmit}");
64                }
65                SegmentType::Ack { window } => {
66                    debug!("  type: ack");
67                    debug!("    window: {window}");
68                }
69                SegmentType::Fin { end_offset } => {
70                    debug!("  type: fin");
71                    debug!("    end offset: {end_offset}");
72                }
73                SegmentType::Rst => {
74                    debug!("  type: rst");
75                }
76            }
77        }
78    }
79
80    pub fn dump_stream(
81        &mut self,
82        connection: &mut Connection<Self>,
83        direction: Direction,
84        maybe_dump_len: Option<usize>,
85    ) {
86        self.gaps.clear();
87        self.segments.clear();
88        self.buf.clear();
89        // indiscriminately dump everything to stdout
90        let mut flow = connection.forward_flow.clone();
91        if direction == Direction::Reverse {
92            flow.reverse();
93        }
94        let uuid = connection.uuid;
95        let stream = connection.get_stream(direction);
96
97        let dump_len = if let Some(dump_len) = maybe_dump_len {
98            debug_assert!(dump_len > 0);
99            dump_len
100        } else {
101            // explicitly dump all remaining segments
102            trace!("dumping remaining segments for direction {direction}");
103            stream.read_segments_until(None, &mut self.segments);
104            // dump everything remaining
105            stream.total_buffered_length()
106        };
107
108        let start_offset = stream.buffer_start();
109        let end_offset = start_offset + dump_len as u64;
110        if dump_len > 0 {
111            trace!("requesting {dump_len} bytes for direction {direction}");
112            stream.read_next(end_offset, &mut self.segments, &mut self.gaps, |slice| {
113                let (a, b) = slice.as_slices();
114                self.buf.extend_from_slice(a);
115                if let Some(b) = b {
116                    self.buf.extend_from_slice(b);
117                }
118            });
119
120            if !self.gaps.is_empty() {
121                debug!("gaps (length {})", self.gaps.len());
122                for gap in &self.gaps {
123                    debug!(" gap {} -> {}", gap.start, gap.end);
124                }
125            }
126            self.dump_stream_segments();
127
128            debug!("data (length {})", self.buf.len());
129            println!("\n====================\n{} ({})", flow, uuid);
130            println!("  offset: {start_offset}");
131            println!("  length: {dump_len}\n");
132            if !self.gaps.is_empty() {
133                let gaps_len: u64 = self.gaps.iter().map(|r| r.end - r.start).sum();
134                println!("  gap bytes: {gaps_len}");
135            }
136            dump_as_readable_ascii(&self.buf, true);
137        } else {
138            // read segments only
139            debug!("no new data, dumping segments only");
140            self.dump_stream_segments();
141        }
142    }
143
144    pub fn write_remaining(&mut self, connection: &mut Connection<Self>, direction: Direction) {
145        debug!(
146            "connection {} direction {direction} writing remaining segments",
147            connection.uuid
148        );
149        self.dump_stream(connection, direction, None);
150    }
151}
152
153impl ConnectionHandler for DumpHandler {
154    type InitialData = ();
155    type ConstructError = Infallible;
156    fn new(_init: (), conn: &mut Connection<Self>) -> Result<Self, Infallible> {
157        info!("new connection: {} ({})", conn.uuid, conn.forward_flow);
158        Ok(DumpHandler {
159            gaps: Vec::new(),
160            segments: Vec::new(),
161            buf: Vec::new(),
162            forward_has_data: false,
163            reverse_has_data: false,
164        })
165    }
166
167    fn data_received(&mut self, connection: &mut Connection<Self>, direction: Direction) {
168        let (fwd_data, rev_data) = match direction {
169            Direction::Forward => (&mut self.forward_has_data, &mut self.reverse_has_data),
170            Direction::Reverse => (&mut self.reverse_has_data, &mut self.forward_has_data),
171        };
172        let fwd_readable_len = connection.get_stream(direction).readable_buffered_length();
173        *fwd_data = fwd_readable_len > 0;
174
175        // dump reverse stream buffer if it has data
176        if *rev_data {
177            let rev_stream = connection.get_stream(direction.swap());
178            let readable = rev_stream.readable_buffered_length();
179            if readable > 0 {
180                trace!("reverse stream has data, will dump");
181                self.dump_stream(connection, direction.swap(), Some(readable));
182            }
183        }
184
185        // dump forward stream if limits hit
186        let fwd_stream = connection.get_stream(direction);
187        if fwd_readable_len > BUFFER_READABLE_THRESHOLD
188            || fwd_stream.segments_info.len() > BUFFER_SEGMENTS_THRESHOLD
189        {
190            trace!("forward stream exceeded threshold, will dump");
191            self.dump_stream(connection, direction, Some(fwd_readable_len));
192        } else if fwd_stream.total_buffered_length() > BUFFER_TOTAL_THRESHOLD {
193            trace!("forward stream exceeded total buffer size threshold, will dump");
194            self.dump_stream(connection, direction, Some(BUFFER_TOTAL_THRESHOLD_ADVANCE));
195        }
196    }
197
198    fn rst_received(
199        &mut self,
200        connection: &mut Connection<Self>,
201        direction: Direction,
202        _extra: PacketExtra,
203    ) {
204        debug!("{direction} ({}) received reset", connection.uuid);
205    }
206
207    fn will_retire(&mut self, connection: &mut Connection<Self>) {
208        info!(
209            "removing connection: {} ({})",
210            connection.forward_flow, connection.uuid
211        );
212        self.write_remaining(connection, Direction::Forward);
213        self.write_remaining(connection, Direction::Reverse);
214    }
215}
216
217/// shared state for DirectoryOutputHandler
218pub struct DirectoryOutputSharedInfoInner {
219    pub base_dir: PathBuf,
220    pub conn_info_file: Mutex<File>,
221}
222
223#[derive(Clone)]
224pub struct DirectoryOutputSharedInfo {
225    pub inner: Arc<DirectoryOutputSharedInfoInner>,
226    pub errors: crossbeam_channel::Sender<eyre::Report>,
227}
228
229pub type ErrorReceiver = crossbeam_channel::Receiver<eyre::Report>;
230impl DirectoryOutputSharedInfo {
231    /// create with output path
232    pub fn new(base_dir: PathBuf) -> std::io::Result<(Self, ErrorReceiver)> {
233        let mut conn_info_file = File::create(base_dir.join("connections.json"))?;
234        conn_info_file.write_all(b"[\n")?;
235        let (error_tx, error_rx) = crossbeam_channel::unbounded();
236        Ok((
237            DirectoryOutputSharedInfo {
238                inner: Arc::new(DirectoryOutputSharedInfoInner {
239                    base_dir,
240                    conn_info_file: Mutex::new(conn_info_file),
241                }),
242                errors: error_tx,
243            },
244            error_rx,
245        ))
246    }
247
248    /// write connection info
249    pub fn record_conn_info(&self, uuid: Uuid, flow: &Flow) -> std::io::Result<()> {
250        let mut serialized = serde_json::to_string(&ConnInfo::new(uuid, flow))
251            .expect("failed to serialize ConnInfo");
252        serialized += ",\n";
253        let mut file = self.inner.conn_info_file.lock();
254        file.write_all(serialized.as_bytes())
255    }
256
257    /// close connection info file
258    pub fn close(self) -> std::io::Result<()> {
259        let mut conn_info_file = Arc::into_inner(self.inner)
260            .unwrap()
261            .conn_info_file
262            .into_inner();
263        let current_pos = conn_info_file.stream_position()?;
264        if current_pos > 2 {
265            // overwrite trailing comma and close array
266            conn_info_file.seek(SeekFrom::Current(-2))?;
267            conn_info_file.write_all(b"\n]\n")?;
268        } else {
269            // no connections, just close the array
270            conn_info_file.write_all(b"]\n")?;
271        }
272        Ok(())
273    }
274
275    /// run a closure, sending errors through the error channel
276    pub fn capture_errors<T>(&self, func: impl FnOnce() -> eyre::Result<T>) -> Option<T> {
277        match func() {
278            Ok(r) => Some(r),
279            Err(e) => {
280                self.errors.send(e).expect("could not forward error");
281                None
282            }
283        }
284    }
285}
286
287/// stream files for DirectoryOutputHandler
288pub struct DirectoryOutputHandlerFiles {
289    pub forward_data: File,
290    pub forward_segments: File,
291    pub reverse_data: File,
292    pub reverse_segments: File,
293}
294
295/// ConnectionHandler to write data to a directory
296pub struct DirectoryOutputHandler {
297    pub shared_info: DirectoryOutputSharedInfo,
298    pub id: Uuid,
299    pub gaps: Vec<Range<u64>>,
300    pub segments: Vec<SegmentInfo>,
301    /// whether we received the handshake_done event
302    pub got_handshake_done: bool,
303    pub files: Option<DirectoryOutputHandlerFiles>,
304}
305
306impl DirectoryOutputHandler {
307    pub fn write_stream_data(
308        &mut self,
309        connection: &mut Connection<Self>,
310        direction: Direction,
311        maybe_dump_len: Option<usize>,
312    ) -> std::io::Result<()> {
313        self.gaps.clear();
314        self.segments.clear();
315
316        let files = self.files.as_mut().expect("files not available!");
317        let (data_file, mut segments_file) = match direction {
318            Direction::Forward => (
319                &mut files.forward_data,
320                BufWriter::new(&mut files.forward_segments),
321            ),
322            Direction::Reverse => (
323                &mut files.reverse_data,
324                BufWriter::new(&mut files.reverse_segments),
325            ),
326        };
327
328        let stream = connection.get_stream(direction);
329        let dump_len = if let Some(dump_len) = maybe_dump_len {
330            debug_assert!(dump_len > 0);
331            dump_len
332        } else {
333            // explicitly dump all remaining segments
334            stream.read_segments_until(None, &mut self.segments);
335            // dump everything remaining
336            stream.total_buffered_length()
337        };
338        if dump_len > 0 {
339            trace!("write_stream_data: requesting {dump_len} bytes from stream for {direction}");
340            let start_offset = stream.buffer_start();
341            let end_offset = start_offset + dump_len as u64;
342            stream
343                .read_next(end_offset, &mut self.segments, &mut self.gaps, |slice| {
344                    let (a, b) = slice.as_slices();
345                    trace!("write_stream_data: writing {} data bytes", a.len());
346                    data_file.write_all(a)?;
347                    if let Some(b) = b {
348                        trace!("write_stream_data: writing {} data bytes", b.len());
349                        data_file.write_all(b)?;
350                    }
351                    Result::<(), std::io::Error>::Ok(())
352                })
353                .expect("read_next cannot fulfill range")?;
354        }
355
356        // write gaps and segments in order
357        let mut gaps_iter = self.gaps.iter().peekable();
358        let mut segments_iter = self.segments.iter().peekable();
359        loop {
360            enum WhichNext {
361                Gap,
362                Segment,
363            }
364            // figure out which to write next
365            let which = match (gaps_iter.peek(), segments_iter.peek()) {
366                (None, None) => break,
367                (None, Some(_)) => WhichNext::Segment,
368                (Some(_), None) => WhichNext::Gap,
369                (Some(&gap), Some(&segment)) => {
370                    if gap.start < segment.offset {
371                        WhichNext::Gap
372                    } else {
373                        WhichNext::Segment
374                    }
375                }
376            };
377
378            // serialize and write
379            match which {
380                WhichNext::Gap => {
381                    let gap = gaps_iter.next().unwrap();
382                    let info = SerializedSegment::new_gap(gap.start, gap.end - gap.start);
383                    serde_json::to_writer(&mut segments_file, &info)?;
384                    segments_file.write_all(b"\n")?;
385                }
386                WhichNext::Segment => {
387                    let segment = segments_iter.next().unwrap();
388                    let info: SerializedSegment = segment.into();
389                    serde_json::to_writer(&mut segments_file, &info)?;
390                    segments_file.write_all(b"\n")?;
391                }
392            }
393        }
394
395        self.gaps.clear();
396        self.segments.clear();
397        Ok(())
398    }
399}
400
401macro_rules! log_error {
402    ($result:expr, $what:expr) => {
403        if let Err(e) = $result {
404            ::tracing::error!(concat!($what, ": {:?}"), e);
405        }
406    };
407}
408
409impl ConnectionHandler for DirectoryOutputHandler {
410    type InitialData = DirectoryOutputSharedInfo;
411    type ConstructError = eyre::Report;
412    fn new(
413        shared_info: Self::InitialData,
414        connection: &mut Connection<Self>,
415    ) -> eyre::Result<Self> {
416        debug!(
417            "connection created: {} ({})",
418            connection.forward_flow, connection.uuid
419        );
420        Ok(DirectoryOutputHandler {
421            shared_info,
422            id: connection.uuid,
423            gaps: Vec::new(),
424            segments: Vec::new(),
425            got_handshake_done: false,
426            files: None,
427        })
428    }
429
430    fn handshake_done(&mut self, connection: &mut Connection<Self>) {
431        info!(
432            "writing data for new connection: {} ({})",
433            connection.forward_flow, connection.uuid
434        );
435        if !self.got_handshake_done {
436            self.got_handshake_done = true;
437        }
438        log_error!(
439            self.shared_info
440                .record_conn_info(connection.uuid, &connection.forward_flow),
441            "failed to write connection info"
442        );
443
444        self.shared_info.capture_errors(|| {
445            let id = connection.uuid;
446            let base_dir = &self.shared_info.inner.base_dir;
447            trace!("creating files for connection {id}");
448            let forward_data = File::create(base_dir.join(format!("{id}.f.data")))
449                .wrap_err("creating forward data file")?;
450            let forward_segments = File::create(base_dir.join(format!("{id}.f.jsonl")))
451                .wrap_err("creating forward segments file")?;
452            let reverse_data = File::create(base_dir.join(format!("{id}.r.data")))
453                .wrap_err("creating reverse data file")?;
454            let reverse_segments = File::create(base_dir.join(format!("{id}.r.jsonl")))
455                .wrap_err("creating reverse segments file")?;
456            self.files = Some(DirectoryOutputHandlerFiles {
457                forward_data,
458                forward_segments,
459                reverse_data,
460                reverse_segments,
461            });
462            Ok(())
463        });
464    }
465
466    fn data_received(&mut self, connection: &mut Connection<Self>, direction: Direction) {
467        let stream = connection.get_stream(direction);
468        let readable_len = stream.readable_buffered_length();
469        if readable_len > BUFFER_READABLE_THRESHOLD
470            || stream.segments_info.len() > BUFFER_SEGMENTS_THRESHOLD
471        {
472            log_error!(
473                self.write_stream_data(connection, direction, Some(readable_len)),
474                "failed to write stream data"
475            );
476        } else if stream.total_buffered_length() > BUFFER_TOTAL_THRESHOLD {
477            log_error!(
478                self.write_stream_data(connection, direction, Some(BUFFER_TOTAL_THRESHOLD_ADVANCE)),
479                "failed to write stream data"
480            );
481        }
482    }
483
484    fn will_retire(&mut self, connection: &mut Connection<Self>) {
485        info!(
486            "removing connection: {} ({})",
487            connection.forward_flow, connection.uuid
488        );
489        if !self.got_handshake_done {
490            // nothing to write if no data
491            return;
492        }
493        log_error!(
494            self.write_stream_data(connection, Direction::Forward, None),
495            "failed to write final forward stream data"
496        );
497        log_error!(
498            self.write_stream_data(connection, Direction::Reverse, None),
499            "failed to write final reverse stream data"
500        );
501    }
502}