1use std::collections::{HashMap, VecDeque};
2use std::sync::LazyLock;
3
4use memchr::memmem;
5use tracing::{debug, trace, warn};
6
7use crate::frame::{FrameIterator, ParseError};
8use crate::types::{
9 Direction, ParseStats, SipMessage, SkipTracking, Timestamp, Transport, UnparsedRegion,
10};
11
12static CRLF: LazyLock<memmem::Finder<'static>> = LazyLock::new(|| memmem::Finder::new(b"\r\n"));
13static CRLFCRLF: LazyLock<memmem::Finder<'static>> =
14 LazyLock::new(|| memmem::Finder::new(b"\r\n\r\n"));
15
16const STALE_TIMEOUT_SECS: u64 = 7200;
21
22pub struct MessageIterator<R> {
45 frames: FrameIterator<R>,
46 buffers: HashMap<(Direction, String), ConnectionBuffer>,
47 ready: VecDeque<SipMessage>,
48 exhausted: bool,
49 current_day: u32,
50 last_time_secs: u32,
51 last_sweep_abs_secs: u64,
52}
53
54struct ConnectionBuffer {
55 transport: Transport,
56 timestamp: Timestamp,
57 content: Vec<u8>,
58 frame_count: usize,
59 last_seen_day: u32,
60 last_seen_time_secs: u32,
61}
62
63impl<R: std::io::Read> MessageIterator<R> {
64 pub fn new(reader: R) -> Self {
66 MessageIterator {
67 frames: FrameIterator::new(reader),
68 buffers: HashMap::new(),
69 ready: VecDeque::new(),
70 exhausted: false,
71 current_day: 0,
72 last_time_secs: 0,
73 last_sweep_abs_secs: 0,
74 }
75 }
76
77 pub fn capture_skipped(mut self, enable: bool) -> Self {
79 self.frames = self.frames.capture_skipped(enable);
80 self
81 }
82
83 pub fn skip_tracking(mut self, tracking: SkipTracking) -> Self {
85 self.frames = self.frames.skip_tracking(tracking);
86 self
87 }
88
89 pub fn parse_stats(&self) -> &ParseStats {
91 self.frames.stats()
92 }
93
94 pub fn parse_stats_mut(&mut self) -> &mut ParseStats {
96 self.frames.stats_mut()
97 }
98
99 pub fn drain_unparsed(&mut self) -> Vec<UnparsedRegion> {
101 self.frames.drain_unparsed()
102 }
103
104 fn update_time_tracking(&mut self, time_secs: u32) {
105 if time_secs < self.last_time_secs && self.last_time_secs - time_secs > 43200 {
106 self.current_day += 1;
107 debug!(
108 day = self.current_day,
109 prev_secs = self.last_time_secs,
110 curr_secs = time_secs,
111 "detected day rollover"
112 );
113 }
114 self.last_time_secs = time_secs;
115 }
116
117 fn current_abs_secs(&self) -> u64 {
118 self.current_day as u64 * 86400 + self.last_time_secs as u64
119 }
120
121 fn sweep_stale_buffers(&mut self) {
122 let current_abs = self.current_abs_secs();
123 self.buffers.retain(|key, buf| {
124 let buf_abs = buf.last_seen_day as u64 * 86400 + buf.last_seen_time_secs as u64;
125 let elapsed = current_abs.saturating_sub(buf_abs);
126 if elapsed > STALE_TIMEOUT_SECS {
127 if buf.content.is_empty() {
128 trace!(
129 address = %key.1,
130 direction = %key.0,
131 elapsed_secs = elapsed,
132 "evicted empty stale connection buffer"
133 );
134 } else {
135 warn!(
136 address = %key.1,
137 direction = %key.0,
138 elapsed_secs = elapsed,
139 pending_bytes = buf.content.len(),
140 "evicted stale connection buffer with incomplete data"
141 );
142 }
143 return false;
144 }
145 true
146 });
147 }
148
149 fn flush_all(&mut self) {
150 let keys: Vec<_> = self.buffers.keys().cloned().collect();
151 for key in keys {
152 if let Some(buf) = self.buffers.get_mut(&key) {
153 let msgs = extract_complete(buf, &key);
154 self.ready.extend(msgs);
155
156 if !buf.content.is_empty() {
157 let content = std::mem::take(&mut buf.content);
158 self.ready.push_back(SipMessage {
159 direction: key.0,
160 transport: buf.transport,
161 address: key.1.clone(),
162 timestamp: buf.timestamp,
163 content,
164 frame_count: buf.frame_count,
165 });
166 buf.frame_count = 0;
167 }
168 }
169 }
170 self.buffers.clear();
171 }
172}
173
174impl<R: std::io::Read> Iterator for MessageIterator<R> {
175 type Item = Result<SipMessage, ParseError>;
176
177 fn next(&mut self) -> Option<Self::Item> {
178 if let Some(msg) = self.ready.pop_front() {
179 return Some(Ok(msg));
180 }
181
182 if self.exhausted {
183 return None;
184 }
185
186 loop {
187 match self.frames.next() {
188 Some(Ok(frame)) => {
189 if frame.transport == Transport::Udp {
190 return Some(Ok(SipMessage {
191 direction: frame.direction,
192 transport: frame.transport,
193 address: frame.address,
194 timestamp: frame.timestamp,
195 content: frame.content,
196 frame_count: 1,
197 }));
198 }
199
200 let time_secs = frame.timestamp.time_of_day_secs();
201 self.update_time_tracking(time_secs);
202
203 let current_abs = self.current_abs_secs();
204 if current_abs.saturating_sub(self.last_sweep_abs_secs) >= STALE_TIMEOUT_SECS {
205 self.sweep_stale_buffers();
206 self.last_sweep_abs_secs = current_abs;
207 }
208
209 let key = (frame.direction, frame.address);
210
211 let buf = match self.buffers.get_mut(&key) {
212 Some(buf) => buf,
213 None => self.buffers.entry(key.clone()).or_insert(ConnectionBuffer {
214 transport: frame.transport,
215 timestamp: frame.timestamp,
216 content: Vec::new(),
217 frame_count: 0,
218 last_seen_day: self.current_day,
219 last_seen_time_secs: time_secs,
220 }),
221 };
222
223 buf.last_seen_day = self.current_day;
224 buf.last_seen_time_secs = time_secs;
225
226 if buf.content.is_empty() {
227 buf.timestamp = frame.timestamp;
228 }
229
230 trace!(
231 frame = buf.frame_count + 1,
232 bytes = frame.content.len(),
233 address = %key.1,
234 "buffering TCP frame"
235 );
236
237 buf.content.extend_from_slice(&frame.content);
238 buf.frame_count += 1;
239
240 let msgs = extract_complete(buf, &key);
241 self.ready.extend(msgs);
242
243 if buf.frame_count == 0 && buf.content.is_empty() {
244 self.buffers.remove(&key);
245 }
246
247 if let Some(msg) = self.ready.pop_front() {
248 return Some(Ok(msg));
249 }
250 }
251 Some(Err(e)) => return Some(Err(e)),
252 None => {
253 self.exhausted = true;
254 self.flush_all();
255 return self.ready.pop_front().map(Ok);
256 }
257 }
258 }
259 }
260}
261
262fn extract_complete(buf: &mut ConnectionBuffer, key: &(Direction, String)) -> Vec<SipMessage> {
266 let mut messages = Vec::new();
267
268 loop {
269 if buf.content.is_empty() {
270 break;
271 }
272
273 if !is_sip_start(&buf.content) {
275 let ws_len = buf
277 .content
278 .iter()
279 .position(|&b| !matches!(b, b'\r' | b'\n' | b' ' | b'\t'))
280 .unwrap_or(buf.content.len());
281
282 if ws_len > 0 {
283 if ws_len == buf.content.len() {
284 trace!(
285 bytes = ws_len,
286 address = %key.1,
287 "drained transport whitespace"
288 );
289 buf.content.clear();
290 break;
291 }
292 if is_sip_start(&buf.content[ws_len..]) {
293 trace!(bytes = ws_len, "drained inter-message whitespace padding");
294 buf.content.drain(..ws_len);
295 continue;
296 }
297 }
298
299 match find_sip_start(&buf.content) {
300 Some(offset) if offset > 0 => {
301 warn!(
302 skipped_bytes = offset,
303 address = %key.1,
304 "skipped non-SIP prefix in TCP buffer"
305 );
306 buf.content.drain(..offset);
307 continue;
308 }
309 _ => break, }
311 }
312
313 let header_end = match CRLFCRLF.find(&buf.content) {
315 Some(offset) => offset,
316 None => break, };
318 let body_start = header_end + 4;
319
320 let msg_end = match find_content_length(&buf.content) {
321 Some(cl) => {
322 let end = body_start + cl;
323 if end > buf.content.len() {
324 break; }
326 end
327 }
328 None => body_start, };
330
331 let remaining = buf.content.split_off(msg_end);
332 let msg_content = std::mem::replace(&mut buf.content, remaining);
333
334 while buf.content.len() >= 2 && buf.content[0] == b'\r' && buf.content[1] == b'\n' {
336 buf.content.drain(..2);
337 }
338
339 let frame_count = if messages.is_empty() {
340 buf.frame_count
341 } else {
342 0
343 };
344
345 if frame_count > 1 {
346 debug!(
347 frame_count,
348 bytes = msg_content.len(),
349 address = %key.1,
350 "extracted reassembled TCP message"
351 );
352 }
353
354 messages.push(SipMessage {
355 direction: key.0,
356 transport: buf.transport,
357 address: key.1.clone(),
358 timestamp: buf.timestamp,
359 content: msg_content,
360 frame_count,
361 });
362
363 buf.frame_count = 0;
364 }
365
366 messages
367}
368
369fn find_content_length(data: &[u8]) -> Option<usize> {
372 let header_end = CRLFCRLF.find(data)?;
373 let headers = &data[..header_end];
374
375 let mut pos = 0;
376 while pos < headers.len() {
377 let line_end = CRLF.find(&headers[pos..]).unwrap_or(headers.len() - pos);
378 let line = &headers[pos..pos + line_end];
379
380 if let Some(value) = extract_header_value(line, b"Content-Length") {
381 return parse_content_length(value);
382 }
383 if let Some(value) = extract_compact_header_value(line, b'l') {
384 return parse_content_length(value);
385 }
386
387 pos += line_end + 2; }
389 None
390}
391
392fn extract_header_value<'a>(line: &'a [u8], name: &[u8]) -> Option<&'a [u8]> {
393 if line.len() <= name.len() + 1 {
394 return None;
395 }
396 if !line[..name.len()].eq_ignore_ascii_case(name) {
397 return None;
398 }
399 if line[name.len()] != b':' {
400 return None;
401 }
402 Some(trim_bytes(&line[name.len() + 1..]))
403}
404
405fn extract_compact_header_value(line: &[u8], compact: u8) -> Option<&[u8]> {
406 if line.len() < 2 {
407 return None;
408 }
409 if line[0] != compact || line[1] != b':' {
410 return None;
411 }
412 Some(trim_bytes(&line[2..]))
413}
414
415fn trim_bytes(b: &[u8]) -> &[u8] {
416 let start = b
417 .iter()
418 .position(|&c| c != b' ' && c != b'\t')
419 .unwrap_or(b.len());
420 let end = b
421 .iter()
422 .rposition(|&c| c != b' ' && c != b'\t')
423 .map_or(start, |p| p + 1);
424 &b[start..end]
425}
426
427fn parse_content_length(value: &[u8]) -> Option<usize> {
428 let s = std::str::from_utf8(value).ok()?;
429 s.parse().ok()
430}
431
432fn is_sip_start(data: &[u8]) -> bool {
434 if data.starts_with(b"SIP/2.0 ") {
435 return true;
436 }
437 const METHODS: &[&[u8]] = &[
438 b"INVITE ",
439 b"ACK ",
440 b"BYE ",
441 b"CANCEL ",
442 b"OPTIONS ",
443 b"REGISTER ",
444 b"PRACK ",
445 b"SUBSCRIBE ",
446 b"NOTIFY ",
447 b"PUBLISH ",
448 b"INFO ",
449 b"REFER ",
450 b"MESSAGE ",
451 b"UPDATE ",
452 ];
453 for method in METHODS {
454 if data.starts_with(method) {
455 return true;
456 }
457 }
458 false
459}
460
461fn find_sip_start(data: &[u8]) -> Option<usize> {
463 if is_sip_start(data) {
464 return Some(0);
465 }
466 let mut pos = 0;
467 while let Some(offset) = CRLF.find(&data[pos..]) {
468 let candidate = pos + offset + 2;
469 if candidate >= data.len() {
470 break;
471 }
472 if is_sip_start(&data[candidate..]) {
473 return Some(candidate);
474 }
475 pos = candidate;
476 }
477 None
478}
479
480#[cfg(test)]
481mod tests {
482 use super::*;
483 use crate::types::Direction;
484
485 fn make_frame(
486 direction: Direction,
487 transport: Transport,
488 addr: &str,
489 content: &[u8],
490 ) -> Vec<u8> {
491 let dir_str = match direction {
492 Direction::Recv => "recv",
493 Direction::Sent => "sent",
494 };
495 let prep = match direction {
496 Direction::Recv => "from",
497 Direction::Sent => "to",
498 };
499 let transport_str = match transport {
500 Transport::Tcp => "tcp",
501 Transport::Udp => "udp",
502 Transport::Tls => "tls",
503 Transport::Wss => "wss",
504 };
505 let header = format!(
506 "{dir_str} {} bytes {prep} {transport_str}/{addr} at 00:00:00.000000:\n",
507 content.len()
508 );
509 let mut data = header.into_bytes();
510 data.extend_from_slice(content);
511 data.extend_from_slice(b"\x0B\n");
512 data
513 }
514
515 #[test]
516 fn single_udp_message() {
517 let content = b"OPTIONS sip:user@host SIP/2.0\r\nContent-Length: 0\r\n\r\n";
518 let data = make_frame(Direction::Recv, Transport::Udp, "1.1.1.1:5060", content);
519 let msgs: Vec<SipMessage> = MessageIterator::new(&data[..])
520 .collect::<Result<Vec<_>, _>>()
521 .unwrap();
522 assert_eq!(msgs.len(), 1);
523 assert_eq!(msgs[0].content, content);
524 assert_eq!(msgs[0].frame_count, 1);
525 assert_eq!(msgs[0].transport, Transport::Udp);
526 }
527
528 #[test]
529 fn tcp_reassembly_two_frames() {
530 let part1 = b"NOTIFY sip:user@host SIP/2.0\r\n";
531 let part2 = b"Content-Length: 0\r\n\r\n";
532 let mut data = make_frame(Direction::Recv, Transport::Tcp, "[::1]:5060", part1);
533 data.extend_from_slice(&make_frame(
534 Direction::Recv,
535 Transport::Tcp,
536 "[::1]:5060",
537 part2,
538 ));
539 let msgs: Vec<SipMessage> = MessageIterator::new(&data[..])
540 .collect::<Result<Vec<_>, _>>()
541 .unwrap();
542 assert_eq!(msgs.len(), 1);
543 assert_eq!(msgs[0].frame_count, 2);
544 let mut expected = Vec::new();
545 expected.extend_from_slice(part1);
546 expected.extend_from_slice(part2);
547 assert_eq!(msgs[0].content, expected);
548 }
549
550 #[test]
551 fn tcp_reassembly_across_interleaved_frames() {
552 let part1 = b"INVITE sip:user@host SIP/2.0\r\n";
556 let part2 = b"Content-Length: 3\r\n\r\nSDP";
557 let response = b"SIP/2.0 100 Trying\r\nContent-Length: 0\r\n\r\n";
558
559 let mut data = make_frame(Direction::Recv, Transport::Tcp, "10.0.0.1:5060", part1);
560 data.extend_from_slice(&make_frame(
561 Direction::Sent,
562 Transport::Tcp,
563 "10.0.0.1:5060",
564 response,
565 ));
566 data.extend_from_slice(&make_frame(
567 Direction::Recv,
568 Transport::Tcp,
569 "10.0.0.1:5060",
570 part2,
571 ));
572
573 let msgs: Vec<SipMessage> = MessageIterator::new(&data[..])
574 .collect::<Result<Vec<_>, _>>()
575 .unwrap();
576
577 assert_eq!(msgs.len(), 2);
578
579 let trying = &msgs[0];
581 assert_eq!(trying.direction, Direction::Sent);
582 assert_eq!(trying.content, response);
583
584 let invite = &msgs[1];
586 assert_eq!(invite.direction, Direction::Recv);
587 let mut expected_invite = Vec::new();
588 expected_invite.extend_from_slice(part1);
589 expected_invite.extend_from_slice(part2);
590 assert_eq!(invite.content, expected_invite);
591 }
592
593 #[test]
594 fn tcp_reassembly_interleaved_different_addresses() {
595 let a_part1 = b"INVITE sip:user@host SIP/2.0\r\n";
602 let a_part2 = b"Content-Length: 3\r\n\r\nSDP";
603 let b_part1 = b"NOTIFY sip:user@host SIP/2.0\r\n";
604 let b_part2 = b"Content-Length: 4\r\n\r\nBODY";
605
606 let mut data = make_frame(Direction::Recv, Transport::Tcp, "[::1]:5060", a_part1);
607 data.extend_from_slice(&make_frame(
608 Direction::Recv,
609 Transport::Tcp,
610 "[::2]:5060",
611 b_part1,
612 ));
613 data.extend_from_slice(&make_frame(
614 Direction::Recv,
615 Transport::Tcp,
616 "[::1]:5060",
617 a_part2,
618 ));
619 data.extend_from_slice(&make_frame(
620 Direction::Recv,
621 Transport::Tcp,
622 "[::2]:5060",
623 b_part2,
624 ));
625
626 let msgs: Vec<SipMessage> = MessageIterator::new(&data[..])
627 .collect::<Result<Vec<_>, _>>()
628 .unwrap();
629
630 assert_eq!(msgs.len(), 2);
631
632 assert_eq!(msgs[0].address, "[::1]:5060");
634 assert_eq!(msgs[0].frame_count, 2);
635 let mut expected_a = Vec::new();
636 expected_a.extend_from_slice(a_part1);
637 expected_a.extend_from_slice(a_part2);
638 assert_eq!(msgs[0].content, expected_a);
639
640 assert_eq!(msgs[1].address, "[::2]:5060");
642 assert_eq!(msgs[1].frame_count, 2);
643 let mut expected_b = Vec::new();
644 expected_b.extend_from_slice(b_part1);
645 expected_b.extend_from_slice(b_part2);
646 assert_eq!(msgs[1].content, expected_b);
647 }
648
649 #[test]
650 fn direction_change_splits_messages() {
651 let recv_content = b"OPTIONS sip:user@host SIP/2.0\r\nContent-Length: 0\r\n\r\n";
652 let sent_content = b"SIP/2.0 200 OK\r\nContent-Length: 0\r\n\r\n";
653 let mut data = make_frame(Direction::Recv, Transport::Tcp, "[::1]:5060", recv_content);
654 data.extend_from_slice(&make_frame(
655 Direction::Sent,
656 Transport::Tcp,
657 "[::1]:5060",
658 sent_content,
659 ));
660 let msgs: Vec<SipMessage> = MessageIterator::new(&data[..])
661 .collect::<Result<Vec<_>, _>>()
662 .unwrap();
663 assert_eq!(msgs.len(), 2);
664 assert_eq!(msgs[0].direction, Direction::Recv);
665 assert_eq!(msgs[1].direction, Direction::Sent);
666 }
667
668 #[test]
669 fn address_change_splits_messages() {
670 let content = b"OPTIONS sip:user@host SIP/2.0\r\nContent-Length: 0\r\n\r\n";
671 let mut data = make_frame(Direction::Recv, Transport::Tcp, "[::1]:5060", content);
672 data.extend_from_slice(&make_frame(
673 Direction::Recv,
674 Transport::Tcp,
675 "[::2]:5060",
676 content,
677 ));
678 let msgs: Vec<SipMessage> = MessageIterator::new(&data[..])
679 .collect::<Result<Vec<_>, _>>()
680 .unwrap();
681 assert_eq!(msgs.len(), 2);
682 assert_eq!(msgs[0].address, "[::1]:5060");
683 assert_eq!(msgs[1].address, "[::2]:5060");
684 }
685
686 #[test]
687 fn udp_no_reassembly() {
688 let content1 = b"OPTIONS sip:a SIP/2.0\r\nContent-Length: 0\r\n\r\n";
689 let content2 = b"OPTIONS sip:b SIP/2.0\r\nContent-Length: 0\r\n\r\n";
690 let mut data = make_frame(Direction::Recv, Transport::Udp, "1.1.1.1:5060", content1);
691 data.extend_from_slice(&make_frame(
692 Direction::Recv,
693 Transport::Udp,
694 "1.1.1.1:5060",
695 content2,
696 ));
697 let msgs: Vec<SipMessage> = MessageIterator::new(&data[..])
698 .collect::<Result<Vec<_>, _>>()
699 .unwrap();
700 assert_eq!(msgs.len(), 2, "UDP frames should not be reassembled");
701 assert_eq!(msgs[0].frame_count, 1);
702 assert_eq!(msgs[1].frame_count, 1);
703 }
704
705 #[test]
706 fn aggregated_messages_split_by_content_length() {
707 let msg1 = b"NOTIFY sip:a SIP/2.0\r\nContent-Length: 5\r\n\r\nhello";
708 let msg2 = b"SIP/2.0 200 OK\r\nContent-Length: 0\r\n\r\n";
709 let mut combined = Vec::new();
710 combined.extend_from_slice(msg1);
711 combined.extend_from_slice(msg2);
712 let data = make_frame(Direction::Recv, Transport::Tcp, "[::1]:5060", &combined);
713 let msgs: Vec<SipMessage> = MessageIterator::new(&data[..])
714 .collect::<Result<Vec<_>, _>>()
715 .unwrap();
716 assert_eq!(msgs.len(), 2);
717 assert_eq!(msgs[0].content, msg1);
718 assert_eq!(msgs[1].content, msg2);
719 }
720
721 #[test]
722 fn find_content_length_standard() {
723 let data = b"NOTIFY sip:a SIP/2.0\r\nContent-Length: 42\r\n\r\n";
724 assert_eq!(find_content_length(data), Some(42));
725 }
726
727 #[test]
728 fn find_content_length_compact() {
729 let data = b"NOTIFY sip:a SIP/2.0\r\nl: 42\r\n\r\n";
730 assert_eq!(find_content_length(data), Some(42));
731 }
732
733 #[test]
734 fn find_content_length_missing() {
735 let data = b"NOTIFY sip:a SIP/2.0\r\nCSeq: 1 NOTIFY\r\n\r\n";
736 assert_eq!(find_content_length(data), None);
737 }
738
739 #[test]
740 fn is_sip_start_request() {
741 assert!(is_sip_start(b"INVITE sip:user@host SIP/2.0\r\n"));
742 assert!(is_sip_start(b"OPTIONS sip:user@host SIP/2.0\r\n"));
743 assert!(is_sip_start(b"NOTIFY sip:user@host SIP/2.0\r\n"));
744 assert!(is_sip_start(b"ACK sip:user@host SIP/2.0\r\n"));
745 }
746
747 #[test]
748 fn is_sip_start_response() {
749 assert!(is_sip_start(b"SIP/2.0 200 OK\r\n"));
750 assert!(is_sip_start(b"SIP/2.0 100 Trying\r\n"));
751 }
752
753 #[test]
754 fn is_sip_start_not_sip() {
755 assert!(!is_sip_start(b"some random data"));
756 assert!(!is_sip_start(b"HTTP/1.1 200 OK\r\n"));
757 }
758
759 #[test]
760 fn find_sip_start_at_beginning() {
761 let data = b"INVITE sip:user@host SIP/2.0\r\nContent-Length: 0\r\n\r\n";
762 assert_eq!(find_sip_start(data), Some(0));
763 }
764
765 #[test]
766 fn find_sip_start_after_prefix() {
767 let data = b"</xml>\r\nNOTIFY sip:user@host SIP/2.0\r\n";
768 assert_eq!(find_sip_start(data), Some(8));
769 }
770
771 #[test]
772 fn find_sip_start_none() {
773 let data = b"no SIP here at all";
774 assert_eq!(find_sip_start(data), None);
775 }
776
777 #[test]
778 fn message_preserves_metadata() {
779 let content = b"OPTIONS sip:user@host SIP/2.0\r\nContent-Length: 0\r\n\r\n";
780 let data = make_frame(
781 Direction::Sent,
782 Transport::Tls,
783 "[2001:db8::1]:5061",
784 content,
785 );
786 let msgs: Vec<SipMessage> = MessageIterator::new(&data[..])
787 .collect::<Result<Vec<_>, _>>()
788 .unwrap();
789 assert_eq!(msgs.len(), 1);
790 assert_eq!(msgs[0].direction, Direction::Sent);
791 assert_eq!(msgs[0].transport, Transport::Tls);
792 assert_eq!(msgs[0].address, "[2001:db8::1]:5061");
793 assert_eq!(
794 msgs[0].timestamp,
795 Timestamp::TimeOnly {
796 hour: 0,
797 min: 0,
798 sec: 0,
799 usec: 0
800 }
801 );
802 }
803
804 #[test]
805 fn extract_handles_crlf_between_messages() {
806 let msg1 = b"NOTIFY sip:a SIP/2.0\r\nContent-Length: 5\r\n\r\nhello";
807 let msg2 = b"SIP/2.0 200 OK\r\nContent-Length: 0\r\n\r\n";
808 let mut content = Vec::new();
809 content.extend_from_slice(msg1);
810 content.extend_from_slice(b"\r\n");
811 content.extend_from_slice(msg2);
812
813 let key = (Direction::Recv, "[::1]:5060".to_string());
814 let mut buf = ConnectionBuffer {
815 transport: Transport::Tcp,
816 timestamp: Timestamp::TimeOnly {
817 hour: 0,
818 min: 0,
819 sec: 0,
820 usec: 0,
821 },
822 content,
823 frame_count: 1,
824 last_seen_day: 0,
825 last_seen_time_secs: 0,
826 };
827 let msgs = extract_complete(&mut buf, &key);
828 assert_eq!(msgs.len(), 2);
829 assert_eq!(msgs[0].content, msg1);
830 assert_eq!(msgs[1].content, msg2);
831 }
832
833 #[test]
834 fn extract_skips_non_sip_prefix() {
835 let prefix = b"</conference-info>\r\n";
836 let msg = b"NOTIFY sip:a SIP/2.0\r\nContent-Length: 0\r\n\r\n";
837 let mut content = Vec::new();
838 content.extend_from_slice(prefix);
839 content.extend_from_slice(msg);
840
841 let key = (Direction::Recv, "[::1]:5060".to_string());
842 let mut buf = ConnectionBuffer {
843 transport: Transport::Tcp,
844 timestamp: Timestamp::TimeOnly {
845 hour: 0,
846 min: 0,
847 sec: 0,
848 usec: 0,
849 },
850 content,
851 frame_count: 1,
852 last_seen_day: 0,
853 last_seen_time_secs: 0,
854 };
855 let msgs = extract_complete(&mut buf, &key);
856 assert_eq!(msgs.len(), 1);
857 assert_eq!(msgs[0].content, msg);
858 }
859
860 #[test]
861 fn extract_waits_for_incomplete_body() {
862 let content = b"INVITE sip:a SIP/2.0\r\nContent-Length: 100\r\n\r\npartial".to_vec();
864
865 let key = (Direction::Recv, "[::1]:5060".to_string());
866 let mut buf = ConnectionBuffer {
867 transport: Transport::Tcp,
868 timestamp: Timestamp::TimeOnly {
869 hour: 0,
870 min: 0,
871 sec: 0,
872 usec: 0,
873 },
874 content,
875 frame_count: 1,
876 last_seen_day: 0,
877 last_seen_time_secs: 0,
878 };
879 let msgs = extract_complete(&mut buf, &key);
880 assert!(msgs.is_empty(), "should wait for body to complete");
881 assert!(!buf.content.is_empty(), "buffer should retain data");
882 }
883
884 #[test]
885 fn extract_waits_for_incomplete_headers() {
886 let content = b"INVITE sip:a SIP/2.0\r\nContent-Length: 0\r\n".to_vec();
888
889 let key = (Direction::Recv, "[::1]:5060".to_string());
890 let mut buf = ConnectionBuffer {
891 transport: Transport::Tcp,
892 timestamp: Timestamp::TimeOnly {
893 hour: 0,
894 min: 0,
895 sec: 0,
896 usec: 0,
897 },
898 content,
899 frame_count: 1,
900 last_seen_day: 0,
901 last_seen_time_secs: 0,
902 };
903 let msgs = extract_complete(&mut buf, &key);
904 assert!(msgs.is_empty(), "should wait for headers to complete");
905 }
906
907 #[test]
908 fn tcp_body_split_across_five_frames() {
909 let body_len: usize = 6424;
912 let body: Vec<u8> = (0..body_len).map(|i| b'A' + (i % 26) as u8).collect();
913
914 let mut headers = Vec::new();
915 headers.extend_from_slice(b"NOTIFY sip:user@host SIP/2.0\r\n");
916 headers
917 .extend_from_slice(b"Via: SIP/2.0/TCP [2001:4958:10:11::6]:45538;branch=z9hG4bK-1\r\n");
918 headers.extend_from_slice(b"Call-ID: fragmented-notify@host\r\n");
919 headers.extend_from_slice(b"CSeq: 1 NOTIFY\r\n");
920 headers.extend_from_slice(
921 b"Content-Type: application/emergencyCallData.AbandonedCall+json\r\n",
922 );
923 headers.extend_from_slice(format!("Content-Length: {body_len}\r\n").as_bytes());
924 headers.extend_from_slice(b"\r\n");
925
926 let mut full_content = headers.clone();
927 full_content.extend_from_slice(&body);
928
929 let frame1_len = 1500.min(full_content.len());
931 let remaining = &full_content[frame1_len..];
932 let frame2_len = 1428.min(remaining.len());
933 let remaining = &remaining[frame2_len..];
934 let frame3_len = 1428.min(remaining.len());
935 let remaining = &remaining[frame3_len..];
936 let frame4_len = 1428.min(remaining.len());
937 let remaining = &remaining[frame4_len..];
938 let frame5_len = remaining.len();
939
940 let addr = "[2001:4958:10:11::6]:45538";
941 let mut data = make_frame(
942 Direction::Recv,
943 Transport::Tcp,
944 addr,
945 &full_content[..frame1_len],
946 );
947 let mut offset = frame1_len;
948 for len in [frame2_len, frame3_len, frame4_len, frame5_len] {
949 data.extend_from_slice(&make_frame(
950 Direction::Recv,
951 Transport::Tcp,
952 addr,
953 &full_content[offset..offset + len],
954 ));
955 offset += len;
956 }
957 assert_eq!(offset, full_content.len());
958
959 let msgs: Vec<SipMessage> = MessageIterator::new(&data[..])
960 .collect::<Result<Vec<_>, _>>()
961 .unwrap();
962
963 assert_eq!(
964 msgs.len(),
965 1,
966 "should produce exactly one reassembled message"
967 );
968 assert_eq!(msgs[0].frame_count, 5, "should track all 5 frames");
969 assert_eq!(
970 msgs[0].content, full_content,
971 "content should be fully reassembled"
972 );
973 assert_eq!(msgs[0].direction, Direction::Recv);
974 assert_eq!(msgs[0].address, addr);
975 }
976
977 #[test]
978 fn parse_stats_delegates() {
979 let content = b"OPTIONS sip:user@host SIP/2.0\r\nContent-Length: 0\r\n\r\n";
980 let data = make_frame(Direction::Recv, Transport::Udp, "1.1.1.1:5060", content);
981 let mut iter = MessageIterator::new(&data[..]);
982 let msgs: Vec<_> = iter.by_ref().collect::<Result<Vec<_>, _>>().unwrap();
983 assert_eq!(msgs.len(), 1);
984 let stats = iter.parse_stats();
985 assert_eq!(stats.bytes_read, data.len() as u64);
986 assert_eq!(stats.bytes_skipped, 0);
987 }
988
989 #[test]
990 fn tcp_body_split_parsed_message() {
991 let body_len: usize = 6424;
993 let body: Vec<u8> = (0..body_len).map(|i| b'A' + (i % 26) as u8).collect();
994
995 let mut headers = Vec::new();
996 headers.extend_from_slice(b"NOTIFY sip:user@host SIP/2.0\r\n");
997 headers.extend_from_slice(b"Call-ID: fragmented-parsed@host\r\n");
998 headers.extend_from_slice(b"CSeq: 1 NOTIFY\r\n");
999 headers.extend_from_slice(
1000 b"Content-Type: application/emergencyCallData.AbandonedCall+json\r\n",
1001 );
1002 headers.extend_from_slice(format!("Content-Length: {body_len}\r\n").as_bytes());
1003 headers.extend_from_slice(b"\r\n");
1004
1005 let mut full_content = headers.clone();
1006 full_content.extend_from_slice(&body);
1007
1008 let split1 = 1500.min(full_content.len());
1010 let split2 = (split1 + 3000).min(full_content.len());
1011
1012 let addr = "[2001:db8::1]:5060";
1013 let mut data = make_frame(
1014 Direction::Recv,
1015 Transport::Tcp,
1016 addr,
1017 &full_content[..split1],
1018 );
1019 data.extend_from_slice(&make_frame(
1020 Direction::Recv,
1021 Transport::Tcp,
1022 addr,
1023 &full_content[split1..split2],
1024 ));
1025 data.extend_from_slice(&make_frame(
1026 Direction::Recv,
1027 Transport::Tcp,
1028 addr,
1029 &full_content[split2..],
1030 ));
1031
1032 let parsed: Vec<crate::types::ParsedSipMessage> =
1033 crate::sip::ParsedMessageIterator::new(&data[..])
1034 .collect::<Result<Vec<_>, _>>()
1035 .unwrap();
1036
1037 assert_eq!(parsed.len(), 1, "should produce one parsed message");
1038 assert_eq!(parsed[0].content_length(), Some(body_len));
1039 assert_eq!(parsed[0].body.len(), body_len, "body should be complete");
1040 assert_eq!(parsed[0].body, body, "body content should match");
1041 assert_eq!(parsed[0].frame_count, 3);
1042 assert_eq!(parsed[0].method(), Some("NOTIFY"));
1043 }
1044
1045 #[test]
1046 fn tls_keepalive_single_lf_drained() {
1047 let data = make_frame(Direction::Recv, Transport::Tls, "[10.0.0.1]:5061", b"\n");
1048 let msgs: Vec<SipMessage> = MessageIterator::new(&data[..])
1049 .collect::<Result<Vec<_>, _>>()
1050 .unwrap();
1051 assert_eq!(msgs.len(), 0, "keep-alive \\n should produce no messages");
1052 }
1053
1054 #[test]
1055 fn tls_keepalive_multiple_lf_drained() {
1056 let addr = "[10.0.0.1]:5061";
1057 let mut data = make_frame(Direction::Recv, Transport::Tls, addr, b"\n");
1058 data.extend_from_slice(&make_frame(Direction::Recv, Transport::Tls, addr, b"\n"));
1059 data.extend_from_slice(&make_frame(Direction::Recv, Transport::Tls, addr, b"\n"));
1060 let msgs: Vec<SipMessage> = MessageIterator::new(&data[..])
1061 .collect::<Result<Vec<_>, _>>()
1062 .unwrap();
1063 assert_eq!(
1064 msgs.len(),
1065 0,
1066 "multiple keep-alive \\n should produce no messages"
1067 );
1068 }
1069
1070 #[test]
1071 fn tls_keepalive_interleaved_with_sip() {
1072 let addr = "[10.0.0.1]:5061";
1073 let sip = b"OPTIONS sip:host SIP/2.0\r\nContent-Length: 0\r\n\r\n";
1074 let mut data = make_frame(Direction::Recv, Transport::Tls, addr, b"\n");
1075 data.extend_from_slice(&make_frame(Direction::Recv, Transport::Tls, addr, sip));
1076 data.extend_from_slice(&make_frame(Direction::Recv, Transport::Tls, addr, b"\n"));
1077 let msgs: Vec<SipMessage> = MessageIterator::new(&data[..])
1078 .collect::<Result<Vec<_>, _>>()
1079 .unwrap();
1080 assert_eq!(msgs.len(), 1, "only the SIP message should be emitted");
1081 assert_eq!(msgs[0].content, sip);
1082 }
1083
1084 #[test]
1085 fn tls_bare_lf_before_sip_start() {
1086 let addr = "[10.0.0.1]:5061";
1087 let sip_part1 = b"\nOPTIONS sip:host SIP/2.0\r\n";
1088 let sip_part2 = b"Content-Length: 0\r\n\r\n";
1089 let mut data = make_frame(Direction::Recv, Transport::Tls, addr, sip_part1);
1090 data.extend_from_slice(&make_frame(
1091 Direction::Recv,
1092 Transport::Tls,
1093 addr,
1094 sip_part2,
1095 ));
1096 let msgs: Vec<SipMessage> = MessageIterator::new(&data[..])
1097 .collect::<Result<Vec<_>, _>>()
1098 .unwrap();
1099 assert_eq!(msgs.len(), 1, "SIP after bare LF should be extracted");
1100 assert!(
1101 msgs[0].content.starts_with(b"OPTIONS"),
1102 "message should start with SIP method, not \\n"
1103 );
1104 }
1105
1106 fn make_frame_at(
1107 direction: Direction,
1108 transport: Transport,
1109 addr: &str,
1110 content: &[u8],
1111 timestamp: &str,
1112 ) -> Vec<u8> {
1113 let dir_str = match direction {
1114 Direction::Recv => "recv",
1115 Direction::Sent => "sent",
1116 };
1117 let prep = match direction {
1118 Direction::Recv => "from",
1119 Direction::Sent => "to",
1120 };
1121 let transport_str = match transport {
1122 Transport::Tcp => "tcp",
1123 Transport::Udp => "udp",
1124 Transport::Tls => "tls",
1125 Transport::Wss => "wss",
1126 };
1127 let header = format!(
1128 "{dir_str} {} bytes {prep} {transport_str}/{addr} at {timestamp}:\n",
1129 content.len()
1130 );
1131 let mut data = header.into_bytes();
1132 data.extend_from_slice(content);
1133 data.extend_from_slice(b"\x0B\n");
1134 data
1135 }
1136
1137 #[test]
1138 fn empty_buffer_removed_after_complete_message() {
1139 let sip = b"OPTIONS sip:host SIP/2.0\r\nContent-Length: 0\r\n\r\n";
1140 let addr_a = "[::1]:5060";
1141 let addr_b = "[::2]:5060";
1142
1143 let mut data = make_frame(Direction::Recv, Transport::Tcp, addr_a, sip);
1144 data.extend_from_slice(&make_frame(Direction::Recv, Transport::Tcp, addr_b, sip));
1145
1146 let mut iter = MessageIterator::new(&data[..]);
1147 let msgs: Vec<SipMessage> = iter.by_ref().collect::<Result<Vec<_>, _>>().unwrap();
1148 assert_eq!(msgs.len(), 2);
1149 assert!(
1150 iter.buffers.is_empty(),
1151 "all buffers should be removed after complete messages are extracted"
1152 );
1153 }
1154
1155 #[test]
1156 fn stale_buffer_evicted_after_timeout() {
1157 let sip = b"OPTIONS sip:host SIP/2.0\r\nContent-Length: 0\r\n\r\n";
1158 let partial = b"INVITE sip:host SIP/2.0\r\n";
1159
1160 let mut data = Vec::new();
1161 data.extend_from_slice(&make_frame_at(
1163 Direction::Recv,
1164 Transport::Tls,
1165 "[::99]:44444",
1166 partial,
1167 "2026-02-16 10:00:00.000000",
1168 ));
1169 data.extend_from_slice(&make_frame_at(
1171 Direction::Recv,
1172 Transport::Tls,
1173 "[::1]:5060",
1174 sip,
1175 "2026-02-16 12:00:01.000000",
1176 ));
1177
1178 let mut iter = MessageIterator::new(&data[..]);
1179 let msgs: Vec<SipMessage> = iter.by_ref().collect::<Result<Vec<_>, _>>().unwrap();
1180
1181 assert_eq!(msgs.len(), 1, "should produce the complete message");
1182 assert_eq!(msgs[0].address, "[::1]:5060");
1183 assert!(
1184 iter.buffers.is_empty(),
1185 "stale buffer for [::99]:44444 should have been evicted"
1186 );
1187 }
1188
1189 #[test]
1190 fn day_rollover_detection_with_time_only() {
1191 let sip = b"OPTIONS sip:host SIP/2.0\r\nContent-Length: 0\r\n\r\n";
1192 let partial = b"INVITE sip:host SIP/2.0\r\n";
1193
1194 let mut data = Vec::new();
1195 data.extend_from_slice(&make_frame_at(
1197 Direction::Recv,
1198 Transport::Tcp,
1199 "[::99]:44444",
1200 partial,
1201 "23:59:00.000000",
1202 ));
1203 data.extend_from_slice(&make_frame_at(
1205 Direction::Recv,
1206 Transport::Tcp,
1207 "[::1]:5060",
1208 sip,
1209 "02:00:01.000000",
1210 ));
1211
1212 let mut iter = MessageIterator::new(&data[..]);
1213 let msgs: Vec<SipMessage> = iter.by_ref().collect::<Result<Vec<_>, _>>().unwrap();
1214
1215 assert_eq!(msgs.len(), 1);
1216 assert_eq!(msgs[0].address, "[::1]:5060");
1217 assert_eq!(iter.current_day, 1, "should have detected one day rollover");
1218 assert!(
1219 iter.buffers.is_empty(),
1220 "stale buffer should have been evicted after day rollover"
1221 );
1222 }
1223
1224 #[test]
1225 fn flush_all_clears_buffers() {
1226 let partial = b"INVITE sip:host SIP/2.0\r\n";
1227 let data = make_frame(Direction::Recv, Transport::Tcp, "[::1]:5060", partial);
1228
1229 let mut iter = MessageIterator::new(&data[..]);
1230 let msgs: Vec<SipMessage> = iter.by_ref().collect::<Result<Vec<_>, _>>().unwrap();
1231 assert_eq!(msgs.len(), 1, "partial should be flushed at EOF");
1232 assert!(
1233 iter.buffers.is_empty(),
1234 "flush_all should clear the HashMap"
1235 );
1236 }
1237}