1use std::io::Read;
2
3use memchr::memmem;
4use tracing::{debug, info, trace, warn};
5
6use crate::types::{
7 Direction, Frame, ParseStats, SkipReason, SkipTracking, Timestamp, Transport, UnparsedRegion,
8};
9
10const RECV_PREFIX: &[u8] = b"recv ";
11const SENT_PREFIX: &[u8] = b"sent ";
12const MAX_PARTIAL_FRAME: usize = 65537;
15
16#[derive(Debug)]
17pub enum ParseError {
18 InvalidHeader(String),
19 InvalidMessage(String),
20 TransportNoise {
21 bytes: usize,
22 transport: Transport,
23 address: String,
24 },
25 Io(std::io::Error),
26}
27
28impl fmt::Display for ParseError {
29 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
30 match self {
31 ParseError::InvalidHeader(msg) => write!(f, "invalid frame header: {msg}"),
32 ParseError::InvalidMessage(msg) => write!(f, "invalid SIP message: {msg}"),
33 ParseError::TransportNoise {
34 bytes,
35 transport,
36 address,
37 } => write!(
38 f,
39 "transport noise: {bytes} bytes of non-SIP data from {transport}/{address}"
40 ),
41 ParseError::Io(e) => write!(f, "I/O error: {e}"),
42 }
43 }
44}
45
46impl std::error::Error for ParseError {
47 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
48 match self {
49 ParseError::Io(e) => Some(e),
50 _ => None,
51 }
52 }
53}
54
55impl From<std::io::Error> for ParseError {
56 fn from(e: std::io::Error) -> Self {
57 ParseError::Io(e)
58 }
59}
60
61use std::fmt;
62
63fn digit(b: u8) -> Option<u8> {
64 match b {
65 b'0'..=b'9' => Some(b - b'0'),
66 _ => None,
67 }
68}
69
70fn parse_u8(bytes: &[u8]) -> Option<u8> {
71 if bytes.is_empty() || bytes.len() > 3 {
72 return None;
73 }
74 let mut val: u8 = 0;
75 for &b in bytes {
76 val = val.checked_mul(10)?.checked_add(digit(b)?)?;
77 }
78 Some(val)
79}
80
81fn parse_u16(bytes: &[u8]) -> Option<u16> {
82 if bytes.is_empty() || bytes.len() > 5 {
83 return None;
84 }
85 let mut val: u16 = 0;
86 for &b in bytes {
87 val = val.checked_mul(10)?.checked_add(u16::from(digit(b)?))?;
88 }
89 Some(val)
90}
91
92fn parse_u32(bytes: &[u8]) -> Option<u32> {
93 if bytes.is_empty() || bytes.len() > 10 {
94 return None;
95 }
96 let mut val: u32 = 0;
97 for &b in bytes {
98 val = val.checked_mul(10)?.checked_add(u32::from(digit(b)?))?;
99 }
100 Some(val)
101}
102
103fn parse_usize(bytes: &[u8]) -> Option<usize> {
104 if bytes.is_empty() || bytes.len() > 10 {
105 return None;
106 }
107 let mut val: usize = 0;
108 for &b in bytes {
109 val = val.checked_mul(10)?.checked_add(usize::from(digit(b)?))?;
110 }
111 Some(val)
112}
113
114fn parse_timestamp(bytes: &[u8]) -> Option<Timestamp> {
116 if bytes.len() >= 26 && bytes[4] == b'-' && bytes[7] == b'-' && bytes[10] == b' ' {
118 let year = parse_u16(&bytes[0..4])?;
119 let month = parse_u8(&bytes[5..7])?;
120 let day = parse_u8(&bytes[8..10])?;
121 let ts = parse_time_part(&bytes[11..])?;
122 return Some(Timestamp::DateTime {
123 year,
124 month,
125 day,
126 hour: ts.0,
127 min: ts.1,
128 sec: ts.2,
129 usec: ts.3,
130 });
131 }
132 let (hour, min, sec, usec) = parse_time_part(bytes)?;
134 Some(Timestamp::TimeOnly {
135 hour,
136 min,
137 sec,
138 usec,
139 })
140}
141
142fn parse_time_part(bytes: &[u8]) -> Option<(u8, u8, u8, u32)> {
144 if bytes.len() < 15 {
145 return None;
146 }
147 if bytes[2] != b':' || bytes[5] != b':' || bytes[8] != b'.' {
148 return None;
149 }
150 let hour = parse_u8(&bytes[0..2])?;
151 let min = parse_u8(&bytes[3..5])?;
152 let sec = parse_u8(&bytes[6..8])?;
153 let usec = parse_u32(&bytes[9..15])?;
154 Some((hour, min, sec, usec))
155}
156
157pub fn parse_frame_header(
164 data: &[u8],
165) -> Result<(Direction, usize, Transport, String, Timestamp, usize), ParseError> {
166 let newline_pos = memchr::memchr(b'\n', data)
167 .ok_or_else(|| ParseError::InvalidHeader("no newline in header".into()))?;
168 let line = &data[..newline_pos];
169 let line = line.strip_suffix(b"\r").unwrap_or(line);
171 let line = line
173 .strip_suffix(b":")
174 .ok_or_else(|| ParseError::InvalidHeader("header does not end with ':'".into()))?;
175
176 let direction = if line.starts_with(RECV_PREFIX) {
178 Direction::Recv
179 } else if line.starts_with(SENT_PREFIX) {
180 Direction::Sent
181 } else {
182 return Err(ParseError::InvalidHeader(
183 "expected 'recv' or 'sent'".into(),
184 ));
185 };
186 let mut pos = 5;
187
188 let space = memchr::memchr(b' ', &line[pos..])
190 .ok_or_else(|| ParseError::InvalidHeader("no space after byte count".into()))?;
191 let byte_count = parse_usize(&line[pos..pos + space])
192 .ok_or_else(|| ParseError::InvalidHeader("invalid byte count".into()))?;
193 pos += space + 1;
194
195 let expected_recv = b"bytes from ";
197 let expected_sent = b"bytes to ";
198 if direction == Direction::Recv {
199 if !line[pos..].starts_with(expected_recv) {
200 return Err(ParseError::InvalidHeader("expected 'bytes from '".into()));
201 }
202 pos += expected_recv.len();
203 } else {
204 if !line[pos..].starts_with(expected_sent) {
205 return Err(ParseError::InvalidHeader("expected 'bytes to '".into()));
206 }
207 pos += expected_sent.len();
208 }
209
210 let transport = if line[pos..].starts_with(b"tcp/") {
212 pos += 4;
213 Transport::Tcp
214 } else if line[pos..].starts_with(b"udp/") {
215 pos += 4;
216 Transport::Udp
217 } else if line[pos..].starts_with(b"tls/") {
218 pos += 4;
219 Transport::Tls
220 } else if line[pos..].starts_with(b"wss/") {
221 pos += 4;
222 Transport::Wss
223 } else {
224 return Err(ParseError::InvalidHeader("unknown transport".into()));
225 };
226
227 let at_marker = b" at ";
229 let at_pos = memmem::find(&line[pos..], at_marker)
230 .ok_or_else(|| ParseError::InvalidHeader("no ' at ' in header".into()))?;
231 let address = String::from_utf8_lossy(&line[pos..pos + at_pos]).into_owned();
232 pos += at_pos + at_marker.len();
233
234 let timestamp = parse_timestamp(&line[pos..])
236 .ok_or_else(|| ParseError::InvalidHeader("invalid timestamp".into()))?;
237
238 Ok((
239 direction,
240 byte_count,
241 transport,
242 address,
243 timestamp,
244 newline_pos + 1,
245 ))
246}
247
248pub fn is_frame_header(data: &[u8]) -> bool {
251 if data.len() < 20 {
252 return false;
253 }
254 let starts_valid = data.starts_with(RECV_PREFIX) || data.starts_with(SENT_PREFIX);
255 if !starts_valid {
256 return false;
257 }
258 let rest = &data[5..];
260 let space = match memchr::memchr(b' ', rest) {
261 Some(p) => p,
262 None => return false,
263 };
264 if space == 0 || space > 10 {
265 return false;
266 }
267 for &b in &rest[..space] {
268 if !b.is_ascii_digit() {
269 return false;
270 }
271 }
272 rest[space..].starts_with(b" bytes ")
273}
274
275const READ_BUF_SIZE: usize = 32 * 1024;
276
277pub struct FrameIterator<R> {
278 reader: R,
279 buf: Vec<u8>,
280 eof: bool,
281 frame_count: u64,
282 offset: u64,
283 stats: ParseStats,
284 skip_tracking: SkipTracking,
285}
286
287impl<R: Read> FrameIterator<R> {
288 pub fn new(reader: R) -> Self {
289 FrameIterator {
290 reader,
291 buf: Vec::with_capacity(READ_BUF_SIZE * 2),
292 eof: false,
293 frame_count: 0,
294 offset: 0,
295 stats: ParseStats::default(),
296 skip_tracking: SkipTracking::CountOnly,
297 }
298 }
299
300 pub fn capture_skipped(mut self, enable: bool) -> Self {
301 if enable {
302 self.skip_tracking = SkipTracking::CaptureData;
303 }
304 self
305 }
306
307 pub fn skip_tracking(mut self, tracking: SkipTracking) -> Self {
308 self.skip_tracking = tracking;
309 self
310 }
311
312 pub fn stats(&self) -> &ParseStats {
313 &self.stats
314 }
315
316 pub fn stats_mut(&mut self) -> &mut ParseStats {
317 &mut self.stats
318 }
319
320 pub fn drain_unparsed(&mut self) -> Vec<UnparsedRegion> {
321 self.stats.drain_regions()
322 }
323
324 fn consume(&mut self, n: usize) {
325 self.buf.drain(..n);
326 self.offset += n as u64;
327 }
328
329 fn consume_skipped(&mut self, n: usize, reason: SkipReason) {
330 if self.skip_tracking != SkipTracking::CountOnly {
331 let data = if self.skip_tracking == SkipTracking::CaptureData {
332 Some(self.buf[..n].to_vec())
333 } else {
334 None
335 };
336 self.stats.unparsed_regions.push(UnparsedRegion {
337 offset: self.offset,
338 length: n as u64,
339 reason,
340 data,
341 });
342 }
343 self.stats.bytes_skipped += n as u64;
344 self.consume(n);
345 }
346
347 fn fill_buf(&mut self) -> Result<bool, std::io::Error> {
348 if self.eof {
349 return Ok(false);
350 }
351 let old_len = self.buf.len();
352 self.buf.resize(old_len + READ_BUF_SIZE, 0);
353 let n = self.reader.read(&mut self.buf[old_len..])?;
354 self.buf.truncate(old_len + n);
355 if n == 0 {
356 self.eof = true;
357 return Ok(false);
358 }
359 self.stats.bytes_read += n as u64;
360 Ok(true)
361 }
362
363 fn is_replay(&self, skipped: &[u8]) -> bool {
369 if self.frame_count == 0 {
370 return false;
371 }
372 skipped.ends_with(b"\r\n\r\n\x0B\n")
373 }
374
375 fn find_boundary(&self, start: usize) -> Option<usize> {
377 let finder = memmem::Finder::new(b"\x0B\n");
378 let mut search_from = start;
379 loop {
380 let pos = finder.find(&self.buf[search_from..])?;
381 let abs_pos = search_from + pos;
382 let after = abs_pos + 2;
383 if after >= self.buf.len() {
384 if self.eof {
387 return Some(abs_pos);
388 }
389 return None; }
391 if is_frame_header(&self.buf[after..]) {
392 return Some(abs_pos);
393 }
394 trace!(
396 offset = abs_pos,
397 "found \\x0B\\n in content (not a boundary), skipping"
398 );
399 search_from = abs_pos + 2;
400 }
401 }
402
403 fn skip_to_first_header(&mut self) -> Option<usize> {
405 if is_frame_header(&self.buf) {
406 return Some(0);
407 }
408 let finder = memmem::Finder::new(b"\x0B\n");
410 let mut search_from = 0;
411 loop {
412 if let Some(pos) = finder.find(&self.buf[search_from..]) {
413 let abs_pos = search_from + pos;
414 let after = abs_pos + 2;
415 if after < self.buf.len() && is_frame_header(&self.buf[after..]) {
416 info!(skipped_bytes = after, "skipped partial first frame");
417 return Some(after);
418 }
419 search_from = abs_pos + 2;
420 } else {
421 return None;
422 }
423 }
424 }
425}
426
427impl<R: Read> Iterator for FrameIterator<R> {
428 type Item = Result<Frame, ParseError>;
429
430 fn next(&mut self) -> Option<Self::Item> {
431 if self.buf.is_empty() && !self.eof {
433 if let Err(e) = self.fill_buf() {
434 return Some(Err(ParseError::Io(e)));
435 }
436 }
437
438 if self.buf.is_empty() {
439 return None;
440 }
441
442 if self.frame_count == 0 {
444 loop {
445 match self.skip_to_first_header() {
446 Some(offset) => {
447 if offset > 0 {
448 let reason = if offset <= MAX_PARTIAL_FRAME {
449 SkipReason::PartialFirstFrame
450 } else {
451 SkipReason::OversizedFrame
452 };
453 self.consume_skipped(offset, reason);
454 }
455 break;
456 }
457 None => {
458 if self.eof {
459 debug!("no valid frame header found in entire input");
460 return None;
461 }
462 if let Err(e) = self.fill_buf() {
463 return Some(Err(ParseError::Io(e)));
464 }
465 }
466 }
467 }
468 }
469
470 if self.buf.is_empty() {
471 return None;
472 }
473
474 let mut strip = 0;
476 while strip < self.buf.len() {
477 if self.buf[strip] == b'\n' {
478 strip += 1;
479 } else if strip + 1 < self.buf.len()
480 && self.buf[strip] == b'\r'
481 && self.buf[strip + 1] == b'\n'
482 {
483 strip += 2;
484 } else {
485 break;
486 }
487 }
488 if strip > 0 {
489 self.consume(strip);
490 if self.buf.is_empty() {
491 return self.next();
492 }
493 }
494
495 let (direction, byte_count, transport, address, timestamp, header_len) = loop {
497 match parse_frame_header(&self.buf) {
498 Ok(h) => break h,
499 Err(ParseError::InvalidHeader(ref msg)) if msg == "no newline in header" => {
500 if self.eof {
501 debug!("truncated frame header at EOF");
502 return None;
503 }
504 if let Err(e) = self.fill_buf() {
505 return Some(Err(ParseError::Io(e)));
506 }
507 }
508 Err(e) => {
509 let header_preview: String = self
510 .buf
511 .iter()
512 .take(200)
513 .take_while(|&&b| b != b'\n')
514 .map(|&b| {
515 if b.is_ascii_graphic() || b == b' ' {
516 b as char
517 } else {
518 '.'
519 }
520 })
521 .collect();
522 if header_preview.starts_with("dump started at ") {
523 let skip = memchr::memchr(b'\n', &self.buf)
524 .map(|p| {
525 let mut end = p + 1;
526 while end < self.buf.len() && self.buf[end] == b'\n' {
527 end += 1;
528 }
529 end
530 })
531 .unwrap_or(self.buf.len());
532 debug!(
533 header = %header_preview,
534 skipped_bytes = skip,
535 "skipped dump restart marker",
536 );
537 self.consume(skip);
538 return self.next();
539 }
540 let skip = if let Some(b) = self.find_boundary(0) {
541 b + 2
542 } else {
543 memchr::memchr(b'\n', &self.buf)
544 .map(|p| p + 1)
545 .unwrap_or(self.buf.len())
546 };
547 let reason =
548 if self.buf.starts_with(RECV_PREFIX) || self.buf.starts_with(SENT_PREFIX) {
549 SkipReason::InvalidHeader
550 } else if skip > MAX_PARTIAL_FRAME {
551 SkipReason::OversizedFrame
552 } else if self.frame_count == 0 {
553 SkipReason::PartialFirstFrame
554 } else {
555 let skipped = &self.buf[..skip];
556 if self.is_replay(skipped) {
557 SkipReason::ReplayedFrame
558 } else {
559 SkipReason::MidStreamSkip
560 }
561 };
562 self.consume_skipped(skip, reason);
563 return Some(Err(e));
564 }
565 }
566 };
567
568 let content_start = header_len;
569 let expected_end = content_start + byte_count;
570
571 loop {
576 while self.buf.len() <= expected_end + 1 && !self.eof {
578 if let Err(e) = self.fill_buf() {
579 return Some(Err(ParseError::Io(e)));
580 }
581 }
582
583 if expected_end < self.buf.len() && self.buf[expected_end] == 0x0B {
585 let has_newline =
586 expected_end + 1 < self.buf.len() && self.buf[expected_end + 1] == b'\n';
587 let at_eof = expected_end + 1 >= self.buf.len() && self.eof;
588
589 if has_newline || at_eof {
590 let content = self.buf[content_start..expected_end].to_vec();
591 let drain_to = if has_newline {
592 expected_end + 2
593 } else {
594 expected_end + 1
595 };
596 self.consume(drain_to);
597 self.frame_count += 1;
598 return Some(Ok(Frame {
599 direction,
600 byte_count,
601 transport,
602 address,
603 timestamp,
604 content,
605 }));
606 }
607 }
608
609 if let Some(boundary_pos) = self.find_boundary(content_start) {
611 let content = self.buf[content_start..boundary_pos].to_vec();
612 let drain_to = boundary_pos + 2;
613 self.consume(drain_to);
614 self.frame_count += 1;
615
616 if content.len() != byte_count {
617 debug!(
618 frame = self.frame_count,
619 expected = byte_count,
620 actual = content.len(),
621 "frame content size mismatch"
622 );
623 }
624
625 return Some(Ok(Frame {
626 direction,
627 byte_count,
628 transport,
629 address,
630 timestamp,
631 content,
632 }));
633 }
634
635 if self.eof {
636 let end = if self.buf.last() == Some(&0x0B) {
638 self.buf.len() - 1
639 } else {
640 self.buf.len()
641 };
642 let content = self.buf[content_start..end].to_vec();
643 let len = self.buf.len();
644 self.consume(len);
645 self.frame_count += 1;
646
647 if content.len() < byte_count {
648 let missing = byte_count - content.len();
649 warn!(
650 frame = self.frame_count,
651 expected = byte_count,
652 actual = content.len(),
653 missing,
654 "incomplete frame at EOF"
655 );
656 if self.skip_tracking != SkipTracking::CountOnly {
657 self.stats.unparsed_regions.push(UnparsedRegion {
658 offset: self.offset,
659 length: missing as u64,
660 reason: SkipReason::IncompleteFrame,
661 data: None,
662 });
663 }
664 } else if content.len() != byte_count {
665 debug!(
666 frame = self.frame_count,
667 expected = byte_count,
668 actual = content.len(),
669 "last frame content size mismatch"
670 );
671 }
672
673 return Some(Ok(Frame {
674 direction,
675 byte_count,
676 transport,
677 address,
678 timestamp,
679 content,
680 }));
681 }
682
683 if let Err(e) = self.fill_buf() {
684 return Some(Err(ParseError::Io(e)));
685 }
686 }
687 }
688}
689
690#[cfg(test)]
691mod tests {
692 use super::*;
693 use crate::types::SkipTracking;
694
695 #[test]
696 fn parse_recv_ipv4_tcp() {
697 let header = b"recv 100 bytes from tcp/192.168.1.1:5060 at 00:00:01.350874:\n";
698 let (dir, count, transport, addr, ts, len) = parse_frame_header(header).unwrap();
699 assert_eq!(dir, Direction::Recv);
700 assert_eq!(count, 100);
701 assert_eq!(transport, Transport::Tcp);
702 assert_eq!(addr, "192.168.1.1:5060");
703 assert_eq!(
704 ts,
705 Timestamp::TimeOnly {
706 hour: 0,
707 min: 0,
708 sec: 1,
709 usec: 350874
710 }
711 );
712 assert_eq!(len, header.len());
713 }
714
715 #[test]
716 fn parse_recv_ipv6_tcp() {
717 let header = b"recv 1440 bytes from tcp/[2001:4958:10:14::4]:30046 at 13:03:21.674883:\n";
718 let (dir, count, transport, addr, ts, _) = parse_frame_header(header).unwrap();
719 assert_eq!(dir, Direction::Recv);
720 assert_eq!(count, 1440);
721 assert_eq!(transport, Transport::Tcp);
722 assert_eq!(addr, "[2001:4958:10:14::4]:30046");
723 assert_eq!(
724 ts,
725 Timestamp::TimeOnly {
726 hour: 13,
727 min: 3,
728 sec: 21,
729 usec: 674883
730 }
731 );
732 }
733
734 #[test]
735 fn parse_sent_ipv6_tcp() {
736 let header = b"sent 681 bytes to tcp/[2001:4958:10:14::4]:30046 at 13:03:21.675500:\n";
737 let (dir, count, transport, addr, _, _) = parse_frame_header(header).unwrap();
738 assert_eq!(dir, Direction::Sent);
739 assert_eq!(count, 681);
740 assert_eq!(transport, Transport::Tcp);
741 assert_eq!(addr, "[2001:4958:10:14::4]:30046");
742 }
743
744 #[test]
745 fn parse_recv_udp() {
746 let header = b"recv 457 bytes from udp/10.0.0.1:5060 at 00:19:47.123456:\n";
747 let (dir, _, transport, _, _, _) = parse_frame_header(header).unwrap();
748 assert_eq!(dir, Direction::Recv);
749 assert_eq!(transport, Transport::Udp);
750 }
751
752 #[test]
753 fn parse_sent_tls() {
754 let header = b"sent 500 bytes to tls/10.0.0.1:5061 at 12:00:00.000000:\n";
755 let (dir, count, transport, _, _, _) = parse_frame_header(header).unwrap();
756 assert_eq!(dir, Direction::Sent);
757 assert_eq!(count, 500);
758 assert_eq!(transport, Transport::Tls);
759 }
760
761 #[test]
762 fn parse_full_datetime_timestamp() {
763 let header = b"recv 100 bytes from tcp/192.168.1.1:5060 at 2026-02-01 10:00:00.000000:\n";
764 let (_, _, _, _, ts, _) = parse_frame_header(header).unwrap();
765 assert_eq!(
766 ts,
767 Timestamp::DateTime {
768 year: 2026,
769 month: 2,
770 day: 1,
771 hour: 10,
772 min: 0,
773 sec: 0,
774 usec: 0
775 }
776 );
777 }
778
779 #[test]
780 fn parse_invalid_header() {
781 assert!(parse_frame_header(b"invalid header\n").is_err());
782 assert!(
783 parse_frame_header(b"recv abc bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\n")
784 .is_err()
785 );
786 }
787
788 #[test]
789 fn is_frame_header_valid() {
790 assert!(is_frame_header(
791 b"recv 100 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\n"
792 ));
793 assert!(is_frame_header(
794 b"sent 681 bytes to tcp/[::1]:5060 at 00:00:00.000000:\n"
795 ));
796 assert!(!is_frame_header(b"not a header"));
797 assert!(!is_frame_header(b"recv abc bytes"));
798 assert!(!is_frame_header(b""));
799 }
800
801 #[test]
802 fn frame_iterator_single_frame() {
803 let data = b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B\n";
804 let frames: Vec<Frame> = FrameIterator::new(&data[..])
805 .collect::<Result<Vec<_>, _>>()
806 .unwrap();
807 assert_eq!(frames.len(), 1);
808 assert_eq!(frames[0].content, b"hello");
809 assert_eq!(frames[0].byte_count, 5);
810 }
811
812 #[test]
813 fn frame_iterator_multiple_frames() {
814 let data = b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B\nsent 5 bytes to tcp/1.1.1.1:5060 at 00:00:00.000001:\nworld\x0B\n";
815 let frames: Vec<Frame> = FrameIterator::new(&data[..])
816 .collect::<Result<Vec<_>, _>>()
817 .unwrap();
818 assert_eq!(frames.len(), 2);
819 assert_eq!(frames[0].content, b"hello");
820 assert_eq!(frames[0].direction, Direction::Recv);
821 assert_eq!(frames[1].content, b"world");
822 assert_eq!(frames[1].direction, Direction::Sent);
823 }
824
825 #[test]
826 fn frame_iterator_vt_in_content() {
827 let mut data = Vec::new();
829 data.extend_from_slice(b"recv 15 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\n");
830 data.extend_from_slice(b"he\x0B\nllo world!!");
831 data.extend_from_slice(b"\x0B\n");
832 let frames: Vec<Frame> = FrameIterator::new(&data[..])
833 .collect::<Result<Vec<_>, _>>()
834 .unwrap();
835 assert_eq!(frames.len(), 1);
836 assert_eq!(frames[0].content, b"he\x0B\nllo world!!");
837 }
838
839 #[test]
840 fn frame_iterator_eof_without_boundary() {
841 let data = b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello";
842 let frames: Vec<Frame> = FrameIterator::new(&data[..])
843 .collect::<Result<Vec<_>, _>>()
844 .unwrap();
845 assert_eq!(frames.len(), 1);
846 assert_eq!(frames[0].content, b"hello");
847 }
848
849 #[test]
850 fn frame_iterator_eof_with_lone_vt() {
851 let data = b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B";
852 let frames: Vec<Frame> = FrameIterator::new(&data[..])
853 .collect::<Result<Vec<_>, _>>()
854 .unwrap();
855 assert_eq!(frames.len(), 1);
856 assert_eq!(frames[0].content, b"hello");
857 }
858
859 #[test]
860 fn frame_iterator_partial_first_frame() {
861 let mut data = Vec::new();
863 data.extend_from_slice(b"partial garbage data");
864 data.extend_from_slice(b"\x0B\n");
865 data.extend_from_slice(
866 b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B\n",
867 );
868 let frames: Vec<Frame> = FrameIterator::new(&data[..])
869 .collect::<Result<Vec<_>, _>>()
870 .unwrap();
871 assert_eq!(frames.len(), 1);
872 assert_eq!(frames[0].content, b"hello");
873 }
874
875 #[test]
876 fn frame_iterator_truncated_last_frame() {
877 let mut data = Vec::new();
879 data.extend_from_slice(
880 b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B\n",
881 );
882 data.extend_from_slice(b"sent 3 bytes to tcp/1.1.1.1:5060 at 00:00:01.000000:\nbye");
883 let frames: Vec<Frame> = FrameIterator::new(&data[..])
884 .collect::<Result<Vec<_>, _>>()
885 .unwrap();
886 assert_eq!(frames.len(), 2);
887 assert_eq!(frames[0].content, b"hello");
888 assert_eq!(frames[1].content, b"bye");
889 }
890
891 #[test]
892 fn frame_iterator_file_concatenation() {
893 let mut data = Vec::new();
897
898 data.extend_from_slice(
900 b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B\n",
901 );
902 data.extend_from_slice(
903 b"sent 5 bytes to tcp/1.1.1.1:5060 at 00:00:00.000001:\nworld\x0B\n",
904 );
905
906 data.extend_from_slice(b"some truncated SIP content from previous rotation\r\n\r\n");
908 data.extend_from_slice(b"\x0B\n");
909 data.extend_from_slice(
910 b"recv 3 bytes from tcp/2.2.2.2:5060 at 01:00:00.000000:\nfoo\x0B\n",
911 );
912
913 let items: Vec<Result<Frame, ParseError>> = FrameIterator::new(&data[..]).collect();
914 let frames: Vec<Frame> = items.into_iter().filter_map(Result::ok).collect();
915 assert_eq!(frames.len(), 3);
916 assert_eq!(frames[0].content, b"hello");
917 assert_eq!(frames[1].content, b"world");
918 assert_eq!(frames[2].content, b"foo");
919 assert_eq!(frames[2].address, "2.2.2.2:5060");
920 }
921
922 #[test]
923 fn frame_iterator_file_concatenation_mid_stream_garbage() {
924 let mut data = Vec::new();
928
929 data.extend_from_slice(
931 b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B\n",
932 );
933
934 data.extend_from_slice(b"Content-Type: application/sdp\r\n\r\nv=0\r\n");
936 data.extend_from_slice(b"\x0B\n");
937
938 data.extend_from_slice(b"sent 3 bytes to tcp/3.3.3.3:5060 at 02:00:00.000000:\nbar\x0B\n");
940
941 let items: Vec<Result<Frame, ParseError>> = FrameIterator::new(&data[..]).collect();
942 let frames: Vec<Frame> = items.into_iter().filter_map(Result::ok).collect();
943 assert_eq!(frames.len(), 2);
944 assert_eq!(frames[0].content, b"hello");
945 assert_eq!(frames[1].content, b"bar");
946 }
947
948 #[test]
949 fn frame_iterator_empty_input() {
950 let data: &[u8] = b"";
951 let frames: Vec<Result<Frame, ParseError>> = FrameIterator::new(data).collect();
952 assert!(frames.is_empty());
953 }
954
955 #[test]
956 fn frame_iterator_only_garbage() {
957 let data = b"this is not a SIP trace dump at all, just garbage text";
958 let frames: Vec<Result<Frame, ParseError>> = FrameIterator::new(&data[..]).collect();
959 assert!(frames.is_empty());
960 }
961
962 #[test]
963 fn frame_iterator_dump_marker_at_eof() {
964 let mut data = Vec::new();
967 data.extend_from_slice(
968 b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B\n",
969 );
970 data.extend_from_slice(b"dump started at Thu Aug 22 11:38:11 2024\n\n\n");
971
972 let frames: Vec<Result<Frame, ParseError>> = FrameIterator::new(&data[..]).collect();
973 assert_eq!(frames.len(), 1);
974 assert!(frames[0].is_ok());
975 assert_eq!(frames[0].as_ref().unwrap().content, b"hello");
976 }
977
978 #[test]
979 fn frame_iterator_dump_marker_mid_stream() {
980 let mut data = Vec::new();
983 data.extend_from_slice(
984 b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B\n",
985 );
986 data.extend_from_slice(b"dump started at Thu Aug 22 11:38:11 2024\n\n\n");
987 data.extend_from_slice(b"sent 3 bytes to tcp/2.2.2.2:5060 at 00:00:01.000000:\nbye\x0B\n");
988
989 let frames: Vec<Result<Frame, ParseError>> = FrameIterator::new(&data[..]).collect();
990 assert_eq!(frames.len(), 2);
991 assert_eq!(frames[0].as_ref().unwrap().content, b"hello");
992 assert_eq!(frames[1].as_ref().unwrap().content, b"bye");
993 }
994
995 #[test]
996 fn frame_iterator_extra_newline_after_boundary() {
997 let mut data = Vec::new();
1000 data.extend_from_slice(
1001 b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B\n",
1002 );
1003 data.push(b'\n');
1004 data.extend_from_slice(
1005 b"sent 5 bytes to tcp/1.1.1.1:5060 at 00:00:00.000001:\nworld\x0B\n",
1006 );
1007
1008 let frames: Vec<Frame> = FrameIterator::new(&data[..])
1009 .collect::<Result<Vec<_>, _>>()
1010 .unwrap();
1011 assert_eq!(frames.len(), 2);
1012 assert_eq!(frames[0].content, b"hello");
1013 assert_eq!(frames[1].content, b"world");
1014 }
1015
1016 #[test]
1017 fn frame_iterator_multiple_newlines_after_boundary() {
1018 let mut data = Vec::new();
1020 data.extend_from_slice(
1021 b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B\n",
1022 );
1023 data.extend_from_slice(b"\n\r\n\n");
1024 data.extend_from_slice(
1025 b"sent 5 bytes to tcp/1.1.1.1:5060 at 00:00:00.000001:\nworld\x0B\n",
1026 );
1027
1028 let frames: Vec<Frame> = FrameIterator::new(&data[..])
1029 .collect::<Result<Vec<_>, _>>()
1030 .unwrap();
1031 assert_eq!(frames.len(), 2);
1032 assert_eq!(frames[0].content, b"hello");
1033 assert_eq!(frames[1].content, b"world");
1034 }
1035
1036 #[test]
1037 fn stats_clean_input() {
1038 let data = b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B\n";
1039 let mut iter = FrameIterator::new(&data[..]);
1040 let frames: Vec<Frame> = iter.by_ref().collect::<Result<Vec<_>, _>>().unwrap();
1041 assert_eq!(frames.len(), 1);
1042 let stats = iter.stats();
1043 assert_eq!(stats.bytes_read, data.len() as u64);
1044 assert_eq!(stats.bytes_skipped, 0);
1045 assert!(stats.unparsed_regions.is_empty());
1046 }
1047
1048 #[test]
1049 fn stats_multiple_frames() {
1050 let data = b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B\nsent 5 bytes to tcp/1.1.1.1:5060 at 00:00:00.000001:\nworld\x0B\n";
1051 let mut iter = FrameIterator::new(&data[..]);
1052 let frames: Vec<Frame> = iter.by_ref().collect::<Result<Vec<_>, _>>().unwrap();
1053 assert_eq!(frames.len(), 2);
1054 let stats = iter.stats();
1055 assert_eq!(stats.bytes_read, data.len() as u64);
1056 assert_eq!(stats.bytes_skipped, 0);
1057 assert!(stats.unparsed_regions.is_empty());
1058 }
1059
1060 #[test]
1061 fn stats_partial_first_frame() {
1062 let mut data = Vec::new();
1063 data.extend_from_slice(b"partial garbage data");
1064 data.extend_from_slice(b"\x0B\n");
1065 data.extend_from_slice(
1066 b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B\n",
1067 );
1068 let mut iter = FrameIterator::new(&data[..]).skip_tracking(SkipTracking::TrackRegions);
1069 let frames: Vec<Frame> = iter.by_ref().collect::<Result<Vec<_>, _>>().unwrap();
1070 assert_eq!(frames.len(), 1);
1071 let stats = iter.stats();
1072 assert_eq!(stats.bytes_read, data.len() as u64);
1073 let skipped = b"partial garbage data\x0B\n".len() as u64;
1075 assert_eq!(stats.bytes_skipped, skipped);
1076 assert_eq!(stats.unparsed_regions.len(), 1);
1077 assert_eq!(stats.unparsed_regions[0].offset, 0);
1078 assert_eq!(stats.unparsed_regions[0].length, skipped);
1079 assert_eq!(
1080 stats.unparsed_regions[0].reason,
1081 crate::types::SkipReason::PartialFirstFrame
1082 );
1083 assert!(stats.unparsed_regions[0].data.is_none());
1084 }
1085
1086 #[test]
1087 fn stats_partial_first_frame_capture() {
1088 let mut data = Vec::new();
1089 data.extend_from_slice(b"partial garbage data");
1090 data.extend_from_slice(b"\x0B\n");
1091 data.extend_from_slice(
1092 b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B\n",
1093 );
1094 let mut iter = FrameIterator::new(&data[..]).capture_skipped(true);
1095 let frames: Vec<Frame> = iter.by_ref().collect::<Result<Vec<_>, _>>().unwrap();
1096 assert_eq!(frames.len(), 1);
1097 let stats = iter.stats();
1098 assert_eq!(stats.unparsed_regions.len(), 1);
1099 let region = &stats.unparsed_regions[0];
1100 assert_eq!(
1101 region.data.as_deref(),
1102 Some(b"partial garbage data\x0B\n".as_slice())
1103 );
1104 }
1105
1106 #[test]
1107 fn stats_mid_stream_partial_frame() {
1108 let mut data = Vec::new();
1110 data.extend_from_slice(
1111 b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B\n",
1112 );
1113 data.extend_from_slice(b"Content-Type: application/sdp\r\n\r\nv=0\r\n");
1114 data.extend_from_slice(b"\x0B\n");
1115 data.extend_from_slice(b"sent 3 bytes to tcp/3.3.3.3:5060 at 02:00:00.000000:\nbar\x0B\n");
1116
1117 let mut iter = FrameIterator::new(&data[..]).skip_tracking(SkipTracking::TrackRegions);
1118 let items: Vec<Result<Frame, ParseError>> = iter.by_ref().collect();
1119 let frames: Vec<Frame> = items.into_iter().filter_map(Result::ok).collect();
1120 assert_eq!(frames.len(), 2);
1121 let stats = iter.stats();
1122 assert!(stats.bytes_skipped > 0);
1123 assert_eq!(stats.unparsed_regions.len(), 1);
1124 assert_eq!(
1125 stats.unparsed_regions[0].reason,
1126 crate::types::SkipReason::MidStreamSkip
1127 );
1128 }
1129
1130 #[test]
1131 fn stats_replayed_frame() {
1132 let frame1 = b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B\n";
1135 let replay = b"Route: <sip:10.0.0.1:5060;lr>\r\nContent-Length: 0\r\n\r\n\x0B\n";
1136 let frame2 = b"sent 3 bytes to tcp/3.3.3.3:5060 at 02:00:00.000000:\nbar\x0B\n";
1137
1138 let mut data = Vec::new();
1139 data.extend_from_slice(frame1);
1140 data.extend_from_slice(replay);
1141 data.extend_from_slice(frame2);
1142
1143 let mut iter = FrameIterator::new(&data[..]).skip_tracking(SkipTracking::TrackRegions);
1144 let items: Vec<Result<Frame, ParseError>> = iter.by_ref().collect();
1145 let frames: Vec<Frame> = items.into_iter().filter_map(Result::ok).collect();
1146 assert_eq!(frames.len(), 2);
1147 let stats = iter.stats();
1148 assert_eq!(stats.unparsed_regions.len(), 1);
1149 assert_eq!(
1150 stats.unparsed_regions[0].reason,
1151 crate::types::SkipReason::ReplayedFrame
1152 );
1153 }
1154
1155 #[test]
1156 fn stats_incomplete_frame_at_eof() {
1157 let mut data = Vec::new();
1159 data.extend_from_slice(
1160 b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B\n",
1161 );
1162 data.extend_from_slice(b"recv 100 bytes from tcp/2.2.2.2:5060 at 01:00:00.000000:\n");
1163 data.extend_from_slice(b"partial content only");
1164 let mut iter = FrameIterator::new(&data[..]).skip_tracking(SkipTracking::TrackRegions);
1167 let items: Vec<Result<Frame, ParseError>> = iter.by_ref().collect();
1168 let frames: Vec<Frame> = items.into_iter().filter_map(Result::ok).collect();
1169 assert_eq!(frames.len(), 2, "truncated frame should still be returned");
1170 assert_eq!(frames[1].content, b"partial content only");
1171 assert_eq!(frames[1].byte_count, 100);
1172 let stats = iter.stats();
1173 assert_eq!(stats.unparsed_regions.len(), 1);
1174 assert_eq!(
1175 stats.unparsed_regions[0].reason,
1176 crate::types::SkipReason::IncompleteFrame
1177 );
1178 }
1179
1180 #[test]
1181 fn stats_invalid_header_skip() {
1182 let mut data = Vec::new();
1184 data.extend_from_slice(
1185 b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B\n",
1186 );
1187 data.extend_from_slice(b"recv CORRUPT HEADER garbage\n");
1188 data.extend_from_slice(b"\x0B\n");
1189 data.extend_from_slice(b"sent 3 bytes to tcp/3.3.3.3:5060 at 02:00:00.000000:\nbar\x0B\n");
1190
1191 let mut iter = FrameIterator::new(&data[..]).skip_tracking(SkipTracking::TrackRegions);
1192 let items: Vec<Result<Frame, ParseError>> = iter.by_ref().collect();
1193 let frames: Vec<Frame> = items.into_iter().filter_map(Result::ok).collect();
1194 assert_eq!(frames.len(), 2);
1195 let stats = iter.stats();
1196 assert!(stats.bytes_skipped > 0);
1197 assert_eq!(stats.unparsed_regions.len(), 1);
1198 assert_eq!(
1199 stats.unparsed_regions[0].reason,
1200 crate::types::SkipReason::InvalidHeader
1201 );
1202 }
1203
1204 #[test]
1205 fn stats_oversized_frame_at_start() {
1206 let mut data = Vec::new();
1207 data.resize(MAX_PARTIAL_FRAME + 1, b'x');
1208 data.extend_from_slice(b"\x0B\n");
1209 data.extend_from_slice(
1210 b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B\n",
1211 );
1212 let mut iter = FrameIterator::new(&data[..]).skip_tracking(SkipTracking::TrackRegions);
1213 let items: Vec<Result<Frame, ParseError>> = iter.by_ref().collect();
1214 let frames: Vec<Frame> = items.into_iter().filter_map(Result::ok).collect();
1215 assert_eq!(frames.len(), 1);
1216 let stats = iter.stats();
1217 assert_eq!(stats.unparsed_regions.len(), 1);
1218 assert_eq!(
1219 stats.unparsed_regions[0].reason,
1220 crate::types::SkipReason::OversizedFrame
1221 );
1222 }
1223
1224 #[test]
1225 fn stats_oversized_frame_mid_stream() {
1226 let mut data = Vec::new();
1227 data.extend_from_slice(
1228 b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B\n",
1229 );
1230 let garbage_len = MAX_PARTIAL_FRAME + 1;
1231 data.resize(data.len() + garbage_len, b'x');
1232 data.extend_from_slice(b"\x0B\n");
1233 data.extend_from_slice(b"sent 3 bytes to tcp/3.3.3.3:5060 at 02:00:00.000000:\nbar\x0B\n");
1234 let mut iter = FrameIterator::new(&data[..]).skip_tracking(SkipTracking::TrackRegions);
1235 let items: Vec<Result<Frame, ParseError>> = iter.by_ref().collect();
1236 let frames: Vec<Frame> = items.into_iter().filter_map(Result::ok).collect();
1237 assert_eq!(frames.len(), 2);
1238 let stats = iter.stats();
1239 assert_eq!(stats.unparsed_regions.len(), 1);
1240 assert_eq!(
1241 stats.unparsed_regions[0].reason,
1242 crate::types::SkipReason::OversizedFrame
1243 );
1244 }
1245
1246 #[test]
1247 fn stats_partial_first_frame_within_limit() {
1248 let mut data = Vec::new();
1250 data.resize(MAX_PARTIAL_FRAME - 2, b'x');
1251 data.extend_from_slice(b"\x0B\n");
1252 data.extend_from_slice(
1253 b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B\n",
1254 );
1255 let mut iter = FrameIterator::new(&data[..]).skip_tracking(SkipTracking::TrackRegions);
1256 let frames: Vec<Frame> = iter.by_ref().collect::<Result<Vec<_>, _>>().unwrap();
1257 assert_eq!(frames.len(), 1);
1258 let stats = iter.stats();
1259 assert_eq!(stats.unparsed_regions.len(), 1);
1260 assert_eq!(
1261 stats.unparsed_regions[0].reason,
1262 crate::types::SkipReason::PartialFirstFrame
1263 );
1264 }
1265
1266 #[test]
1267 fn stats_dump_restart_marker() {
1268 let mut data = Vec::new();
1269 data.extend_from_slice(
1270 b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B\n",
1271 );
1272 data.extend_from_slice(b"dump started at Thu Aug 22 11:38:11 2024\n\n\n");
1273 data.extend_from_slice(b"sent 3 bytes to tcp/2.2.2.2:5060 at 00:00:01.000000:\nbye\x0B\n");
1274
1275 let mut iter = FrameIterator::new(&data[..]);
1276 let frames: Vec<Frame> = iter.by_ref().collect::<Result<Vec<_>, _>>().unwrap();
1277 assert_eq!(frames.len(), 2);
1278 let stats = iter.stats();
1279 assert_eq!(stats.bytes_skipped, 0);
1281 assert!(stats.unparsed_regions.is_empty());
1282 }
1283
1284 #[test]
1285 fn stats_track_regions_no_data() {
1286 let mut data = Vec::new();
1287 data.extend_from_slice(b"partial garbage data");
1288 data.extend_from_slice(b"\x0B\n");
1289 data.extend_from_slice(
1290 b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B\n",
1291 );
1292 let mut iter = FrameIterator::new(&data[..]).skip_tracking(SkipTracking::TrackRegions);
1293 let _: Vec<_> = iter.by_ref().collect::<Result<Vec<_>, _>>().unwrap();
1294 let stats = iter.stats();
1295 assert_eq!(stats.unparsed_regions.len(), 1);
1296 assert!(stats.unparsed_regions[0].data.is_none());
1297 }
1298
1299 #[test]
1300 fn stats_count_only_no_regions() {
1301 let mut data = Vec::new();
1302 data.extend_from_slice(b"partial garbage data");
1303 data.extend_from_slice(b"\x0B\n");
1304 data.extend_from_slice(
1305 b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B\n",
1306 );
1307 let mut iter = FrameIterator::new(&data[..]);
1308 let frames: Vec<_> = iter.by_ref().collect::<Result<Vec<_>, _>>().unwrap();
1309 assert_eq!(frames.len(), 1);
1310 let stats = iter.stats();
1311 let skipped = b"partial garbage data\x0B\n".len() as u64;
1312 assert_eq!(stats.bytes_skipped, skipped);
1313 assert!(
1314 stats.unparsed_regions.is_empty(),
1315 "CountOnly should not accumulate regions"
1316 );
1317 }
1318
1319 #[test]
1320 fn frame_iterator_trailing_newlines_at_eof() {
1321 let mut data = Vec::new();
1323 data.extend_from_slice(
1324 b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B\n",
1325 );
1326 data.extend_from_slice(b"\n\n");
1327
1328 let frames: Vec<Frame> = FrameIterator::new(&data[..])
1329 .collect::<Result<Vec<_>, _>>()
1330 .unwrap();
1331 assert_eq!(frames.len(), 1);
1332 assert_eq!(frames[0].content, b"hello");
1333 }
1334}