Skip to main content

freeswitch_sofia_trace_parser/
message.rs

1use std::collections::{HashMap, VecDeque};
2use std::sync::LazyLock;
3
4use memchr::memmem;
5use tracing::{debug, trace, warn};
6
7use crate::frame::{FrameIterator, ParseError};
8use crate::types::{
9    Direction, ParseStats, SipMessage, SkipTracking, Timestamp, Transport, UnparsedRegion,
10};
11
12static CRLF: LazyLock<memmem::Finder<'static>> = LazyLock::new(|| memmem::Finder::new(b"\r\n"));
13static CRLFCRLF: LazyLock<memmem::Finder<'static>> =
14    LazyLock::new(|| memmem::Finder::new(b"\r\n\r\n"));
15
16/// RFC 793 default TCP keepalive timeout.
17/// Connection buffers inactive for longer than this are considered stale
18/// and evicted to prevent unbounded memory growth when processing
19/// multi-day dump file streams with ephemeral TLS source ports.
20const STALE_TIMEOUT_SECS: u64 = 7200;
21
22/// Level 2 streaming parser: reassembles TCP segments into complete SIP messages.
23///
24/// Wraps a [`FrameIterator`] and groups TCP frames by `(Direction, Address)`.
25/// Messages are emitted when headers and Content-Length body bytes are fully
26/// available. UDP frames pass through as-is (1:1 mapping).
27///
28/// Stale TCP connection buffers are evicted after 2 hours of inactivity to
29/// maintain constant memory on multi-day dump streams.
30///
31/// # Example
32///
33/// ```no_run
34/// use std::fs::File;
35/// use freeswitch_sofia_trace_parser::MessageIterator;
36///
37/// let file = File::open("profile.dump").unwrap();
38/// for msg in MessageIterator::new(file) {
39///     let msg = msg.unwrap();
40///     println!("{} {} {} ({} frames)",
41///         msg.timestamp, msg.direction, msg.address, msg.frame_count);
42/// }
43/// ```
44pub struct MessageIterator<R> {
45    frames: FrameIterator<R>,
46    buffers: HashMap<(Direction, String), ConnectionBuffer>,
47    ready: VecDeque<SipMessage>,
48    exhausted: bool,
49    current_day: u32,
50    last_time_secs: u32,
51    last_sweep_abs_secs: u64,
52}
53
54struct ConnectionBuffer {
55    transport: Transport,
56    timestamp: Timestamp,
57    content: Vec<u8>,
58    frame_count: usize,
59    last_seen_day: u32,
60    last_seen_time_secs: u32,
61}
62
63impl<R: std::io::Read> MessageIterator<R> {
64    /// Create a new message iterator reading from the given source.
65    pub fn new(reader: R) -> Self {
66        MessageIterator {
67            frames: FrameIterator::new(reader),
68            buffers: HashMap::new(),
69            ready: VecDeque::new(),
70            exhausted: false,
71            current_day: 0,
72            last_time_secs: 0,
73            last_sweep_abs_secs: 0,
74        }
75    }
76
77    /// Enable capturing of skipped bytes in the underlying [`FrameIterator`].
78    pub fn capture_skipped(mut self, enable: bool) -> Self {
79        self.frames = self.frames.capture_skipped(enable);
80        self
81    }
82
83    /// Set the level of detail for unparsed region tracking.
84    pub fn skip_tracking(mut self, tracking: SkipTracking) -> Self {
85        self.frames = self.frames.skip_tracking(tracking);
86        self
87    }
88
89    /// Borrow the accumulated parse statistics from the underlying frame parser.
90    pub fn parse_stats(&self) -> &ParseStats {
91        self.frames.stats()
92    }
93
94    /// Mutably borrow the parse statistics.
95    pub fn parse_stats_mut(&mut self) -> &mut ParseStats {
96        self.frames.stats_mut()
97    }
98
99    /// Take all accumulated unparsed regions, leaving the list empty.
100    pub fn drain_unparsed(&mut self) -> Vec<UnparsedRegion> {
101        self.frames.drain_unparsed()
102    }
103
104    fn update_time_tracking(&mut self, time_secs: u32) {
105        if time_secs < self.last_time_secs && self.last_time_secs - time_secs > 43200 {
106            self.current_day += 1;
107            debug!(
108                day = self.current_day,
109                prev_secs = self.last_time_secs,
110                curr_secs = time_secs,
111                "detected day rollover"
112            );
113        }
114        self.last_time_secs = time_secs;
115    }
116
117    fn current_abs_secs(&self) -> u64 {
118        self.current_day as u64 * 86400 + self.last_time_secs as u64
119    }
120
121    fn sweep_stale_buffers(&mut self) {
122        let current_abs = self.current_abs_secs();
123        self.buffers.retain(|key, buf| {
124            let buf_abs = buf.last_seen_day as u64 * 86400 + buf.last_seen_time_secs as u64;
125            let elapsed = current_abs.saturating_sub(buf_abs);
126            if elapsed > STALE_TIMEOUT_SECS {
127                if buf.content.is_empty() {
128                    trace!(
129                        address = %key.1,
130                        direction = %key.0,
131                        elapsed_secs = elapsed,
132                        "evicted empty stale connection buffer"
133                    );
134                } else {
135                    warn!(
136                        address = %key.1,
137                        direction = %key.0,
138                        elapsed_secs = elapsed,
139                        pending_bytes = buf.content.len(),
140                        "evicted stale connection buffer with incomplete data"
141                    );
142                }
143                return false;
144            }
145            true
146        });
147    }
148
149    fn flush_all(&mut self) {
150        let keys: Vec<_> = self.buffers.keys().cloned().collect();
151        for key in keys {
152            if let Some(buf) = self.buffers.get_mut(&key) {
153                let msgs = extract_complete(buf, &key);
154                self.ready.extend(msgs);
155
156                if !buf.content.is_empty() {
157                    let content = std::mem::take(&mut buf.content);
158                    self.ready.push_back(SipMessage {
159                        direction: key.0,
160                        transport: buf.transport,
161                        address: key.1.clone(),
162                        timestamp: buf.timestamp,
163                        content,
164                        frame_count: buf.frame_count,
165                    });
166                    buf.frame_count = 0;
167                }
168            }
169        }
170        self.buffers.clear();
171    }
172}
173
174impl<R: std::io::Read> Iterator for MessageIterator<R> {
175    type Item = Result<SipMessage, ParseError>;
176
177    fn next(&mut self) -> Option<Self::Item> {
178        if let Some(msg) = self.ready.pop_front() {
179            return Some(Ok(msg));
180        }
181
182        if self.exhausted {
183            return None;
184        }
185
186        loop {
187            match self.frames.next() {
188                Some(Ok(frame)) => {
189                    if frame.transport == Transport::Udp {
190                        return Some(Ok(SipMessage {
191                            direction: frame.direction,
192                            transport: frame.transport,
193                            address: frame.address,
194                            timestamp: frame.timestamp,
195                            content: frame.content,
196                            frame_count: 1,
197                        }));
198                    }
199
200                    let time_secs = frame.timestamp.time_of_day_secs();
201                    self.update_time_tracking(time_secs);
202
203                    let current_abs = self.current_abs_secs();
204                    if current_abs.saturating_sub(self.last_sweep_abs_secs) >= STALE_TIMEOUT_SECS {
205                        self.sweep_stale_buffers();
206                        self.last_sweep_abs_secs = current_abs;
207                    }
208
209                    let key = (frame.direction, frame.address);
210
211                    let buf = match self.buffers.get_mut(&key) {
212                        Some(buf) => buf,
213                        None => self.buffers.entry(key.clone()).or_insert(ConnectionBuffer {
214                            transport: frame.transport,
215                            timestamp: frame.timestamp,
216                            content: Vec::new(),
217                            frame_count: 0,
218                            last_seen_day: self.current_day,
219                            last_seen_time_secs: time_secs,
220                        }),
221                    };
222
223                    buf.last_seen_day = self.current_day;
224                    buf.last_seen_time_secs = time_secs;
225
226                    if buf.content.is_empty() {
227                        buf.timestamp = frame.timestamp;
228                    }
229
230                    trace!(
231                        frame = buf.frame_count + 1,
232                        bytes = frame.content.len(),
233                        address = %key.1,
234                        "buffering TCP frame"
235                    );
236
237                    buf.content.extend_from_slice(&frame.content);
238                    buf.frame_count += 1;
239
240                    let msgs = extract_complete(buf, &key);
241                    self.ready.extend(msgs);
242
243                    if buf.frame_count == 0 && buf.content.is_empty() {
244                        self.buffers.remove(&key);
245                    }
246
247                    if let Some(msg) = self.ready.pop_front() {
248                        return Some(Ok(msg));
249                    }
250                }
251                Some(Err(e)) => return Some(Err(e)),
252                None => {
253                    self.exhausted = true;
254                    self.flush_all();
255                    return self.ready.pop_front().map(Ok);
256                }
257            }
258        }
259    }
260}
261
262/// Extract complete SIP messages from a connection buffer.
263/// Messages are complete when we find headers (\r\n\r\n) and have
264/// Content-Length bytes of body available.
265fn extract_complete(buf: &mut ConnectionBuffer, key: &(Direction, String)) -> Vec<SipMessage> {
266    let mut messages = Vec::new();
267
268    loop {
269        if buf.content.is_empty() {
270            break;
271        }
272
273        // Skip non-SIP prefix (body fragments from incomplete prior messages)
274        if !is_sip_start(&buf.content) {
275            // Drain leading whitespace (CRLF padding, bare LF keep-alives, etc.)
276            let ws_len = buf
277                .content
278                .iter()
279                .position(|&b| !matches!(b, b'\r' | b'\n' | b' ' | b'\t'))
280                .unwrap_or(buf.content.len());
281
282            if ws_len > 0 {
283                if ws_len == buf.content.len() {
284                    trace!(
285                        bytes = ws_len,
286                        address = %key.1,
287                        "drained transport whitespace"
288                    );
289                    buf.content.clear();
290                    break;
291                }
292                if is_sip_start(&buf.content[ws_len..]) {
293                    trace!(bytes = ws_len, "drained inter-message whitespace padding");
294                    buf.content.drain(..ws_len);
295                    continue;
296                }
297            }
298
299            match find_sip_start(&buf.content) {
300                Some(offset) if offset > 0 => {
301                    warn!(
302                        skipped_bytes = offset,
303                        address = %key.1,
304                        "skipped non-SIP prefix in TCP buffer"
305                    );
306                    buf.content.drain(..offset);
307                    continue;
308                }
309                _ => break, // No SIP start found, wait for more data
310            }
311        }
312
313        // Find header/body boundary
314        let header_end = match CRLFCRLF.find(&buf.content) {
315            Some(offset) => offset,
316            None => break, // Headers incomplete, wait for more data
317        };
318        let body_start = header_end + 4;
319
320        let msg_end = match find_content_length(&buf.content) {
321            Some(cl) => {
322                let end = body_start + cl;
323                if end > buf.content.len() {
324                    break; // Body incomplete, wait for more data
325                }
326                end
327            }
328            None => body_start, // No CL = no body (RFC 3261 Section 18.3)
329        };
330
331        let remaining = buf.content.split_off(msg_end);
332        let msg_content = std::mem::replace(&mut buf.content, remaining);
333
334        // Skip trailing CRLF between messages
335        while buf.content.len() >= 2 && buf.content[0] == b'\r' && buf.content[1] == b'\n' {
336            buf.content.drain(..2);
337        }
338
339        let frame_count = if messages.is_empty() {
340            buf.frame_count
341        } else {
342            0
343        };
344
345        if frame_count > 1 {
346            debug!(
347                frame_count,
348                bytes = msg_content.len(),
349                address = %key.1,
350                "extracted reassembled TCP message"
351            );
352        }
353
354        messages.push(SipMessage {
355            direction: key.0,
356            transport: buf.transport,
357            address: key.1.clone(),
358            timestamp: buf.timestamp,
359            content: msg_content,
360            frame_count,
361        });
362
363        buf.frame_count = 0;
364    }
365
366    messages
367}
368
369/// Find Content-Length header value in SIP message bytes.
370/// Returns the value as usize if found.
371fn find_content_length(data: &[u8]) -> Option<usize> {
372    let header_end = CRLFCRLF.find(data)?;
373    let headers = &data[..header_end];
374
375    let mut pos = 0;
376    while pos < headers.len() {
377        let line_end = CRLF.find(&headers[pos..]).unwrap_or(headers.len() - pos);
378        let line = &headers[pos..pos + line_end];
379
380        if let Some(value) = extract_header_value(line, b"Content-Length") {
381            return parse_content_length(value);
382        }
383        if let Some(value) = extract_compact_header_value(line, b'l') {
384            return parse_content_length(value);
385        }
386
387        pos += line_end + 2; // skip \r\n
388    }
389    None
390}
391
392fn extract_header_value<'a>(line: &'a [u8], name: &[u8]) -> Option<&'a [u8]> {
393    if line.len() <= name.len() + 1 {
394        return None;
395    }
396    if !line[..name.len()].eq_ignore_ascii_case(name) {
397        return None;
398    }
399    if line[name.len()] != b':' {
400        return None;
401    }
402    Some(trim_bytes(&line[name.len() + 1..]))
403}
404
405fn extract_compact_header_value(line: &[u8], compact: u8) -> Option<&[u8]> {
406    if line.len() < 2 {
407        return None;
408    }
409    if line[0] != compact || line[1] != b':' {
410        return None;
411    }
412    Some(trim_bytes(&line[2..]))
413}
414
415fn trim_bytes(b: &[u8]) -> &[u8] {
416    let start = b
417        .iter()
418        .position(|&c| c != b' ' && c != b'\t')
419        .unwrap_or(b.len());
420    let end = b
421        .iter()
422        .rposition(|&c| c != b' ' && c != b'\t')
423        .map_or(start, |p| p + 1);
424    &b[start..end]
425}
426
427fn parse_content_length(value: &[u8]) -> Option<usize> {
428    let s = std::str::from_utf8(value).ok()?;
429    s.parse().ok()
430}
431
432/// Check if data at given position starts with a SIP request or response line.
433fn is_sip_start(data: &[u8]) -> bool {
434    if data.starts_with(b"SIP/2.0 ") {
435        return true;
436    }
437    const METHODS: &[&[u8]] = &[
438        b"INVITE ",
439        b"ACK ",
440        b"BYE ",
441        b"CANCEL ",
442        b"OPTIONS ",
443        b"REGISTER ",
444        b"PRACK ",
445        b"SUBSCRIBE ",
446        b"NOTIFY ",
447        b"PUBLISH ",
448        b"INFO ",
449        b"REFER ",
450        b"MESSAGE ",
451        b"UPDATE ",
452    ];
453    for method in METHODS {
454        if data.starts_with(method) {
455            return true;
456        }
457    }
458    false
459}
460
461/// Scan for the first SIP message start at a CRLF boundary within data.
462fn find_sip_start(data: &[u8]) -> Option<usize> {
463    if is_sip_start(data) {
464        return Some(0);
465    }
466    let mut pos = 0;
467    while let Some(offset) = CRLF.find(&data[pos..]) {
468        let candidate = pos + offset + 2;
469        if candidate >= data.len() {
470            break;
471        }
472        if is_sip_start(&data[candidate..]) {
473            return Some(candidate);
474        }
475        pos = candidate;
476    }
477    None
478}
479
480#[cfg(test)]
481mod tests {
482    use super::*;
483    use crate::types::Direction;
484
485    fn make_frame(
486        direction: Direction,
487        transport: Transport,
488        addr: &str,
489        content: &[u8],
490    ) -> Vec<u8> {
491        let dir_str = match direction {
492            Direction::Recv => "recv",
493            Direction::Sent => "sent",
494        };
495        let prep = match direction {
496            Direction::Recv => "from",
497            Direction::Sent => "to",
498        };
499        let transport_str = match transport {
500            Transport::Tcp => "tcp",
501            Transport::Udp => "udp",
502            Transport::Tls => "tls",
503            Transport::Wss => "wss",
504        };
505        let header = format!(
506            "{dir_str} {} bytes {prep} {transport_str}/{addr} at 00:00:00.000000:\n",
507            content.len()
508        );
509        let mut data = header.into_bytes();
510        data.extend_from_slice(content);
511        data.extend_from_slice(b"\x0B\n");
512        data
513    }
514
515    #[test]
516    fn single_udp_message() {
517        let content = b"OPTIONS sip:user@host SIP/2.0\r\nContent-Length: 0\r\n\r\n";
518        let data = make_frame(Direction::Recv, Transport::Udp, "1.1.1.1:5060", content);
519        let msgs: Vec<SipMessage> = MessageIterator::new(&data[..])
520            .collect::<Result<Vec<_>, _>>()
521            .unwrap();
522        assert_eq!(msgs.len(), 1);
523        assert_eq!(msgs[0].content, content);
524        assert_eq!(msgs[0].frame_count, 1);
525        assert_eq!(msgs[0].transport, Transport::Udp);
526    }
527
528    #[test]
529    fn tcp_reassembly_two_frames() {
530        let part1 = b"NOTIFY sip:user@host SIP/2.0\r\n";
531        let part2 = b"Content-Length: 0\r\n\r\n";
532        let mut data = make_frame(Direction::Recv, Transport::Tcp, "[::1]:5060", part1);
533        data.extend_from_slice(&make_frame(
534            Direction::Recv,
535            Transport::Tcp,
536            "[::1]:5060",
537            part2,
538        ));
539        let msgs: Vec<SipMessage> = MessageIterator::new(&data[..])
540            .collect::<Result<Vec<_>, _>>()
541            .unwrap();
542        assert_eq!(msgs.len(), 1);
543        assert_eq!(msgs[0].frame_count, 2);
544        let mut expected = Vec::new();
545        expected.extend_from_slice(part1);
546        expected.extend_from_slice(part2);
547        assert_eq!(msgs[0].content, expected);
548    }
549
550    #[test]
551    fn tcp_reassembly_across_interleaved_frames() {
552        // Frame 1: recv from A (partial INVITE)
553        // Frame 2: sent to A (response on same connection — interrupts)
554        // Frame 3: recv from A (rest of INVITE)
555        let part1 = b"INVITE sip:user@host SIP/2.0\r\n";
556        let part2 = b"Content-Length: 3\r\n\r\nSDP";
557        let response = b"SIP/2.0 100 Trying\r\nContent-Length: 0\r\n\r\n";
558
559        let mut data = make_frame(Direction::Recv, Transport::Tcp, "10.0.0.1:5060", part1);
560        data.extend_from_slice(&make_frame(
561            Direction::Sent,
562            Transport::Tcp,
563            "10.0.0.1:5060",
564            response,
565        ));
566        data.extend_from_slice(&make_frame(
567            Direction::Recv,
568            Transport::Tcp,
569            "10.0.0.1:5060",
570            part2,
571        ));
572
573        let msgs: Vec<SipMessage> = MessageIterator::new(&data[..])
574            .collect::<Result<Vec<_>, _>>()
575            .unwrap();
576
577        assert_eq!(msgs.len(), 2);
578
579        // 100 Trying completes first (single frame)
580        let trying = &msgs[0];
581        assert_eq!(trying.direction, Direction::Sent);
582        assert_eq!(trying.content, response);
583
584        // INVITE completes when frame 3 arrives (reassembled from frames 1+3)
585        let invite = &msgs[1];
586        assert_eq!(invite.direction, Direction::Recv);
587        let mut expected_invite = Vec::new();
588        expected_invite.extend_from_slice(part1);
589        expected_invite.extend_from_slice(part2);
590        assert_eq!(invite.content, expected_invite);
591    }
592
593    #[test]
594    fn tcp_reassembly_interleaved_different_addresses() {
595        // Two different addresses both sending multi-frame messages,
596        // frames arriving interleaved:
597        //   Frame 1: recv from A (partial INVITE)
598        //   Frame 2: recv from B (partial NOTIFY)
599        //   Frame 3: recv from A (rest of INVITE — completes A)
600        //   Frame 4: recv from B (rest of NOTIFY — completes B)
601        let a_part1 = b"INVITE sip:user@host SIP/2.0\r\n";
602        let a_part2 = b"Content-Length: 3\r\n\r\nSDP";
603        let b_part1 = b"NOTIFY sip:user@host SIP/2.0\r\n";
604        let b_part2 = b"Content-Length: 4\r\n\r\nBODY";
605
606        let mut data = make_frame(Direction::Recv, Transport::Tcp, "[::1]:5060", a_part1);
607        data.extend_from_slice(&make_frame(
608            Direction::Recv,
609            Transport::Tcp,
610            "[::2]:5060",
611            b_part1,
612        ));
613        data.extend_from_slice(&make_frame(
614            Direction::Recv,
615            Transport::Tcp,
616            "[::1]:5060",
617            a_part2,
618        ));
619        data.extend_from_slice(&make_frame(
620            Direction::Recv,
621            Transport::Tcp,
622            "[::2]:5060",
623            b_part2,
624        ));
625
626        let msgs: Vec<SipMessage> = MessageIterator::new(&data[..])
627            .collect::<Result<Vec<_>, _>>()
628            .unwrap();
629
630        assert_eq!(msgs.len(), 2);
631
632        // INVITE from [::1] completes first (frame 3 arrives before frame 4)
633        assert_eq!(msgs[0].address, "[::1]:5060");
634        assert_eq!(msgs[0].frame_count, 2);
635        let mut expected_a = Vec::new();
636        expected_a.extend_from_slice(a_part1);
637        expected_a.extend_from_slice(a_part2);
638        assert_eq!(msgs[0].content, expected_a);
639
640        // NOTIFY from [::2] completes second
641        assert_eq!(msgs[1].address, "[::2]:5060");
642        assert_eq!(msgs[1].frame_count, 2);
643        let mut expected_b = Vec::new();
644        expected_b.extend_from_slice(b_part1);
645        expected_b.extend_from_slice(b_part2);
646        assert_eq!(msgs[1].content, expected_b);
647    }
648
649    #[test]
650    fn direction_change_splits_messages() {
651        let recv_content = b"OPTIONS sip:user@host SIP/2.0\r\nContent-Length: 0\r\n\r\n";
652        let sent_content = b"SIP/2.0 200 OK\r\nContent-Length: 0\r\n\r\n";
653        let mut data = make_frame(Direction::Recv, Transport::Tcp, "[::1]:5060", recv_content);
654        data.extend_from_slice(&make_frame(
655            Direction::Sent,
656            Transport::Tcp,
657            "[::1]:5060",
658            sent_content,
659        ));
660        let msgs: Vec<SipMessage> = MessageIterator::new(&data[..])
661            .collect::<Result<Vec<_>, _>>()
662            .unwrap();
663        assert_eq!(msgs.len(), 2);
664        assert_eq!(msgs[0].direction, Direction::Recv);
665        assert_eq!(msgs[1].direction, Direction::Sent);
666    }
667
668    #[test]
669    fn address_change_splits_messages() {
670        let content = b"OPTIONS sip:user@host SIP/2.0\r\nContent-Length: 0\r\n\r\n";
671        let mut data = make_frame(Direction::Recv, Transport::Tcp, "[::1]:5060", content);
672        data.extend_from_slice(&make_frame(
673            Direction::Recv,
674            Transport::Tcp,
675            "[::2]:5060",
676            content,
677        ));
678        let msgs: Vec<SipMessage> = MessageIterator::new(&data[..])
679            .collect::<Result<Vec<_>, _>>()
680            .unwrap();
681        assert_eq!(msgs.len(), 2);
682        assert_eq!(msgs[0].address, "[::1]:5060");
683        assert_eq!(msgs[1].address, "[::2]:5060");
684    }
685
686    #[test]
687    fn udp_no_reassembly() {
688        let content1 = b"OPTIONS sip:a SIP/2.0\r\nContent-Length: 0\r\n\r\n";
689        let content2 = b"OPTIONS sip:b SIP/2.0\r\nContent-Length: 0\r\n\r\n";
690        let mut data = make_frame(Direction::Recv, Transport::Udp, "1.1.1.1:5060", content1);
691        data.extend_from_slice(&make_frame(
692            Direction::Recv,
693            Transport::Udp,
694            "1.1.1.1:5060",
695            content2,
696        ));
697        let msgs: Vec<SipMessage> = MessageIterator::new(&data[..])
698            .collect::<Result<Vec<_>, _>>()
699            .unwrap();
700        assert_eq!(msgs.len(), 2, "UDP frames should not be reassembled");
701        assert_eq!(msgs[0].frame_count, 1);
702        assert_eq!(msgs[1].frame_count, 1);
703    }
704
705    #[test]
706    fn aggregated_messages_split_by_content_length() {
707        let msg1 = b"NOTIFY sip:a SIP/2.0\r\nContent-Length: 5\r\n\r\nhello";
708        let msg2 = b"SIP/2.0 200 OK\r\nContent-Length: 0\r\n\r\n";
709        let mut combined = Vec::new();
710        combined.extend_from_slice(msg1);
711        combined.extend_from_slice(msg2);
712        let data = make_frame(Direction::Recv, Transport::Tcp, "[::1]:5060", &combined);
713        let msgs: Vec<SipMessage> = MessageIterator::new(&data[..])
714            .collect::<Result<Vec<_>, _>>()
715            .unwrap();
716        assert_eq!(msgs.len(), 2);
717        assert_eq!(msgs[0].content, msg1);
718        assert_eq!(msgs[1].content, msg2);
719    }
720
721    #[test]
722    fn find_content_length_standard() {
723        let data = b"NOTIFY sip:a SIP/2.0\r\nContent-Length: 42\r\n\r\n";
724        assert_eq!(find_content_length(data), Some(42));
725    }
726
727    #[test]
728    fn find_content_length_compact() {
729        let data = b"NOTIFY sip:a SIP/2.0\r\nl: 42\r\n\r\n";
730        assert_eq!(find_content_length(data), Some(42));
731    }
732
733    #[test]
734    fn find_content_length_missing() {
735        let data = b"NOTIFY sip:a SIP/2.0\r\nCSeq: 1 NOTIFY\r\n\r\n";
736        assert_eq!(find_content_length(data), None);
737    }
738
739    #[test]
740    fn is_sip_start_request() {
741        assert!(is_sip_start(b"INVITE sip:user@host SIP/2.0\r\n"));
742        assert!(is_sip_start(b"OPTIONS sip:user@host SIP/2.0\r\n"));
743        assert!(is_sip_start(b"NOTIFY sip:user@host SIP/2.0\r\n"));
744        assert!(is_sip_start(b"ACK sip:user@host SIP/2.0\r\n"));
745    }
746
747    #[test]
748    fn is_sip_start_response() {
749        assert!(is_sip_start(b"SIP/2.0 200 OK\r\n"));
750        assert!(is_sip_start(b"SIP/2.0 100 Trying\r\n"));
751    }
752
753    #[test]
754    fn is_sip_start_not_sip() {
755        assert!(!is_sip_start(b"some random data"));
756        assert!(!is_sip_start(b"HTTP/1.1 200 OK\r\n"));
757    }
758
759    #[test]
760    fn find_sip_start_at_beginning() {
761        let data = b"INVITE sip:user@host SIP/2.0\r\nContent-Length: 0\r\n\r\n";
762        assert_eq!(find_sip_start(data), Some(0));
763    }
764
765    #[test]
766    fn find_sip_start_after_prefix() {
767        let data = b"</xml>\r\nNOTIFY sip:user@host SIP/2.0\r\n";
768        assert_eq!(find_sip_start(data), Some(8));
769    }
770
771    #[test]
772    fn find_sip_start_none() {
773        let data = b"no SIP here at all";
774        assert_eq!(find_sip_start(data), None);
775    }
776
777    #[test]
778    fn message_preserves_metadata() {
779        let content = b"OPTIONS sip:user@host SIP/2.0\r\nContent-Length: 0\r\n\r\n";
780        let data = make_frame(
781            Direction::Sent,
782            Transport::Tls,
783            "[2001:db8::1]:5061",
784            content,
785        );
786        let msgs: Vec<SipMessage> = MessageIterator::new(&data[..])
787            .collect::<Result<Vec<_>, _>>()
788            .unwrap();
789        assert_eq!(msgs.len(), 1);
790        assert_eq!(msgs[0].direction, Direction::Sent);
791        assert_eq!(msgs[0].transport, Transport::Tls);
792        assert_eq!(msgs[0].address, "[2001:db8::1]:5061");
793        assert_eq!(
794            msgs[0].timestamp,
795            Timestamp::TimeOnly {
796                hour: 0,
797                min: 0,
798                sec: 0,
799                usec: 0
800            }
801        );
802    }
803
804    #[test]
805    fn extract_handles_crlf_between_messages() {
806        let msg1 = b"NOTIFY sip:a SIP/2.0\r\nContent-Length: 5\r\n\r\nhello";
807        let msg2 = b"SIP/2.0 200 OK\r\nContent-Length: 0\r\n\r\n";
808        let mut content = Vec::new();
809        content.extend_from_slice(msg1);
810        content.extend_from_slice(b"\r\n");
811        content.extend_from_slice(msg2);
812
813        let key = (Direction::Recv, "[::1]:5060".to_string());
814        let mut buf = ConnectionBuffer {
815            transport: Transport::Tcp,
816            timestamp: Timestamp::TimeOnly {
817                hour: 0,
818                min: 0,
819                sec: 0,
820                usec: 0,
821            },
822            content,
823            frame_count: 1,
824            last_seen_day: 0,
825            last_seen_time_secs: 0,
826        };
827        let msgs = extract_complete(&mut buf, &key);
828        assert_eq!(msgs.len(), 2);
829        assert_eq!(msgs[0].content, msg1);
830        assert_eq!(msgs[1].content, msg2);
831    }
832
833    #[test]
834    fn extract_skips_non_sip_prefix() {
835        let prefix = b"</conference-info>\r\n";
836        let msg = b"NOTIFY sip:a SIP/2.0\r\nContent-Length: 0\r\n\r\n";
837        let mut content = Vec::new();
838        content.extend_from_slice(prefix);
839        content.extend_from_slice(msg);
840
841        let key = (Direction::Recv, "[::1]:5060".to_string());
842        let mut buf = ConnectionBuffer {
843            transport: Transport::Tcp,
844            timestamp: Timestamp::TimeOnly {
845                hour: 0,
846                min: 0,
847                sec: 0,
848                usec: 0,
849            },
850            content,
851            frame_count: 1,
852            last_seen_day: 0,
853            last_seen_time_secs: 0,
854        };
855        let msgs = extract_complete(&mut buf, &key);
856        assert_eq!(msgs.len(), 1);
857        assert_eq!(msgs[0].content, msg);
858    }
859
860    #[test]
861    fn extract_waits_for_incomplete_body() {
862        // Headers complete but body is missing
863        let content = b"INVITE sip:a SIP/2.0\r\nContent-Length: 100\r\n\r\npartial".to_vec();
864
865        let key = (Direction::Recv, "[::1]:5060".to_string());
866        let mut buf = ConnectionBuffer {
867            transport: Transport::Tcp,
868            timestamp: Timestamp::TimeOnly {
869                hour: 0,
870                min: 0,
871                sec: 0,
872                usec: 0,
873            },
874            content,
875            frame_count: 1,
876            last_seen_day: 0,
877            last_seen_time_secs: 0,
878        };
879        let msgs = extract_complete(&mut buf, &key);
880        assert!(msgs.is_empty(), "should wait for body to complete");
881        assert!(!buf.content.is_empty(), "buffer should retain data");
882    }
883
884    #[test]
885    fn extract_waits_for_incomplete_headers() {
886        // Headers not complete (no \r\n\r\n)
887        let content = b"INVITE sip:a SIP/2.0\r\nContent-Length: 0\r\n".to_vec();
888
889        let key = (Direction::Recv, "[::1]:5060".to_string());
890        let mut buf = ConnectionBuffer {
891            transport: Transport::Tcp,
892            timestamp: Timestamp::TimeOnly {
893                hour: 0,
894                min: 0,
895                sec: 0,
896                usec: 0,
897            },
898            content,
899            frame_count: 1,
900            last_seen_day: 0,
901            last_seen_time_secs: 0,
902        };
903        let msgs = extract_complete(&mut buf, &key);
904        assert!(msgs.is_empty(), "should wait for headers to complete");
905    }
906
907    #[test]
908    fn tcp_body_split_across_five_frames() {
909        // Simulate REQUEST.md scenario: NOTIFY with Content-Length: 6424
910        // body split across 5 TCP frames (like NG9-1-1 abandoned call JSON)
911        let body_len: usize = 6424;
912        let body: Vec<u8> = (0..body_len).map(|i| b'A' + (i % 26) as u8).collect();
913
914        let mut headers = Vec::new();
915        headers.extend_from_slice(b"NOTIFY sip:user@host SIP/2.0\r\n");
916        headers
917            .extend_from_slice(b"Via: SIP/2.0/TCP [2001:4958:10:11::6]:45538;branch=z9hG4bK-1\r\n");
918        headers.extend_from_slice(b"Call-ID: fragmented-notify@host\r\n");
919        headers.extend_from_slice(b"CSeq: 1 NOTIFY\r\n");
920        headers.extend_from_slice(
921            b"Content-Type: application/emergencyCallData.AbandonedCall+json\r\n",
922        );
923        headers.extend_from_slice(format!("Content-Length: {body_len}\r\n").as_bytes());
924        headers.extend_from_slice(b"\r\n");
925
926        let mut full_content = headers.clone();
927        full_content.extend_from_slice(&body);
928
929        // Split into 5 frames like real TCP segments
930        let frame1_len = 1500.min(full_content.len());
931        let remaining = &full_content[frame1_len..];
932        let frame2_len = 1428.min(remaining.len());
933        let remaining = &remaining[frame2_len..];
934        let frame3_len = 1428.min(remaining.len());
935        let remaining = &remaining[frame3_len..];
936        let frame4_len = 1428.min(remaining.len());
937        let remaining = &remaining[frame4_len..];
938        let frame5_len = remaining.len();
939
940        let addr = "[2001:4958:10:11::6]:45538";
941        let mut data = make_frame(
942            Direction::Recv,
943            Transport::Tcp,
944            addr,
945            &full_content[..frame1_len],
946        );
947        let mut offset = frame1_len;
948        for len in [frame2_len, frame3_len, frame4_len, frame5_len] {
949            data.extend_from_slice(&make_frame(
950                Direction::Recv,
951                Transport::Tcp,
952                addr,
953                &full_content[offset..offset + len],
954            ));
955            offset += len;
956        }
957        assert_eq!(offset, full_content.len());
958
959        let msgs: Vec<SipMessage> = MessageIterator::new(&data[..])
960            .collect::<Result<Vec<_>, _>>()
961            .unwrap();
962
963        assert_eq!(
964            msgs.len(),
965            1,
966            "should produce exactly one reassembled message"
967        );
968        assert_eq!(msgs[0].frame_count, 5, "should track all 5 frames");
969        assert_eq!(
970            msgs[0].content, full_content,
971            "content should be fully reassembled"
972        );
973        assert_eq!(msgs[0].direction, Direction::Recv);
974        assert_eq!(msgs[0].address, addr);
975    }
976
977    #[test]
978    fn parse_stats_delegates() {
979        let content = b"OPTIONS sip:user@host SIP/2.0\r\nContent-Length: 0\r\n\r\n";
980        let data = make_frame(Direction::Recv, Transport::Udp, "1.1.1.1:5060", content);
981        let mut iter = MessageIterator::new(&data[..]);
982        let msgs: Vec<_> = iter.by_ref().collect::<Result<Vec<_>, _>>().unwrap();
983        assert_eq!(msgs.len(), 1);
984        let stats = iter.parse_stats();
985        assert_eq!(stats.bytes_read, data.len() as u64);
986        assert_eq!(stats.bytes_skipped, 0);
987    }
988
989    #[test]
990    fn tcp_body_split_parsed_message() {
991        // Same scenario but verified through Level 3 (ParsedSipMessage)
992        let body_len: usize = 6424;
993        let body: Vec<u8> = (0..body_len).map(|i| b'A' + (i % 26) as u8).collect();
994
995        let mut headers = Vec::new();
996        headers.extend_from_slice(b"NOTIFY sip:user@host SIP/2.0\r\n");
997        headers.extend_from_slice(b"Call-ID: fragmented-parsed@host\r\n");
998        headers.extend_from_slice(b"CSeq: 1 NOTIFY\r\n");
999        headers.extend_from_slice(
1000            b"Content-Type: application/emergencyCallData.AbandonedCall+json\r\n",
1001        );
1002        headers.extend_from_slice(format!("Content-Length: {body_len}\r\n").as_bytes());
1003        headers.extend_from_slice(b"\r\n");
1004
1005        let mut full_content = headers.clone();
1006        full_content.extend_from_slice(&body);
1007
1008        // Split into 3 frames
1009        let split1 = 1500.min(full_content.len());
1010        let split2 = (split1 + 3000).min(full_content.len());
1011
1012        let addr = "[2001:db8::1]:5060";
1013        let mut data = make_frame(
1014            Direction::Recv,
1015            Transport::Tcp,
1016            addr,
1017            &full_content[..split1],
1018        );
1019        data.extend_from_slice(&make_frame(
1020            Direction::Recv,
1021            Transport::Tcp,
1022            addr,
1023            &full_content[split1..split2],
1024        ));
1025        data.extend_from_slice(&make_frame(
1026            Direction::Recv,
1027            Transport::Tcp,
1028            addr,
1029            &full_content[split2..],
1030        ));
1031
1032        let parsed: Vec<crate::types::ParsedSipMessage> =
1033            crate::sip::ParsedMessageIterator::new(&data[..])
1034                .collect::<Result<Vec<_>, _>>()
1035                .unwrap();
1036
1037        assert_eq!(parsed.len(), 1, "should produce one parsed message");
1038        assert_eq!(parsed[0].content_length(), Some(body_len));
1039        assert_eq!(parsed[0].body.len(), body_len, "body should be complete");
1040        assert_eq!(parsed[0].body, body, "body content should match");
1041        assert_eq!(parsed[0].frame_count, 3);
1042        assert_eq!(parsed[0].method(), Some("NOTIFY"));
1043    }
1044
1045    #[test]
1046    fn tls_keepalive_single_lf_drained() {
1047        let data = make_frame(Direction::Recv, Transport::Tls, "[10.0.0.1]:5061", b"\n");
1048        let msgs: Vec<SipMessage> = MessageIterator::new(&data[..])
1049            .collect::<Result<Vec<_>, _>>()
1050            .unwrap();
1051        assert_eq!(msgs.len(), 0, "keep-alive \\n should produce no messages");
1052    }
1053
1054    #[test]
1055    fn tls_keepalive_multiple_lf_drained() {
1056        let addr = "[10.0.0.1]:5061";
1057        let mut data = make_frame(Direction::Recv, Transport::Tls, addr, b"\n");
1058        data.extend_from_slice(&make_frame(Direction::Recv, Transport::Tls, addr, b"\n"));
1059        data.extend_from_slice(&make_frame(Direction::Recv, Transport::Tls, addr, b"\n"));
1060        let msgs: Vec<SipMessage> = MessageIterator::new(&data[..])
1061            .collect::<Result<Vec<_>, _>>()
1062            .unwrap();
1063        assert_eq!(
1064            msgs.len(),
1065            0,
1066            "multiple keep-alive \\n should produce no messages"
1067        );
1068    }
1069
1070    #[test]
1071    fn tls_keepalive_interleaved_with_sip() {
1072        let addr = "[10.0.0.1]:5061";
1073        let sip = b"OPTIONS sip:host SIP/2.0\r\nContent-Length: 0\r\n\r\n";
1074        let mut data = make_frame(Direction::Recv, Transport::Tls, addr, b"\n");
1075        data.extend_from_slice(&make_frame(Direction::Recv, Transport::Tls, addr, sip));
1076        data.extend_from_slice(&make_frame(Direction::Recv, Transport::Tls, addr, b"\n"));
1077        let msgs: Vec<SipMessage> = MessageIterator::new(&data[..])
1078            .collect::<Result<Vec<_>, _>>()
1079            .unwrap();
1080        assert_eq!(msgs.len(), 1, "only the SIP message should be emitted");
1081        assert_eq!(msgs[0].content, sip);
1082    }
1083
1084    #[test]
1085    fn tls_bare_lf_before_sip_start() {
1086        let addr = "[10.0.0.1]:5061";
1087        let sip_part1 = b"\nOPTIONS sip:host SIP/2.0\r\n";
1088        let sip_part2 = b"Content-Length: 0\r\n\r\n";
1089        let mut data = make_frame(Direction::Recv, Transport::Tls, addr, sip_part1);
1090        data.extend_from_slice(&make_frame(
1091            Direction::Recv,
1092            Transport::Tls,
1093            addr,
1094            sip_part2,
1095        ));
1096        let msgs: Vec<SipMessage> = MessageIterator::new(&data[..])
1097            .collect::<Result<Vec<_>, _>>()
1098            .unwrap();
1099        assert_eq!(msgs.len(), 1, "SIP after bare LF should be extracted");
1100        assert!(
1101            msgs[0].content.starts_with(b"OPTIONS"),
1102            "message should start with SIP method, not \\n"
1103        );
1104    }
1105
1106    fn make_frame_at(
1107        direction: Direction,
1108        transport: Transport,
1109        addr: &str,
1110        content: &[u8],
1111        timestamp: &str,
1112    ) -> Vec<u8> {
1113        let dir_str = match direction {
1114            Direction::Recv => "recv",
1115            Direction::Sent => "sent",
1116        };
1117        let prep = match direction {
1118            Direction::Recv => "from",
1119            Direction::Sent => "to",
1120        };
1121        let transport_str = match transport {
1122            Transport::Tcp => "tcp",
1123            Transport::Udp => "udp",
1124            Transport::Tls => "tls",
1125            Transport::Wss => "wss",
1126        };
1127        let header = format!(
1128            "{dir_str} {} bytes {prep} {transport_str}/{addr} at {timestamp}:\n",
1129            content.len()
1130        );
1131        let mut data = header.into_bytes();
1132        data.extend_from_slice(content);
1133        data.extend_from_slice(b"\x0B\n");
1134        data
1135    }
1136
1137    #[test]
1138    fn empty_buffer_removed_after_complete_message() {
1139        let sip = b"OPTIONS sip:host SIP/2.0\r\nContent-Length: 0\r\n\r\n";
1140        let addr_a = "[::1]:5060";
1141        let addr_b = "[::2]:5060";
1142
1143        let mut data = make_frame(Direction::Recv, Transport::Tcp, addr_a, sip);
1144        data.extend_from_slice(&make_frame(Direction::Recv, Transport::Tcp, addr_b, sip));
1145
1146        let mut iter = MessageIterator::new(&data[..]);
1147        let msgs: Vec<SipMessage> = iter.by_ref().collect::<Result<Vec<_>, _>>().unwrap();
1148        assert_eq!(msgs.len(), 2);
1149        assert!(
1150            iter.buffers.is_empty(),
1151            "all buffers should be removed after complete messages are extracted"
1152        );
1153    }
1154
1155    #[test]
1156    fn stale_buffer_evicted_after_timeout() {
1157        let sip = b"OPTIONS sip:host SIP/2.0\r\nContent-Length: 0\r\n\r\n";
1158        let partial = b"INVITE sip:host SIP/2.0\r\n";
1159
1160        let mut data = Vec::new();
1161        // Partial frame from stale addr at 10:00:00
1162        data.extend_from_slice(&make_frame_at(
1163            Direction::Recv,
1164            Transport::Tls,
1165            "[::99]:44444",
1166            partial,
1167            "2026-02-16 10:00:00.000000",
1168        ));
1169        // Complete msg from another addr at 12:00:01 (>2h later, triggers sweep)
1170        data.extend_from_slice(&make_frame_at(
1171            Direction::Recv,
1172            Transport::Tls,
1173            "[::1]:5060",
1174            sip,
1175            "2026-02-16 12:00:01.000000",
1176        ));
1177
1178        let mut iter = MessageIterator::new(&data[..]);
1179        let msgs: Vec<SipMessage> = iter.by_ref().collect::<Result<Vec<_>, _>>().unwrap();
1180
1181        assert_eq!(msgs.len(), 1, "should produce the complete message");
1182        assert_eq!(msgs[0].address, "[::1]:5060");
1183        assert!(
1184            iter.buffers.is_empty(),
1185            "stale buffer for [::99]:44444 should have been evicted"
1186        );
1187    }
1188
1189    #[test]
1190    fn day_rollover_detection_with_time_only() {
1191        let sip = b"OPTIONS sip:host SIP/2.0\r\nContent-Length: 0\r\n\r\n";
1192        let partial = b"INVITE sip:host SIP/2.0\r\n";
1193
1194        let mut data = Vec::new();
1195        // Partial frame at 23:59:00
1196        data.extend_from_slice(&make_frame_at(
1197            Direction::Recv,
1198            Transport::Tcp,
1199            "[::99]:44444",
1200            partial,
1201            "23:59:00.000000",
1202        ));
1203        // Complete msg at 02:00:01 (next day — rollover detected, >2h from 23:59)
1204        data.extend_from_slice(&make_frame_at(
1205            Direction::Recv,
1206            Transport::Tcp,
1207            "[::1]:5060",
1208            sip,
1209            "02:00:01.000000",
1210        ));
1211
1212        let mut iter = MessageIterator::new(&data[..]);
1213        let msgs: Vec<SipMessage> = iter.by_ref().collect::<Result<Vec<_>, _>>().unwrap();
1214
1215        assert_eq!(msgs.len(), 1);
1216        assert_eq!(msgs[0].address, "[::1]:5060");
1217        assert_eq!(iter.current_day, 1, "should have detected one day rollover");
1218        assert!(
1219            iter.buffers.is_empty(),
1220            "stale buffer should have been evicted after day rollover"
1221        );
1222    }
1223
1224    #[test]
1225    fn flush_all_clears_buffers() {
1226        let partial = b"INVITE sip:host SIP/2.0\r\n";
1227        let data = make_frame(Direction::Recv, Transport::Tcp, "[::1]:5060", partial);
1228
1229        let mut iter = MessageIterator::new(&data[..]);
1230        let msgs: Vec<SipMessage> = iter.by_ref().collect::<Result<Vec<_>, _>>().unwrap();
1231        assert_eq!(msgs.len(), 1, "partial should be flushed at EOF");
1232        assert!(
1233            iter.buffers.is_empty(),
1234            "flush_all should clear the HashMap"
1235        );
1236    }
1237}