Skip to main content

freeswitch_sofia_trace_parser/
message.rs

1use std::collections::{HashMap, VecDeque};
2use std::sync::LazyLock;
3
4use memchr::memmem;
5use tracing::{debug, trace, warn};
6
7use crate::frame::{FrameIterator, ParseError};
8use crate::types::{
9    Direction, ParseStats, SipMessage, SkipTracking, Timestamp, Transport, UnparsedRegion,
10};
11
12static CRLF: LazyLock<memmem::Finder<'static>> = LazyLock::new(|| memmem::Finder::new(b"\r\n"));
13static CRLFCRLF: LazyLock<memmem::Finder<'static>> =
14    LazyLock::new(|| memmem::Finder::new(b"\r\n\r\n"));
15
16/// RFC 793 default TCP keepalive timeout.
17/// Connection buffers inactive for longer than this are considered stale
18/// and evicted to prevent unbounded memory growth when processing
19/// multi-day dump file streams with ephemeral TLS source ports.
20const STALE_TIMEOUT_SECS: u64 = 7200;
21
22pub struct MessageIterator<R> {
23    frames: FrameIterator<R>,
24    buffers: HashMap<(Direction, String), ConnectionBuffer>,
25    ready: VecDeque<SipMessage>,
26    exhausted: bool,
27    current_day: u32,
28    last_time_secs: u32,
29    last_sweep_abs_secs: u64,
30}
31
32struct ConnectionBuffer {
33    transport: Transport,
34    timestamp: Timestamp,
35    content: Vec<u8>,
36    frame_count: usize,
37    last_seen_day: u32,
38    last_seen_time_secs: u32,
39}
40
41impl<R: std::io::Read> MessageIterator<R> {
42    pub fn new(reader: R) -> Self {
43        MessageIterator {
44            frames: FrameIterator::new(reader),
45            buffers: HashMap::new(),
46            ready: VecDeque::new(),
47            exhausted: false,
48            current_day: 0,
49            last_time_secs: 0,
50            last_sweep_abs_secs: 0,
51        }
52    }
53
54    pub fn capture_skipped(mut self, enable: bool) -> Self {
55        self.frames = self.frames.capture_skipped(enable);
56        self
57    }
58
59    pub fn skip_tracking(mut self, tracking: SkipTracking) -> Self {
60        self.frames = self.frames.skip_tracking(tracking);
61        self
62    }
63
64    pub fn parse_stats(&self) -> &ParseStats {
65        self.frames.stats()
66    }
67
68    pub fn parse_stats_mut(&mut self) -> &mut ParseStats {
69        self.frames.stats_mut()
70    }
71
72    pub fn drain_unparsed(&mut self) -> Vec<UnparsedRegion> {
73        self.frames.drain_unparsed()
74    }
75
76    fn update_time_tracking(&mut self, time_secs: u32) {
77        if time_secs < self.last_time_secs && self.last_time_secs - time_secs > 43200 {
78            self.current_day += 1;
79            debug!(
80                day = self.current_day,
81                prev_secs = self.last_time_secs,
82                curr_secs = time_secs,
83                "detected day rollover"
84            );
85        }
86        self.last_time_secs = time_secs;
87    }
88
89    fn current_abs_secs(&self) -> u64 {
90        self.current_day as u64 * 86400 + self.last_time_secs as u64
91    }
92
93    fn sweep_stale_buffers(&mut self) {
94        let current_abs = self.current_abs_secs();
95        self.buffers.retain(|key, buf| {
96            let buf_abs = buf.last_seen_day as u64 * 86400 + buf.last_seen_time_secs as u64;
97            let elapsed = current_abs.saturating_sub(buf_abs);
98            if elapsed > STALE_TIMEOUT_SECS {
99                if buf.content.is_empty() {
100                    trace!(
101                        address = %key.1,
102                        direction = %key.0,
103                        elapsed_secs = elapsed,
104                        "evicted empty stale connection buffer"
105                    );
106                } else {
107                    warn!(
108                        address = %key.1,
109                        direction = %key.0,
110                        elapsed_secs = elapsed,
111                        pending_bytes = buf.content.len(),
112                        "evicted stale connection buffer with incomplete data"
113                    );
114                }
115                return false;
116            }
117            true
118        });
119    }
120
121    fn flush_all(&mut self) {
122        let keys: Vec<_> = self.buffers.keys().cloned().collect();
123        for key in keys {
124            if let Some(buf) = self.buffers.get_mut(&key) {
125                let msgs = extract_complete(buf, &key);
126                self.ready.extend(msgs);
127
128                if !buf.content.is_empty() {
129                    let content = std::mem::take(&mut buf.content);
130                    self.ready.push_back(SipMessage {
131                        direction: key.0,
132                        transport: buf.transport,
133                        address: key.1.clone(),
134                        timestamp: buf.timestamp,
135                        content,
136                        frame_count: buf.frame_count,
137                    });
138                    buf.frame_count = 0;
139                }
140            }
141        }
142        self.buffers.clear();
143    }
144}
145
146impl<R: std::io::Read> Iterator for MessageIterator<R> {
147    type Item = Result<SipMessage, ParseError>;
148
149    fn next(&mut self) -> Option<Self::Item> {
150        if let Some(msg) = self.ready.pop_front() {
151            return Some(Ok(msg));
152        }
153
154        if self.exhausted {
155            return None;
156        }
157
158        loop {
159            match self.frames.next() {
160                Some(Ok(frame)) => {
161                    if frame.transport == Transport::Udp {
162                        return Some(Ok(SipMessage {
163                            direction: frame.direction,
164                            transport: frame.transport,
165                            address: frame.address,
166                            timestamp: frame.timestamp,
167                            content: frame.content,
168                            frame_count: 1,
169                        }));
170                    }
171
172                    let time_secs = frame.timestamp.time_of_day_secs();
173                    self.update_time_tracking(time_secs);
174
175                    let current_abs = self.current_abs_secs();
176                    if current_abs.saturating_sub(self.last_sweep_abs_secs) >= STALE_TIMEOUT_SECS {
177                        self.sweep_stale_buffers();
178                        self.last_sweep_abs_secs = current_abs;
179                    }
180
181                    let key = (frame.direction, frame.address.clone());
182
183                    let buf = self
184                        .buffers
185                        .entry(key.clone())
186                        .or_insert_with(|| ConnectionBuffer {
187                            transport: frame.transport,
188                            timestamp: frame.timestamp,
189                            content: Vec::new(),
190                            frame_count: 0,
191                            last_seen_day: self.current_day,
192                            last_seen_time_secs: time_secs,
193                        });
194
195                    buf.last_seen_day = self.current_day;
196                    buf.last_seen_time_secs = time_secs;
197
198                    if buf.content.is_empty() {
199                        buf.timestamp = frame.timestamp;
200                    }
201
202                    trace!(
203                        frame = buf.frame_count + 1,
204                        bytes = frame.content.len(),
205                        address = %key.1,
206                        "buffering TCP frame"
207                    );
208
209                    buf.content.extend_from_slice(&frame.content);
210                    buf.frame_count += 1;
211
212                    let msgs = extract_complete(buf, &key);
213                    self.ready.extend(msgs);
214
215                    if buf.frame_count == 0 && buf.content.is_empty() {
216                        self.buffers.remove(&key);
217                    }
218
219                    if let Some(msg) = self.ready.pop_front() {
220                        return Some(Ok(msg));
221                    }
222                }
223                Some(Err(e)) => return Some(Err(e)),
224                None => {
225                    self.exhausted = true;
226                    self.flush_all();
227                    return self.ready.pop_front().map(Ok);
228                }
229            }
230        }
231    }
232}
233
234/// Extract complete SIP messages from a connection buffer.
235/// Messages are complete when we find headers (\r\n\r\n) and have
236/// Content-Length bytes of body available.
237fn extract_complete(buf: &mut ConnectionBuffer, key: &(Direction, String)) -> Vec<SipMessage> {
238    let mut messages = Vec::new();
239
240    loop {
241        if buf.content.is_empty() {
242            break;
243        }
244
245        // Skip non-SIP prefix (body fragments from incomplete prior messages)
246        if !is_sip_start(&buf.content) {
247            // Drain leading whitespace (CRLF padding, bare LF keep-alives, etc.)
248            let ws_len = buf
249                .content
250                .iter()
251                .position(|&b| !matches!(b, b'\r' | b'\n' | b' ' | b'\t'))
252                .unwrap_or(buf.content.len());
253
254            if ws_len > 0 {
255                if ws_len == buf.content.len() {
256                    trace!(
257                        bytes = ws_len,
258                        address = %key.1,
259                        "drained transport whitespace"
260                    );
261                    buf.content.clear();
262                    break;
263                }
264                if is_sip_start(&buf.content[ws_len..]) {
265                    trace!(bytes = ws_len, "drained inter-message whitespace padding");
266                    buf.content.drain(..ws_len);
267                    continue;
268                }
269            }
270
271            match find_sip_start(&buf.content) {
272                Some(offset) if offset > 0 => {
273                    warn!(
274                        skipped_bytes = offset,
275                        address = %key.1,
276                        "skipped non-SIP prefix in TCP buffer"
277                    );
278                    buf.content.drain(..offset);
279                    continue;
280                }
281                _ => break, // No SIP start found, wait for more data
282            }
283        }
284
285        // Find header/body boundary
286        let header_end = match CRLFCRLF.find(&buf.content) {
287            Some(offset) => offset,
288            None => break, // Headers incomplete, wait for more data
289        };
290        let body_start = header_end + 4;
291
292        let msg_end = match find_content_length(&buf.content) {
293            Some(cl) => {
294                let end = body_start + cl;
295                if end > buf.content.len() {
296                    break; // Body incomplete, wait for more data
297                }
298                end
299            }
300            None => body_start, // No CL = no body (RFC 3261 Section 18.3)
301        };
302
303        let msg_content: Vec<u8> = buf.content.drain(..msg_end).collect();
304
305        // Skip trailing CRLF between messages
306        while buf.content.len() >= 2 && buf.content[0] == b'\r' && buf.content[1] == b'\n' {
307            buf.content.drain(..2);
308        }
309
310        let frame_count = if messages.is_empty() {
311            buf.frame_count
312        } else {
313            0
314        };
315
316        if frame_count > 1 {
317            debug!(
318                frame_count,
319                bytes = msg_content.len(),
320                address = %key.1,
321                "extracted reassembled TCP message"
322            );
323        }
324
325        messages.push(SipMessage {
326            direction: key.0,
327            transport: buf.transport,
328            address: key.1.clone(),
329            timestamp: buf.timestamp,
330            content: msg_content,
331            frame_count,
332        });
333
334        buf.frame_count = 0;
335    }
336
337    messages
338}
339
340/// Find Content-Length header value in SIP message bytes.
341/// Returns the value as usize if found.
342fn find_content_length(data: &[u8]) -> Option<usize> {
343    let header_end = CRLFCRLF.find(data)?;
344    let headers = &data[..header_end];
345
346    let mut pos = 0;
347    while pos < headers.len() {
348        let line_end = CRLF.find(&headers[pos..]).unwrap_or(headers.len() - pos);
349        let line = &headers[pos..pos + line_end];
350
351        if let Some(value) = extract_header_value(line, b"Content-Length") {
352            return parse_content_length(value);
353        }
354        if let Some(value) = extract_compact_header_value(line, b'l') {
355            return parse_content_length(value);
356        }
357
358        pos += line_end + 2; // skip \r\n
359    }
360    None
361}
362
363fn extract_header_value<'a>(line: &'a [u8], name: &[u8]) -> Option<&'a [u8]> {
364    if line.len() <= name.len() + 1 {
365        return None;
366    }
367    if !line[..name.len()].eq_ignore_ascii_case(name) {
368        return None;
369    }
370    if line[name.len()] != b':' {
371        return None;
372    }
373    Some(trim_bytes(&line[name.len() + 1..]))
374}
375
376fn extract_compact_header_value(line: &[u8], compact: u8) -> Option<&[u8]> {
377    if line.len() < 2 {
378        return None;
379    }
380    if line[0] != compact || line[1] != b':' {
381        return None;
382    }
383    Some(trim_bytes(&line[2..]))
384}
385
386fn trim_bytes(b: &[u8]) -> &[u8] {
387    let start = b
388        .iter()
389        .position(|&c| c != b' ' && c != b'\t')
390        .unwrap_or(b.len());
391    let end = b
392        .iter()
393        .rposition(|&c| c != b' ' && c != b'\t')
394        .map_or(start, |p| p + 1);
395    &b[start..end]
396}
397
398fn parse_content_length(value: &[u8]) -> Option<usize> {
399    let s = std::str::from_utf8(value).ok()?;
400    s.parse().ok()
401}
402
403/// Check if data at given position starts with a SIP request or response line.
404fn is_sip_start(data: &[u8]) -> bool {
405    if data.starts_with(b"SIP/2.0 ") {
406        return true;
407    }
408    const METHODS: &[&[u8]] = &[
409        b"INVITE ",
410        b"ACK ",
411        b"BYE ",
412        b"CANCEL ",
413        b"OPTIONS ",
414        b"REGISTER ",
415        b"PRACK ",
416        b"SUBSCRIBE ",
417        b"NOTIFY ",
418        b"PUBLISH ",
419        b"INFO ",
420        b"REFER ",
421        b"MESSAGE ",
422        b"UPDATE ",
423    ];
424    for method in METHODS {
425        if data.starts_with(method) {
426            return true;
427        }
428    }
429    false
430}
431
432/// Scan for the first SIP message start at a CRLF boundary within data.
433fn find_sip_start(data: &[u8]) -> Option<usize> {
434    if is_sip_start(data) {
435        return Some(0);
436    }
437    let mut pos = 0;
438    while let Some(offset) = CRLF.find(&data[pos..]) {
439        let candidate = pos + offset + 2;
440        if candidate >= data.len() {
441            break;
442        }
443        if is_sip_start(&data[candidate..]) {
444            return Some(candidate);
445        }
446        pos = candidate;
447    }
448    None
449}
450
451#[cfg(test)]
452mod tests {
453    use super::*;
454    use crate::types::Direction;
455
456    fn make_frame(
457        direction: Direction,
458        transport: Transport,
459        addr: &str,
460        content: &[u8],
461    ) -> Vec<u8> {
462        let dir_str = match direction {
463            Direction::Recv => "recv",
464            Direction::Sent => "sent",
465        };
466        let prep = match direction {
467            Direction::Recv => "from",
468            Direction::Sent => "to",
469        };
470        let transport_str = match transport {
471            Transport::Tcp => "tcp",
472            Transport::Udp => "udp",
473            Transport::Tls => "tls",
474            Transport::Wss => "wss",
475        };
476        let header = format!(
477            "{dir_str} {} bytes {prep} {transport_str}/{addr} at 00:00:00.000000:\n",
478            content.len()
479        );
480        let mut data = header.into_bytes();
481        data.extend_from_slice(content);
482        data.extend_from_slice(b"\x0B\n");
483        data
484    }
485
486    #[test]
487    fn single_udp_message() {
488        let content = b"OPTIONS sip:user@host SIP/2.0\r\nContent-Length: 0\r\n\r\n";
489        let data = make_frame(Direction::Recv, Transport::Udp, "1.1.1.1:5060", content);
490        let msgs: Vec<SipMessage> = MessageIterator::new(&data[..])
491            .collect::<Result<Vec<_>, _>>()
492            .unwrap();
493        assert_eq!(msgs.len(), 1);
494        assert_eq!(msgs[0].content, content);
495        assert_eq!(msgs[0].frame_count, 1);
496        assert_eq!(msgs[0].transport, Transport::Udp);
497    }
498
499    #[test]
500    fn tcp_reassembly_two_frames() {
501        let part1 = b"NOTIFY sip:user@host SIP/2.0\r\n";
502        let part2 = b"Content-Length: 0\r\n\r\n";
503        let mut data = make_frame(Direction::Recv, Transport::Tcp, "[::1]:5060", part1);
504        data.extend_from_slice(&make_frame(
505            Direction::Recv,
506            Transport::Tcp,
507            "[::1]:5060",
508            part2,
509        ));
510        let msgs: Vec<SipMessage> = MessageIterator::new(&data[..])
511            .collect::<Result<Vec<_>, _>>()
512            .unwrap();
513        assert_eq!(msgs.len(), 1);
514        assert_eq!(msgs[0].frame_count, 2);
515        let mut expected = Vec::new();
516        expected.extend_from_slice(part1);
517        expected.extend_from_slice(part2);
518        assert_eq!(msgs[0].content, expected);
519    }
520
521    #[test]
522    fn tcp_reassembly_across_interleaved_frames() {
523        // Frame 1: recv from A (partial INVITE)
524        // Frame 2: sent to A (response on same connection — interrupts)
525        // Frame 3: recv from A (rest of INVITE)
526        let part1 = b"INVITE sip:user@host SIP/2.0\r\n";
527        let part2 = b"Content-Length: 3\r\n\r\nSDP";
528        let response = b"SIP/2.0 100 Trying\r\nContent-Length: 0\r\n\r\n";
529
530        let mut data = make_frame(Direction::Recv, Transport::Tcp, "10.0.0.1:5060", part1);
531        data.extend_from_slice(&make_frame(
532            Direction::Sent,
533            Transport::Tcp,
534            "10.0.0.1:5060",
535            response,
536        ));
537        data.extend_from_slice(&make_frame(
538            Direction::Recv,
539            Transport::Tcp,
540            "10.0.0.1:5060",
541            part2,
542        ));
543
544        let msgs: Vec<SipMessage> = MessageIterator::new(&data[..])
545            .collect::<Result<Vec<_>, _>>()
546            .unwrap();
547
548        assert_eq!(msgs.len(), 2);
549
550        // 100 Trying completes first (single frame)
551        let trying = &msgs[0];
552        assert_eq!(trying.direction, Direction::Sent);
553        assert_eq!(trying.content, response);
554
555        // INVITE completes when frame 3 arrives (reassembled from frames 1+3)
556        let invite = &msgs[1];
557        assert_eq!(invite.direction, Direction::Recv);
558        let mut expected_invite = Vec::new();
559        expected_invite.extend_from_slice(part1);
560        expected_invite.extend_from_slice(part2);
561        assert_eq!(invite.content, expected_invite);
562    }
563
564    #[test]
565    fn tcp_reassembly_interleaved_different_addresses() {
566        // Two different addresses both sending multi-frame messages,
567        // frames arriving interleaved:
568        //   Frame 1: recv from A (partial INVITE)
569        //   Frame 2: recv from B (partial NOTIFY)
570        //   Frame 3: recv from A (rest of INVITE — completes A)
571        //   Frame 4: recv from B (rest of NOTIFY — completes B)
572        let a_part1 = b"INVITE sip:user@host SIP/2.0\r\n";
573        let a_part2 = b"Content-Length: 3\r\n\r\nSDP";
574        let b_part1 = b"NOTIFY sip:user@host SIP/2.0\r\n";
575        let b_part2 = b"Content-Length: 4\r\n\r\nBODY";
576
577        let mut data = make_frame(Direction::Recv, Transport::Tcp, "[::1]:5060", a_part1);
578        data.extend_from_slice(&make_frame(
579            Direction::Recv,
580            Transport::Tcp,
581            "[::2]:5060",
582            b_part1,
583        ));
584        data.extend_from_slice(&make_frame(
585            Direction::Recv,
586            Transport::Tcp,
587            "[::1]:5060",
588            a_part2,
589        ));
590        data.extend_from_slice(&make_frame(
591            Direction::Recv,
592            Transport::Tcp,
593            "[::2]:5060",
594            b_part2,
595        ));
596
597        let msgs: Vec<SipMessage> = MessageIterator::new(&data[..])
598            .collect::<Result<Vec<_>, _>>()
599            .unwrap();
600
601        assert_eq!(msgs.len(), 2);
602
603        // INVITE from [::1] completes first (frame 3 arrives before frame 4)
604        assert_eq!(msgs[0].address, "[::1]:5060");
605        assert_eq!(msgs[0].frame_count, 2);
606        let mut expected_a = Vec::new();
607        expected_a.extend_from_slice(a_part1);
608        expected_a.extend_from_slice(a_part2);
609        assert_eq!(msgs[0].content, expected_a);
610
611        // NOTIFY from [::2] completes second
612        assert_eq!(msgs[1].address, "[::2]:5060");
613        assert_eq!(msgs[1].frame_count, 2);
614        let mut expected_b = Vec::new();
615        expected_b.extend_from_slice(b_part1);
616        expected_b.extend_from_slice(b_part2);
617        assert_eq!(msgs[1].content, expected_b);
618    }
619
620    #[test]
621    fn direction_change_splits_messages() {
622        let recv_content = b"OPTIONS sip:user@host SIP/2.0\r\nContent-Length: 0\r\n\r\n";
623        let sent_content = b"SIP/2.0 200 OK\r\nContent-Length: 0\r\n\r\n";
624        let mut data = make_frame(Direction::Recv, Transport::Tcp, "[::1]:5060", recv_content);
625        data.extend_from_slice(&make_frame(
626            Direction::Sent,
627            Transport::Tcp,
628            "[::1]:5060",
629            sent_content,
630        ));
631        let msgs: Vec<SipMessage> = MessageIterator::new(&data[..])
632            .collect::<Result<Vec<_>, _>>()
633            .unwrap();
634        assert_eq!(msgs.len(), 2);
635        assert_eq!(msgs[0].direction, Direction::Recv);
636        assert_eq!(msgs[1].direction, Direction::Sent);
637    }
638
639    #[test]
640    fn address_change_splits_messages() {
641        let content = b"OPTIONS sip:user@host SIP/2.0\r\nContent-Length: 0\r\n\r\n";
642        let mut data = make_frame(Direction::Recv, Transport::Tcp, "[::1]:5060", content);
643        data.extend_from_slice(&make_frame(
644            Direction::Recv,
645            Transport::Tcp,
646            "[::2]:5060",
647            content,
648        ));
649        let msgs: Vec<SipMessage> = MessageIterator::new(&data[..])
650            .collect::<Result<Vec<_>, _>>()
651            .unwrap();
652        assert_eq!(msgs.len(), 2);
653        assert_eq!(msgs[0].address, "[::1]:5060");
654        assert_eq!(msgs[1].address, "[::2]:5060");
655    }
656
657    #[test]
658    fn udp_no_reassembly() {
659        let content1 = b"OPTIONS sip:a SIP/2.0\r\nContent-Length: 0\r\n\r\n";
660        let content2 = b"OPTIONS sip:b SIP/2.0\r\nContent-Length: 0\r\n\r\n";
661        let mut data = make_frame(Direction::Recv, Transport::Udp, "1.1.1.1:5060", content1);
662        data.extend_from_slice(&make_frame(
663            Direction::Recv,
664            Transport::Udp,
665            "1.1.1.1:5060",
666            content2,
667        ));
668        let msgs: Vec<SipMessage> = MessageIterator::new(&data[..])
669            .collect::<Result<Vec<_>, _>>()
670            .unwrap();
671        assert_eq!(msgs.len(), 2, "UDP frames should not be reassembled");
672        assert_eq!(msgs[0].frame_count, 1);
673        assert_eq!(msgs[1].frame_count, 1);
674    }
675
676    #[test]
677    fn aggregated_messages_split_by_content_length() {
678        let msg1 = b"NOTIFY sip:a SIP/2.0\r\nContent-Length: 5\r\n\r\nhello";
679        let msg2 = b"SIP/2.0 200 OK\r\nContent-Length: 0\r\n\r\n";
680        let mut combined = Vec::new();
681        combined.extend_from_slice(msg1);
682        combined.extend_from_slice(msg2);
683        let data = make_frame(Direction::Recv, Transport::Tcp, "[::1]:5060", &combined);
684        let msgs: Vec<SipMessage> = MessageIterator::new(&data[..])
685            .collect::<Result<Vec<_>, _>>()
686            .unwrap();
687        assert_eq!(msgs.len(), 2);
688        assert_eq!(msgs[0].content, msg1);
689        assert_eq!(msgs[1].content, msg2);
690    }
691
692    #[test]
693    fn find_content_length_standard() {
694        let data = b"NOTIFY sip:a SIP/2.0\r\nContent-Length: 42\r\n\r\n";
695        assert_eq!(find_content_length(data), Some(42));
696    }
697
698    #[test]
699    fn find_content_length_compact() {
700        let data = b"NOTIFY sip:a SIP/2.0\r\nl: 42\r\n\r\n";
701        assert_eq!(find_content_length(data), Some(42));
702    }
703
704    #[test]
705    fn find_content_length_missing() {
706        let data = b"NOTIFY sip:a SIP/2.0\r\nCSeq: 1 NOTIFY\r\n\r\n";
707        assert_eq!(find_content_length(data), None);
708    }
709
710    #[test]
711    fn is_sip_start_request() {
712        assert!(is_sip_start(b"INVITE sip:user@host SIP/2.0\r\n"));
713        assert!(is_sip_start(b"OPTIONS sip:user@host SIP/2.0\r\n"));
714        assert!(is_sip_start(b"NOTIFY sip:user@host SIP/2.0\r\n"));
715        assert!(is_sip_start(b"ACK sip:user@host SIP/2.0\r\n"));
716    }
717
718    #[test]
719    fn is_sip_start_response() {
720        assert!(is_sip_start(b"SIP/2.0 200 OK\r\n"));
721        assert!(is_sip_start(b"SIP/2.0 100 Trying\r\n"));
722    }
723
724    #[test]
725    fn is_sip_start_not_sip() {
726        assert!(!is_sip_start(b"some random data"));
727        assert!(!is_sip_start(b"HTTP/1.1 200 OK\r\n"));
728    }
729
730    #[test]
731    fn find_sip_start_at_beginning() {
732        let data = b"INVITE sip:user@host SIP/2.0\r\nContent-Length: 0\r\n\r\n";
733        assert_eq!(find_sip_start(data), Some(0));
734    }
735
736    #[test]
737    fn find_sip_start_after_prefix() {
738        let data = b"</xml>\r\nNOTIFY sip:user@host SIP/2.0\r\n";
739        assert_eq!(find_sip_start(data), Some(8));
740    }
741
742    #[test]
743    fn find_sip_start_none() {
744        let data = b"no SIP here at all";
745        assert_eq!(find_sip_start(data), None);
746    }
747
748    #[test]
749    fn message_preserves_metadata() {
750        let content = b"OPTIONS sip:user@host SIP/2.0\r\nContent-Length: 0\r\n\r\n";
751        let data = make_frame(
752            Direction::Sent,
753            Transport::Tls,
754            "[2001:db8::1]:5061",
755            content,
756        );
757        let msgs: Vec<SipMessage> = MessageIterator::new(&data[..])
758            .collect::<Result<Vec<_>, _>>()
759            .unwrap();
760        assert_eq!(msgs.len(), 1);
761        assert_eq!(msgs[0].direction, Direction::Sent);
762        assert_eq!(msgs[0].transport, Transport::Tls);
763        assert_eq!(msgs[0].address, "[2001:db8::1]:5061");
764        assert_eq!(
765            msgs[0].timestamp,
766            Timestamp::TimeOnly {
767                hour: 0,
768                min: 0,
769                sec: 0,
770                usec: 0
771            }
772        );
773    }
774
775    #[test]
776    fn extract_handles_crlf_between_messages() {
777        let msg1 = b"NOTIFY sip:a SIP/2.0\r\nContent-Length: 5\r\n\r\nhello";
778        let msg2 = b"SIP/2.0 200 OK\r\nContent-Length: 0\r\n\r\n";
779        let mut content = Vec::new();
780        content.extend_from_slice(msg1);
781        content.extend_from_slice(b"\r\n");
782        content.extend_from_slice(msg2);
783
784        let key = (Direction::Recv, "[::1]:5060".to_string());
785        let mut buf = ConnectionBuffer {
786            transport: Transport::Tcp,
787            timestamp: Timestamp::TimeOnly {
788                hour: 0,
789                min: 0,
790                sec: 0,
791                usec: 0,
792            },
793            content,
794            frame_count: 1,
795            last_seen_day: 0,
796            last_seen_time_secs: 0,
797        };
798        let msgs = extract_complete(&mut buf, &key);
799        assert_eq!(msgs.len(), 2);
800        assert_eq!(msgs[0].content, msg1);
801        assert_eq!(msgs[1].content, msg2);
802    }
803
804    #[test]
805    fn extract_skips_non_sip_prefix() {
806        let prefix = b"</conference-info>\r\n";
807        let msg = b"NOTIFY sip:a SIP/2.0\r\nContent-Length: 0\r\n\r\n";
808        let mut content = Vec::new();
809        content.extend_from_slice(prefix);
810        content.extend_from_slice(msg);
811
812        let key = (Direction::Recv, "[::1]:5060".to_string());
813        let mut buf = ConnectionBuffer {
814            transport: Transport::Tcp,
815            timestamp: Timestamp::TimeOnly {
816                hour: 0,
817                min: 0,
818                sec: 0,
819                usec: 0,
820            },
821            content,
822            frame_count: 1,
823            last_seen_day: 0,
824            last_seen_time_secs: 0,
825        };
826        let msgs = extract_complete(&mut buf, &key);
827        assert_eq!(msgs.len(), 1);
828        assert_eq!(msgs[0].content, msg);
829    }
830
831    #[test]
832    fn extract_waits_for_incomplete_body() {
833        // Headers complete but body is missing
834        let content = b"INVITE sip:a SIP/2.0\r\nContent-Length: 100\r\n\r\npartial".to_vec();
835
836        let key = (Direction::Recv, "[::1]:5060".to_string());
837        let mut buf = ConnectionBuffer {
838            transport: Transport::Tcp,
839            timestamp: Timestamp::TimeOnly {
840                hour: 0,
841                min: 0,
842                sec: 0,
843                usec: 0,
844            },
845            content,
846            frame_count: 1,
847            last_seen_day: 0,
848            last_seen_time_secs: 0,
849        };
850        let msgs = extract_complete(&mut buf, &key);
851        assert!(msgs.is_empty(), "should wait for body to complete");
852        assert!(!buf.content.is_empty(), "buffer should retain data");
853    }
854
855    #[test]
856    fn extract_waits_for_incomplete_headers() {
857        // Headers not complete (no \r\n\r\n)
858        let content = b"INVITE sip:a SIP/2.0\r\nContent-Length: 0\r\n".to_vec();
859
860        let key = (Direction::Recv, "[::1]:5060".to_string());
861        let mut buf = ConnectionBuffer {
862            transport: Transport::Tcp,
863            timestamp: Timestamp::TimeOnly {
864                hour: 0,
865                min: 0,
866                sec: 0,
867                usec: 0,
868            },
869            content,
870            frame_count: 1,
871            last_seen_day: 0,
872            last_seen_time_secs: 0,
873        };
874        let msgs = extract_complete(&mut buf, &key);
875        assert!(msgs.is_empty(), "should wait for headers to complete");
876    }
877
878    #[test]
879    fn tcp_body_split_across_five_frames() {
880        // Simulate REQUEST.md scenario: NOTIFY with Content-Length: 6424
881        // body split across 5 TCP frames (like NG9-1-1 abandoned call JSON)
882        let body_len: usize = 6424;
883        let body: Vec<u8> = (0..body_len).map(|i| b'A' + (i % 26) as u8).collect();
884
885        let mut headers = Vec::new();
886        headers.extend_from_slice(b"NOTIFY sip:user@host SIP/2.0\r\n");
887        headers
888            .extend_from_slice(b"Via: SIP/2.0/TCP [2001:4958:10:11::6]:45538;branch=z9hG4bK-1\r\n");
889        headers.extend_from_slice(b"Call-ID: fragmented-notify@host\r\n");
890        headers.extend_from_slice(b"CSeq: 1 NOTIFY\r\n");
891        headers.extend_from_slice(
892            b"Content-Type: application/emergencyCallData.AbandonedCall+json\r\n",
893        );
894        headers.extend_from_slice(format!("Content-Length: {body_len}\r\n").as_bytes());
895        headers.extend_from_slice(b"\r\n");
896
897        let mut full_content = headers.clone();
898        full_content.extend_from_slice(&body);
899
900        // Split into 5 frames like real TCP segments
901        let frame1_len = 1500.min(full_content.len());
902        let remaining = &full_content[frame1_len..];
903        let frame2_len = 1428.min(remaining.len());
904        let remaining = &remaining[frame2_len..];
905        let frame3_len = 1428.min(remaining.len());
906        let remaining = &remaining[frame3_len..];
907        let frame4_len = 1428.min(remaining.len());
908        let remaining = &remaining[frame4_len..];
909        let frame5_len = remaining.len();
910
911        let addr = "[2001:4958:10:11::6]:45538";
912        let mut data = make_frame(
913            Direction::Recv,
914            Transport::Tcp,
915            addr,
916            &full_content[..frame1_len],
917        );
918        let mut offset = frame1_len;
919        for len in [frame2_len, frame3_len, frame4_len, frame5_len] {
920            data.extend_from_slice(&make_frame(
921                Direction::Recv,
922                Transport::Tcp,
923                addr,
924                &full_content[offset..offset + len],
925            ));
926            offset += len;
927        }
928        assert_eq!(offset, full_content.len());
929
930        let msgs: Vec<SipMessage> = MessageIterator::new(&data[..])
931            .collect::<Result<Vec<_>, _>>()
932            .unwrap();
933
934        assert_eq!(
935            msgs.len(),
936            1,
937            "should produce exactly one reassembled message"
938        );
939        assert_eq!(msgs[0].frame_count, 5, "should track all 5 frames");
940        assert_eq!(
941            msgs[0].content, full_content,
942            "content should be fully reassembled"
943        );
944        assert_eq!(msgs[0].direction, Direction::Recv);
945        assert_eq!(msgs[0].address, addr);
946    }
947
948    #[test]
949    fn parse_stats_delegates() {
950        let content = b"OPTIONS sip:user@host SIP/2.0\r\nContent-Length: 0\r\n\r\n";
951        let data = make_frame(Direction::Recv, Transport::Udp, "1.1.1.1:5060", content);
952        let mut iter = MessageIterator::new(&data[..]);
953        let msgs: Vec<_> = iter.by_ref().collect::<Result<Vec<_>, _>>().unwrap();
954        assert_eq!(msgs.len(), 1);
955        let stats = iter.parse_stats();
956        assert_eq!(stats.bytes_read, data.len() as u64);
957        assert_eq!(stats.bytes_skipped, 0);
958    }
959
960    #[test]
961    fn tcp_body_split_parsed_message() {
962        // Same scenario but verified through Level 3 (ParsedSipMessage)
963        let body_len: usize = 6424;
964        let body: Vec<u8> = (0..body_len).map(|i| b'A' + (i % 26) as u8).collect();
965
966        let mut headers = Vec::new();
967        headers.extend_from_slice(b"NOTIFY sip:user@host SIP/2.0\r\n");
968        headers.extend_from_slice(b"Call-ID: fragmented-parsed@host\r\n");
969        headers.extend_from_slice(b"CSeq: 1 NOTIFY\r\n");
970        headers.extend_from_slice(
971            b"Content-Type: application/emergencyCallData.AbandonedCall+json\r\n",
972        );
973        headers.extend_from_slice(format!("Content-Length: {body_len}\r\n").as_bytes());
974        headers.extend_from_slice(b"\r\n");
975
976        let mut full_content = headers.clone();
977        full_content.extend_from_slice(&body);
978
979        // Split into 3 frames
980        let split1 = 1500.min(full_content.len());
981        let split2 = (split1 + 3000).min(full_content.len());
982
983        let addr = "[2001:db8::1]:5060";
984        let mut data = make_frame(
985            Direction::Recv,
986            Transport::Tcp,
987            addr,
988            &full_content[..split1],
989        );
990        data.extend_from_slice(&make_frame(
991            Direction::Recv,
992            Transport::Tcp,
993            addr,
994            &full_content[split1..split2],
995        ));
996        data.extend_from_slice(&make_frame(
997            Direction::Recv,
998            Transport::Tcp,
999            addr,
1000            &full_content[split2..],
1001        ));
1002
1003        let parsed: Vec<crate::types::ParsedSipMessage> =
1004            crate::sip::ParsedMessageIterator::new(&data[..])
1005                .collect::<Result<Vec<_>, _>>()
1006                .unwrap();
1007
1008        assert_eq!(parsed.len(), 1, "should produce one parsed message");
1009        assert_eq!(parsed[0].content_length(), Some(body_len));
1010        assert_eq!(parsed[0].body.len(), body_len, "body should be complete");
1011        assert_eq!(parsed[0].body, body, "body content should match");
1012        assert_eq!(parsed[0].frame_count, 3);
1013        assert_eq!(parsed[0].method(), Some("NOTIFY"));
1014    }
1015
1016    #[test]
1017    fn tls_keepalive_single_lf_drained() {
1018        let data = make_frame(Direction::Recv, Transport::Tls, "[10.0.0.1]:5061", b"\n");
1019        let msgs: Vec<SipMessage> = MessageIterator::new(&data[..])
1020            .collect::<Result<Vec<_>, _>>()
1021            .unwrap();
1022        assert_eq!(msgs.len(), 0, "keep-alive \\n should produce no messages");
1023    }
1024
1025    #[test]
1026    fn tls_keepalive_multiple_lf_drained() {
1027        let addr = "[10.0.0.1]:5061";
1028        let mut data = make_frame(Direction::Recv, Transport::Tls, addr, b"\n");
1029        data.extend_from_slice(&make_frame(Direction::Recv, Transport::Tls, addr, b"\n"));
1030        data.extend_from_slice(&make_frame(Direction::Recv, Transport::Tls, addr, b"\n"));
1031        let msgs: Vec<SipMessage> = MessageIterator::new(&data[..])
1032            .collect::<Result<Vec<_>, _>>()
1033            .unwrap();
1034        assert_eq!(
1035            msgs.len(),
1036            0,
1037            "multiple keep-alive \\n should produce no messages"
1038        );
1039    }
1040
1041    #[test]
1042    fn tls_keepalive_interleaved_with_sip() {
1043        let addr = "[10.0.0.1]:5061";
1044        let sip = b"OPTIONS sip:host SIP/2.0\r\nContent-Length: 0\r\n\r\n";
1045        let mut data = make_frame(Direction::Recv, Transport::Tls, addr, b"\n");
1046        data.extend_from_slice(&make_frame(Direction::Recv, Transport::Tls, addr, sip));
1047        data.extend_from_slice(&make_frame(Direction::Recv, Transport::Tls, addr, b"\n"));
1048        let msgs: Vec<SipMessage> = MessageIterator::new(&data[..])
1049            .collect::<Result<Vec<_>, _>>()
1050            .unwrap();
1051        assert_eq!(msgs.len(), 1, "only the SIP message should be emitted");
1052        assert_eq!(msgs[0].content, sip);
1053    }
1054
1055    #[test]
1056    fn tls_bare_lf_before_sip_start() {
1057        let addr = "[10.0.0.1]:5061";
1058        let sip_part1 = b"\nOPTIONS sip:host SIP/2.0\r\n";
1059        let sip_part2 = b"Content-Length: 0\r\n\r\n";
1060        let mut data = make_frame(Direction::Recv, Transport::Tls, addr, sip_part1);
1061        data.extend_from_slice(&make_frame(
1062            Direction::Recv,
1063            Transport::Tls,
1064            addr,
1065            sip_part2,
1066        ));
1067        let msgs: Vec<SipMessage> = MessageIterator::new(&data[..])
1068            .collect::<Result<Vec<_>, _>>()
1069            .unwrap();
1070        assert_eq!(msgs.len(), 1, "SIP after bare LF should be extracted");
1071        assert!(
1072            msgs[0].content.starts_with(b"OPTIONS"),
1073            "message should start with SIP method, not \\n"
1074        );
1075    }
1076
1077    fn make_frame_at(
1078        direction: Direction,
1079        transport: Transport,
1080        addr: &str,
1081        content: &[u8],
1082        timestamp: &str,
1083    ) -> Vec<u8> {
1084        let dir_str = match direction {
1085            Direction::Recv => "recv",
1086            Direction::Sent => "sent",
1087        };
1088        let prep = match direction {
1089            Direction::Recv => "from",
1090            Direction::Sent => "to",
1091        };
1092        let transport_str = match transport {
1093            Transport::Tcp => "tcp",
1094            Transport::Udp => "udp",
1095            Transport::Tls => "tls",
1096            Transport::Wss => "wss",
1097        };
1098        let header = format!(
1099            "{dir_str} {} bytes {prep} {transport_str}/{addr} at {timestamp}:\n",
1100            content.len()
1101        );
1102        let mut data = header.into_bytes();
1103        data.extend_from_slice(content);
1104        data.extend_from_slice(b"\x0B\n");
1105        data
1106    }
1107
1108    #[test]
1109    fn empty_buffer_removed_after_complete_message() {
1110        let sip = b"OPTIONS sip:host SIP/2.0\r\nContent-Length: 0\r\n\r\n";
1111        let addr_a = "[::1]:5060";
1112        let addr_b = "[::2]:5060";
1113
1114        let mut data = make_frame(Direction::Recv, Transport::Tcp, addr_a, sip);
1115        data.extend_from_slice(&make_frame(Direction::Recv, Transport::Tcp, addr_b, sip));
1116
1117        let mut iter = MessageIterator::new(&data[..]);
1118        let msgs: Vec<SipMessage> = iter.by_ref().collect::<Result<Vec<_>, _>>().unwrap();
1119        assert_eq!(msgs.len(), 2);
1120        assert!(
1121            iter.buffers.is_empty(),
1122            "all buffers should be removed after complete messages are extracted"
1123        );
1124    }
1125
1126    #[test]
1127    fn stale_buffer_evicted_after_timeout() {
1128        let sip = b"OPTIONS sip:host SIP/2.0\r\nContent-Length: 0\r\n\r\n";
1129        let partial = b"INVITE sip:host SIP/2.0\r\n";
1130
1131        let mut data = Vec::new();
1132        // Partial frame from stale addr at 10:00:00
1133        data.extend_from_slice(&make_frame_at(
1134            Direction::Recv,
1135            Transport::Tls,
1136            "[::99]:44444",
1137            partial,
1138            "2026-02-16 10:00:00.000000",
1139        ));
1140        // Complete msg from another addr at 12:00:01 (>2h later, triggers sweep)
1141        data.extend_from_slice(&make_frame_at(
1142            Direction::Recv,
1143            Transport::Tls,
1144            "[::1]:5060",
1145            sip,
1146            "2026-02-16 12:00:01.000000",
1147        ));
1148
1149        let mut iter = MessageIterator::new(&data[..]);
1150        let msgs: Vec<SipMessage> = iter.by_ref().collect::<Result<Vec<_>, _>>().unwrap();
1151
1152        assert_eq!(msgs.len(), 1, "should produce the complete message");
1153        assert_eq!(msgs[0].address, "[::1]:5060");
1154        assert!(
1155            iter.buffers.is_empty(),
1156            "stale buffer for [::99]:44444 should have been evicted"
1157        );
1158    }
1159
1160    #[test]
1161    fn day_rollover_detection_with_time_only() {
1162        let sip = b"OPTIONS sip:host SIP/2.0\r\nContent-Length: 0\r\n\r\n";
1163        let partial = b"INVITE sip:host SIP/2.0\r\n";
1164
1165        let mut data = Vec::new();
1166        // Partial frame at 23:59:00
1167        data.extend_from_slice(&make_frame_at(
1168            Direction::Recv,
1169            Transport::Tcp,
1170            "[::99]:44444",
1171            partial,
1172            "23:59:00.000000",
1173        ));
1174        // Complete msg at 02:00:01 (next day — rollover detected, >2h from 23:59)
1175        data.extend_from_slice(&make_frame_at(
1176            Direction::Recv,
1177            Transport::Tcp,
1178            "[::1]:5060",
1179            sip,
1180            "02:00:01.000000",
1181        ));
1182
1183        let mut iter = MessageIterator::new(&data[..]);
1184        let msgs: Vec<SipMessage> = iter.by_ref().collect::<Result<Vec<_>, _>>().unwrap();
1185
1186        assert_eq!(msgs.len(), 1);
1187        assert_eq!(msgs[0].address, "[::1]:5060");
1188        assert_eq!(iter.current_day, 1, "should have detected one day rollover");
1189        assert!(
1190            iter.buffers.is_empty(),
1191            "stale buffer should have been evicted after day rollover"
1192        );
1193    }
1194
1195    #[test]
1196    fn flush_all_clears_buffers() {
1197        let partial = b"INVITE sip:host SIP/2.0\r\n";
1198        let data = make_frame(Direction::Recv, Transport::Tcp, "[::1]:5060", partial);
1199
1200        let mut iter = MessageIterator::new(&data[..]);
1201        let msgs: Vec<SipMessage> = iter.by_ref().collect::<Result<Vec<_>, _>>().unwrap();
1202        assert_eq!(msgs.len(), 1, "partial should be flushed at EOF");
1203        assert!(
1204            iter.buffers.is_empty(),
1205            "flush_all should clear the HashMap"
1206        );
1207    }
1208}