1use std::convert::Infallible;
2use std::fs::File;
3use std::io::{BufWriter, Seek, SeekFrom, Write};
4use std::ops::Range;
5use std::path::PathBuf;
6use std::sync::Arc;
7
8use eyre::Context;
9use parking_lot::Mutex;
10use tracing::{debug, info, trace};
11use uuid::Uuid;
12
13use crate::connection::{Connection, Direction};
14use crate::flow_table::Flow;
15use crate::serialized::{PacketExtra, ConnInfo, SerializedSegment};
16use crate::stream::{SegmentInfo, SegmentType};
17use crate::ConnectionHandler;
18
19const BUFFER_READABLE_THRESHOLD: usize = 64 << 10;
21const BUFFER_SEGMENTS_THRESHOLD: usize = 16 << 10;
23const BUFFER_TOTAL_THRESHOLD: usize = 256 << 10;
25const BUFFER_TOTAL_THRESHOLD_ADVANCE: usize = 64 << 10;
27
28pub fn dump_as_readable_ascii(buf: &[u8], newline: bool) {
29 let mut writer = BufWriter::new(std::io::stdout());
30 buf.iter()
31 .copied()
32 .map(|v| {
33 if (b' '..=b'~').contains(&v) || v == b'\n' {
34 v
35 } else {
36 b'.'
37 }
38 })
39 .for_each(|v| writer.write_all(&[v]).expect("failed write"));
40 if newline {
41 let _ = writer.write_all(b"\n");
42 }
43}
44
45pub struct DumpHandler {
47 pub gaps: Vec<Range<u64>>,
48 pub segments: Vec<SegmentInfo>,
49 pub buf: Vec<u8>,
50 pub forward_has_data: bool,
51 pub reverse_has_data: bool,
52}
53
54impl DumpHandler {
55 pub fn dump_stream_segments(&self) {
56 debug!("segments (length {})", self.segments.len());
57 for segment in &self.segments {
58 debug!(" offset: {}", segment.offset);
59 debug!(" reverse acked: {}", segment.reverse_acked);
60 match segment.data {
61 SegmentType::Data { len, is_retransmit } => {
62 debug!(" type: data");
63 debug!(" len {len}, retransmit {is_retransmit}");
64 }
65 SegmentType::Ack { window } => {
66 debug!(" type: ack");
67 debug!(" window: {window}");
68 }
69 SegmentType::Fin { end_offset } => {
70 debug!(" type: fin");
71 debug!(" end offset: {end_offset}");
72 }
73 SegmentType::Rst => {
74 debug!(" type: rst");
75 }
76 }
77 }
78 }
79
80 pub fn dump_stream(
81 &mut self,
82 connection: &mut Connection<Self>,
83 direction: Direction,
84 maybe_dump_len: Option<usize>,
85 ) {
86 self.gaps.clear();
87 self.segments.clear();
88 self.buf.clear();
89 let mut flow = connection.forward_flow.clone();
91 if direction == Direction::Reverse {
92 flow.reverse();
93 }
94 let uuid = connection.uuid;
95 let stream = connection.get_stream(direction);
96
97 let dump_len = if let Some(dump_len) = maybe_dump_len {
98 debug_assert!(dump_len > 0);
99 dump_len
100 } else {
101 trace!("dumping remaining segments for direction {direction}");
103 stream.read_segments_until(None, &mut self.segments);
104 stream.total_buffered_length()
106 };
107
108 let start_offset = stream.buffer_start();
109 let end_offset = start_offset + dump_len as u64;
110 if dump_len > 0 {
111 trace!("requesting {dump_len} bytes for direction {direction}");
112 stream.read_next(end_offset, &mut self.segments, &mut self.gaps, |slice| {
113 let (a, b) = slice.as_slices();
114 self.buf.extend_from_slice(a);
115 if let Some(b) = b {
116 self.buf.extend_from_slice(b);
117 }
118 });
119
120 if !self.gaps.is_empty() {
121 debug!("gaps (length {})", self.gaps.len());
122 for gap in &self.gaps {
123 debug!(" gap {} -> {}", gap.start, gap.end);
124 }
125 }
126 self.dump_stream_segments();
127
128 debug!("data (length {})", self.buf.len());
129 println!("\n====================\n{} ({})", flow, uuid);
130 println!(" offset: {start_offset}");
131 println!(" length: {dump_len}\n");
132 if !self.gaps.is_empty() {
133 let gaps_len: u64 = self.gaps.iter().map(|r| r.end - r.start).sum();
134 println!(" gap bytes: {gaps_len}");
135 }
136 dump_as_readable_ascii(&self.buf, true);
137 } else {
138 debug!("no new data, dumping segments only");
140 self.dump_stream_segments();
141 }
142 }
143
144 pub fn write_remaining(&mut self, connection: &mut Connection<Self>, direction: Direction) {
145 debug!(
146 "connection {} direction {direction} writing remaining segments",
147 connection.uuid
148 );
149 self.dump_stream(connection, direction, None);
150 }
151}
152
153impl ConnectionHandler for DumpHandler {
154 type InitialData = ();
155 type ConstructError = Infallible;
156 fn new(_init: (), conn: &mut Connection<Self>) -> Result<Self, Infallible> {
157 info!("new connection: {} ({})", conn.uuid, conn.forward_flow);
158 Ok(DumpHandler {
159 gaps: Vec::new(),
160 segments: Vec::new(),
161 buf: Vec::new(),
162 forward_has_data: false,
163 reverse_has_data: false,
164 })
165 }
166
167 fn data_received(&mut self, connection: &mut Connection<Self>, direction: Direction) {
168 let (fwd_data, rev_data) = match direction {
169 Direction::Forward => (&mut self.forward_has_data, &mut self.reverse_has_data),
170 Direction::Reverse => (&mut self.reverse_has_data, &mut self.forward_has_data),
171 };
172 let fwd_readable_len = connection.get_stream(direction).readable_buffered_length();
173 *fwd_data = fwd_readable_len > 0;
174
175 if *rev_data {
177 let rev_stream = connection.get_stream(direction.swap());
178 let readable = rev_stream.readable_buffered_length();
179 if readable > 0 {
180 trace!("reverse stream has data, will dump");
181 self.dump_stream(connection, direction.swap(), Some(readable));
182 }
183 }
184
185 let fwd_stream = connection.get_stream(direction);
187 if fwd_readable_len > BUFFER_READABLE_THRESHOLD
188 || fwd_stream.segments_info.len() > BUFFER_SEGMENTS_THRESHOLD
189 {
190 trace!("forward stream exceeded threshold, will dump");
191 self.dump_stream(connection, direction, Some(fwd_readable_len));
192 } else if fwd_stream.total_buffered_length() > BUFFER_TOTAL_THRESHOLD {
193 trace!("forward stream exceeded total buffer size threshold, will dump");
194 self.dump_stream(connection, direction, Some(BUFFER_TOTAL_THRESHOLD_ADVANCE));
195 }
196 }
197
198 fn rst_received(
199 &mut self,
200 connection: &mut Connection<Self>,
201 direction: Direction,
202 _extra: PacketExtra,
203 ) {
204 debug!("{direction} ({}) received reset", connection.uuid);
205 }
206
207 fn will_retire(&mut self, connection: &mut Connection<Self>) {
208 info!(
209 "removing connection: {} ({})",
210 connection.forward_flow, connection.uuid
211 );
212 self.write_remaining(connection, Direction::Forward);
213 self.write_remaining(connection, Direction::Reverse);
214 }
215}
216
217pub struct DirectoryOutputSharedInfoInner {
219 pub base_dir: PathBuf,
220 pub conn_info_file: Mutex<File>,
221}
222
223#[derive(Clone)]
224pub struct DirectoryOutputSharedInfo {
225 pub inner: Arc<DirectoryOutputSharedInfoInner>,
226 pub errors: crossbeam_channel::Sender<eyre::Report>,
227}
228
229pub type ErrorReceiver = crossbeam_channel::Receiver<eyre::Report>;
230impl DirectoryOutputSharedInfo {
231 pub fn new(base_dir: PathBuf) -> std::io::Result<(Self, ErrorReceiver)> {
233 let mut conn_info_file = File::create(base_dir.join("connections.json"))?;
234 conn_info_file.write_all(b"[\n")?;
235 let (error_tx, error_rx) = crossbeam_channel::unbounded();
236 Ok((
237 DirectoryOutputSharedInfo {
238 inner: Arc::new(DirectoryOutputSharedInfoInner {
239 base_dir,
240 conn_info_file: Mutex::new(conn_info_file),
241 }),
242 errors: error_tx,
243 },
244 error_rx,
245 ))
246 }
247
248 pub fn record_conn_info(&self, uuid: Uuid, flow: &Flow) -> std::io::Result<()> {
250 let mut serialized = serde_json::to_string(&ConnInfo::new(uuid, flow))
251 .expect("failed to serialize ConnInfo");
252 serialized += ",\n";
253 let mut file = self.inner.conn_info_file.lock();
254 file.write_all(serialized.as_bytes())
255 }
256
257 pub fn close(self) -> std::io::Result<()> {
259 let mut conn_info_file = Arc::into_inner(self.inner)
260 .unwrap()
261 .conn_info_file
262 .into_inner();
263 let current_pos = conn_info_file.stream_position()?;
264 if current_pos > 2 {
265 conn_info_file.seek(SeekFrom::Current(-2))?;
267 conn_info_file.write_all(b"\n]\n")?;
268 } else {
269 conn_info_file.write_all(b"]\n")?;
271 }
272 Ok(())
273 }
274
275 pub fn capture_errors<T>(&self, func: impl FnOnce() -> eyre::Result<T>) -> Option<T> {
277 match func() {
278 Ok(r) => Some(r),
279 Err(e) => {
280 self.errors.send(e).expect("could not forward error");
281 None
282 }
283 }
284 }
285}
286
287pub struct DirectoryOutputHandlerFiles {
289 pub forward_data: File,
290 pub forward_segments: File,
291 pub reverse_data: File,
292 pub reverse_segments: File,
293}
294
295pub struct DirectoryOutputHandler {
297 pub shared_info: DirectoryOutputSharedInfo,
298 pub id: Uuid,
299 pub gaps: Vec<Range<u64>>,
300 pub segments: Vec<SegmentInfo>,
301 pub got_handshake_done: bool,
303 pub files: Option<DirectoryOutputHandlerFiles>,
304}
305
306impl DirectoryOutputHandler {
307 pub fn write_stream_data(
308 &mut self,
309 connection: &mut Connection<Self>,
310 direction: Direction,
311 maybe_dump_len: Option<usize>,
312 ) -> std::io::Result<()> {
313 self.gaps.clear();
314 self.segments.clear();
315
316 let files = self.files.as_mut().expect("files not available!");
317 let (data_file, mut segments_file) = match direction {
318 Direction::Forward => (
319 &mut files.forward_data,
320 BufWriter::new(&mut files.forward_segments),
321 ),
322 Direction::Reverse => (
323 &mut files.reverse_data,
324 BufWriter::new(&mut files.reverse_segments),
325 ),
326 };
327
328 let stream = connection.get_stream(direction);
329 let dump_len = if let Some(dump_len) = maybe_dump_len {
330 debug_assert!(dump_len > 0);
331 dump_len
332 } else {
333 stream.read_segments_until(None, &mut self.segments);
335 stream.total_buffered_length()
337 };
338 if dump_len > 0 {
339 trace!("write_stream_data: requesting {dump_len} bytes from stream for {direction}");
340 let start_offset = stream.buffer_start();
341 let end_offset = start_offset + dump_len as u64;
342 stream
343 .read_next(end_offset, &mut self.segments, &mut self.gaps, |slice| {
344 let (a, b) = slice.as_slices();
345 trace!("write_stream_data: writing {} data bytes", a.len());
346 data_file.write_all(a)?;
347 if let Some(b) = b {
348 trace!("write_stream_data: writing {} data bytes", b.len());
349 data_file.write_all(b)?;
350 }
351 Result::<(), std::io::Error>::Ok(())
352 })
353 .expect("read_next cannot fulfill range")?;
354 }
355
356 let mut gaps_iter = self.gaps.iter().peekable();
358 let mut segments_iter = self.segments.iter().peekable();
359 loop {
360 enum WhichNext {
361 Gap,
362 Segment,
363 }
364 let which = match (gaps_iter.peek(), segments_iter.peek()) {
366 (None, None) => break,
367 (None, Some(_)) => WhichNext::Segment,
368 (Some(_), None) => WhichNext::Gap,
369 (Some(&gap), Some(&segment)) => {
370 if gap.start < segment.offset {
371 WhichNext::Gap
372 } else {
373 WhichNext::Segment
374 }
375 }
376 };
377
378 match which {
380 WhichNext::Gap => {
381 let gap = gaps_iter.next().unwrap();
382 let info = SerializedSegment::new_gap(gap.start, gap.end - gap.start);
383 serde_json::to_writer(&mut segments_file, &info)?;
384 segments_file.write_all(b"\n")?;
385 }
386 WhichNext::Segment => {
387 let segment = segments_iter.next().unwrap();
388 let info: SerializedSegment = segment.into();
389 serde_json::to_writer(&mut segments_file, &info)?;
390 segments_file.write_all(b"\n")?;
391 }
392 }
393 }
394
395 self.gaps.clear();
396 self.segments.clear();
397 Ok(())
398 }
399}
400
401macro_rules! log_error {
402 ($result:expr, $what:expr) => {
403 if let Err(e) = $result {
404 ::tracing::error!(concat!($what, ": {:?}"), e);
405 }
406 };
407}
408
409impl ConnectionHandler for DirectoryOutputHandler {
410 type InitialData = DirectoryOutputSharedInfo;
411 type ConstructError = eyre::Report;
412 fn new(
413 shared_info: Self::InitialData,
414 connection: &mut Connection<Self>,
415 ) -> eyre::Result<Self> {
416 debug!(
417 "connection created: {} ({})",
418 connection.forward_flow, connection.uuid
419 );
420 Ok(DirectoryOutputHandler {
421 shared_info,
422 id: connection.uuid,
423 gaps: Vec::new(),
424 segments: Vec::new(),
425 got_handshake_done: false,
426 files: None,
427 })
428 }
429
430 fn handshake_done(&mut self, connection: &mut Connection<Self>) {
431 info!(
432 "writing data for new connection: {} ({})",
433 connection.forward_flow, connection.uuid
434 );
435 if !self.got_handshake_done {
436 self.got_handshake_done = true;
437 }
438 log_error!(
439 self.shared_info
440 .record_conn_info(connection.uuid, &connection.forward_flow),
441 "failed to write connection info"
442 );
443
444 self.shared_info.capture_errors(|| {
445 let id = connection.uuid;
446 let base_dir = &self.shared_info.inner.base_dir;
447 trace!("creating files for connection {id}");
448 let forward_data = File::create(base_dir.join(format!("{id}.f.data")))
449 .wrap_err("creating forward data file")?;
450 let forward_segments = File::create(base_dir.join(format!("{id}.f.jsonl")))
451 .wrap_err("creating forward segments file")?;
452 let reverse_data = File::create(base_dir.join(format!("{id}.r.data")))
453 .wrap_err("creating reverse data file")?;
454 let reverse_segments = File::create(base_dir.join(format!("{id}.r.jsonl")))
455 .wrap_err("creating reverse segments file")?;
456 self.files = Some(DirectoryOutputHandlerFiles {
457 forward_data,
458 forward_segments,
459 reverse_data,
460 reverse_segments,
461 });
462 Ok(())
463 });
464 }
465
466 fn data_received(&mut self, connection: &mut Connection<Self>, direction: Direction) {
467 let stream = connection.get_stream(direction);
468 let readable_len = stream.readable_buffered_length();
469 if readable_len > BUFFER_READABLE_THRESHOLD
470 || stream.segments_info.len() > BUFFER_SEGMENTS_THRESHOLD
471 {
472 log_error!(
473 self.write_stream_data(connection, direction, Some(readable_len)),
474 "failed to write stream data"
475 );
476 } else if stream.total_buffered_length() > BUFFER_TOTAL_THRESHOLD {
477 log_error!(
478 self.write_stream_data(connection, direction, Some(BUFFER_TOTAL_THRESHOLD_ADVANCE)),
479 "failed to write stream data"
480 );
481 }
482 }
483
484 fn will_retire(&mut self, connection: &mut Connection<Self>) {
485 info!(
486 "removing connection: {} ({})",
487 connection.forward_flow, connection.uuid
488 );
489 if !self.got_handshake_done {
490 return;
492 }
493 log_error!(
494 self.write_stream_data(connection, Direction::Forward, None),
495 "failed to write final forward stream data"
496 );
497 log_error!(
498 self.write_stream_data(connection, Direction::Reverse, None),
499 "failed to write final reverse stream data"
500 );
501 }
502}