Skip to main content

freeswitch_sofia_trace_parser/
frame.rs

1use std::io::Read;
2
3use memchr::memmem;
4use tracing::{debug, info, trace, warn};
5
6use crate::types::{
7    Direction, Frame, ParseStats, SkipReason, SkipTracking, Timestamp, Transport, UnparsedRegion,
8};
9
10const RECV_PREFIX: &[u8] = b"recv ";
11const SENT_PREFIX: &[u8] = b"sent ";
12/// Maximum skip size classified as a partial first frame.
13/// Based on IP max datagram size (65535) plus the `\x0B\n` boundary (2 bytes).
14const MAX_PARTIAL_FRAME: usize = 65537;
15
16/// Errors produced during frame parsing (Level 1) and SIP parsing (Level 3).
17///
18/// Returned as `Iterator::Item = Result<T, ParseError>`. The caller decides
19/// whether to skip, log, or fail on each error.
20#[derive(Debug)]
21pub enum ParseError {
22    /// Frame header is malformed (e.g., missing colon, bad timestamp).
23    InvalidHeader(String),
24    /// Reassembled content is not a valid SIP message.
25    InvalidMessage(String),
26    /// Whitespace-only content from TLS/TCP keep-alive probes (RFC 5626).
27    /// Not a parse failure; can be safely ignored.
28    TransportNoise {
29        /// Number of whitespace bytes.
30        bytes: usize,
31        /// Transport of the connection that produced the noise.
32        transport: Transport,
33        /// Remote address of the connection.
34        address: String,
35    },
36    /// Underlying reader returned an I/O error.
37    Io(std::io::Error),
38}
39
40impl fmt::Display for ParseError {
41    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
42        match self {
43            ParseError::InvalidHeader(msg) => write!(f, "invalid frame header: {msg}"),
44            ParseError::InvalidMessage(msg) => write!(f, "invalid SIP message: {msg}"),
45            ParseError::TransportNoise {
46                bytes,
47                transport,
48                address,
49            } => write!(
50                f,
51                "transport noise: {bytes} bytes of non-SIP data from {transport}/{address}"
52            ),
53            ParseError::Io(e) => write!(f, "I/O error: {e}"),
54        }
55    }
56}
57
58impl std::error::Error for ParseError {
59    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
60        match self {
61            ParseError::Io(e) => Some(e),
62            _ => None,
63        }
64    }
65}
66
67impl From<std::io::Error> for ParseError {
68    fn from(e: std::io::Error) -> Self {
69        ParseError::Io(e)
70    }
71}
72
73use std::fmt;
74
75fn digit(b: u8) -> Option<u8> {
76    match b {
77        b'0'..=b'9' => Some(b - b'0'),
78        _ => None,
79    }
80}
81
82fn parse_u8(bytes: &[u8]) -> Option<u8> {
83    if bytes.is_empty() || bytes.len() > 3 {
84        return None;
85    }
86    let mut val: u8 = 0;
87    for &b in bytes {
88        val = val.checked_mul(10)?.checked_add(digit(b)?)?;
89    }
90    Some(val)
91}
92
93fn parse_u16(bytes: &[u8]) -> Option<u16> {
94    if bytes.is_empty() || bytes.len() > 5 {
95        return None;
96    }
97    let mut val: u16 = 0;
98    for &b in bytes {
99        val = val.checked_mul(10)?.checked_add(u16::from(digit(b)?))?;
100    }
101    Some(val)
102}
103
104fn parse_u32(bytes: &[u8]) -> Option<u32> {
105    if bytes.is_empty() || bytes.len() > 10 {
106        return None;
107    }
108    let mut val: u32 = 0;
109    for &b in bytes {
110        val = val.checked_mul(10)?.checked_add(u32::from(digit(b)?))?;
111    }
112    Some(val)
113}
114
115fn parse_usize(bytes: &[u8]) -> Option<usize> {
116    if bytes.is_empty() || bytes.len() > 10 {
117        return None;
118    }
119    let mut val: usize = 0;
120    for &b in bytes {
121        val = val.checked_mul(10)?.checked_add(usize::from(digit(b)?))?;
122    }
123    Some(val)
124}
125
126/// Parse timestamp from bytes: either `HH:MM:SS.usec` or `YYYY-MM-DD HH:MM:SS.usec`
127fn parse_timestamp(bytes: &[u8]) -> Option<Timestamp> {
128    // Try full datetime first: YYYY-MM-DD HH:MM:SS.usec (min 26 bytes)
129    if bytes.len() >= 26 && bytes[4] == b'-' && bytes[7] == b'-' && bytes[10] == b' ' {
130        let year = parse_u16(&bytes[0..4])?;
131        let month = parse_u8(&bytes[5..7])?;
132        let day = parse_u8(&bytes[8..10])?;
133        let ts = parse_time_part(&bytes[11..])?;
134        return Some(Timestamp::DateTime {
135            year,
136            month,
137            day,
138            hour: ts.0,
139            min: ts.1,
140            sec: ts.2,
141            usec: ts.3,
142        });
143    }
144    // Time-only: HH:MM:SS.usec (min 15 bytes)
145    let (hour, min, sec, usec) = parse_time_part(bytes)?;
146    Some(Timestamp::TimeOnly {
147        hour,
148        min,
149        sec,
150        usec,
151    })
152}
153
154/// Parse `HH:MM:SS.usec` from bytes, returns (hour, min, sec, usec)
155fn parse_time_part(bytes: &[u8]) -> Option<(u8, u8, u8, u32)> {
156    if bytes.len() < 15 {
157        return None;
158    }
159    if bytes[2] != b':' || bytes[5] != b':' || bytes[8] != b'.' {
160        return None;
161    }
162    let hour = parse_u8(&bytes[0..2])?;
163    let min = parse_u8(&bytes[3..5])?;
164    let sec = parse_u8(&bytes[6..8])?;
165    let usec = parse_u32(&bytes[9..15])?;
166    Some((hour, min, sec, usec))
167}
168
169/// Parse a frame header line from `&[u8]`.
170///
171/// Expected format:
172/// `(recv|sent) <N> bytes (from|to) <transport>/<address> at <timestamp>:\n`
173///
174/// Returns `(Frame header fields, header_len)` where header_len includes the trailing `\n`.
175pub fn parse_frame_header(
176    data: &[u8],
177) -> Result<(Direction, usize, Transport, String, Timestamp, usize), ParseError> {
178    let newline_pos = memchr::memchr(b'\n', data)
179        .ok_or_else(|| ParseError::InvalidHeader("no newline in header".into()))?;
180    let line = &data[..newline_pos];
181    // Strip trailing \r if present
182    let line = line.strip_suffix(b"\r").unwrap_or(line);
183    // Must end with ':'
184    let line = line
185        .strip_suffix(b":")
186        .ok_or_else(|| ParseError::InvalidHeader("header does not end with ':'".into()))?;
187
188    // Direction — both "recv " and "sent " are 5 bytes
189    let direction = if line.starts_with(RECV_PREFIX) {
190        Direction::Recv
191    } else if line.starts_with(SENT_PREFIX) {
192        Direction::Sent
193    } else {
194        return Err(ParseError::InvalidHeader(
195            "expected 'recv' or 'sent'".into(),
196        ));
197    };
198    let mut pos = 5;
199
200    // Byte count: digits until ' '
201    let space = memchr::memchr(b' ', &line[pos..])
202        .ok_or_else(|| ParseError::InvalidHeader("no space after byte count".into()))?;
203    let byte_count = parse_usize(&line[pos..pos + space])
204        .ok_or_else(|| ParseError::InvalidHeader("invalid byte count".into()))?;
205    pos += space + 1;
206
207    // " bytes from/to "
208    let expected_recv = b"bytes from ";
209    let expected_sent = b"bytes to ";
210    if direction == Direction::Recv {
211        if !line[pos..].starts_with(expected_recv) {
212            return Err(ParseError::InvalidHeader("expected 'bytes from '".into()));
213        }
214        pos += expected_recv.len();
215    } else {
216        if !line[pos..].starts_with(expected_sent) {
217            return Err(ParseError::InvalidHeader("expected 'bytes to '".into()));
218        }
219        pos += expected_sent.len();
220    }
221
222    // Transport: tcp/ udp/ tls/ wss/
223    let transport = if line[pos..].starts_with(b"tcp/") {
224        pos += 4;
225        Transport::Tcp
226    } else if line[pos..].starts_with(b"udp/") {
227        pos += 4;
228        Transport::Udp
229    } else if line[pos..].starts_with(b"tls/") {
230        pos += 4;
231        Transport::Tls
232    } else if line[pos..].starts_with(b"wss/") {
233        pos += 4;
234        Transport::Wss
235    } else {
236        return Err(ParseError::InvalidHeader("unknown transport".into()));
237    };
238
239    // Address: until " at "
240    let at_marker = b" at ";
241    let at_pos = memmem::find(&line[pos..], at_marker)
242        .ok_or_else(|| ParseError::InvalidHeader("no ' at ' in header".into()))?;
243    let address = String::from_utf8_lossy(&line[pos..pos + at_pos]).into_owned();
244    pos += at_pos + at_marker.len();
245
246    // Timestamp: rest of line (after stripping trailing ':' already done)
247    let timestamp = parse_timestamp(&line[pos..])
248        .ok_or_else(|| ParseError::InvalidHeader("invalid timestamp".into()))?;
249
250    Ok((
251        direction,
252        byte_count,
253        transport,
254        address,
255        timestamp,
256        newline_pos + 1,
257    ))
258}
259
260/// Check if data at given position looks like a valid frame header start.
261/// Used to validate `\x0B\n` boundaries.
262pub fn is_frame_header(data: &[u8]) -> bool {
263    if data.len() < 20 {
264        return false;
265    }
266    let starts_valid = data.starts_with(RECV_PREFIX) || data.starts_with(SENT_PREFIX);
267    if !starts_valid {
268        return false;
269    }
270    // Check that after direction there are digits followed by " bytes "
271    let rest = &data[5..];
272    let space = match memchr::memchr(b' ', rest) {
273        Some(p) => p,
274        None => return false,
275    };
276    if space == 0 || space > 10 {
277        return false;
278    }
279    for &b in &rest[..space] {
280        if !b.is_ascii_digit() {
281            return false;
282        }
283    }
284    rest[space..].starts_with(b" bytes ")
285}
286
287const READ_BUF_SIZE: usize = 32 * 1024;
288
289/// Level 1 streaming parser: splits raw dump bytes into [`Frame`]s.
290///
291/// Reads from any [`Read`] source and yields frames delimited by `\x0B\n`
292/// boundaries. Handles truncated first/last frames, file concatenation,
293/// and garbage recovery.
294///
295/// # Example
296///
297/// ```no_run
298/// use std::fs::File;
299/// use freeswitch_sofia_trace_parser::FrameIterator;
300///
301/// let file = File::open("profile.dump").unwrap();
302/// for frame in FrameIterator::new(file) {
303///     let frame = frame.unwrap();
304///     println!("{} {} bytes {} {}",
305///         frame.timestamp, frame.byte_count, frame.direction, frame.address);
306/// }
307/// ```
308pub struct FrameIterator<R> {
309    reader: R,
310    buf: Vec<u8>,
311    eof: bool,
312    frame_count: u64,
313    offset: u64,
314    stats: ParseStats,
315    skip_tracking: SkipTracking,
316}
317
318impl<R: Read> FrameIterator<R> {
319    /// Create a new frame iterator reading from the given source.
320    pub fn new(reader: R) -> Self {
321        FrameIterator {
322            reader,
323            buf: Vec::with_capacity(READ_BUF_SIZE * 2),
324            eof: false,
325            frame_count: 0,
326            offset: 0,
327            stats: ParseStats::default(),
328            skip_tracking: SkipTracking::CountOnly,
329        }
330    }
331
332    /// Enable capturing of skipped bytes (shorthand for
333    /// [`SkipTracking::CaptureData`]).
334    pub fn capture_skipped(mut self, enable: bool) -> Self {
335        if enable {
336            self.skip_tracking = SkipTracking::CaptureData;
337        }
338        self
339    }
340
341    /// Set the level of detail for unparsed region tracking.
342    pub fn skip_tracking(mut self, tracking: SkipTracking) -> Self {
343        self.skip_tracking = tracking;
344        self
345    }
346
347    /// Borrow the accumulated parse statistics.
348    pub fn stats(&self) -> &ParseStats {
349        &self.stats
350    }
351
352    /// Mutably borrow the parse statistics.
353    pub fn stats_mut(&mut self) -> &mut ParseStats {
354        &mut self.stats
355    }
356
357    /// Take all accumulated unparsed regions, leaving the list empty.
358    pub fn drain_unparsed(&mut self) -> Vec<UnparsedRegion> {
359        self.stats.drain_regions()
360    }
361
362    fn consume(&mut self, n: usize) {
363        self.buf.drain(..n);
364        self.offset += n as u64;
365    }
366
367    fn consume_skipped(&mut self, n: usize, reason: SkipReason) {
368        if self.skip_tracking != SkipTracking::CountOnly {
369            let data = if self.skip_tracking == SkipTracking::CaptureData {
370                Some(self.buf[..n].to_vec())
371            } else {
372                None
373            };
374            self.stats.unparsed_regions.push(UnparsedRegion {
375                offset: self.offset,
376                length: n as u64,
377                reason,
378                data,
379            });
380        }
381        self.stats.bytes_skipped += n as u64;
382        self.consume(n);
383    }
384
385    fn fill_buf(&mut self) -> Result<bool, std::io::Error> {
386        if self.eof {
387            return Ok(false);
388        }
389        let old_len = self.buf.len();
390        self.buf.resize(old_len + READ_BUF_SIZE, 0);
391        let n = self.reader.read(&mut self.buf[old_len..])?;
392        self.buf.truncate(old_len + n);
393        if n == 0 {
394            self.eof = true;
395            return Ok(false);
396        }
397        self.stats.bytes_read += n as u64;
398        Ok(true)
399    }
400
401    /// Check if skipped content is a truncated frame from a logrotate file boundary.
402    ///
403    /// Detection: the skipped content ends with `\r\n\r\n\x0B\n` — the SIP header/body
404    /// terminator followed by the frame boundary marker. This pattern indicates the tail
405    /// of a SIP frame that was split across logrotated dump files.
406    fn is_replay(&self, skipped: &[u8]) -> bool {
407        if self.frame_count == 0 {
408            return false;
409        }
410        skipped.ends_with(b"\r\n\r\n\x0B\n")
411    }
412
413    /// Find the next `\x0B\n` boundary that is followed by a valid frame header.
414    fn find_boundary(&self, start: usize) -> Option<usize> {
415        let finder = memmem::Finder::new(b"\x0B\n");
416        let mut search_from = start;
417        loop {
418            let pos = finder.find(&self.buf[search_from..])?;
419            let abs_pos = search_from + pos;
420            let after = abs_pos + 2;
421            if after >= self.buf.len() {
422                // Boundary at very end — could be real, but we can't validate header yet
423                // If EOF, accept it as boundary (content ends at \x0B)
424                if self.eof {
425                    return Some(abs_pos);
426                }
427                return None; // Need more data
428            }
429            if is_frame_header(&self.buf[after..]) {
430                return Some(abs_pos);
431            }
432            // \x0B\n in content, not a boundary — skip past it
433            trace!(
434                offset = abs_pos,
435                "found \\x0B\\n in content (not a boundary), skipping"
436            );
437            search_from = abs_pos + 2;
438        }
439    }
440
441    /// Skip to the first valid frame header in the buffer (for partial first frames).
442    fn skip_to_first_header(&mut self) -> Option<usize> {
443        if is_frame_header(&self.buf) {
444            return Some(0);
445        }
446        // Look for \x0B\n followed by a valid header
447        let finder = memmem::Finder::new(b"\x0B\n");
448        let mut search_from = 0;
449        loop {
450            if let Some(pos) = finder.find(&self.buf[search_from..]) {
451                let abs_pos = search_from + pos;
452                let after = abs_pos + 2;
453                if after < self.buf.len() && is_frame_header(&self.buf[after..]) {
454                    info!(skipped_bytes = after, "skipped partial first frame");
455                    return Some(after);
456                }
457                search_from = abs_pos + 2;
458            } else {
459                return None;
460            }
461        }
462    }
463}
464
465impl<R: Read> Iterator for FrameIterator<R> {
466    type Item = Result<Frame, ParseError>;
467
468    fn next(&mut self) -> Option<Self::Item> {
469        // Ensure we have data
470        if self.buf.is_empty() && !self.eof {
471            if let Err(e) = self.fill_buf() {
472                return Some(Err(ParseError::Io(e)));
473            }
474        }
475
476        if self.buf.is_empty() {
477            return None;
478        }
479
480        // On first call, skip to first valid header if needed
481        if self.frame_count == 0 {
482            loop {
483                match self.skip_to_first_header() {
484                    Some(offset) => {
485                        if offset > 0 {
486                            let reason = if offset <= MAX_PARTIAL_FRAME {
487                                SkipReason::PartialFirstFrame
488                            } else {
489                                SkipReason::OversizedFrame
490                            };
491                            self.consume_skipped(offset, reason);
492                        }
493                        break;
494                    }
495                    None => {
496                        if self.eof {
497                            debug!("no valid frame header found in entire input");
498                            let remaining = self.buf.len();
499                            if remaining > 0 {
500                                self.consume_skipped(remaining, SkipReason::InvalidHeader);
501                            }
502                            return None;
503                        }
504                        if let Err(e) = self.fill_buf() {
505                            return Some(Err(ParseError::Io(e)));
506                        }
507                    }
508                }
509            }
510        }
511
512        if self.buf.is_empty() {
513            return None;
514        }
515
516        // Strip inter-frame newline padding (\n or \r\n between frames)
517        let mut strip = 0;
518        while strip < self.buf.len() {
519            if self.buf[strip] == b'\n' {
520                strip += 1;
521            } else if strip + 1 < self.buf.len()
522                && self.buf[strip] == b'\r'
523                && self.buf[strip + 1] == b'\n'
524            {
525                strip += 2;
526            } else {
527                break;
528            }
529        }
530        if strip > 0 {
531            self.consume(strip);
532            if self.buf.is_empty() {
533                return self.next();
534            }
535        }
536
537        // Parse frame header — may need more data if header spans buffer boundary
538        let (direction, byte_count, transport, address, timestamp, header_len) = loop {
539            match parse_frame_header(&self.buf) {
540                Ok(h) => break h,
541                Err(ParseError::InvalidHeader(ref msg)) if msg == "no newline in header" => {
542                    if self.eof {
543                        debug!("truncated frame header at EOF");
544                        let remaining = self.buf.len();
545                        if remaining > 0 {
546                            self.consume_skipped(remaining, SkipReason::InvalidHeader);
547                        }
548                        return None;
549                    }
550                    if let Err(e) = self.fill_buf() {
551                        return Some(Err(ParseError::Io(e)));
552                    }
553                }
554                Err(e) => {
555                    let header_preview: String = self
556                        .buf
557                        .iter()
558                        .take(200)
559                        .take_while(|&&b| b != b'\n')
560                        .map(|&b| {
561                            if b.is_ascii_graphic() || b == b' ' {
562                                b as char
563                            } else {
564                                '.'
565                            }
566                        })
567                        .collect();
568                    if header_preview.starts_with("dump started at ") {
569                        let skip = memchr::memchr(b'\n', &self.buf)
570                            .map(|p| {
571                                let mut end = p + 1;
572                                while end < self.buf.len() && self.buf[end] == b'\n' {
573                                    end += 1;
574                                }
575                                end
576                            })
577                            .unwrap_or(self.buf.len());
578                        debug!(
579                            header = %header_preview,
580                            skipped_bytes = skip,
581                            "skipped dump restart marker",
582                        );
583                        self.consume(skip);
584                        return self.next();
585                    }
586                    let skip = if let Some(b) = self.find_boundary(0) {
587                        b + 2
588                    } else {
589                        memchr::memchr(b'\n', &self.buf)
590                            .map(|p| p + 1)
591                            .unwrap_or(self.buf.len())
592                    };
593                    let reason =
594                        if self.buf.starts_with(RECV_PREFIX) || self.buf.starts_with(SENT_PREFIX) {
595                            SkipReason::InvalidHeader
596                        } else if skip > MAX_PARTIAL_FRAME {
597                            SkipReason::OversizedFrame
598                        } else if self.frame_count == 0 {
599                            SkipReason::PartialFirstFrame
600                        } else {
601                            let skipped = &self.buf[..skip];
602                            if self.is_replay(skipped) {
603                                SkipReason::ReplayedFrame
604                            } else {
605                                SkipReason::MidStreamSkip
606                            }
607                        };
608                    self.consume_skipped(skip, reason);
609                    return Some(Err(e));
610                }
611            }
612        };
613
614        let content_start = header_len;
615        let expected_end = content_start + byte_count;
616
617        // Find the boundary for this frame.
618        // Strategy: first check at the expected position (content_start + byte_count),
619        // then fall back to scanning. This handles file concatenation where \x0B\n
620        // is followed by garbage from the next file's truncated first frame.
621        loop {
622            // Ensure we have enough data to check the expected position
623            while self.buf.len() <= expected_end + 1 && !self.eof {
624                if let Err(e) = self.fill_buf() {
625                    return Some(Err(ParseError::Io(e)));
626                }
627            }
628
629            // Check at expected position first (byte_count hint)
630            if expected_end < self.buf.len() && self.buf[expected_end] == 0x0B {
631                let has_newline =
632                    expected_end + 1 < self.buf.len() && self.buf[expected_end + 1] == b'\n';
633                let at_eof = expected_end + 1 >= self.buf.len() && self.eof;
634
635                if has_newline || at_eof {
636                    let content = self.buf[content_start..expected_end].to_vec();
637                    let drain_to = if has_newline {
638                        expected_end + 2
639                    } else {
640                        expected_end + 1
641                    };
642                    self.consume(drain_to);
643                    self.frame_count += 1;
644                    return Some(Ok(Frame {
645                        direction,
646                        byte_count,
647                        transport,
648                        address,
649                        timestamp,
650                        content,
651                    }));
652                }
653            }
654
655            // Fall back to scanning for \x0B\n + valid header
656            if let Some(boundary_pos) = self.find_boundary(content_start) {
657                let content = self.buf[content_start..boundary_pos].to_vec();
658                let drain_to = boundary_pos + 2;
659                self.consume(drain_to);
660                self.frame_count += 1;
661
662                if content.len() != byte_count {
663                    debug!(
664                        frame = self.frame_count,
665                        expected = byte_count,
666                        actual = content.len(),
667                        "frame content size mismatch"
668                    );
669                }
670
671                return Some(Ok(Frame {
672                    direction,
673                    byte_count,
674                    transport,
675                    address,
676                    timestamp,
677                    content,
678                }));
679            }
680
681            if self.eof {
682                // Last frame — no trailing \x0B\n
683                let end = if self.buf.last() == Some(&0x0B) {
684                    self.buf.len() - 1
685                } else {
686                    self.buf.len()
687                };
688                let content = self.buf[content_start..end].to_vec();
689                let len = self.buf.len();
690                self.consume(len);
691                self.frame_count += 1;
692
693                if content.len() < byte_count {
694                    let missing = byte_count - content.len();
695                    warn!(
696                        frame = self.frame_count,
697                        expected = byte_count,
698                        actual = content.len(),
699                        missing,
700                        "incomplete frame at EOF"
701                    );
702                    if self.skip_tracking != SkipTracking::CountOnly {
703                        self.stats.unparsed_regions.push(UnparsedRegion {
704                            offset: self.offset,
705                            length: missing as u64,
706                            reason: SkipReason::IncompleteFrame,
707                            data: None,
708                        });
709                    }
710                } else if content.len() != byte_count {
711                    debug!(
712                        frame = self.frame_count,
713                        expected = byte_count,
714                        actual = content.len(),
715                        "last frame content size mismatch"
716                    );
717                }
718
719                return Some(Ok(Frame {
720                    direction,
721                    byte_count,
722                    transport,
723                    address,
724                    timestamp,
725                    content,
726                }));
727            }
728
729            if let Err(e) = self.fill_buf() {
730                return Some(Err(ParseError::Io(e)));
731            }
732        }
733    }
734}
735
736#[cfg(test)]
737mod tests {
738    use super::*;
739    use crate::types::SkipTracking;
740
741    #[test]
742    fn parse_recv_ipv4_tcp() {
743        let header = b"recv 100 bytes from tcp/192.168.1.1:5060 at 00:00:01.350874:\n";
744        let (dir, count, transport, addr, ts, len) = parse_frame_header(header).unwrap();
745        assert_eq!(dir, Direction::Recv);
746        assert_eq!(count, 100);
747        assert_eq!(transport, Transport::Tcp);
748        assert_eq!(addr, "192.168.1.1:5060");
749        assert_eq!(
750            ts,
751            Timestamp::TimeOnly {
752                hour: 0,
753                min: 0,
754                sec: 1,
755                usec: 350874
756            }
757        );
758        assert_eq!(len, header.len());
759    }
760
761    #[test]
762    fn parse_recv_ipv6_tcp() {
763        let header = b"recv 1440 bytes from tcp/[2001:4958:10:14::4]:30046 at 13:03:21.674883:\n";
764        let (dir, count, transport, addr, ts, _) = parse_frame_header(header).unwrap();
765        assert_eq!(dir, Direction::Recv);
766        assert_eq!(count, 1440);
767        assert_eq!(transport, Transport::Tcp);
768        assert_eq!(addr, "[2001:4958:10:14::4]:30046");
769        assert_eq!(
770            ts,
771            Timestamp::TimeOnly {
772                hour: 13,
773                min: 3,
774                sec: 21,
775                usec: 674883
776            }
777        );
778    }
779
780    #[test]
781    fn parse_sent_ipv6_tcp() {
782        let header = b"sent 681 bytes to tcp/[2001:4958:10:14::4]:30046 at 13:03:21.675500:\n";
783        let (dir, count, transport, addr, _, _) = parse_frame_header(header).unwrap();
784        assert_eq!(dir, Direction::Sent);
785        assert_eq!(count, 681);
786        assert_eq!(transport, Transport::Tcp);
787        assert_eq!(addr, "[2001:4958:10:14::4]:30046");
788    }
789
790    #[test]
791    fn parse_recv_udp() {
792        let header = b"recv 457 bytes from udp/10.0.0.1:5060 at 00:19:47.123456:\n";
793        let (dir, _, transport, _, _, _) = parse_frame_header(header).unwrap();
794        assert_eq!(dir, Direction::Recv);
795        assert_eq!(transport, Transport::Udp);
796    }
797
798    #[test]
799    fn parse_sent_tls() {
800        let header = b"sent 500 bytes to tls/10.0.0.1:5061 at 12:00:00.000000:\n";
801        let (dir, count, transport, _, _, _) = parse_frame_header(header).unwrap();
802        assert_eq!(dir, Direction::Sent);
803        assert_eq!(count, 500);
804        assert_eq!(transport, Transport::Tls);
805    }
806
807    #[test]
808    fn parse_full_datetime_timestamp() {
809        let header = b"recv 100 bytes from tcp/192.168.1.1:5060 at 2026-02-01 10:00:00.000000:\n";
810        let (_, _, _, _, ts, _) = parse_frame_header(header).unwrap();
811        assert_eq!(
812            ts,
813            Timestamp::DateTime {
814                year: 2026,
815                month: 2,
816                day: 1,
817                hour: 10,
818                min: 0,
819                sec: 0,
820                usec: 0
821            }
822        );
823    }
824
825    #[test]
826    fn parse_invalid_header() {
827        assert!(parse_frame_header(b"invalid header\n").is_err());
828        assert!(
829            parse_frame_header(b"recv abc bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\n")
830                .is_err()
831        );
832    }
833
834    #[test]
835    fn is_frame_header_valid() {
836        assert!(is_frame_header(
837            b"recv 100 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\n"
838        ));
839        assert!(is_frame_header(
840            b"sent 681 bytes to tcp/[::1]:5060 at 00:00:00.000000:\n"
841        ));
842        assert!(!is_frame_header(b"not a header"));
843        assert!(!is_frame_header(b"recv abc bytes"));
844        assert!(!is_frame_header(b""));
845    }
846
847    #[test]
848    fn frame_iterator_single_frame() {
849        let data = b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B\n";
850        let frames: Vec<Frame> = FrameIterator::new(&data[..])
851            .collect::<Result<Vec<_>, _>>()
852            .unwrap();
853        assert_eq!(frames.len(), 1);
854        assert_eq!(frames[0].content, b"hello");
855        assert_eq!(frames[0].byte_count, 5);
856    }
857
858    #[test]
859    fn frame_iterator_multiple_frames() {
860        let data = b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B\nsent 5 bytes to tcp/1.1.1.1:5060 at 00:00:00.000001:\nworld\x0B\n";
861        let frames: Vec<Frame> = FrameIterator::new(&data[..])
862            .collect::<Result<Vec<_>, _>>()
863            .unwrap();
864        assert_eq!(frames.len(), 2);
865        assert_eq!(frames[0].content, b"hello");
866        assert_eq!(frames[0].direction, Direction::Recv);
867        assert_eq!(frames[1].content, b"world");
868        assert_eq!(frames[1].direction, Direction::Sent);
869    }
870
871    #[test]
872    fn frame_iterator_vt_in_content() {
873        // \x0B in content but not followed by valid header — should NOT split
874        let mut data = Vec::new();
875        data.extend_from_slice(b"recv 15 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\n");
876        data.extend_from_slice(b"he\x0B\nllo world!!");
877        data.extend_from_slice(b"\x0B\n");
878        let frames: Vec<Frame> = FrameIterator::new(&data[..])
879            .collect::<Result<Vec<_>, _>>()
880            .unwrap();
881        assert_eq!(frames.len(), 1);
882        assert_eq!(frames[0].content, b"he\x0B\nllo world!!");
883    }
884
885    #[test]
886    fn frame_iterator_eof_without_boundary() {
887        let data = b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello";
888        let frames: Vec<Frame> = FrameIterator::new(&data[..])
889            .collect::<Result<Vec<_>, _>>()
890            .unwrap();
891        assert_eq!(frames.len(), 1);
892        assert_eq!(frames[0].content, b"hello");
893    }
894
895    #[test]
896    fn frame_iterator_eof_with_lone_vt() {
897        let data = b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B";
898        let frames: Vec<Frame> = FrameIterator::new(&data[..])
899            .collect::<Result<Vec<_>, _>>()
900            .unwrap();
901        assert_eq!(frames.len(), 1);
902        assert_eq!(frames[0].content, b"hello");
903    }
904
905    #[test]
906    fn frame_iterator_partial_first_frame() {
907        // Data starts with garbage, then a valid boundary + frame
908        let mut data = Vec::new();
909        data.extend_from_slice(b"partial garbage data");
910        data.extend_from_slice(b"\x0B\n");
911        data.extend_from_slice(
912            b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B\n",
913        );
914        let frames: Vec<Frame> = FrameIterator::new(&data[..])
915            .collect::<Result<Vec<_>, _>>()
916            .unwrap();
917        assert_eq!(frames.len(), 1);
918        assert_eq!(frames[0].content, b"hello");
919    }
920
921    #[test]
922    fn frame_iterator_truncated_last_frame() {
923        // Complete frame followed by truncated frame at EOF (no \x0B\n)
924        let mut data = Vec::new();
925        data.extend_from_slice(
926            b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B\n",
927        );
928        data.extend_from_slice(b"sent 3 bytes to tcp/1.1.1.1:5060 at 00:00:01.000000:\nbye");
929        let frames: Vec<Frame> = FrameIterator::new(&data[..])
930            .collect::<Result<Vec<_>, _>>()
931            .unwrap();
932        assert_eq!(frames.len(), 2);
933        assert_eq!(frames[0].content, b"hello");
934        assert_eq!(frames[1].content, b"bye");
935    }
936
937    #[test]
938    fn frame_iterator_file_concatenation() {
939        // Simulates `cat dump.20 dump.21 | parser`
940        // File 1: truncated start + valid frame + complete end
941        // File 2: truncated start (no header) + valid frame
942        let mut data = Vec::new();
943
944        // File 1: starts with valid header
945        data.extend_from_slice(
946            b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B\n",
947        );
948        data.extend_from_slice(
949            b"sent 5 bytes to tcp/1.1.1.1:5060 at 00:00:00.000001:\nworld\x0B\n",
950        );
951
952        // File 2: starts with truncated frame data (no header), then boundary, then valid frame
953        data.extend_from_slice(b"some truncated SIP content from previous rotation\r\n\r\n");
954        data.extend_from_slice(b"\x0B\n");
955        data.extend_from_slice(
956            b"recv 3 bytes from tcp/2.2.2.2:5060 at 01:00:00.000000:\nfoo\x0B\n",
957        );
958
959        let items: Vec<Result<Frame, ParseError>> = FrameIterator::new(&data[..]).collect();
960        let frames: Vec<Frame> = items.into_iter().filter_map(Result::ok).collect();
961        assert_eq!(frames.len(), 3);
962        assert_eq!(frames[0].content, b"hello");
963        assert_eq!(frames[1].content, b"world");
964        assert_eq!(frames[2].content, b"foo");
965        assert_eq!(frames[2].address, "2.2.2.2:5060");
966    }
967
968    #[test]
969    fn frame_iterator_file_concatenation_mid_stream_garbage() {
970        // The join point between files produces garbage that looks like:
971        // ...last_content\x0B\ntruncated_first_of_next_file\x0B\nvalid_header...
972        // The truncated part is NOT a valid header, so recovery should skip it
973        let mut data = Vec::new();
974
975        // Last frame of file 1
976        data.extend_from_slice(
977            b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B\n",
978        );
979
980        // Truncated first frame of file 2 (mid-SIP content, no frame header)
981        data.extend_from_slice(b"Content-Type: application/sdp\r\n\r\nv=0\r\n");
982        data.extend_from_slice(b"\x0B\n");
983
984        // Valid second frame of file 2
985        data.extend_from_slice(b"sent 3 bytes to tcp/3.3.3.3:5060 at 02:00:00.000000:\nbar\x0B\n");
986
987        let items: Vec<Result<Frame, ParseError>> = FrameIterator::new(&data[..]).collect();
988        let frames: Vec<Frame> = items.into_iter().filter_map(Result::ok).collect();
989        assert_eq!(frames.len(), 2);
990        assert_eq!(frames[0].content, b"hello");
991        assert_eq!(frames[1].content, b"bar");
992    }
993
994    #[test]
995    fn frame_iterator_empty_input() {
996        let data: &[u8] = b"";
997        let frames: Vec<Result<Frame, ParseError>> = FrameIterator::new(data).collect();
998        assert!(frames.is_empty());
999    }
1000
1001    #[test]
1002    fn frame_iterator_only_garbage() {
1003        let data = b"this is not a SIP trace dump at all, just garbage text";
1004        let mut iter = FrameIterator::new(&data[..]).skip_tracking(SkipTracking::TrackRegions);
1005        let frames: Vec<Result<Frame, ParseError>> = iter.by_ref().collect();
1006        assert!(frames.is_empty());
1007        let stats = iter.stats();
1008        assert_eq!(stats.bytes_read, data.len() as u64);
1009        assert_eq!(stats.bytes_skipped, data.len() as u64);
1010        assert_eq!(stats.unparsed_regions.len(), 1);
1011        assert_eq!(stats.unparsed_regions[0].reason, SkipReason::InvalidHeader);
1012    }
1013
1014    #[test]
1015    fn frame_iterator_truncated_header_at_eof() {
1016        let data = b"recv 5 bytes from tcp/1.1.1.1:5060";
1017        let mut iter = FrameIterator::new(&data[..]).skip_tracking(SkipTracking::TrackRegions);
1018        let frames: Vec<Result<Frame, ParseError>> = iter.by_ref().collect();
1019        assert!(frames.is_empty());
1020        let stats = iter.stats();
1021        assert_eq!(stats.bytes_read, data.len() as u64);
1022        assert_eq!(stats.bytes_skipped, data.len() as u64);
1023    }
1024
1025    #[test]
1026    fn frame_iterator_dump_marker_at_eof() {
1027        // A dump restart marker at the end of input (with trailing \n\n as in real dumps)
1028        // should be silently consumed, not returned as an error.
1029        let mut data = Vec::new();
1030        data.extend_from_slice(
1031            b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B\n",
1032        );
1033        data.extend_from_slice(b"dump started at Thu Aug 22 11:38:11 2024\n\n\n");
1034
1035        let frames: Vec<Result<Frame, ParseError>> = FrameIterator::new(&data[..]).collect();
1036        assert_eq!(frames.len(), 1);
1037        assert!(frames[0].is_ok());
1038        assert_eq!(frames[0].as_ref().unwrap().content, b"hello");
1039    }
1040
1041    #[test]
1042    fn frame_iterator_dump_marker_mid_stream() {
1043        // A dump restart marker between two valid frames (with trailing \n\n as in real dumps)
1044        // should be skipped, and both frames should parse successfully.
1045        let mut data = Vec::new();
1046        data.extend_from_slice(
1047            b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B\n",
1048        );
1049        data.extend_from_slice(b"dump started at Thu Aug 22 11:38:11 2024\n\n\n");
1050        data.extend_from_slice(b"sent 3 bytes to tcp/2.2.2.2:5060 at 00:00:01.000000:\nbye\x0B\n");
1051
1052        let frames: Vec<Result<Frame, ParseError>> = FrameIterator::new(&data[..]).collect();
1053        assert_eq!(frames.len(), 2);
1054        assert_eq!(frames[0].as_ref().unwrap().content, b"hello");
1055        assert_eq!(frames[1].as_ref().unwrap().content, b"bye");
1056    }
1057
1058    #[test]
1059    fn frame_iterator_extra_newline_after_boundary() {
1060        // Some dump files have \x0B\n\n between frames (extra \n after boundary).
1061        // The extra \n should be stripped, not trigger recovery warnings.
1062        let mut data = Vec::new();
1063        data.extend_from_slice(
1064            b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B\n",
1065        );
1066        data.push(b'\n');
1067        data.extend_from_slice(
1068            b"sent 5 bytes to tcp/1.1.1.1:5060 at 00:00:00.000001:\nworld\x0B\n",
1069        );
1070
1071        let frames: Vec<Frame> = FrameIterator::new(&data[..])
1072            .collect::<Result<Vec<_>, _>>()
1073            .unwrap();
1074        assert_eq!(frames.len(), 2);
1075        assert_eq!(frames[0].content, b"hello");
1076        assert_eq!(frames[1].content, b"world");
1077    }
1078
1079    #[test]
1080    fn frame_iterator_multiple_newlines_after_boundary() {
1081        // Multiple \n and \r\n between frames should all be stripped.
1082        let mut data = Vec::new();
1083        data.extend_from_slice(
1084            b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B\n",
1085        );
1086        data.extend_from_slice(b"\n\r\n\n");
1087        data.extend_from_slice(
1088            b"sent 5 bytes to tcp/1.1.1.1:5060 at 00:00:00.000001:\nworld\x0B\n",
1089        );
1090
1091        let frames: Vec<Frame> = FrameIterator::new(&data[..])
1092            .collect::<Result<Vec<_>, _>>()
1093            .unwrap();
1094        assert_eq!(frames.len(), 2);
1095        assert_eq!(frames[0].content, b"hello");
1096        assert_eq!(frames[1].content, b"world");
1097    }
1098
1099    #[test]
1100    fn stats_clean_input() {
1101        let data = b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B\n";
1102        let mut iter = FrameIterator::new(&data[..]);
1103        let frames: Vec<Frame> = iter.by_ref().collect::<Result<Vec<_>, _>>().unwrap();
1104        assert_eq!(frames.len(), 1);
1105        let stats = iter.stats();
1106        assert_eq!(stats.bytes_read, data.len() as u64);
1107        assert_eq!(stats.bytes_skipped, 0);
1108        assert!(stats.unparsed_regions.is_empty());
1109    }
1110
1111    #[test]
1112    fn stats_multiple_frames() {
1113        let data = b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B\nsent 5 bytes to tcp/1.1.1.1:5060 at 00:00:00.000001:\nworld\x0B\n";
1114        let mut iter = FrameIterator::new(&data[..]);
1115        let frames: Vec<Frame> = iter.by_ref().collect::<Result<Vec<_>, _>>().unwrap();
1116        assert_eq!(frames.len(), 2);
1117        let stats = iter.stats();
1118        assert_eq!(stats.bytes_read, data.len() as u64);
1119        assert_eq!(stats.bytes_skipped, 0);
1120        assert!(stats.unparsed_regions.is_empty());
1121    }
1122
1123    #[test]
1124    fn stats_partial_first_frame() {
1125        let mut data = Vec::new();
1126        data.extend_from_slice(b"partial garbage data");
1127        data.extend_from_slice(b"\x0B\n");
1128        data.extend_from_slice(
1129            b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B\n",
1130        );
1131        let mut iter = FrameIterator::new(&data[..]).skip_tracking(SkipTracking::TrackRegions);
1132        let frames: Vec<Frame> = iter.by_ref().collect::<Result<Vec<_>, _>>().unwrap();
1133        assert_eq!(frames.len(), 1);
1134        let stats = iter.stats();
1135        assert_eq!(stats.bytes_read, data.len() as u64);
1136        // "partial garbage data" + "\x0B\n" = 21 bytes skipped
1137        let skipped = b"partial garbage data\x0B\n".len() as u64;
1138        assert_eq!(stats.bytes_skipped, skipped);
1139        assert_eq!(stats.unparsed_regions.len(), 1);
1140        assert_eq!(stats.unparsed_regions[0].offset, 0);
1141        assert_eq!(stats.unparsed_regions[0].length, skipped);
1142        assert_eq!(
1143            stats.unparsed_regions[0].reason,
1144            crate::types::SkipReason::PartialFirstFrame
1145        );
1146        assert!(stats.unparsed_regions[0].data.is_none());
1147    }
1148
1149    #[test]
1150    fn stats_partial_first_frame_capture() {
1151        let mut data = Vec::new();
1152        data.extend_from_slice(b"partial garbage data");
1153        data.extend_from_slice(b"\x0B\n");
1154        data.extend_from_slice(
1155            b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B\n",
1156        );
1157        let mut iter = FrameIterator::new(&data[..]).capture_skipped(true);
1158        let frames: Vec<Frame> = iter.by_ref().collect::<Result<Vec<_>, _>>().unwrap();
1159        assert_eq!(frames.len(), 1);
1160        let stats = iter.stats();
1161        assert_eq!(stats.unparsed_regions.len(), 1);
1162        let region = &stats.unparsed_regions[0];
1163        assert_eq!(
1164            region.data.as_deref(),
1165            Some(b"partial garbage data\x0B\n".as_slice())
1166        );
1167    }
1168
1169    #[test]
1170    fn stats_mid_stream_partial_frame() {
1171        // SIP content between valid frames (file concatenation scenario)
1172        let mut data = Vec::new();
1173        data.extend_from_slice(
1174            b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B\n",
1175        );
1176        data.extend_from_slice(b"Content-Type: application/sdp\r\n\r\nv=0\r\n");
1177        data.extend_from_slice(b"\x0B\n");
1178        data.extend_from_slice(b"sent 3 bytes to tcp/3.3.3.3:5060 at 02:00:00.000000:\nbar\x0B\n");
1179
1180        let mut iter = FrameIterator::new(&data[..]).skip_tracking(SkipTracking::TrackRegions);
1181        let items: Vec<Result<Frame, ParseError>> = iter.by_ref().collect();
1182        let frames: Vec<Frame> = items.into_iter().filter_map(Result::ok).collect();
1183        assert_eq!(frames.len(), 2);
1184        let stats = iter.stats();
1185        assert!(stats.bytes_skipped > 0);
1186        assert_eq!(stats.unparsed_regions.len(), 1);
1187        assert_eq!(
1188            stats.unparsed_regions[0].reason,
1189            crate::types::SkipReason::MidStreamSkip
1190        );
1191    }
1192
1193    #[test]
1194    fn stats_replayed_frame() {
1195        // Simulate logrotate: a frame's tail (SIP headers ending with \r\n\r\n\x0B\n)
1196        // appears between two valid frames at a file boundary.
1197        let frame1 = b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B\n";
1198        let replay = b"Route: <sip:10.0.0.1:5060;lr>\r\nContent-Length: 0\r\n\r\n\x0B\n";
1199        let frame2 = b"sent 3 bytes to tcp/3.3.3.3:5060 at 02:00:00.000000:\nbar\x0B\n";
1200
1201        let mut data = Vec::new();
1202        data.extend_from_slice(frame1);
1203        data.extend_from_slice(replay);
1204        data.extend_from_slice(frame2);
1205
1206        let mut iter = FrameIterator::new(&data[..]).skip_tracking(SkipTracking::TrackRegions);
1207        let items: Vec<Result<Frame, ParseError>> = iter.by_ref().collect();
1208        let frames: Vec<Frame> = items.into_iter().filter_map(Result::ok).collect();
1209        assert_eq!(frames.len(), 2);
1210        let stats = iter.stats();
1211        assert_eq!(stats.unparsed_regions.len(), 1);
1212        assert_eq!(
1213            stats.unparsed_regions[0].reason,
1214            crate::types::SkipReason::ReplayedFrame
1215        );
1216    }
1217
1218    #[test]
1219    fn stats_incomplete_frame_at_eof() {
1220        // Frame header says 100 bytes but only 20 bytes available before EOF
1221        let mut data = Vec::new();
1222        data.extend_from_slice(
1223            b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B\n",
1224        );
1225        data.extend_from_slice(b"recv 100 bytes from tcp/2.2.2.2:5060 at 01:00:00.000000:\n");
1226        data.extend_from_slice(b"partial content only");
1227        // No \x0B\n boundary — EOF truncation
1228
1229        let mut iter = FrameIterator::new(&data[..]).skip_tracking(SkipTracking::TrackRegions);
1230        let items: Vec<Result<Frame, ParseError>> = iter.by_ref().collect();
1231        let frames: Vec<Frame> = items.into_iter().filter_map(Result::ok).collect();
1232        assert_eq!(frames.len(), 2, "truncated frame should still be returned");
1233        assert_eq!(frames[1].content, b"partial content only");
1234        assert_eq!(frames[1].byte_count, 100);
1235        let stats = iter.stats();
1236        assert_eq!(stats.unparsed_regions.len(), 1);
1237        assert_eq!(
1238            stats.unparsed_regions[0].reason,
1239            crate::types::SkipReason::IncompleteFrame
1240        );
1241    }
1242
1243    #[test]
1244    fn stats_invalid_header_skip() {
1245        // Malformed frame header (starts with recv/sent but unparseable)
1246        let mut data = Vec::new();
1247        data.extend_from_slice(
1248            b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B\n",
1249        );
1250        data.extend_from_slice(b"recv CORRUPT HEADER garbage\n");
1251        data.extend_from_slice(b"\x0B\n");
1252        data.extend_from_slice(b"sent 3 bytes to tcp/3.3.3.3:5060 at 02:00:00.000000:\nbar\x0B\n");
1253
1254        let mut iter = FrameIterator::new(&data[..]).skip_tracking(SkipTracking::TrackRegions);
1255        let items: Vec<Result<Frame, ParseError>> = iter.by_ref().collect();
1256        let frames: Vec<Frame> = items.into_iter().filter_map(Result::ok).collect();
1257        assert_eq!(frames.len(), 2);
1258        let stats = iter.stats();
1259        assert!(stats.bytes_skipped > 0);
1260        assert_eq!(stats.unparsed_regions.len(), 1);
1261        assert_eq!(
1262            stats.unparsed_regions[0].reason,
1263            crate::types::SkipReason::InvalidHeader
1264        );
1265    }
1266
1267    #[test]
1268    fn stats_oversized_frame_at_start() {
1269        let mut data = Vec::new();
1270        data.resize(MAX_PARTIAL_FRAME + 1, b'x');
1271        data.extend_from_slice(b"\x0B\n");
1272        data.extend_from_slice(
1273            b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B\n",
1274        );
1275        let mut iter = FrameIterator::new(&data[..]).skip_tracking(SkipTracking::TrackRegions);
1276        let items: Vec<Result<Frame, ParseError>> = iter.by_ref().collect();
1277        let frames: Vec<Frame> = items.into_iter().filter_map(Result::ok).collect();
1278        assert_eq!(frames.len(), 1);
1279        let stats = iter.stats();
1280        assert_eq!(stats.unparsed_regions.len(), 1);
1281        assert_eq!(
1282            stats.unparsed_regions[0].reason,
1283            crate::types::SkipReason::OversizedFrame
1284        );
1285    }
1286
1287    #[test]
1288    fn stats_oversized_frame_mid_stream() {
1289        let mut data = Vec::new();
1290        data.extend_from_slice(
1291            b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B\n",
1292        );
1293        let garbage_len = MAX_PARTIAL_FRAME + 1;
1294        data.resize(data.len() + garbage_len, b'x');
1295        data.extend_from_slice(b"\x0B\n");
1296        data.extend_from_slice(b"sent 3 bytes to tcp/3.3.3.3:5060 at 02:00:00.000000:\nbar\x0B\n");
1297        let mut iter = FrameIterator::new(&data[..]).skip_tracking(SkipTracking::TrackRegions);
1298        let items: Vec<Result<Frame, ParseError>> = iter.by_ref().collect();
1299        let frames: Vec<Frame> = items.into_iter().filter_map(Result::ok).collect();
1300        assert_eq!(frames.len(), 2);
1301        let stats = iter.stats();
1302        assert_eq!(stats.unparsed_regions.len(), 1);
1303        assert_eq!(
1304            stats.unparsed_regions[0].reason,
1305            crate::types::SkipReason::OversizedFrame
1306        );
1307    }
1308
1309    #[test]
1310    fn stats_partial_first_frame_within_limit() {
1311        // Content + \x0B\n boundary = MAX_PARTIAL_FRAME, should still be PartialFirstFrame
1312        let mut data = Vec::new();
1313        data.resize(MAX_PARTIAL_FRAME - 2, b'x');
1314        data.extend_from_slice(b"\x0B\n");
1315        data.extend_from_slice(
1316            b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B\n",
1317        );
1318        let mut iter = FrameIterator::new(&data[..]).skip_tracking(SkipTracking::TrackRegions);
1319        let frames: Vec<Frame> = iter.by_ref().collect::<Result<Vec<_>, _>>().unwrap();
1320        assert_eq!(frames.len(), 1);
1321        let stats = iter.stats();
1322        assert_eq!(stats.unparsed_regions.len(), 1);
1323        assert_eq!(
1324            stats.unparsed_regions[0].reason,
1325            crate::types::SkipReason::PartialFirstFrame
1326        );
1327    }
1328
1329    #[test]
1330    fn stats_dump_restart_marker() {
1331        let mut data = Vec::new();
1332        data.extend_from_slice(
1333            b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B\n",
1334        );
1335        data.extend_from_slice(b"dump started at Thu Aug 22 11:38:11 2024\n\n\n");
1336        data.extend_from_slice(b"sent 3 bytes to tcp/2.2.2.2:5060 at 00:00:01.000000:\nbye\x0B\n");
1337
1338        let mut iter = FrameIterator::new(&data[..]);
1339        let frames: Vec<Frame> = iter.by_ref().collect::<Result<Vec<_>, _>>().unwrap();
1340        assert_eq!(frames.len(), 2);
1341        let stats = iter.stats();
1342        // Dump restart marker is structural, not skipped
1343        assert_eq!(stats.bytes_skipped, 0);
1344        assert!(stats.unparsed_regions.is_empty());
1345    }
1346
1347    #[test]
1348    fn stats_track_regions_no_data() {
1349        let mut data = Vec::new();
1350        data.extend_from_slice(b"partial garbage data");
1351        data.extend_from_slice(b"\x0B\n");
1352        data.extend_from_slice(
1353            b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B\n",
1354        );
1355        let mut iter = FrameIterator::new(&data[..]).skip_tracking(SkipTracking::TrackRegions);
1356        let _: Vec<_> = iter.by_ref().collect::<Result<Vec<_>, _>>().unwrap();
1357        let stats = iter.stats();
1358        assert_eq!(stats.unparsed_regions.len(), 1);
1359        assert!(stats.unparsed_regions[0].data.is_none());
1360    }
1361
1362    #[test]
1363    fn stats_count_only_no_regions() {
1364        let mut data = Vec::new();
1365        data.extend_from_slice(b"partial garbage data");
1366        data.extend_from_slice(b"\x0B\n");
1367        data.extend_from_slice(
1368            b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B\n",
1369        );
1370        let mut iter = FrameIterator::new(&data[..]);
1371        let frames: Vec<_> = iter.by_ref().collect::<Result<Vec<_>, _>>().unwrap();
1372        assert_eq!(frames.len(), 1);
1373        let stats = iter.stats();
1374        let skipped = b"partial garbage data\x0B\n".len() as u64;
1375        assert_eq!(stats.bytes_skipped, skipped);
1376        assert!(
1377            stats.unparsed_regions.is_empty(),
1378            "CountOnly should not accumulate regions"
1379        );
1380    }
1381
1382    #[test]
1383    fn frame_iterator_trailing_newlines_at_eof() {
1384        // Trailing newlines after the last boundary at EOF should not cause errors.
1385        let mut data = Vec::new();
1386        data.extend_from_slice(
1387            b"recv 5 bytes from tcp/1.1.1.1:5060 at 00:00:00.000000:\nhello\x0B\n",
1388        );
1389        data.extend_from_slice(b"\n\n");
1390
1391        let frames: Vec<Frame> = FrameIterator::new(&data[..])
1392            .collect::<Result<Vec<_>, _>>()
1393            .unwrap();
1394        assert_eq!(frames.len(), 1);
1395        assert_eq!(frames[0].content, b"hello");
1396    }
1397}