1use std::collections::{HashMap, VecDeque};
2use std::sync::LazyLock;
3
4use memchr::memmem;
5use tracing::{debug, trace, warn};
6
7use crate::frame::{FrameIterator, ParseError};
8use crate::types::{
9 Direction, ParseStats, SipMessage, SkipTracking, Timestamp, Transport, UnparsedRegion,
10};
11
12static CRLF: LazyLock<memmem::Finder<'static>> = LazyLock::new(|| memmem::Finder::new(b"\r\n"));
13static CRLFCRLF: LazyLock<memmem::Finder<'static>> =
14 LazyLock::new(|| memmem::Finder::new(b"\r\n\r\n"));
15
16const STALE_TIMEOUT_SECS: u64 = 7200;
21
22pub struct MessageIterator<R> {
23 frames: FrameIterator<R>,
24 buffers: HashMap<(Direction, String), ConnectionBuffer>,
25 ready: VecDeque<SipMessage>,
26 exhausted: bool,
27 current_day: u32,
28 last_time_secs: u32,
29 last_sweep_abs_secs: u64,
30}
31
32struct ConnectionBuffer {
33 transport: Transport,
34 timestamp: Timestamp,
35 content: Vec<u8>,
36 frame_count: usize,
37 last_seen_day: u32,
38 last_seen_time_secs: u32,
39}
40
41impl<R: std::io::Read> MessageIterator<R> {
42 pub fn new(reader: R) -> Self {
43 MessageIterator {
44 frames: FrameIterator::new(reader),
45 buffers: HashMap::new(),
46 ready: VecDeque::new(),
47 exhausted: false,
48 current_day: 0,
49 last_time_secs: 0,
50 last_sweep_abs_secs: 0,
51 }
52 }
53
54 pub fn capture_skipped(mut self, enable: bool) -> Self {
55 self.frames = self.frames.capture_skipped(enable);
56 self
57 }
58
59 pub fn skip_tracking(mut self, tracking: SkipTracking) -> Self {
60 self.frames = self.frames.skip_tracking(tracking);
61 self
62 }
63
64 pub fn parse_stats(&self) -> &ParseStats {
65 self.frames.stats()
66 }
67
68 pub fn parse_stats_mut(&mut self) -> &mut ParseStats {
69 self.frames.stats_mut()
70 }
71
72 pub fn drain_unparsed(&mut self) -> Vec<UnparsedRegion> {
73 self.frames.drain_unparsed()
74 }
75
76 fn update_time_tracking(&mut self, time_secs: u32) {
77 if time_secs < self.last_time_secs && self.last_time_secs - time_secs > 43200 {
78 self.current_day += 1;
79 debug!(
80 day = self.current_day,
81 prev_secs = self.last_time_secs,
82 curr_secs = time_secs,
83 "detected day rollover"
84 );
85 }
86 self.last_time_secs = time_secs;
87 }
88
89 fn current_abs_secs(&self) -> u64 {
90 self.current_day as u64 * 86400 + self.last_time_secs as u64
91 }
92
93 fn sweep_stale_buffers(&mut self) {
94 let current_abs = self.current_abs_secs();
95 self.buffers.retain(|key, buf| {
96 let buf_abs = buf.last_seen_day as u64 * 86400 + buf.last_seen_time_secs as u64;
97 let elapsed = current_abs.saturating_sub(buf_abs);
98 if elapsed > STALE_TIMEOUT_SECS {
99 if buf.content.is_empty() {
100 trace!(
101 address = %key.1,
102 direction = %key.0,
103 elapsed_secs = elapsed,
104 "evicted empty stale connection buffer"
105 );
106 } else {
107 warn!(
108 address = %key.1,
109 direction = %key.0,
110 elapsed_secs = elapsed,
111 pending_bytes = buf.content.len(),
112 "evicted stale connection buffer with incomplete data"
113 );
114 }
115 return false;
116 }
117 true
118 });
119 }
120
121 fn flush_all(&mut self) {
122 let keys: Vec<_> = self.buffers.keys().cloned().collect();
123 for key in keys {
124 if let Some(buf) = self.buffers.get_mut(&key) {
125 let msgs = extract_complete(buf, &key);
126 self.ready.extend(msgs);
127
128 if !buf.content.is_empty() {
129 let content = std::mem::take(&mut buf.content);
130 self.ready.push_back(SipMessage {
131 direction: key.0,
132 transport: buf.transport,
133 address: key.1.clone(),
134 timestamp: buf.timestamp,
135 content,
136 frame_count: buf.frame_count,
137 });
138 buf.frame_count = 0;
139 }
140 }
141 }
142 self.buffers.clear();
143 }
144}
145
146impl<R: std::io::Read> Iterator for MessageIterator<R> {
147 type Item = Result<SipMessage, ParseError>;
148
149 fn next(&mut self) -> Option<Self::Item> {
150 if let Some(msg) = self.ready.pop_front() {
151 return Some(Ok(msg));
152 }
153
154 if self.exhausted {
155 return None;
156 }
157
158 loop {
159 match self.frames.next() {
160 Some(Ok(frame)) => {
161 if frame.transport == Transport::Udp {
162 return Some(Ok(SipMessage {
163 direction: frame.direction,
164 transport: frame.transport,
165 address: frame.address,
166 timestamp: frame.timestamp,
167 content: frame.content,
168 frame_count: 1,
169 }));
170 }
171
172 let time_secs = frame.timestamp.time_of_day_secs();
173 self.update_time_tracking(time_secs);
174
175 let current_abs = self.current_abs_secs();
176 if current_abs.saturating_sub(self.last_sweep_abs_secs) >= STALE_TIMEOUT_SECS {
177 self.sweep_stale_buffers();
178 self.last_sweep_abs_secs = current_abs;
179 }
180
181 let key = (frame.direction, frame.address.clone());
182
183 let buf = self
184 .buffers
185 .entry(key.clone())
186 .or_insert_with(|| ConnectionBuffer {
187 transport: frame.transport,
188 timestamp: frame.timestamp,
189 content: Vec::new(),
190 frame_count: 0,
191 last_seen_day: self.current_day,
192 last_seen_time_secs: time_secs,
193 });
194
195 buf.last_seen_day = self.current_day;
196 buf.last_seen_time_secs = time_secs;
197
198 if buf.content.is_empty() {
199 buf.timestamp = frame.timestamp;
200 }
201
202 trace!(
203 frame = buf.frame_count + 1,
204 bytes = frame.content.len(),
205 address = %key.1,
206 "buffering TCP frame"
207 );
208
209 buf.content.extend_from_slice(&frame.content);
210 buf.frame_count += 1;
211
212 let msgs = extract_complete(buf, &key);
213 self.ready.extend(msgs);
214
215 if buf.frame_count == 0 && buf.content.is_empty() {
216 self.buffers.remove(&key);
217 }
218
219 if let Some(msg) = self.ready.pop_front() {
220 return Some(Ok(msg));
221 }
222 }
223 Some(Err(e)) => return Some(Err(e)),
224 None => {
225 self.exhausted = true;
226 self.flush_all();
227 return self.ready.pop_front().map(Ok);
228 }
229 }
230 }
231 }
232}
233
234fn extract_complete(buf: &mut ConnectionBuffer, key: &(Direction, String)) -> Vec<SipMessage> {
238 let mut messages = Vec::new();
239
240 loop {
241 if buf.content.is_empty() {
242 break;
243 }
244
245 if !is_sip_start(&buf.content) {
247 let ws_len = buf
249 .content
250 .iter()
251 .position(|&b| !matches!(b, b'\r' | b'\n' | b' ' | b'\t'))
252 .unwrap_or(buf.content.len());
253
254 if ws_len > 0 {
255 if ws_len == buf.content.len() {
256 trace!(
257 bytes = ws_len,
258 address = %key.1,
259 "drained transport whitespace"
260 );
261 buf.content.clear();
262 break;
263 }
264 if is_sip_start(&buf.content[ws_len..]) {
265 trace!(bytes = ws_len, "drained inter-message whitespace padding");
266 buf.content.drain(..ws_len);
267 continue;
268 }
269 }
270
271 match find_sip_start(&buf.content) {
272 Some(offset) if offset > 0 => {
273 warn!(
274 skipped_bytes = offset,
275 address = %key.1,
276 "skipped non-SIP prefix in TCP buffer"
277 );
278 buf.content.drain(..offset);
279 continue;
280 }
281 _ => break, }
283 }
284
285 let header_end = match CRLFCRLF.find(&buf.content) {
287 Some(offset) => offset,
288 None => break, };
290 let body_start = header_end + 4;
291
292 let msg_end = match find_content_length(&buf.content) {
293 Some(cl) => {
294 let end = body_start + cl;
295 if end > buf.content.len() {
296 break; }
298 end
299 }
300 None => body_start, };
302
303 let msg_content: Vec<u8> = buf.content.drain(..msg_end).collect();
304
305 while buf.content.len() >= 2 && buf.content[0] == b'\r' && buf.content[1] == b'\n' {
307 buf.content.drain(..2);
308 }
309
310 let frame_count = if messages.is_empty() {
311 buf.frame_count
312 } else {
313 0
314 };
315
316 if frame_count > 1 {
317 debug!(
318 frame_count,
319 bytes = msg_content.len(),
320 address = %key.1,
321 "extracted reassembled TCP message"
322 );
323 }
324
325 messages.push(SipMessage {
326 direction: key.0,
327 transport: buf.transport,
328 address: key.1.clone(),
329 timestamp: buf.timestamp,
330 content: msg_content,
331 frame_count,
332 });
333
334 buf.frame_count = 0;
335 }
336
337 messages
338}
339
340fn find_content_length(data: &[u8]) -> Option<usize> {
343 let header_end = CRLFCRLF.find(data)?;
344 let headers = &data[..header_end];
345
346 let mut pos = 0;
347 while pos < headers.len() {
348 let line_end = CRLF.find(&headers[pos..]).unwrap_or(headers.len() - pos);
349 let line = &headers[pos..pos + line_end];
350
351 if let Some(value) = extract_header_value(line, b"Content-Length") {
352 return parse_content_length(value);
353 }
354 if let Some(value) = extract_compact_header_value(line, b'l') {
355 return parse_content_length(value);
356 }
357
358 pos += line_end + 2; }
360 None
361}
362
363fn extract_header_value<'a>(line: &'a [u8], name: &[u8]) -> Option<&'a [u8]> {
364 if line.len() <= name.len() + 1 {
365 return None;
366 }
367 if !line[..name.len()].eq_ignore_ascii_case(name) {
368 return None;
369 }
370 if line[name.len()] != b':' {
371 return None;
372 }
373 Some(trim_bytes(&line[name.len() + 1..]))
374}
375
376fn extract_compact_header_value(line: &[u8], compact: u8) -> Option<&[u8]> {
377 if line.len() < 2 {
378 return None;
379 }
380 if line[0] != compact || line[1] != b':' {
381 return None;
382 }
383 Some(trim_bytes(&line[2..]))
384}
385
386fn trim_bytes(b: &[u8]) -> &[u8] {
387 let start = b
388 .iter()
389 .position(|&c| c != b' ' && c != b'\t')
390 .unwrap_or(b.len());
391 let end = b
392 .iter()
393 .rposition(|&c| c != b' ' && c != b'\t')
394 .map_or(start, |p| p + 1);
395 &b[start..end]
396}
397
398fn parse_content_length(value: &[u8]) -> Option<usize> {
399 let s = std::str::from_utf8(value).ok()?;
400 s.parse().ok()
401}
402
403fn is_sip_start(data: &[u8]) -> bool {
405 if data.starts_with(b"SIP/2.0 ") {
406 return true;
407 }
408 const METHODS: &[&[u8]] = &[
409 b"INVITE ",
410 b"ACK ",
411 b"BYE ",
412 b"CANCEL ",
413 b"OPTIONS ",
414 b"REGISTER ",
415 b"PRACK ",
416 b"SUBSCRIBE ",
417 b"NOTIFY ",
418 b"PUBLISH ",
419 b"INFO ",
420 b"REFER ",
421 b"MESSAGE ",
422 b"UPDATE ",
423 ];
424 for method in METHODS {
425 if data.starts_with(method) {
426 return true;
427 }
428 }
429 false
430}
431
432fn find_sip_start(data: &[u8]) -> Option<usize> {
434 if is_sip_start(data) {
435 return Some(0);
436 }
437 let mut pos = 0;
438 while let Some(offset) = CRLF.find(&data[pos..]) {
439 let candidate = pos + offset + 2;
440 if candidate >= data.len() {
441 break;
442 }
443 if is_sip_start(&data[candidate..]) {
444 return Some(candidate);
445 }
446 pos = candidate;
447 }
448 None
449}
450
451#[cfg(test)]
452mod tests {
453 use super::*;
454 use crate::types::Direction;
455
456 fn make_frame(
457 direction: Direction,
458 transport: Transport,
459 addr: &str,
460 content: &[u8],
461 ) -> Vec<u8> {
462 let dir_str = match direction {
463 Direction::Recv => "recv",
464 Direction::Sent => "sent",
465 };
466 let prep = match direction {
467 Direction::Recv => "from",
468 Direction::Sent => "to",
469 };
470 let transport_str = match transport {
471 Transport::Tcp => "tcp",
472 Transport::Udp => "udp",
473 Transport::Tls => "tls",
474 Transport::Wss => "wss",
475 };
476 let header = format!(
477 "{dir_str} {} bytes {prep} {transport_str}/{addr} at 00:00:00.000000:\n",
478 content.len()
479 );
480 let mut data = header.into_bytes();
481 data.extend_from_slice(content);
482 data.extend_from_slice(b"\x0B\n");
483 data
484 }
485
486 #[test]
487 fn single_udp_message() {
488 let content = b"OPTIONS sip:user@host SIP/2.0\r\nContent-Length: 0\r\n\r\n";
489 let data = make_frame(Direction::Recv, Transport::Udp, "1.1.1.1:5060", content);
490 let msgs: Vec<SipMessage> = MessageIterator::new(&data[..])
491 .collect::<Result<Vec<_>, _>>()
492 .unwrap();
493 assert_eq!(msgs.len(), 1);
494 assert_eq!(msgs[0].content, content);
495 assert_eq!(msgs[0].frame_count, 1);
496 assert_eq!(msgs[0].transport, Transport::Udp);
497 }
498
499 #[test]
500 fn tcp_reassembly_two_frames() {
501 let part1 = b"NOTIFY sip:user@host SIP/2.0\r\n";
502 let part2 = b"Content-Length: 0\r\n\r\n";
503 let mut data = make_frame(Direction::Recv, Transport::Tcp, "[::1]:5060", part1);
504 data.extend_from_slice(&make_frame(
505 Direction::Recv,
506 Transport::Tcp,
507 "[::1]:5060",
508 part2,
509 ));
510 let msgs: Vec<SipMessage> = MessageIterator::new(&data[..])
511 .collect::<Result<Vec<_>, _>>()
512 .unwrap();
513 assert_eq!(msgs.len(), 1);
514 assert_eq!(msgs[0].frame_count, 2);
515 let mut expected = Vec::new();
516 expected.extend_from_slice(part1);
517 expected.extend_from_slice(part2);
518 assert_eq!(msgs[0].content, expected);
519 }
520
521 #[test]
522 fn tcp_reassembly_across_interleaved_frames() {
523 let part1 = b"INVITE sip:user@host SIP/2.0\r\n";
527 let part2 = b"Content-Length: 3\r\n\r\nSDP";
528 let response = b"SIP/2.0 100 Trying\r\nContent-Length: 0\r\n\r\n";
529
530 let mut data = make_frame(Direction::Recv, Transport::Tcp, "10.0.0.1:5060", part1);
531 data.extend_from_slice(&make_frame(
532 Direction::Sent,
533 Transport::Tcp,
534 "10.0.0.1:5060",
535 response,
536 ));
537 data.extend_from_slice(&make_frame(
538 Direction::Recv,
539 Transport::Tcp,
540 "10.0.0.1:5060",
541 part2,
542 ));
543
544 let msgs: Vec<SipMessage> = MessageIterator::new(&data[..])
545 .collect::<Result<Vec<_>, _>>()
546 .unwrap();
547
548 assert_eq!(msgs.len(), 2);
549
550 let trying = &msgs[0];
552 assert_eq!(trying.direction, Direction::Sent);
553 assert_eq!(trying.content, response);
554
555 let invite = &msgs[1];
557 assert_eq!(invite.direction, Direction::Recv);
558 let mut expected_invite = Vec::new();
559 expected_invite.extend_from_slice(part1);
560 expected_invite.extend_from_slice(part2);
561 assert_eq!(invite.content, expected_invite);
562 }
563
564 #[test]
565 fn tcp_reassembly_interleaved_different_addresses() {
566 let a_part1 = b"INVITE sip:user@host SIP/2.0\r\n";
573 let a_part2 = b"Content-Length: 3\r\n\r\nSDP";
574 let b_part1 = b"NOTIFY sip:user@host SIP/2.0\r\n";
575 let b_part2 = b"Content-Length: 4\r\n\r\nBODY";
576
577 let mut data = make_frame(Direction::Recv, Transport::Tcp, "[::1]:5060", a_part1);
578 data.extend_from_slice(&make_frame(
579 Direction::Recv,
580 Transport::Tcp,
581 "[::2]:5060",
582 b_part1,
583 ));
584 data.extend_from_slice(&make_frame(
585 Direction::Recv,
586 Transport::Tcp,
587 "[::1]:5060",
588 a_part2,
589 ));
590 data.extend_from_slice(&make_frame(
591 Direction::Recv,
592 Transport::Tcp,
593 "[::2]:5060",
594 b_part2,
595 ));
596
597 let msgs: Vec<SipMessage> = MessageIterator::new(&data[..])
598 .collect::<Result<Vec<_>, _>>()
599 .unwrap();
600
601 assert_eq!(msgs.len(), 2);
602
603 assert_eq!(msgs[0].address, "[::1]:5060");
605 assert_eq!(msgs[0].frame_count, 2);
606 let mut expected_a = Vec::new();
607 expected_a.extend_from_slice(a_part1);
608 expected_a.extend_from_slice(a_part2);
609 assert_eq!(msgs[0].content, expected_a);
610
611 assert_eq!(msgs[1].address, "[::2]:5060");
613 assert_eq!(msgs[1].frame_count, 2);
614 let mut expected_b = Vec::new();
615 expected_b.extend_from_slice(b_part1);
616 expected_b.extend_from_slice(b_part2);
617 assert_eq!(msgs[1].content, expected_b);
618 }
619
620 #[test]
621 fn direction_change_splits_messages() {
622 let recv_content = b"OPTIONS sip:user@host SIP/2.0\r\nContent-Length: 0\r\n\r\n";
623 let sent_content = b"SIP/2.0 200 OK\r\nContent-Length: 0\r\n\r\n";
624 let mut data = make_frame(Direction::Recv, Transport::Tcp, "[::1]:5060", recv_content);
625 data.extend_from_slice(&make_frame(
626 Direction::Sent,
627 Transport::Tcp,
628 "[::1]:5060",
629 sent_content,
630 ));
631 let msgs: Vec<SipMessage> = MessageIterator::new(&data[..])
632 .collect::<Result<Vec<_>, _>>()
633 .unwrap();
634 assert_eq!(msgs.len(), 2);
635 assert_eq!(msgs[0].direction, Direction::Recv);
636 assert_eq!(msgs[1].direction, Direction::Sent);
637 }
638
639 #[test]
640 fn address_change_splits_messages() {
641 let content = b"OPTIONS sip:user@host SIP/2.0\r\nContent-Length: 0\r\n\r\n";
642 let mut data = make_frame(Direction::Recv, Transport::Tcp, "[::1]:5060", content);
643 data.extend_from_slice(&make_frame(
644 Direction::Recv,
645 Transport::Tcp,
646 "[::2]:5060",
647 content,
648 ));
649 let msgs: Vec<SipMessage> = MessageIterator::new(&data[..])
650 .collect::<Result<Vec<_>, _>>()
651 .unwrap();
652 assert_eq!(msgs.len(), 2);
653 assert_eq!(msgs[0].address, "[::1]:5060");
654 assert_eq!(msgs[1].address, "[::2]:5060");
655 }
656
657 #[test]
658 fn udp_no_reassembly() {
659 let content1 = b"OPTIONS sip:a SIP/2.0\r\nContent-Length: 0\r\n\r\n";
660 let content2 = b"OPTIONS sip:b SIP/2.0\r\nContent-Length: 0\r\n\r\n";
661 let mut data = make_frame(Direction::Recv, Transport::Udp, "1.1.1.1:5060", content1);
662 data.extend_from_slice(&make_frame(
663 Direction::Recv,
664 Transport::Udp,
665 "1.1.1.1:5060",
666 content2,
667 ));
668 let msgs: Vec<SipMessage> = MessageIterator::new(&data[..])
669 .collect::<Result<Vec<_>, _>>()
670 .unwrap();
671 assert_eq!(msgs.len(), 2, "UDP frames should not be reassembled");
672 assert_eq!(msgs[0].frame_count, 1);
673 assert_eq!(msgs[1].frame_count, 1);
674 }
675
676 #[test]
677 fn aggregated_messages_split_by_content_length() {
678 let msg1 = b"NOTIFY sip:a SIP/2.0\r\nContent-Length: 5\r\n\r\nhello";
679 let msg2 = b"SIP/2.0 200 OK\r\nContent-Length: 0\r\n\r\n";
680 let mut combined = Vec::new();
681 combined.extend_from_slice(msg1);
682 combined.extend_from_slice(msg2);
683 let data = make_frame(Direction::Recv, Transport::Tcp, "[::1]:5060", &combined);
684 let msgs: Vec<SipMessage> = MessageIterator::new(&data[..])
685 .collect::<Result<Vec<_>, _>>()
686 .unwrap();
687 assert_eq!(msgs.len(), 2);
688 assert_eq!(msgs[0].content, msg1);
689 assert_eq!(msgs[1].content, msg2);
690 }
691
692 #[test]
693 fn find_content_length_standard() {
694 let data = b"NOTIFY sip:a SIP/2.0\r\nContent-Length: 42\r\n\r\n";
695 assert_eq!(find_content_length(data), Some(42));
696 }
697
698 #[test]
699 fn find_content_length_compact() {
700 let data = b"NOTIFY sip:a SIP/2.0\r\nl: 42\r\n\r\n";
701 assert_eq!(find_content_length(data), Some(42));
702 }
703
704 #[test]
705 fn find_content_length_missing() {
706 let data = b"NOTIFY sip:a SIP/2.0\r\nCSeq: 1 NOTIFY\r\n\r\n";
707 assert_eq!(find_content_length(data), None);
708 }
709
710 #[test]
711 fn is_sip_start_request() {
712 assert!(is_sip_start(b"INVITE sip:user@host SIP/2.0\r\n"));
713 assert!(is_sip_start(b"OPTIONS sip:user@host SIP/2.0\r\n"));
714 assert!(is_sip_start(b"NOTIFY sip:user@host SIP/2.0\r\n"));
715 assert!(is_sip_start(b"ACK sip:user@host SIP/2.0\r\n"));
716 }
717
718 #[test]
719 fn is_sip_start_response() {
720 assert!(is_sip_start(b"SIP/2.0 200 OK\r\n"));
721 assert!(is_sip_start(b"SIP/2.0 100 Trying\r\n"));
722 }
723
724 #[test]
725 fn is_sip_start_not_sip() {
726 assert!(!is_sip_start(b"some random data"));
727 assert!(!is_sip_start(b"HTTP/1.1 200 OK\r\n"));
728 }
729
730 #[test]
731 fn find_sip_start_at_beginning() {
732 let data = b"INVITE sip:user@host SIP/2.0\r\nContent-Length: 0\r\n\r\n";
733 assert_eq!(find_sip_start(data), Some(0));
734 }
735
736 #[test]
737 fn find_sip_start_after_prefix() {
738 let data = b"</xml>\r\nNOTIFY sip:user@host SIP/2.0\r\n";
739 assert_eq!(find_sip_start(data), Some(8));
740 }
741
742 #[test]
743 fn find_sip_start_none() {
744 let data = b"no SIP here at all";
745 assert_eq!(find_sip_start(data), None);
746 }
747
748 #[test]
749 fn message_preserves_metadata() {
750 let content = b"OPTIONS sip:user@host SIP/2.0\r\nContent-Length: 0\r\n\r\n";
751 let data = make_frame(
752 Direction::Sent,
753 Transport::Tls,
754 "[2001:db8::1]:5061",
755 content,
756 );
757 let msgs: Vec<SipMessage> = MessageIterator::new(&data[..])
758 .collect::<Result<Vec<_>, _>>()
759 .unwrap();
760 assert_eq!(msgs.len(), 1);
761 assert_eq!(msgs[0].direction, Direction::Sent);
762 assert_eq!(msgs[0].transport, Transport::Tls);
763 assert_eq!(msgs[0].address, "[2001:db8::1]:5061");
764 assert_eq!(
765 msgs[0].timestamp,
766 Timestamp::TimeOnly {
767 hour: 0,
768 min: 0,
769 sec: 0,
770 usec: 0
771 }
772 );
773 }
774
775 #[test]
776 fn extract_handles_crlf_between_messages() {
777 let msg1 = b"NOTIFY sip:a SIP/2.0\r\nContent-Length: 5\r\n\r\nhello";
778 let msg2 = b"SIP/2.0 200 OK\r\nContent-Length: 0\r\n\r\n";
779 let mut content = Vec::new();
780 content.extend_from_slice(msg1);
781 content.extend_from_slice(b"\r\n");
782 content.extend_from_slice(msg2);
783
784 let key = (Direction::Recv, "[::1]:5060".to_string());
785 let mut buf = ConnectionBuffer {
786 transport: Transport::Tcp,
787 timestamp: Timestamp::TimeOnly {
788 hour: 0,
789 min: 0,
790 sec: 0,
791 usec: 0,
792 },
793 content,
794 frame_count: 1,
795 last_seen_day: 0,
796 last_seen_time_secs: 0,
797 };
798 let msgs = extract_complete(&mut buf, &key);
799 assert_eq!(msgs.len(), 2);
800 assert_eq!(msgs[0].content, msg1);
801 assert_eq!(msgs[1].content, msg2);
802 }
803
804 #[test]
805 fn extract_skips_non_sip_prefix() {
806 let prefix = b"</conference-info>\r\n";
807 let msg = b"NOTIFY sip:a SIP/2.0\r\nContent-Length: 0\r\n\r\n";
808 let mut content = Vec::new();
809 content.extend_from_slice(prefix);
810 content.extend_from_slice(msg);
811
812 let key = (Direction::Recv, "[::1]:5060".to_string());
813 let mut buf = ConnectionBuffer {
814 transport: Transport::Tcp,
815 timestamp: Timestamp::TimeOnly {
816 hour: 0,
817 min: 0,
818 sec: 0,
819 usec: 0,
820 },
821 content,
822 frame_count: 1,
823 last_seen_day: 0,
824 last_seen_time_secs: 0,
825 };
826 let msgs = extract_complete(&mut buf, &key);
827 assert_eq!(msgs.len(), 1);
828 assert_eq!(msgs[0].content, msg);
829 }
830
831 #[test]
832 fn extract_waits_for_incomplete_body() {
833 let content = b"INVITE sip:a SIP/2.0\r\nContent-Length: 100\r\n\r\npartial".to_vec();
835
836 let key = (Direction::Recv, "[::1]:5060".to_string());
837 let mut buf = ConnectionBuffer {
838 transport: Transport::Tcp,
839 timestamp: Timestamp::TimeOnly {
840 hour: 0,
841 min: 0,
842 sec: 0,
843 usec: 0,
844 },
845 content,
846 frame_count: 1,
847 last_seen_day: 0,
848 last_seen_time_secs: 0,
849 };
850 let msgs = extract_complete(&mut buf, &key);
851 assert!(msgs.is_empty(), "should wait for body to complete");
852 assert!(!buf.content.is_empty(), "buffer should retain data");
853 }
854
855 #[test]
856 fn extract_waits_for_incomplete_headers() {
857 let content = b"INVITE sip:a SIP/2.0\r\nContent-Length: 0\r\n".to_vec();
859
860 let key = (Direction::Recv, "[::1]:5060".to_string());
861 let mut buf = ConnectionBuffer {
862 transport: Transport::Tcp,
863 timestamp: Timestamp::TimeOnly {
864 hour: 0,
865 min: 0,
866 sec: 0,
867 usec: 0,
868 },
869 content,
870 frame_count: 1,
871 last_seen_day: 0,
872 last_seen_time_secs: 0,
873 };
874 let msgs = extract_complete(&mut buf, &key);
875 assert!(msgs.is_empty(), "should wait for headers to complete");
876 }
877
878 #[test]
879 fn tcp_body_split_across_five_frames() {
880 let body_len: usize = 6424;
883 let body: Vec<u8> = (0..body_len).map(|i| b'A' + (i % 26) as u8).collect();
884
885 let mut headers = Vec::new();
886 headers.extend_from_slice(b"NOTIFY sip:user@host SIP/2.0\r\n");
887 headers
888 .extend_from_slice(b"Via: SIP/2.0/TCP [2001:4958:10:11::6]:45538;branch=z9hG4bK-1\r\n");
889 headers.extend_from_slice(b"Call-ID: fragmented-notify@host\r\n");
890 headers.extend_from_slice(b"CSeq: 1 NOTIFY\r\n");
891 headers.extend_from_slice(
892 b"Content-Type: application/emergencyCallData.AbandonedCall+json\r\n",
893 );
894 headers.extend_from_slice(format!("Content-Length: {body_len}\r\n").as_bytes());
895 headers.extend_from_slice(b"\r\n");
896
897 let mut full_content = headers.clone();
898 full_content.extend_from_slice(&body);
899
900 let frame1_len = 1500.min(full_content.len());
902 let remaining = &full_content[frame1_len..];
903 let frame2_len = 1428.min(remaining.len());
904 let remaining = &remaining[frame2_len..];
905 let frame3_len = 1428.min(remaining.len());
906 let remaining = &remaining[frame3_len..];
907 let frame4_len = 1428.min(remaining.len());
908 let remaining = &remaining[frame4_len..];
909 let frame5_len = remaining.len();
910
911 let addr = "[2001:4958:10:11::6]:45538";
912 let mut data = make_frame(
913 Direction::Recv,
914 Transport::Tcp,
915 addr,
916 &full_content[..frame1_len],
917 );
918 let mut offset = frame1_len;
919 for len in [frame2_len, frame3_len, frame4_len, frame5_len] {
920 data.extend_from_slice(&make_frame(
921 Direction::Recv,
922 Transport::Tcp,
923 addr,
924 &full_content[offset..offset + len],
925 ));
926 offset += len;
927 }
928 assert_eq!(offset, full_content.len());
929
930 let msgs: Vec<SipMessage> = MessageIterator::new(&data[..])
931 .collect::<Result<Vec<_>, _>>()
932 .unwrap();
933
934 assert_eq!(
935 msgs.len(),
936 1,
937 "should produce exactly one reassembled message"
938 );
939 assert_eq!(msgs[0].frame_count, 5, "should track all 5 frames");
940 assert_eq!(
941 msgs[0].content, full_content,
942 "content should be fully reassembled"
943 );
944 assert_eq!(msgs[0].direction, Direction::Recv);
945 assert_eq!(msgs[0].address, addr);
946 }
947
948 #[test]
949 fn parse_stats_delegates() {
950 let content = b"OPTIONS sip:user@host SIP/2.0\r\nContent-Length: 0\r\n\r\n";
951 let data = make_frame(Direction::Recv, Transport::Udp, "1.1.1.1:5060", content);
952 let mut iter = MessageIterator::new(&data[..]);
953 let msgs: Vec<_> = iter.by_ref().collect::<Result<Vec<_>, _>>().unwrap();
954 assert_eq!(msgs.len(), 1);
955 let stats = iter.parse_stats();
956 assert_eq!(stats.bytes_read, data.len() as u64);
957 assert_eq!(stats.bytes_skipped, 0);
958 }
959
960 #[test]
961 fn tcp_body_split_parsed_message() {
962 let body_len: usize = 6424;
964 let body: Vec<u8> = (0..body_len).map(|i| b'A' + (i % 26) as u8).collect();
965
966 let mut headers = Vec::new();
967 headers.extend_from_slice(b"NOTIFY sip:user@host SIP/2.0\r\n");
968 headers.extend_from_slice(b"Call-ID: fragmented-parsed@host\r\n");
969 headers.extend_from_slice(b"CSeq: 1 NOTIFY\r\n");
970 headers.extend_from_slice(
971 b"Content-Type: application/emergencyCallData.AbandonedCall+json\r\n",
972 );
973 headers.extend_from_slice(format!("Content-Length: {body_len}\r\n").as_bytes());
974 headers.extend_from_slice(b"\r\n");
975
976 let mut full_content = headers.clone();
977 full_content.extend_from_slice(&body);
978
979 let split1 = 1500.min(full_content.len());
981 let split2 = (split1 + 3000).min(full_content.len());
982
983 let addr = "[2001:db8::1]:5060";
984 let mut data = make_frame(
985 Direction::Recv,
986 Transport::Tcp,
987 addr,
988 &full_content[..split1],
989 );
990 data.extend_from_slice(&make_frame(
991 Direction::Recv,
992 Transport::Tcp,
993 addr,
994 &full_content[split1..split2],
995 ));
996 data.extend_from_slice(&make_frame(
997 Direction::Recv,
998 Transport::Tcp,
999 addr,
1000 &full_content[split2..],
1001 ));
1002
1003 let parsed: Vec<crate::types::ParsedSipMessage> =
1004 crate::sip::ParsedMessageIterator::new(&data[..])
1005 .collect::<Result<Vec<_>, _>>()
1006 .unwrap();
1007
1008 assert_eq!(parsed.len(), 1, "should produce one parsed message");
1009 assert_eq!(parsed[0].content_length(), Some(body_len));
1010 assert_eq!(parsed[0].body.len(), body_len, "body should be complete");
1011 assert_eq!(parsed[0].body, body, "body content should match");
1012 assert_eq!(parsed[0].frame_count, 3);
1013 assert_eq!(parsed[0].method(), Some("NOTIFY"));
1014 }
1015
1016 #[test]
1017 fn tls_keepalive_single_lf_drained() {
1018 let data = make_frame(Direction::Recv, Transport::Tls, "[10.0.0.1]:5061", b"\n");
1019 let msgs: Vec<SipMessage> = MessageIterator::new(&data[..])
1020 .collect::<Result<Vec<_>, _>>()
1021 .unwrap();
1022 assert_eq!(msgs.len(), 0, "keep-alive \\n should produce no messages");
1023 }
1024
1025 #[test]
1026 fn tls_keepalive_multiple_lf_drained() {
1027 let addr = "[10.0.0.1]:5061";
1028 let mut data = make_frame(Direction::Recv, Transport::Tls, addr, b"\n");
1029 data.extend_from_slice(&make_frame(Direction::Recv, Transport::Tls, addr, b"\n"));
1030 data.extend_from_slice(&make_frame(Direction::Recv, Transport::Tls, addr, b"\n"));
1031 let msgs: Vec<SipMessage> = MessageIterator::new(&data[..])
1032 .collect::<Result<Vec<_>, _>>()
1033 .unwrap();
1034 assert_eq!(
1035 msgs.len(),
1036 0,
1037 "multiple keep-alive \\n should produce no messages"
1038 );
1039 }
1040
1041 #[test]
1042 fn tls_keepalive_interleaved_with_sip() {
1043 let addr = "[10.0.0.1]:5061";
1044 let sip = b"OPTIONS sip:host SIP/2.0\r\nContent-Length: 0\r\n\r\n";
1045 let mut data = make_frame(Direction::Recv, Transport::Tls, addr, b"\n");
1046 data.extend_from_slice(&make_frame(Direction::Recv, Transport::Tls, addr, sip));
1047 data.extend_from_slice(&make_frame(Direction::Recv, Transport::Tls, addr, b"\n"));
1048 let msgs: Vec<SipMessage> = MessageIterator::new(&data[..])
1049 .collect::<Result<Vec<_>, _>>()
1050 .unwrap();
1051 assert_eq!(msgs.len(), 1, "only the SIP message should be emitted");
1052 assert_eq!(msgs[0].content, sip);
1053 }
1054
1055 #[test]
1056 fn tls_bare_lf_before_sip_start() {
1057 let addr = "[10.0.0.1]:5061";
1058 let sip_part1 = b"\nOPTIONS sip:host SIP/2.0\r\n";
1059 let sip_part2 = b"Content-Length: 0\r\n\r\n";
1060 let mut data = make_frame(Direction::Recv, Transport::Tls, addr, sip_part1);
1061 data.extend_from_slice(&make_frame(
1062 Direction::Recv,
1063 Transport::Tls,
1064 addr,
1065 sip_part2,
1066 ));
1067 let msgs: Vec<SipMessage> = MessageIterator::new(&data[..])
1068 .collect::<Result<Vec<_>, _>>()
1069 .unwrap();
1070 assert_eq!(msgs.len(), 1, "SIP after bare LF should be extracted");
1071 assert!(
1072 msgs[0].content.starts_with(b"OPTIONS"),
1073 "message should start with SIP method, not \\n"
1074 );
1075 }
1076
1077 fn make_frame_at(
1078 direction: Direction,
1079 transport: Transport,
1080 addr: &str,
1081 content: &[u8],
1082 timestamp: &str,
1083 ) -> Vec<u8> {
1084 let dir_str = match direction {
1085 Direction::Recv => "recv",
1086 Direction::Sent => "sent",
1087 };
1088 let prep = match direction {
1089 Direction::Recv => "from",
1090 Direction::Sent => "to",
1091 };
1092 let transport_str = match transport {
1093 Transport::Tcp => "tcp",
1094 Transport::Udp => "udp",
1095 Transport::Tls => "tls",
1096 Transport::Wss => "wss",
1097 };
1098 let header = format!(
1099 "{dir_str} {} bytes {prep} {transport_str}/{addr} at {timestamp}:\n",
1100 content.len()
1101 );
1102 let mut data = header.into_bytes();
1103 data.extend_from_slice(content);
1104 data.extend_from_slice(b"\x0B\n");
1105 data
1106 }
1107
1108 #[test]
1109 fn empty_buffer_removed_after_complete_message() {
1110 let sip = b"OPTIONS sip:host SIP/2.0\r\nContent-Length: 0\r\n\r\n";
1111 let addr_a = "[::1]:5060";
1112 let addr_b = "[::2]:5060";
1113
1114 let mut data = make_frame(Direction::Recv, Transport::Tcp, addr_a, sip);
1115 data.extend_from_slice(&make_frame(Direction::Recv, Transport::Tcp, addr_b, sip));
1116
1117 let mut iter = MessageIterator::new(&data[..]);
1118 let msgs: Vec<SipMessage> = iter.by_ref().collect::<Result<Vec<_>, _>>().unwrap();
1119 assert_eq!(msgs.len(), 2);
1120 assert!(
1121 iter.buffers.is_empty(),
1122 "all buffers should be removed after complete messages are extracted"
1123 );
1124 }
1125
1126 #[test]
1127 fn stale_buffer_evicted_after_timeout() {
1128 let sip = b"OPTIONS sip:host SIP/2.0\r\nContent-Length: 0\r\n\r\n";
1129 let partial = b"INVITE sip:host SIP/2.0\r\n";
1130
1131 let mut data = Vec::new();
1132 data.extend_from_slice(&make_frame_at(
1134 Direction::Recv,
1135 Transport::Tls,
1136 "[::99]:44444",
1137 partial,
1138 "2026-02-16 10:00:00.000000",
1139 ));
1140 data.extend_from_slice(&make_frame_at(
1142 Direction::Recv,
1143 Transport::Tls,
1144 "[::1]:5060",
1145 sip,
1146 "2026-02-16 12:00:01.000000",
1147 ));
1148
1149 let mut iter = MessageIterator::new(&data[..]);
1150 let msgs: Vec<SipMessage> = iter.by_ref().collect::<Result<Vec<_>, _>>().unwrap();
1151
1152 assert_eq!(msgs.len(), 1, "should produce the complete message");
1153 assert_eq!(msgs[0].address, "[::1]:5060");
1154 assert!(
1155 iter.buffers.is_empty(),
1156 "stale buffer for [::99]:44444 should have been evicted"
1157 );
1158 }
1159
1160 #[test]
1161 fn day_rollover_detection_with_time_only() {
1162 let sip = b"OPTIONS sip:host SIP/2.0\r\nContent-Length: 0\r\n\r\n";
1163 let partial = b"INVITE sip:host SIP/2.0\r\n";
1164
1165 let mut data = Vec::new();
1166 data.extend_from_slice(&make_frame_at(
1168 Direction::Recv,
1169 Transport::Tcp,
1170 "[::99]:44444",
1171 partial,
1172 "23:59:00.000000",
1173 ));
1174 data.extend_from_slice(&make_frame_at(
1176 Direction::Recv,
1177 Transport::Tcp,
1178 "[::1]:5060",
1179 sip,
1180 "02:00:01.000000",
1181 ));
1182
1183 let mut iter = MessageIterator::new(&data[..]);
1184 let msgs: Vec<SipMessage> = iter.by_ref().collect::<Result<Vec<_>, _>>().unwrap();
1185
1186 assert_eq!(msgs.len(), 1);
1187 assert_eq!(msgs[0].address, "[::1]:5060");
1188 assert_eq!(iter.current_day, 1, "should have detected one day rollover");
1189 assert!(
1190 iter.buffers.is_empty(),
1191 "stale buffer should have been evicted after day rollover"
1192 );
1193 }
1194
1195 #[test]
1196 fn flush_all_clears_buffers() {
1197 let partial = b"INVITE sip:host SIP/2.0\r\n";
1198 let data = make_frame(Direction::Recv, Transport::Tcp, "[::1]:5060", partial);
1199
1200 let mut iter = MessageIterator::new(&data[..]);
1201 let msgs: Vec<SipMessage> = iter.by_ref().collect::<Result<Vec<_>, _>>().unwrap();
1202 assert_eq!(msgs.len(), 1, "partial should be flushed at EOF");
1203 assert!(
1204 iter.buffers.is_empty(),
1205 "flush_all should clear the HashMap"
1206 );
1207 }
1208}