Skip to main content

freeswitch_sofia_trace_parser/
frame.rs

1use std::io::Read;
2
3use memchr::memmem;
4use tracing::{debug, info, trace, warn};
5
6use crate::types::{
7    Direction, Frame, ParseStats, SkipReason, SkipTracking, Timestamp, Transport, UnparsedRegion,
8};
9
10const RECV_PREFIX: &[u8] = b"recv ";
11const SENT_PREFIX: &[u8] = b"sent ";
12/// Maximum skip size classified as a partial first frame.
13/// Based on IP max datagram size (65535) plus the `\x0B\n` boundary (2 bytes).
14const MAX_PARTIAL_FRAME: usize = 65537;
15
16#[derive(Debug)]
17pub enum ParseError {
18    InvalidHeader(String),
19    InvalidMessage(String),
20    TransportNoise {
21        bytes: usize,
22        transport: Transport,
23        address: String,
24    },
25    Io(std::io::Error),
26}
27
28impl fmt::Display for ParseError {
29    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
30        match self {
31            ParseError::InvalidHeader(msg) => write!(f, "invalid frame header: {msg}"),
32            ParseError::InvalidMessage(msg) => write!(f, "invalid SIP message: {msg}"),
33            ParseError::TransportNoise {
34                bytes,
35                transport,
36                address,
37            } => write!(
38                f,
39                "transport noise: {bytes} bytes of non-SIP data from {transport}/{address}"
40            ),
41            ParseError::Io(e) => write!(f, "I/O error: {e}"),
42        }
43    }
44}
45
46impl std::error::Error for ParseError {
47    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
48        match self {
49            ParseError::Io(e) => Some(e),
50            _ => None,
51        }
52    }
53}
54
55impl From<std::io::Error> for ParseError {
56    fn from(e: std::io::Error) -> Self {
57        ParseError::Io(e)
58    }
59}
60
61use std::fmt;
62
63fn digit(b: u8) -> Option<u8> {
64    match b {
65        b'0'..=b'9' => Some(b - b'0'),
66        _ => None,
67    }
68}
69
70fn parse_u8(bytes: &[u8]) -> Option<u8> {
71    if bytes.is_empty() || bytes.len() > 3 {
72        return None;
73    }
74    let mut val: u8 = 0;
75    for &b in bytes {
76        val = val.checked_mul(10)?.checked_add(digit(b)?)?;
77    }
78    Some(val)
79}
80
81fn parse_u16(bytes: &[u8]) -> Option<u16> {
82    if bytes.is_empty() || bytes.len() > 5 {
83        return None;
84    }
85    let mut val: u16 = 0;
86    for &b in bytes {
87        val = val.checked_mul(10)?.checked_add(u16::from(digit(b)?))?;
88    }
89    Some(val)
90}
91
92fn parse_u32(bytes: &[u8]) -> Option<u32> {
93    if bytes.is_empty() || bytes.len() > 10 {
94        return None;
95    }
96    let mut val: u32 = 0;
97    for &b in bytes {
98        val = val.checked_mul(10)?.checked_add(u32::from(digit(b)?))?;
99    }
100    Some(val)
101}
102
103fn parse_usize(bytes: &[u8]) -> Option<usize> {
104    if bytes.is_empty() || bytes.len() > 10 {
105        return None;
106    }
107    let mut val: usize = 0;
108    for &b in bytes {
109        val = val.checked_mul(10)?.checked_add(usize::from(digit(b)?))?;
110    }
111    Some(val)
112}
113
114/// Parse timestamp from bytes: either `HH:MM:SS.usec` or `YYYY-MM-DD HH:MM:SS.usec`
115fn parse_timestamp(bytes: &[u8]) -> Option<Timestamp> {
116    // Try full datetime first: YYYY-MM-DD HH:MM:SS.usec (min 26 bytes)
117    if bytes.len() >= 26 && bytes[4] == b'-' && bytes[7] == b'-' && bytes[10] == b' ' {
118        let year = parse_u16(&bytes[0..4])?;
119        let month = parse_u8(&bytes[5..7])?;
120        let day = parse_u8(&bytes[8..10])?;
121        let ts = parse_time_part(&bytes[11..])?;
122        return Some(Timestamp::DateTime {
123            year,
124            month,
125            day,
126            hour: ts.0,
127            min: ts.1,
128            sec: ts.2,
129            usec: ts.3,
130        });
131    }
132    // Time-only: HH:MM:SS.usec (min 15 bytes)
133    let (hour, min, sec, usec) = parse_time_part(bytes)?;
134    Some(Timestamp::TimeOnly {
135        hour,
136        min,
137        sec,
138        usec,
139    })
140}
141
142/// Parse `HH:MM:SS.usec` from bytes, returns (hour, min, sec, usec)
143fn parse_time_part(bytes: &[u8]) -> Option<(u8, u8, u8, u32)> {
144    if bytes.len() < 15 {
145        return None;
146    }
147    if bytes[2] != b':' || bytes[5] != b':' || bytes[8] != b'.' {
148        return None;
149    }
150    let hour = parse_u8(&bytes[0..2])?;
151    let min = parse_u8(&bytes[3..5])?;
152    let sec = parse_u8(&bytes[6..8])?;
153    let usec = parse_u32(&bytes[9..15])?;
154    Some((hour, min, sec, usec))
155}
156
157/// Parse a frame header line from `&[u8]`.
158///
159/// Expected format:
160/// `(recv|sent) <N> bytes (from|to) <transport>/<address> at <timestamp>:\n`
161///
162/// Returns `(Frame header fields, header_len)` where header_len includes the trailing `\n`.
163pub fn parse_frame_header(
164    data: &[u8],
165) -> Result<(Direction, usize, Transport, String, Timestamp, usize), ParseError> {
166    let newline_pos = memchr::memchr(b'\n', data)
167        .ok_or_else(|| ParseError::InvalidHeader("no newline in header".into()))?;
168    let line = &data[..newline_pos];
169    // Strip trailing \r if present
170    let line = line.strip_suffix(b"\r").unwrap_or(line);
171    // Must end with ':'
172    let line = line
173        .strip_suffix(b":")
174        .ok_or_else(|| ParseError::InvalidHeader("header does not end with ':'".into()))?;
175
176    // Direction — both "recv " and "sent " are 5 bytes
177    let direction = if line.starts_with(RECV_PREFIX) {
178        Direction::Recv
179    } else if line.starts_with(SENT_PREFIX) {
180        Direction::Sent
181    } else {
182        return Err(ParseError::InvalidHeader(
183            "expected 'recv' or 'sent'".into(),
184        ));
185    };
186    let mut pos = 5;
187
188    // Byte count: digits until ' '
189    let space = memchr::memchr(b' ', &line[pos..])
190        .ok_or_else(|| ParseError::InvalidHeader("no space after byte count".into()))?;
191    let byte_count = parse_usize(&line[pos..pos + space])
192        .ok_or_else(|| ParseError::InvalidHeader("invalid byte count".into()))?;
193    pos += space + 1;
194
195    // " bytes from/to "
196    let expected_recv = b"bytes from ";
197    let expected_sent = b"bytes to ";
198    if direction == Direction::Recv {
199        if !line[pos..].starts_with(expected_recv) {
200            return Err(ParseError::InvalidHeader("expected 'bytes from '".into()));
201        }
202        pos += expected_recv.len();
203    } else {
204        if !line[pos..].starts_with(expected_sent) {
205            return Err(ParseError::InvalidHeader("expected 'bytes to '".into()));
206        }
207        pos += expected_sent.len();
208    }
209
210    // Transport: tcp/ udp/ tls/ wss/
211    let transport = if line[pos..].starts_with(b"tcp/") {
212        pos += 4;
213        Transport::Tcp
214    } else if line[pos..].starts_with(b"udp/") {
215        pos += 4;
216        Transport::Udp
217    } else if line[pos..].starts_with(b"tls/") {
218        pos += 4;
219        Transport::Tls
220    } else if line[pos..].starts_with(b"wss/") {
221        pos += 4;
222        Transport::Wss
223    } else {
224        return Err(ParseError::InvalidHeader("unknown transport".into()));
225    };
226
227    // Address: until " at "
228    let at_marker = b" at ";
229    let at_pos = memmem::find(&line[pos..], at_marker)
230        .ok_or_else(|| ParseError::InvalidHeader("no ' at ' in header".into()))?;
231    let address = String::from_utf8_lossy(&line[pos..pos + at_pos]).into_owned();
232    pos += at_pos + at_marker.len();
233
234    // Timestamp: rest of line (after stripping trailing ':' already done)
235    let timestamp = parse_timestamp(&line[pos..])
236        .ok_or_else(|| ParseError::InvalidHeader("invalid timestamp".into()))?;
237
238    Ok((
239        direction,
240        byte_count,
241        transport,
242        address,
243        timestamp,
244        newline_pos + 1,
245    ))
246}
247
248/// Check if data at given position looks like a valid frame header start.
249/// Used to validate `\x0B\n` boundaries.
250pub fn is_frame_header(data: &[u8]) -> bool {
251    if data.len() < 20 {
252        return false;
253    }
254    let starts_valid = data.starts_with(RECV_PREFIX) || data.starts_with(SENT_PREFIX);
255    if !starts_valid {
256        return false;
257    }
258    // Check that after direction there are digits followed by " bytes "
259    let rest = &data[5..];
260    let space = match memchr::memchr(b' ', rest) {
261        Some(p) => p,
262        None => return false,
263    };
264    if space == 0 || space > 10 {
265        return false;
266    }
267    for &b in &rest[..space] {
268        if !b.is_ascii_digit() {
269            return false;
270        }
271    }
272    rest[space..].starts_with(b" bytes ")
273}
274
275const READ_BUF_SIZE: usize = 32 * 1024;
276
277pub struct FrameIterator<R> {
278    reader: R,
279    buf: Vec<u8>,
280    eof: bool,
281    frame_count: u64,
282    offset: u64,
283    stats: ParseStats,
284    skip_tracking: SkipTracking,
285}
286
287impl<R: Read> FrameIterator<R> {
288    pub fn new(reader: R) -> Self {
289        FrameIterator {
290            reader,
291            buf: Vec::with_capacity(READ_BUF_SIZE * 2),
292            eof: false,
293            frame_count: 0,
294            offset: 0,
295            stats: ParseStats::default(),
296            skip_tracking: SkipTracking::CountOnly,
297        }
298    }
299
300    pub fn capture_skipped(mut self, enable: bool) -> Self {
301        if enable {
302            self.skip_tracking = SkipTracking::CaptureData;
303        }
304        self
305    }
306
307    pub fn skip_tracking(mut self, tracking: SkipTracking) -> Self {
308        self.skip_tracking = tracking;
309        self
310    }
311
312    pub fn stats(&self) -> &ParseStats {
313        &self.stats
314    }
315
316    pub fn stats_mut(&mut self) -> &mut ParseStats {
317        &mut self.stats
318    }
319
320    pub fn drain_unparsed(&mut self) -> Vec<UnparsedRegion> {
321        self.stats.drain_regions()
322    }
323
324    fn consume(&mut self, n: usize) {
325        self.buf.drain(..n);
326        self.offset += n as u64;
327    }
328
329    fn consume_skipped(&mut self, n: usize, reason: SkipReason) {
330        if self.skip_tracking != SkipTracking::CountOnly {
331            let data = if self.skip_tracking == SkipTracking::CaptureData {
332                Some(self.buf[..n].to_vec())
333            } else {
334                None
335            };
336            self.stats.unparsed_regions.push(UnparsedRegion {
337                offset: self.offset,
338                length: n as u64,
339                reason,
340                data,
341            });
342        }
343        self.stats.bytes_skipped += n as u64;
344        self.consume(n);
345    }
346
347    fn fill_buf(&mut self) -> Result<bool, std::io::Error> {
348        if self.eof {
349            return Ok(false);
350        }
351        let old_len = self.buf.len();
352        self.buf.resize(old_len + READ_BUF_SIZE, 0);
353        let n = self.reader.read(&mut self.buf[old_len..])?;
354        self.buf.truncate(old_len + n);
355        if n == 0 {
356            self.eof = true;
357            return Ok(false);
358        }
359        self.stats.bytes_read += n as u64;
360        Ok(true)
361    }
362
363    /// Check if skipped content is a truncated frame from a logrotate file boundary.
364    ///
365    /// Detection: the skipped content ends with `\r\n\r\n\x0B\n` — the SIP header/body
366    /// terminator followed by the frame boundary marker. This pattern indicates the tail
367    /// of a SIP frame that was split across logrotated dump files.
368    fn is_replay(&self, skipped: &[u8]) -> bool {
369        if self.frame_count == 0 {
370            return false;
371        }
372        skipped.ends_with(b"\r\n\r\n\x0B\n")
373    }
374
375    /// Find the next `\x0B\n` boundary that is followed by a valid frame header.
376    fn find_boundary(&self, start: usize) -> Option<usize> {
377        let finder = memmem::Finder::new(b"\x0B\n");
378        let mut search_from = start;
379        loop {
380            let pos = finder.find(&self.buf[search_from..])?;
381            let abs_pos = search_from + pos;
382            let after = abs_pos + 2;
383            if after >= self.buf.len() {
384                // Boundary at very end — could be real, but we can't validate header yet
385                // If EOF, accept it as boundary (content ends at \x0B)
386                if self.eof {
387                    return Some(abs_pos);
388                }
389                return None; // Need more data
390            }
391            if is_frame_header(&self.buf[after..]) {
392                return Some(abs_pos);
393            }
394            // \x0B\n in content, not a boundary — skip past it
395            trace!(
396                offset = abs_pos,
397                "found \\x0B\\n in content (not a boundary), skipping"
398            );
399            search_from = abs_pos + 2;
400        }
401    }
402
403    /// Skip to the first valid frame header in the buffer (for partial first frames).
404    fn skip_to_first_header(&mut self) -> Option<usize> {
405        if is_frame_header(&self.buf) {
406            return Some(0);
407        }
408        // Look for \x0B\n followed by a valid header
409        let finder = memmem::Finder::new(b"\x0B\n");
410        let mut search_from = 0;
411        loop {
412            if let Some(pos) = finder.find(&self.buf[search_from..]) {
413                let abs_pos = search_from + pos;
414                let after = abs_pos + 2;
415                if after < self.buf.len() && is_frame_header(&self.buf[after..]) {
416                    info!(skipped_bytes = after, "skipped partial first frame");
417                    return Some(after);
418                }
419                search_from = abs_pos + 2;
420            } else {
421                return None;
422            }
423        }
424    }
425}
426
427impl<R: Read> Iterator for FrameIterator<R> {
428    type Item = Result<Frame, ParseError>;
429
430    fn next(&mut self) -> Option<Self::Item> {
431        // Ensure we have data
432        if self.buf.is_empty() && !self.eof {
433            if let Err(e) = self.fill_buf() {
434                return Some(Err(ParseError::Io(e)));
435            }
436        }
437
438        if self.buf.is_empty() {
439            return None;
440        }
441
442        // On first call, skip to first valid header if needed
443        if self.frame_count == 0 {
444            loop {
445                match self.skip_to_first_header() {
446                    Some(offset) => {
447                        if offset > 0 {
448                            let reason = if offset <= MAX_PARTIAL_FRAME {
449                                SkipReason::PartialFirstFrame
450                            } else {
451                                SkipReason::OversizedFrame
452                            };
453                            self.consume_skipped(offset, reason);
454                        }
455                        break;
456                    }
457                    None => {
458                        if self.eof {
459                            debug!("no valid frame header found in entire input");
460                            return None;
461                        }
462                        if let Err(e) = self.fill_buf() {
463                            return Some(Err(ParseError::Io(e)));
464                        }
465                    }
466                }
467            }
468        }
469
470        if self.buf.is_empty() {
471            return None;
472        }
473
474        // Strip inter-frame newline padding (\n or \r\n between frames)
475        let mut strip = 0;
476        while strip < self.buf.len() {
477            if self.buf[strip] == b'\n' {
478                strip += 1;
479            } else if strip + 1 < self.buf.len()
480                && self.buf[strip] == b'\r'
481                && self.buf[strip + 1] == b'\n'
482            {
483                strip += 2;
484            } else {
485                break;
486            }
487        }
488        if strip > 0 {
489            self.consume(strip);
490            if self.buf.is_empty() {
491                return self.next();
492            }
493        }
494
495        // Parse frame header — may need more data if header spans buffer boundary
496        let (direction, byte_count, transport, address, timestamp, header_len) = loop {
497            match parse_frame_header(&self.buf) {
498                Ok(h) => break h,
499                Err(ParseError::InvalidHeader(ref msg)) if msg == "no newline in header" => {
500                    if self.eof {
501                        debug!("truncated frame header at EOF");
502                        return None;
503                    }
504                    if let Err(e) = self.fill_buf() {
505                        return Some(Err(ParseError::Io(e)));
506                    }
507                }
508                Err(e) => {
509                    let header_preview: String = self
510                        .buf
511                        .iter()
512                        .take(200)
513                        .take_while(|&&b| b != b'\n')
514                        .map(|&b| {
515                            if b.is_ascii_graphic() || b == b' ' {
516                                b as char
517                            } else {
518                                '.'
519                            }
520                        })
521                        .collect();
522                    if header_preview.starts_with("dump started at ") {
523                        let skip = memchr::memchr(b'\n', &self.buf)
524                            .map(|p| {
525                                let mut end = p + 1;
526                                while end < self.buf.len() && self.buf[end] == b'\n' {
527                                    end += 1;
528                                }
529                                end
530                            })
531                            .unwrap_or(self.buf.len());
532                        debug!(
533                            header = %header_preview,
534                            skipped_bytes = skip,
535                            "skipped dump restart marker",
536                        );
537                        self.consume(skip);
538                        return self.next();
539                    }
540                    let skip = if let Some(b) = self.find_boundary(0) {
541                        b + 2
542                    } else {
543                        memchr::memchr(b'\n', &self.buf)
544                            .map(|p| p + 1)
545                            .unwrap_or(self.buf.len())
546                    };
547                    let reason =
548                        if self.buf.starts_with(RECV_PREFIX) || self.buf.starts_with(SENT_PREFIX) {
549                            SkipReason::InvalidHeader
550                        } else if skip > MAX_PARTIAL_FRAME {
551                            SkipReason::OversizedFrame
552                        } else if self.frame_count == 0 {
553                            SkipReason::PartialFirstFrame
554                        } else {
555                            let skipped = &self.buf[..skip];
556                            if self.is_replay(skipped) {
557                                SkipReason::ReplayedFrame
558                            } else {
559                                SkipReason::MidStreamSkip
560                            }
561                        };
562                    self.consume_skipped(skip, reason);
563                    return Some(Err(e));
564                }
565            }
566        };
567
568        let content_start = header_len;
569        let expected_end = content_start + byte_count;
570
571        // Find the boundary for this frame.
572        // Strategy: first check at the expected position (content_start + byte_count),
573        // then fall back to scanning. This handles file concatenation where \x0B\n
574        // is followed by garbage from the next file's truncated first frame.
575        loop {
576            // Ensure we have enough data to check the expected position
577            while self.buf.len() <= expected_end + 1 && !self.eof {
578                if let Err(e) = self.fill_buf() {
579                    return Some(Err(ParseError::Io(e)));
580                }
581            }
582
583            // Check at expected position first (byte_count hint)
584            if expected_end < self.buf.len() && self.buf[expected_end] == 0x0B {
585                let has_newline =
586                    expected_end + 1 < self.buf.len() && self.buf[expected_end + 1] == b'\n';
587                let at_eof = expected_end + 1 >= self.buf.len() && self.eof;
588
589                if has_newline || at_eof {
590                    let content = self.buf[content_start..expected_end].to_vec();
591                    let drain_to = if has_newline {
592                        expected_end + 2
593                    } else {
594                        expected_end + 1
595                    };
596                    self.consume(drain_to);
597                    self.frame_count += 1;
598                    return Some(Ok(Frame {
599                        direction,
600                        byte_count,
601                        transport,
602                        address,
603                        timestamp,
604                        content,
605                    }));
606                }
607            }
608
609            // Fall back to scanning for \x0B\n + valid header
610            if let Some(boundary_pos) = self.find_boundary(content_start) {
611                let content = self.buf[content_start..boundary_pos].to_vec();
612                let drain_to = boundary_pos + 2;
613                self.consume(drain_to);
614                self.frame_count += 1;
615
616                if content.len() != byte_count {
617                    debug!(
618                        frame = self.frame_count,
619                        expected = byte_count,
620                        actual = content.len(),
621                        "frame content size mismatch"
622                    );
623                }
624
625                return Some(Ok(Frame {
626                    direction,
627                    byte_count,
628                    transport,
629                    address,
630                    timestamp,
631                    content,
632                }));
633            }
634
635            if self.eof {
636                // Last frame — no trailing \x0B\n
637                let end = if self.buf.last() == Some(&0x0B) {
638                    self.buf.len() - 1
639                } else {
640                    self.buf.len()
641                };
642                let content = self.buf[content_start..end].to_vec();
643                let len = self.buf.len();
644                self.consume(len);
645                self.frame_count += 1;
646
647                if content.len() < byte_count {
648                    let missing = byte_count - content.len();
649                    warn!(
650                        frame = self.frame_count,
651                        expected = byte_count,
652                        actual = content.len(),
653                        missing,
654                        "incomplete frame at EOF"
655                    );
656                    if self.skip_tracking != SkipTracking::CountOnly {
657                        self.stats.unparsed_regions.push(UnparsedRegion {
658                            offset: self.offset,
659                            length: missing as u64,
660                            reason: SkipReason::IncompleteFrame,
661                            data: None,
662                        });
663                    }
664                } else if content.len() != byte_count {
665                    debug!(
666                        frame = self.frame_count,
667                        expected = byte_count,
668                        actual = content.len(),
669                        "last frame content size mismatch"
670                    );
671                }
672
673                return Some(Ok(Frame {
674                    direction,
675                    byte_count,
676                    transport,
677                    address,
678                    timestamp,
679                    content,
680                }));
681            }
682
683            if let Err(e) = self.fill_buf() {
684                return Some(Err(ParseError::Io(e)));
685            }
686        }
687    }
688}
689
690#[cfg(test)]
691mod tests {
692    use super::*;
693    use crate::types::SkipTracking;
694
695    #[test]
696    fn parse_recv_ipv4_tcp() {
697        let header = b"recv 100 bytes from tcp/192.168.1.1:5060 at 00:00:01.350874:\n";
698        let (dir, count, transport, addr, ts, len) = parse_frame_header(header).unwrap();
699        assert_eq!(dir, Direction::Recv);
700        assert_eq!(count, 100);
701        assert_eq!(transport, Transport::Tcp);
702        assert_eq!(addr, "192.168.1.1:5060");
703        assert_eq!(
704            ts,
705            Timestamp::TimeOnly {
706                hour: 0,
707                min: 0,
708                sec: 1,
709                usec: 350874
710            }
711        );
712        assert_eq!(len, header.len());
713    }
714
715    #[test]
716    fn parse_recv_ipv6_tcp() {
717        let header = b"recv 1440 bytes from tcp/[2001:4958:10:14::4]:30046 at 13:03:21.674883:\n";
718        let (dir, count, transport, addr, ts, _) = parse_frame_header(header).unwrap();
719        assert_eq!(dir, Direction::Recv);
720        assert_eq!(count, 1440);
721        assert_eq!(transport, Transport::Tcp);
722        assert_eq!(addr, "[2001:4958:10:14::4]:30046");
723        assert_eq!(
724            ts,
725            Timestamp::TimeOnly {
726                hour: 13,
727                min: 3,
728                sec: 21,
729                usec: 674883
730            }
731        );
732    }
733
734    #[test]
735    fn parse_sent_ipv6_tcp() {
736        let header = b"sent 681 bytes to tcp/[2001:4958:10:14::4]:30046 at 13:03:21.675500:\n";
737        let (dir, count, transport, addr, _, _) = parse_frame_header(header).unwrap();
738        assert_eq!(dir, Direction::Sent);
739        assert_eq!(count, 681);
740        assert_eq!(transport, Transport::Tcp);
741        assert_eq!(addr, "[2001:4958:10:14::4]:30046");
742    }
743
744    #[test]
745    fn parse_recv_udp() {
746        let header = b"recv 457 bytes from udp/10.0.0.1:5060 at 00:19:47.123456:\n";
747        let (dir, _, transport, _, _, _) = parse_frame_header(header).unwrap();
748        assert_eq!(dir, Direction::Recv);
749        assert_eq!(transport, Transport::Udp);
750    }
751
752    #[test]
753    fn parse_sent_tls() {
754        let header = b"sent 500 bytes to tls/10.0.0.1:5061 at 12:00:00.000000:\n";
755        let (dir, count, transport, _, _, _) = parse_frame_header(header).unwrap();
756        assert_eq!(dir, Direction::Sent);
757        assert_eq!(count, 500);
758        assert_eq!(transport, Transport::Tls);
759    }
760
761    #[test]
762    fn parse_full_datetime_timestamp() {
763        let header = b"recv 100 bytes from tcp/192.168.1.1:5060 at 2026-02-01 10:00:00.000000:\n";
764        let (_, _, _, _, ts, _) = parse_frame_header(header).unwrap();
765        assert_eq!(
766            ts,
767            Timestamp::DateTime {
768                year: 2026,
769                month: 2,
770                day: 1,
771                hour: 10,
772                min: 0,
773                sec: 0,
774                usec: 0
775            }
776        );
777    }
778
779    #[test]
780    fn parse_invalid_header() {
781        assert!(parse_frame_header(b"invalid header\n").is_err());
782        assert!(
783            parse_frame_header(b"recv abc bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\n")
784                .is_err()
785        );
786    }
787
788    #[test]
789    fn is_frame_header_valid() {
790        assert!(is_frame_header(
791            b"recv 100 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\n"
792        ));
793        assert!(is_frame_header(
794            b"sent 681 bytes to tcp/[::1]:5060 at 00:00:00.000000:\n"
795        ));
796        assert!(!is_frame_header(b"not a header"));
797        assert!(!is_frame_header(b"recv abc bytes"));
798        assert!(!is_frame_header(b""));
799    }
800
801    #[test]
802    fn frame_iterator_single_frame() {
803        let data = b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B\n";
804        let frames: Vec<Frame> = FrameIterator::new(&data[..])
805            .collect::<Result<Vec<_>, _>>()
806            .unwrap();
807        assert_eq!(frames.len(), 1);
808        assert_eq!(frames[0].content, b"hello");
809        assert_eq!(frames[0].byte_count, 5);
810    }
811
812    #[test]
813    fn frame_iterator_multiple_frames() {
814        let data = b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B\nsent 5 bytes to tcp/1.1.1.1:5060 at 00:00:00.000001:\nworld\x0B\n";
815        let frames: Vec<Frame> = FrameIterator::new(&data[..])
816            .collect::<Result<Vec<_>, _>>()
817            .unwrap();
818        assert_eq!(frames.len(), 2);
819        assert_eq!(frames[0].content, b"hello");
820        assert_eq!(frames[0].direction, Direction::Recv);
821        assert_eq!(frames[1].content, b"world");
822        assert_eq!(frames[1].direction, Direction::Sent);
823    }
824
825    #[test]
826    fn frame_iterator_vt_in_content() {
827        // \x0B in content but not followed by valid header — should NOT split
828        let mut data = Vec::new();
829        data.extend_from_slice(b"recv 15 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\n");
830        data.extend_from_slice(b"he\x0B\nllo world!!");
831        data.extend_from_slice(b"\x0B\n");
832        let frames: Vec<Frame> = FrameIterator::new(&data[..])
833            .collect::<Result<Vec<_>, _>>()
834            .unwrap();
835        assert_eq!(frames.len(), 1);
836        assert_eq!(frames[0].content, b"he\x0B\nllo world!!");
837    }
838
839    #[test]
840    fn frame_iterator_eof_without_boundary() {
841        let data = b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello";
842        let frames: Vec<Frame> = FrameIterator::new(&data[..])
843            .collect::<Result<Vec<_>, _>>()
844            .unwrap();
845        assert_eq!(frames.len(), 1);
846        assert_eq!(frames[0].content, b"hello");
847    }
848
849    #[test]
850    fn frame_iterator_eof_with_lone_vt() {
851        let data = b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B";
852        let frames: Vec<Frame> = FrameIterator::new(&data[..])
853            .collect::<Result<Vec<_>, _>>()
854            .unwrap();
855        assert_eq!(frames.len(), 1);
856        assert_eq!(frames[0].content, b"hello");
857    }
858
859    #[test]
860    fn frame_iterator_partial_first_frame() {
861        // Data starts with garbage, then a valid boundary + frame
862        let mut data = Vec::new();
863        data.extend_from_slice(b"partial garbage data");
864        data.extend_from_slice(b"\x0B\n");
865        data.extend_from_slice(
866            b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B\n",
867        );
868        let frames: Vec<Frame> = FrameIterator::new(&data[..])
869            .collect::<Result<Vec<_>, _>>()
870            .unwrap();
871        assert_eq!(frames.len(), 1);
872        assert_eq!(frames[0].content, b"hello");
873    }
874
875    #[test]
876    fn frame_iterator_truncated_last_frame() {
877        // Complete frame followed by truncated frame at EOF (no \x0B\n)
878        let mut data = Vec::new();
879        data.extend_from_slice(
880            b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B\n",
881        );
882        data.extend_from_slice(b"sent 3 bytes to tcp/1.1.1.1:5060 at 00:00:01.000000:\nbye");
883        let frames: Vec<Frame> = FrameIterator::new(&data[..])
884            .collect::<Result<Vec<_>, _>>()
885            .unwrap();
886        assert_eq!(frames.len(), 2);
887        assert_eq!(frames[0].content, b"hello");
888        assert_eq!(frames[1].content, b"bye");
889    }
890
891    #[test]
892    fn frame_iterator_file_concatenation() {
893        // Simulates `cat dump.20 dump.21 | parser`
894        // File 1: truncated start + valid frame + complete end
895        // File 2: truncated start (no header) + valid frame
896        let mut data = Vec::new();
897
898        // File 1: starts with valid header
899        data.extend_from_slice(
900            b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B\n",
901        );
902        data.extend_from_slice(
903            b"sent 5 bytes to tcp/1.1.1.1:5060 at 00:00:00.000001:\nworld\x0B\n",
904        );
905
906        // File 2: starts with truncated frame data (no header), then boundary, then valid frame
907        data.extend_from_slice(b"some truncated SIP content from previous rotation\r\n\r\n");
908        data.extend_from_slice(b"\x0B\n");
909        data.extend_from_slice(
910            b"recv 3 bytes from tcp/2.2.2.2:5060 at 01:00:00.000000:\nfoo\x0B\n",
911        );
912
913        let items: Vec<Result<Frame, ParseError>> = FrameIterator::new(&data[..]).collect();
914        let frames: Vec<Frame> = items.into_iter().filter_map(Result::ok).collect();
915        assert_eq!(frames.len(), 3);
916        assert_eq!(frames[0].content, b"hello");
917        assert_eq!(frames[1].content, b"world");
918        assert_eq!(frames[2].content, b"foo");
919        assert_eq!(frames[2].address, "2.2.2.2:5060");
920    }
921
922    #[test]
923    fn frame_iterator_file_concatenation_mid_stream_garbage() {
924        // The join point between files produces garbage that looks like:
925        // ...last_content\x0B\ntruncated_first_of_next_file\x0B\nvalid_header...
926        // The truncated part is NOT a valid header, so recovery should skip it
927        let mut data = Vec::new();
928
929        // Last frame of file 1
930        data.extend_from_slice(
931            b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B\n",
932        );
933
934        // Truncated first frame of file 2 (mid-SIP content, no frame header)
935        data.extend_from_slice(b"Content-Type: application/sdp\r\n\r\nv=0\r\n");
936        data.extend_from_slice(b"\x0B\n");
937
938        // Valid second frame of file 2
939        data.extend_from_slice(b"sent 3 bytes to tcp/3.3.3.3:5060 at 02:00:00.000000:\nbar\x0B\n");
940
941        let items: Vec<Result<Frame, ParseError>> = FrameIterator::new(&data[..]).collect();
942        let frames: Vec<Frame> = items.into_iter().filter_map(Result::ok).collect();
943        assert_eq!(frames.len(), 2);
944        assert_eq!(frames[0].content, b"hello");
945        assert_eq!(frames[1].content, b"bar");
946    }
947
948    #[test]
949    fn frame_iterator_empty_input() {
950        let data: &[u8] = b"";
951        let frames: Vec<Result<Frame, ParseError>> = FrameIterator::new(data).collect();
952        assert!(frames.is_empty());
953    }
954
955    #[test]
956    fn frame_iterator_only_garbage() {
957        let data = b"this is not a SIP trace dump at all, just garbage text";
958        let frames: Vec<Result<Frame, ParseError>> = FrameIterator::new(&data[..]).collect();
959        assert!(frames.is_empty());
960    }
961
962    #[test]
963    fn frame_iterator_dump_marker_at_eof() {
964        // A dump restart marker at the end of input (with trailing \n\n as in real dumps)
965        // should be silently consumed, not returned as an error.
966        let mut data = Vec::new();
967        data.extend_from_slice(
968            b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B\n",
969        );
970        data.extend_from_slice(b"dump started at Thu Aug 22 11:38:11 2024\n\n\n");
971
972        let frames: Vec<Result<Frame, ParseError>> = FrameIterator::new(&data[..]).collect();
973        assert_eq!(frames.len(), 1);
974        assert!(frames[0].is_ok());
975        assert_eq!(frames[0].as_ref().unwrap().content, b"hello");
976    }
977
978    #[test]
979    fn frame_iterator_dump_marker_mid_stream() {
980        // A dump restart marker between two valid frames (with trailing \n\n as in real dumps)
981        // should be skipped, and both frames should parse successfully.
982        let mut data = Vec::new();
983        data.extend_from_slice(
984            b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B\n",
985        );
986        data.extend_from_slice(b"dump started at Thu Aug 22 11:38:11 2024\n\n\n");
987        data.extend_from_slice(b"sent 3 bytes to tcp/2.2.2.2:5060 at 00:00:01.000000:\nbye\x0B\n");
988
989        let frames: Vec<Result<Frame, ParseError>> = FrameIterator::new(&data[..]).collect();
990        assert_eq!(frames.len(), 2);
991        assert_eq!(frames[0].as_ref().unwrap().content, b"hello");
992        assert_eq!(frames[1].as_ref().unwrap().content, b"bye");
993    }
994
995    #[test]
996    fn frame_iterator_extra_newline_after_boundary() {
997        // Some dump files have \x0B\n\n between frames (extra \n after boundary).
998        // The extra \n should be stripped, not trigger recovery warnings.
999        let mut data = Vec::new();
1000        data.extend_from_slice(
1001            b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B\n",
1002        );
1003        data.push(b'\n');
1004        data.extend_from_slice(
1005            b"sent 5 bytes to tcp/1.1.1.1:5060 at 00:00:00.000001:\nworld\x0B\n",
1006        );
1007
1008        let frames: Vec<Frame> = FrameIterator::new(&data[..])
1009            .collect::<Result<Vec<_>, _>>()
1010            .unwrap();
1011        assert_eq!(frames.len(), 2);
1012        assert_eq!(frames[0].content, b"hello");
1013        assert_eq!(frames[1].content, b"world");
1014    }
1015
1016    #[test]
1017    fn frame_iterator_multiple_newlines_after_boundary() {
1018        // Multiple \n and \r\n between frames should all be stripped.
1019        let mut data = Vec::new();
1020        data.extend_from_slice(
1021            b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B\n",
1022        );
1023        data.extend_from_slice(b"\n\r\n\n");
1024        data.extend_from_slice(
1025            b"sent 5 bytes to tcp/1.1.1.1:5060 at 00:00:00.000001:\nworld\x0B\n",
1026        );
1027
1028        let frames: Vec<Frame> = FrameIterator::new(&data[..])
1029            .collect::<Result<Vec<_>, _>>()
1030            .unwrap();
1031        assert_eq!(frames.len(), 2);
1032        assert_eq!(frames[0].content, b"hello");
1033        assert_eq!(frames[1].content, b"world");
1034    }
1035
1036    #[test]
1037    fn stats_clean_input() {
1038        let data = b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B\n";
1039        let mut iter = FrameIterator::new(&data[..]);
1040        let frames: Vec<Frame> = iter.by_ref().collect::<Result<Vec<_>, _>>().unwrap();
1041        assert_eq!(frames.len(), 1);
1042        let stats = iter.stats();
1043        assert_eq!(stats.bytes_read, data.len() as u64);
1044        assert_eq!(stats.bytes_skipped, 0);
1045        assert!(stats.unparsed_regions.is_empty());
1046    }
1047
1048    #[test]
1049    fn stats_multiple_frames() {
1050        let data = b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B\nsent 5 bytes to tcp/1.1.1.1:5060 at 00:00:00.000001:\nworld\x0B\n";
1051        let mut iter = FrameIterator::new(&data[..]);
1052        let frames: Vec<Frame> = iter.by_ref().collect::<Result<Vec<_>, _>>().unwrap();
1053        assert_eq!(frames.len(), 2);
1054        let stats = iter.stats();
1055        assert_eq!(stats.bytes_read, data.len() as u64);
1056        assert_eq!(stats.bytes_skipped, 0);
1057        assert!(stats.unparsed_regions.is_empty());
1058    }
1059
1060    #[test]
1061    fn stats_partial_first_frame() {
1062        let mut data = Vec::new();
1063        data.extend_from_slice(b"partial garbage data");
1064        data.extend_from_slice(b"\x0B\n");
1065        data.extend_from_slice(
1066            b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B\n",
1067        );
1068        let mut iter = FrameIterator::new(&data[..]).skip_tracking(SkipTracking::TrackRegions);
1069        let frames: Vec<Frame> = iter.by_ref().collect::<Result<Vec<_>, _>>().unwrap();
1070        assert_eq!(frames.len(), 1);
1071        let stats = iter.stats();
1072        assert_eq!(stats.bytes_read, data.len() as u64);
1073        // "partial garbage data" + "\x0B\n" = 21 bytes skipped
1074        let skipped = b"partial garbage data\x0B\n".len() as u64;
1075        assert_eq!(stats.bytes_skipped, skipped);
1076        assert_eq!(stats.unparsed_regions.len(), 1);
1077        assert_eq!(stats.unparsed_regions[0].offset, 0);
1078        assert_eq!(stats.unparsed_regions[0].length, skipped);
1079        assert_eq!(
1080            stats.unparsed_regions[0].reason,
1081            crate::types::SkipReason::PartialFirstFrame
1082        );
1083        assert!(stats.unparsed_regions[0].data.is_none());
1084    }
1085
1086    #[test]
1087    fn stats_partial_first_frame_capture() {
1088        let mut data = Vec::new();
1089        data.extend_from_slice(b"partial garbage data");
1090        data.extend_from_slice(b"\x0B\n");
1091        data.extend_from_slice(
1092            b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B\n",
1093        );
1094        let mut iter = FrameIterator::new(&data[..]).capture_skipped(true);
1095        let frames: Vec<Frame> = iter.by_ref().collect::<Result<Vec<_>, _>>().unwrap();
1096        assert_eq!(frames.len(), 1);
1097        let stats = iter.stats();
1098        assert_eq!(stats.unparsed_regions.len(), 1);
1099        let region = &stats.unparsed_regions[0];
1100        assert_eq!(
1101            region.data.as_deref(),
1102            Some(b"partial garbage data\x0B\n".as_slice())
1103        );
1104    }
1105
1106    #[test]
1107    fn stats_mid_stream_partial_frame() {
1108        // SIP content between valid frames (file concatenation scenario)
1109        let mut data = Vec::new();
1110        data.extend_from_slice(
1111            b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B\n",
1112        );
1113        data.extend_from_slice(b"Content-Type: application/sdp\r\n\r\nv=0\r\n");
1114        data.extend_from_slice(b"\x0B\n");
1115        data.extend_from_slice(b"sent 3 bytes to tcp/3.3.3.3:5060 at 02:00:00.000000:\nbar\x0B\n");
1116
1117        let mut iter = FrameIterator::new(&data[..]).skip_tracking(SkipTracking::TrackRegions);
1118        let items: Vec<Result<Frame, ParseError>> = iter.by_ref().collect();
1119        let frames: Vec<Frame> = items.into_iter().filter_map(Result::ok).collect();
1120        assert_eq!(frames.len(), 2);
1121        let stats = iter.stats();
1122        assert!(stats.bytes_skipped > 0);
1123        assert_eq!(stats.unparsed_regions.len(), 1);
1124        assert_eq!(
1125            stats.unparsed_regions[0].reason,
1126            crate::types::SkipReason::MidStreamSkip
1127        );
1128    }
1129
1130    #[test]
1131    fn stats_replayed_frame() {
1132        // Simulate logrotate: a frame's tail (SIP headers ending with \r\n\r\n\x0B\n)
1133        // appears between two valid frames at a file boundary.
1134        let frame1 = b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B\n";
1135        let replay = b"Route: <sip:10.0.0.1:5060;lr>\r\nContent-Length: 0\r\n\r\n\x0B\n";
1136        let frame2 = b"sent 3 bytes to tcp/3.3.3.3:5060 at 02:00:00.000000:\nbar\x0B\n";
1137
1138        let mut data = Vec::new();
1139        data.extend_from_slice(frame1);
1140        data.extend_from_slice(replay);
1141        data.extend_from_slice(frame2);
1142
1143        let mut iter = FrameIterator::new(&data[..]).skip_tracking(SkipTracking::TrackRegions);
1144        let items: Vec<Result<Frame, ParseError>> = iter.by_ref().collect();
1145        let frames: Vec<Frame> = items.into_iter().filter_map(Result::ok).collect();
1146        assert_eq!(frames.len(), 2);
1147        let stats = iter.stats();
1148        assert_eq!(stats.unparsed_regions.len(), 1);
1149        assert_eq!(
1150            stats.unparsed_regions[0].reason,
1151            crate::types::SkipReason::ReplayedFrame
1152        );
1153    }
1154
1155    #[test]
1156    fn stats_incomplete_frame_at_eof() {
1157        // Frame header says 100 bytes but only 20 bytes available before EOF
1158        let mut data = Vec::new();
1159        data.extend_from_slice(
1160            b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B\n",
1161        );
1162        data.extend_from_slice(b"recv 100 bytes from tcp/2.2.2.2:5060 at 01:00:00.000000:\n");
1163        data.extend_from_slice(b"partial content only");
1164        // No \x0B\n boundary — EOF truncation
1165
1166        let mut iter = FrameIterator::new(&data[..]).skip_tracking(SkipTracking::TrackRegions);
1167        let items: Vec<Result<Frame, ParseError>> = iter.by_ref().collect();
1168        let frames: Vec<Frame> = items.into_iter().filter_map(Result::ok).collect();
1169        assert_eq!(frames.len(), 2, "truncated frame should still be returned");
1170        assert_eq!(frames[1].content, b"partial content only");
1171        assert_eq!(frames[1].byte_count, 100);
1172        let stats = iter.stats();
1173        assert_eq!(stats.unparsed_regions.len(), 1);
1174        assert_eq!(
1175            stats.unparsed_regions[0].reason,
1176            crate::types::SkipReason::IncompleteFrame
1177        );
1178    }
1179
1180    #[test]
1181    fn stats_invalid_header_skip() {
1182        // Malformed frame header (starts with recv/sent but unparseable)
1183        let mut data = Vec::new();
1184        data.extend_from_slice(
1185            b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B\n",
1186        );
1187        data.extend_from_slice(b"recv CORRUPT HEADER garbage\n");
1188        data.extend_from_slice(b"\x0B\n");
1189        data.extend_from_slice(b"sent 3 bytes to tcp/3.3.3.3:5060 at 02:00:00.000000:\nbar\x0B\n");
1190
1191        let mut iter = FrameIterator::new(&data[..]).skip_tracking(SkipTracking::TrackRegions);
1192        let items: Vec<Result<Frame, ParseError>> = iter.by_ref().collect();
1193        let frames: Vec<Frame> = items.into_iter().filter_map(Result::ok).collect();
1194        assert_eq!(frames.len(), 2);
1195        let stats = iter.stats();
1196        assert!(stats.bytes_skipped > 0);
1197        assert_eq!(stats.unparsed_regions.len(), 1);
1198        assert_eq!(
1199            stats.unparsed_regions[0].reason,
1200            crate::types::SkipReason::InvalidHeader
1201        );
1202    }
1203
1204    #[test]
1205    fn stats_oversized_frame_at_start() {
1206        let mut data = Vec::new();
1207        data.resize(MAX_PARTIAL_FRAME + 1, b'x');
1208        data.extend_from_slice(b"\x0B\n");
1209        data.extend_from_slice(
1210            b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B\n",
1211        );
1212        let mut iter = FrameIterator::new(&data[..]).skip_tracking(SkipTracking::TrackRegions);
1213        let items: Vec<Result<Frame, ParseError>> = iter.by_ref().collect();
1214        let frames: Vec<Frame> = items.into_iter().filter_map(Result::ok).collect();
1215        assert_eq!(frames.len(), 1);
1216        let stats = iter.stats();
1217        assert_eq!(stats.unparsed_regions.len(), 1);
1218        assert_eq!(
1219            stats.unparsed_regions[0].reason,
1220            crate::types::SkipReason::OversizedFrame
1221        );
1222    }
1223
1224    #[test]
1225    fn stats_oversized_frame_mid_stream() {
1226        let mut data = Vec::new();
1227        data.extend_from_slice(
1228            b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B\n",
1229        );
1230        let garbage_len = MAX_PARTIAL_FRAME + 1;
1231        data.resize(data.len() + garbage_len, b'x');
1232        data.extend_from_slice(b"\x0B\n");
1233        data.extend_from_slice(b"sent 3 bytes to tcp/3.3.3.3:5060 at 02:00:00.000000:\nbar\x0B\n");
1234        let mut iter = FrameIterator::new(&data[..]).skip_tracking(SkipTracking::TrackRegions);
1235        let items: Vec<Result<Frame, ParseError>> = iter.by_ref().collect();
1236        let frames: Vec<Frame> = items.into_iter().filter_map(Result::ok).collect();
1237        assert_eq!(frames.len(), 2);
1238        let stats = iter.stats();
1239        assert_eq!(stats.unparsed_regions.len(), 1);
1240        assert_eq!(
1241            stats.unparsed_regions[0].reason,
1242            crate::types::SkipReason::OversizedFrame
1243        );
1244    }
1245
1246    #[test]
1247    fn stats_partial_first_frame_within_limit() {
1248        // Content + \x0B\n boundary = MAX_PARTIAL_FRAME, should still be PartialFirstFrame
1249        let mut data = Vec::new();
1250        data.resize(MAX_PARTIAL_FRAME - 2, b'x');
1251        data.extend_from_slice(b"\x0B\n");
1252        data.extend_from_slice(
1253            b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B\n",
1254        );
1255        let mut iter = FrameIterator::new(&data[..]).skip_tracking(SkipTracking::TrackRegions);
1256        let frames: Vec<Frame> = iter.by_ref().collect::<Result<Vec<_>, _>>().unwrap();
1257        assert_eq!(frames.len(), 1);
1258        let stats = iter.stats();
1259        assert_eq!(stats.unparsed_regions.len(), 1);
1260        assert_eq!(
1261            stats.unparsed_regions[0].reason,
1262            crate::types::SkipReason::PartialFirstFrame
1263        );
1264    }
1265
1266    #[test]
1267    fn stats_dump_restart_marker() {
1268        let mut data = Vec::new();
1269        data.extend_from_slice(
1270            b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B\n",
1271        );
1272        data.extend_from_slice(b"dump started at Thu Aug 22 11:38:11 2024\n\n\n");
1273        data.extend_from_slice(b"sent 3 bytes to tcp/2.2.2.2:5060 at 00:00:01.000000:\nbye\x0B\n");
1274
1275        let mut iter = FrameIterator::new(&data[..]);
1276        let frames: Vec<Frame> = iter.by_ref().collect::<Result<Vec<_>, _>>().unwrap();
1277        assert_eq!(frames.len(), 2);
1278        let stats = iter.stats();
1279        // Dump restart marker is structural, not skipped
1280        assert_eq!(stats.bytes_skipped, 0);
1281        assert!(stats.unparsed_regions.is_empty());
1282    }
1283
1284    #[test]
1285    fn stats_track_regions_no_data() {
1286        let mut data = Vec::new();
1287        data.extend_from_slice(b"partial garbage data");
1288        data.extend_from_slice(b"\x0B\n");
1289        data.extend_from_slice(
1290            b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B\n",
1291        );
1292        let mut iter = FrameIterator::new(&data[..]).skip_tracking(SkipTracking::TrackRegions);
1293        let _: Vec<_> = iter.by_ref().collect::<Result<Vec<_>, _>>().unwrap();
1294        let stats = iter.stats();
1295        assert_eq!(stats.unparsed_regions.len(), 1);
1296        assert!(stats.unparsed_regions[0].data.is_none());
1297    }
1298
1299    #[test]
1300    fn stats_count_only_no_regions() {
1301        let mut data = Vec::new();
1302        data.extend_from_slice(b"partial garbage data");
1303        data.extend_from_slice(b"\x0B\n");
1304        data.extend_from_slice(
1305            b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B\n",
1306        );
1307        let mut iter = FrameIterator::new(&data[..]);
1308        let frames: Vec<_> = iter.by_ref().collect::<Result<Vec<_>, _>>().unwrap();
1309        assert_eq!(frames.len(), 1);
1310        let stats = iter.stats();
1311        let skipped = b"partial garbage data\x0B\n".len() as u64;
1312        assert_eq!(stats.bytes_skipped, skipped);
1313        assert!(
1314            stats.unparsed_regions.is_empty(),
1315            "CountOnly should not accumulate regions"
1316        );
1317    }
1318
1319    #[test]
1320    fn frame_iterator_trailing_newlines_at_eof() {
1321        // Trailing newlines after the last boundary at EOF should not cause errors.
1322        let mut data = Vec::new();
1323        data.extend_from_slice(
1324            b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B\n",
1325        );
1326        data.extend_from_slice(b"\n\n");
1327
1328        let frames: Vec<Frame> = FrameIterator::new(&data[..])
1329            .collect::<Result<Vec<_>, _>>()
1330            .unwrap();
1331        assert_eq!(frames.len(), 1);
1332        assert_eq!(frames[0].content, b"hello");
1333    }
1334}