1use crate::attached::AttachedLines;
2use crate::level::LogLevel;
3use crate::line::{
4 is_date_at, is_log_header_at, is_uuid_at, parse_line, LineKind, UUID_PREFIX_LEN,
5};
6use crate::message::{classify_message, MessageKind, SdpDirection};
7use std::collections::VecDeque;
8
9#[derive(Debug, Clone, PartialEq, Eq)]
14#[non_exhaustive]
15pub enum Block {
16 ChannelData {
19 fields: Vec<(String, String)>,
20 variables: Vec<(String, String)>,
21 },
22 Sdp {
24 direction: SdpDirection,
25 body: Vec<String>,
26 },
27 CodecNegotiation {
29 comparisons: Vec<(String, String)>,
30 selected: Vec<String>,
31 },
32}
33
34#[derive(Debug, Clone, Copy, PartialEq, Eq)]
38pub enum UnclassifiedTracking {
39 CountOnly,
41 TrackLines,
43 CaptureData,
45}
46
47#[derive(Debug, Clone, PartialEq, Eq)]
49#[non_exhaustive]
50pub enum UnclassifiedReason {
51 OrphanContinuation,
53 UnknownMessageFormat,
55 TruncatedField,
57}
58
59#[derive(Debug, Clone, PartialEq, Eq)]
61pub struct UnclassifiedLine {
62 pub line_number: u64,
63 pub reason: UnclassifiedReason,
64 pub data: Option<String>,
66}
67
68#[derive(Debug, Clone, Default)]
70pub struct ParseStats {
71 pub lines_processed: u64,
72 pub lines_unclassified: u64,
73 pub lines_in_entries: u64,
75 pub lines_empty_orphan: u64,
77 pub lines_split: u64,
80 pub unclassified_lines: Vec<UnclassifiedLine>,
82}
83
84impl ParseStats {
85 pub fn unaccounted_lines(&self) -> u64 {
92 let expected = self.lines_in_entries + self.lines_empty_orphan;
93 let actual = self.lines_processed + self.lines_split;
94 actual.saturating_sub(expected)
95 }
96}
97
98#[derive(Debug)]
104pub struct LogEntry {
105 pub uuid: String,
107 pub timestamp: String,
109 pub level: Option<LogLevel>,
111 pub idle_pct: Option<String>,
113 pub source: Option<String>,
115 pub message: String,
117 pub kind: LineKind,
119 pub message_kind: MessageKind,
121 pub block: Option<Block>,
123 pub attached: AttachedLines,
125 pub line_number: u64,
127 pub warnings: Vec<String>,
129}
130
131fn parse_field_line(msg: &str) -> Option<(String, String)> {
132 let colon = msg.find(": ")?;
133 let name = &msg[..colon];
134 if name.contains(' ') || name.is_empty() {
135 return None;
136 }
137 let value_part = &msg[colon + 2..];
138 let value = if let Some(inner) = value_part.strip_prefix('[') {
139 inner.strip_suffix(']').unwrap_or(inner)
140 } else {
141 value_part
142 };
143 Some((name.to_string(), value.to_string()))
144}
145
146enum StreamState {
147 Idle,
148 InChannelData {
149 fields: Vec<(String, String)>,
150 variables: Vec<(String, String)>,
151 open_var_name: Option<String>,
152 open_var_value: Option<String>,
153 },
154 InSdp {
155 direction: SdpDirection,
156 body: Vec<String>,
157 },
158 InCodecNegotiation {
159 comparisons: Vec<(String, String)>,
160 selected: Vec<String>,
161 },
162}
163
164impl StreamState {
165 fn take_idle(&mut self) -> StreamState {
166 std::mem::replace(self, StreamState::Idle)
167 }
168}
169
170pub struct LogStream<I> {
180 lines: I,
181 last_uuid: String,
182 last_timestamp: String,
183 pending: Option<LogEntry>,
184 state: StreamState,
185 stats: ParseStats,
186 tracking: UnclassifiedTracking,
187 line_number: u64,
188 split_pending: VecDeque<String>,
189 deferred_warning: Option<String>,
190}
191
192impl<I: Iterator<Item = String>> LogStream<I> {
193 pub fn new(lines: I) -> Self {
195 LogStream {
196 lines,
197 last_uuid: String::new(),
198 last_timestamp: String::new(),
199 pending: None,
200 state: StreamState::Idle,
201 stats: ParseStats::default(),
202 tracking: UnclassifiedTracking::CountOnly,
203 line_number: 0,
204 split_pending: VecDeque::new(),
205 deferred_warning: None,
206 }
207 }
208
209 pub fn unclassified_tracking(mut self, level: UnclassifiedTracking) -> Self {
211 self.tracking = level;
212 self
213 }
214
215 pub fn stats(&self) -> &ParseStats {
217 &self.stats
218 }
219
220 pub fn drain_unclassified(&mut self) -> Vec<UnclassifiedLine> {
224 std::mem::take(&mut self.stats.unclassified_lines)
225 }
226
227 fn record_unclassified(&mut self, reason: UnclassifiedReason, data: Option<&str>) {
228 self.stats.lines_unclassified += 1;
229 match self.tracking {
230 UnclassifiedTracking::CountOnly => {}
231 UnclassifiedTracking::TrackLines => {
232 self.stats.unclassified_lines.push(UnclassifiedLine {
233 line_number: self.line_number,
234 reason,
235 data: None,
236 });
237 }
238 UnclassifiedTracking::CaptureData => {
239 self.stats.unclassified_lines.push(UnclassifiedLine {
240 line_number: self.line_number,
241 reason,
242 data: data.map(|s| s.to_string()),
243 });
244 }
245 }
246 }
247
248 fn finalize_block(&mut self) -> (Option<Block>, Vec<String>) {
249 let mut warnings = Vec::new();
250 match self.state.take_idle() {
251 StreamState::Idle => (None, warnings),
252 StreamState::InChannelData {
253 fields,
254 mut variables,
255 open_var_name,
256 open_var_value,
257 } => {
258 if let (Some(ref name), Some(value)) = (&open_var_name, open_var_value) {
259 warnings.push(format!("unclosed multi-line variable: {name}"));
260 variables.push((name.clone(), value));
261 }
262 (Some(Block::ChannelData { fields, variables }), warnings)
263 }
264 StreamState::InSdp { direction, body } => {
265 (Some(Block::Sdp { direction, body }), warnings)
266 }
267 StreamState::InCodecNegotiation {
268 comparisons,
269 selected,
270 } => (
271 Some(Block::CodecNegotiation {
272 comparisons,
273 selected,
274 }),
275 warnings,
276 ),
277 }
278 }
279
280 fn finalize_pending(&mut self) -> Option<LogEntry> {
281 let (block, warnings) = self.finalize_block();
282 if let Some(ref mut p) = self.pending {
283 p.block = block;
284 p.warnings.extend(warnings);
285 self.stats.lines_in_entries += 1 + p.attached.len() as u64;
286 }
287 self.pending.take()
288 }
289
290 fn start_block_for_message(&mut self, message_kind: &MessageKind) {
291 self.state = match message_kind {
292 MessageKind::ChannelData => StreamState::InChannelData {
293 fields: Vec::new(),
294 variables: Vec::new(),
295 open_var_name: None,
296 open_var_value: None,
297 },
298 MessageKind::SdpMarker { direction } => StreamState::InSdp {
299 direction: direction.clone(),
300 body: Vec::new(),
301 },
302 MessageKind::CodecNegotiation => StreamState::InCodecNegotiation {
303 comparisons: Vec::new(),
304 selected: Vec::new(),
305 },
306 _ => StreamState::Idle,
307 };
308 }
309
310 fn accumulate_codec_entry(&mut self, msg: &str) {
311 let mut warning = None;
312 if let StreamState::InCodecNegotiation {
313 comparisons,
314 selected,
315 } = &mut self.state
316 {
317 let rest = msg.strip_prefix("Audio Codec Compare ").unwrap_or(msg);
318 if rest.contains("is saved as a match") {
319 let codec = rest.find(']').map(|end| &rest[1..end]).unwrap_or(rest);
320 selected.push(codec.to_string());
321 } else if let Some(slash) = rest.find("]/[") {
322 let offered = &rest[1..slash];
323 let local = &rest[slash + 3..rest.len().saturating_sub(1)];
324 comparisons.push((offered.to_string(), local.to_string()));
325 } else {
326 warning = Some(format!(
327 "unrecognized codec negotiation line: {}",
328 if msg.len() > 80 { &msg[..80] } else { msg }
329 ));
330 }
331 }
332 if let (Some(w), Some(ref mut pending)) = (warning, &mut self.pending) {
333 pending.warnings.push(w);
334 }
335 }
336
337 fn accumulate_continuation(&mut self, msg: &str, line: &str) {
338 let msg_kind = classify_message(msg);
339 let mut warning = None;
340 match &mut self.state {
341 StreamState::InChannelData {
342 fields,
343 variables,
344 open_var_name,
345 open_var_value,
346 } => {
347 if let Some(ref mut val) = open_var_value {
348 val.push('\n');
349 val.push_str(msg);
350 if msg.ends_with(']') {
351 let trimmed = val.trim_end_matches(']').to_string();
352 let name = open_var_name.take().unwrap();
353 *open_var_value = None;
354 variables.push((name, trimmed));
355 }
356 } else {
357 match &msg_kind {
358 MessageKind::ChannelField { name, value } => {
359 fields.push((name.clone(), value.clone()));
360 }
361 MessageKind::Variable { name, value } => {
362 if !msg.ends_with(']') && msg.contains(": [") {
363 *open_var_name = Some(name.clone());
364 *open_var_value = Some(value.clone());
365 } else {
366 variables.push((name.clone(), value.clone()));
367 }
368 }
369 _ => {
370 if let Some((name, value)) = parse_field_line(msg) {
371 fields.push((name, value));
372 } else {
373 warning = Some(format!(
374 "unparseable CHANNEL_DATA line: {}",
375 if msg.len() > 80 { &msg[..80] } else { msg }
376 ));
377 }
378 }
379 }
380 }
381 }
382 StreamState::InSdp { body, .. } => {
383 body.push(msg.to_string());
384 }
385 StreamState::InCodecNegotiation { .. } => {
386 warning = Some(format!(
387 "unexpected codec negotiation continuation: {}",
388 if msg.len() > 80 { &msg[..80] } else { msg }
389 ));
390 }
391 StreamState::Idle => {}
392 }
393 if let Some(ref mut pending) = self.pending {
394 if let Some(w) = warning {
395 pending.warnings.push(w);
396 }
397 pending.attached.push(line);
398 }
399 }
400
401 fn new_entry(
402 &mut self,
403 uuid: String,
404 timestamp: String,
405 message: String,
406 kind: LineKind,
407 message_kind: MessageKind,
408 ) -> LogEntry {
409 let mut warnings = Vec::new();
410 if let Some(w) = self.deferred_warning.take() {
411 warnings.push(w);
412 }
413 LogEntry {
414 uuid,
415 timestamp,
416 message,
417 kind,
418 message_kind,
419 level: None,
420 idle_pct: None,
421 source: None,
422 block: None,
423 attached: AttachedLines::new(),
424 line_number: self.line_number,
425 warnings,
426 }
427 }
428}
429
430const MOD_LOGFILE_BUF_SIZE: usize = 2048;
434
435const MAX_LINE_PAYLOAD: usize = MOD_LOGFILE_BUF_SIZE - UUID_PREFIX_LEN - 1;
438
439const COLLISION_SCAN_SLACK: usize = 64;
445
446impl<I: Iterator<Item = String>> LogStream<I> {
447 fn detect_collision(&mut self, line: String) -> String {
465 if line.len() > MAX_LINE_PAYLOAD {
466 let warning = format!(
467 "line exceeds mod_logfile 2048-byte buffer ({} bytes), data may be truncated",
468 line.len() + 38,
469 );
470 if let Some(ref mut pending) = self.pending {
471 pending.warnings.push(warning);
472 } else {
473 self.deferred_warning = Some(warning);
474 }
475 }
476
477 let bytes = line.as_bytes();
479 let min_scan = if is_uuid_at(bytes, 0) {
480 if bytes.len() > UUID_PREFIX_LEN && bytes[UUID_PREFIX_LEN].is_ascii_digit() {
481 64 } else {
483 UUID_PREFIX_LEN }
485 } else if is_date_at(bytes, 0) {
486 27 } else {
488 0
489 };
490
491 let end = bytes.len().saturating_sub(28);
492 let oversize = bytes.len() > MAX_LINE_PAYLOAD;
493
494 let mut splits: Vec<usize> = Vec::new();
514 let mut chunk_start = 0usize;
515 let mut offset = min_scan;
516 while offset <= end {
517 if is_log_header_at(bytes, offset) {
518 let split_at = if offset >= chunk_start + UUID_PREFIX_LEN
519 && is_uuid_at(bytes, offset - UUID_PREFIX_LEN)
520 {
521 offset - UUID_PREFIX_LEN
522 } else {
523 offset
524 };
525 if split_at > chunk_start {
526 splits.push(split_at);
527 chunk_start = split_at;
528 offset += 27;
529 } else {
530 offset = (offset + 27).max(offset + 1);
535 }
536 continue;
537 }
538 if oversize {
539 let boundary = chunk_start + MAX_LINE_PAYLOAD;
540 if offset + COLLISION_SCAN_SLACK >= boundary
541 && offset <= boundary + COLLISION_SCAN_SLACK
542 && is_uuid_at(bytes, offset)
543 {
544 splits.push(offset);
545 chunk_start = offset;
546 offset += UUID_PREFIX_LEN;
547 continue;
548 }
549 }
550 offset += 1;
551 }
552
553 if splits.is_empty() {
554 return line;
555 }
556
557 let mut tail = line;
560 let mut chunks: Vec<String> = Vec::with_capacity(splits.len());
561 for &at in splits.iter().rev() {
562 chunks.push(tail.split_off(at));
563 }
564 chunks.reverse();
565 self.split_pending.extend(chunks);
566 tail
567 }
568}
569
570impl<I: Iterator<Item = String>> Iterator for LogStream<I> {
571 type Item = LogEntry;
572
573 fn next(&mut self) -> Option<LogEntry> {
574 loop {
575 let line = if let Some(split) = self.split_pending.pop_front() {
576 self.stats.lines_split += 1;
577 split
581 } else {
582 let Some(line) = self.lines.next() else {
583 return self.finalize_pending();
584 };
585
586 if line.starts_with('\x00') {
587 let yielded = self.finalize_pending();
588 self.last_uuid.clear();
589 self.last_timestamp.clear();
590 if yielded.is_some() {
591 return yielded;
592 }
593 continue;
594 }
595
596 self.line_number += 1;
597 self.stats.lines_processed += 1;
598 self.detect_collision(line)
599 };
600
601 let parsed = parse_line(&line);
602
603 match parsed.kind {
604 LineKind::Full | LineKind::System | LineKind::Truncated => {
605 let uuid = parsed.uuid.unwrap_or("").to_string();
606 let message_kind = classify_message(parsed.message);
607
608 if message_kind == MessageKind::CodecNegotiation {
610 if let (Some(ref pending), StreamState::InCodecNegotiation { .. }) =
611 (&self.pending, &self.state)
612 {
613 if uuid == pending.uuid {
614 self.accumulate_codec_entry(parsed.message);
615 if let Some(ref mut p) = self.pending {
616 p.attached.push(&line);
617 }
618 continue;
619 }
620 }
621 }
622
623 let yielded = self.finalize_pending();
624
625 let timestamp = parsed
626 .timestamp
627 .map(|t| t.to_string())
628 .unwrap_or_else(|| self.last_timestamp.clone());
629
630 if !uuid.is_empty() {
631 self.last_uuid = uuid.clone();
632 }
633 if parsed.timestamp.is_some() {
634 self.last_timestamp = timestamp.clone();
635 }
636
637 self.start_block_for_message(&message_kind);
638 if message_kind == MessageKind::CodecNegotiation {
639 self.accumulate_codec_entry(parsed.message);
640 }
641
642 let mut entry = self.new_entry(
643 uuid,
644 timestamp,
645 parsed.message.to_string(),
646 parsed.kind,
647 message_kind,
648 );
649 entry.level = parsed.level;
650 entry.idle_pct = parsed.idle_pct.map(|s| s.to_string());
651 entry.source = parsed.source.map(|s| s.to_string());
652 self.pending = Some(entry);
653
654 if yielded.is_some() {
655 return yielded;
656 }
657 }
658
659 LineKind::UuidContinuation => {
660 let uuid = parsed.uuid.unwrap_or("").to_string();
661 let is_primary = parsed.message.starts_with("EXECUTE ");
662
663 if let Some(ref pending) = self.pending {
664 if !is_primary && uuid == pending.uuid {
665 self.accumulate_continuation(parsed.message, &line);
666 } else {
667 let yielded = self.finalize_pending();
668 let message_kind = classify_message(parsed.message);
669
670 if !uuid.is_empty() {
671 self.last_uuid = uuid.clone();
672 }
673
674 self.start_block_for_message(&message_kind);
675 self.pending = Some(self.new_entry(
676 uuid,
677 self.last_timestamp.clone(),
678 parsed.message.to_string(),
679 parsed.kind,
680 message_kind,
681 ));
682
683 return yielded;
684 }
685 } else {
686 let message_kind = classify_message(parsed.message);
687
688 if !uuid.is_empty() {
689 self.last_uuid = uuid.clone();
690 }
691
692 self.start_block_for_message(&message_kind);
693 self.pending = Some(self.new_entry(
694 uuid,
695 self.last_timestamp.clone(),
696 parsed.message.to_string(),
697 parsed.kind,
698 message_kind,
699 ));
700 }
701 }
702
703 LineKind::BareContinuation => {
704 if self.pending.is_some() {
705 self.accumulate_continuation(parsed.message, &line);
706 } else {
707 self.record_unclassified(
708 UnclassifiedReason::OrphanContinuation,
709 Some(&line),
710 );
711 let message_kind = classify_message(parsed.message);
712 self.pending = Some(self.new_entry(
713 self.last_uuid.clone(),
714 self.last_timestamp.clone(),
715 parsed.message.to_string(),
716 parsed.kind,
717 message_kind,
718 ));
719 }
720 }
721
722 LineKind::Empty => {
723 if let Some(ref mut pending) = self.pending {
724 pending.attached.push(&line);
725 } else {
726 self.stats.lines_empty_orphan += 1;
727 }
728 }
729 }
730 }
731 }
732}
733
734#[cfg(test)]
735mod tests {
736 use super::*;
737
738 const UUID1: &str = "a1b2c3d4-e5f6-7890-abcd-ef1234567890";
739 const UUID2: &str = "b2c3d4e5-f6a7-8901-bcde-f12345678901";
740
741 fn full_line(uuid: &str, ts: &str, msg: &str) -> String {
742 format!("{uuid} {ts} 95.97% [DEBUG] sofia.c:100 {msg}")
743 }
744
745 const TS1: &str = "2025-01-15 10:30:45.123456";
746 const TS2: &str = "2025-01-15 10:30:46.234567";
747
748 #[test]
751 fn inherits_uuid_for_bare_continuation() {
752 let lines = vec![
753 full_line(UUID1, TS1, "CHANNEL_DATA:"),
754 "variable_foo: [bar]".to_string(),
755 "variable_baz: [qux]".to_string(),
756 ];
757 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
758 assert_eq!(entries.len(), 1);
759 assert_eq!(entries[0].uuid, UUID1);
760 assert_eq!(entries[0].attached.len(), 2);
761 assert_eq!(entries[0].attached.get(0), Some("variable_foo: [bar]"));
762 assert_eq!(entries[0].attached.get(1), Some("variable_baz: [qux]"));
763 }
764
765 #[test]
766 fn inherits_timestamp_for_uuid_continuation() {
767 let lines = vec![
768 full_line(UUID1, TS1, "First"),
769 format!("{UUID2} Channel-State: [CS_EXECUTE]"),
770 ];
771 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
772 assert_eq!(entries.len(), 2);
773 assert_eq!(entries[0].timestamp, TS1);
774 assert_eq!(entries[1].uuid, UUID2);
775 assert_eq!(entries[1].timestamp, TS1);
776 }
777
778 #[test]
779 fn new_full_line_yields_previous() {
780 let lines = vec![
781 full_line(UUID1, TS1, "First"),
782 full_line(UUID2, TS2, "Second"),
783 ];
784 let mut stream = LogStream::new(lines.into_iter());
785 let first = stream.next().unwrap();
786 assert_eq!(first.uuid, UUID1);
787 assert_eq!(first.message, "First");
788 let second = stream.next().unwrap();
789 assert_eq!(second.uuid, UUID2);
790 assert_eq!(second.message, "Second");
791 assert!(stream.next().is_none());
792 }
793
794 #[test]
795 fn channel_data_collected_as_attached() {
796 let lines = vec![
797 full_line(UUID1, TS1, "CHANNEL_DATA:"),
798 format!("{UUID1} Channel-Name: [sofia/internal/+15550001234@192.0.2.1]"),
799 format!("{UUID1} Unique-ID: [{UUID1}]"),
800 "variable_sip_call_id: [test123@192.0.2.1]".to_string(),
801 ];
802 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
803 assert_eq!(entries.len(), 1);
804 assert_eq!(entries[0].message, "CHANNEL_DATA:");
805 assert_eq!(entries[0].attached.len(), 3);
806 }
807
808 #[test]
809 fn sdp_body_collected_as_attached() {
810 let lines = vec![
811 full_line(UUID1, TS1, "Local SDP:"),
812 "v=0".to_string(),
813 "o=- 1234 5678 IN IP4 192.0.2.1".to_string(),
814 "s=-".to_string(),
815 "c=IN IP4 192.0.2.1".to_string(),
816 "m=audio 10000 RTP/AVP 0".to_string(),
817 ];
818 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
819 assert_eq!(entries.len(), 1);
820 assert_eq!(entries[0].attached.len(), 5);
821 }
822
823 #[test]
824 fn truncated_starts_new_entry() {
825 let lines = vec![
826 full_line(UUID1, TS1, "First"),
827 format!(
828 "varia{UUID2} EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 set(x=y)"
829 ),
830 ];
831 let mut stream = LogStream::new(lines.into_iter());
832 let first = stream.next().unwrap();
833 assert_eq!(first.uuid, UUID1);
834 assert_eq!(first.message, "First");
835 let second = stream.next().unwrap();
836 assert_eq!(second.uuid, UUID2);
837 assert_eq!(second.kind, LineKind::Truncated);
838 }
839
840 #[test]
841 fn empty_lines_in_attached() {
842 let lines = vec![
843 full_line(UUID1, TS1, "First"),
844 String::new(),
845 "continuation".to_string(),
846 ];
847 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
848 assert_eq!(entries.len(), 1);
849 assert_eq!(entries[0].attached.len(), 2);
850 assert_eq!(entries[0].attached.get(0), Some(""));
851 assert_eq!(entries[0].attached.get(1), Some("continuation"));
852 }
853
854 #[test]
855 fn system_line_no_uuid() {
856 let lines = vec![format!(
857 "{TS1} 95.97% [INFO] mod_event_socket.c:1772 Event Socket command"
858 )];
859 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
860 assert_eq!(entries.len(), 1);
861 assert_eq!(entries[0].uuid, "");
862 assert_eq!(entries[0].kind, LineKind::System);
863 }
864
865 #[test]
866 fn final_entry_on_exhaustion() {
867 let lines = vec![full_line(UUID1, TS1, "Only entry")];
868 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
869 assert_eq!(entries.len(), 1);
870 assert_eq!(entries[0].message, "Only entry");
871 }
872
873 #[test]
874 fn consecutive_full_lines() {
875 let lines = vec![
876 full_line(UUID1, TS1, "First"),
877 full_line(UUID1, TS2, "Second"),
878 full_line(UUID2, TS1, "Third"),
879 ];
880 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
881 assert_eq!(entries.len(), 3);
882 for entry in &entries {
883 assert!(entry.attached.is_empty());
884 }
885 }
886
887 #[test]
888 fn execute_after_channel_data_same_uuid() {
889 let lines = vec![
890 full_line(UUID1, TS1, "CHANNEL_DATA:"),
891 format!("{UUID1} Channel-State: [CS_EXECUTE]"),
892 format!("{UUID1} variable_sip_call_id: [test@192.0.2.1]"),
893 "variable_foo: [bar]".to_string(),
894 String::new(),
895 String::new(),
896 format!("{UUID1} EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 export(originate_timeout=3600)"),
897 full_line(UUID1, TS2, "EXPORT (export_vars) [originate_timeout]=[3600]"),
898 ];
899 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
900 assert_eq!(entries.len(), 3);
901 assert_eq!(entries[0].message, "CHANNEL_DATA:");
902 assert_eq!(entries[0].attached.len(), 5);
903 assert_eq!(entries[1].message, "EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 export(originate_timeout=3600)");
904 assert_eq!(entries[1].kind, LineKind::UuidContinuation);
905 assert_eq!(
906 entries[2].message,
907 "EXPORT (export_vars) [originate_timeout]=[3600]"
908 );
909 }
910
911 #[test]
912 fn execute_between_full_lines_same_uuid() {
913 let lines = vec![
914 full_line(UUID1, TS1, "CoreSession::setVariable(X-C911P-City, ST GEORGES)"),
915 format!("{UUID1} EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 db(insert/ng_{UUID1}/city/ST GEORGES)"),
916 full_line(UUID1, TS2, "CoreSession::setVariable(X-C911P-Region, SGS)"),
917 ];
918 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
919 assert_eq!(entries.len(), 3);
920 assert_eq!(
921 entries[0].message,
922 "CoreSession::setVariable(X-C911P-City, ST GEORGES)"
923 );
924 assert!(entries[0].attached.is_empty());
925 assert!(entries[1].message.starts_with("EXECUTE "));
926 assert_eq!(entries[1].kind, LineKind::UuidContinuation);
927 assert_eq!(
928 entries[2].message,
929 "CoreSession::setVariable(X-C911P-Region, SGS)"
930 );
931 }
932
933 #[test]
934 fn multiple_execute_between_full_lines() {
935 let lines = vec![
936 full_line(UUID1, TS1, "CoreSession::setVariable(ngcs_call_id, urn:emergency:uid:callid:test)"),
937 format!("{UUID1} EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 db(insert/ng_{UUID1}/call_id/urn:emergency:uid:callid:test)"),
938 format!("{UUID1} EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 db(insert/callid_codecs/urn:emergency:uid:callid:test/PCMU@8000h)"),
939 full_line(UUID1, TS2, "CoreSession::setVariable(ngcs_short_call_id, test)"),
940 ];
941 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
942 assert_eq!(entries.len(), 4);
943 assert!(entries[0].attached.is_empty());
944 assert!(entries[1].message.contains("call_id"));
945 assert!(entries[2].message.contains("callid_codecs"));
946 assert_eq!(
947 entries[3].message,
948 "CoreSession::setVariable(ngcs_short_call_id, test)"
949 );
950 }
951
952 #[test]
953 fn uuid_continuation_different_uuid_yields() {
954 let lines = vec![
955 full_line(UUID1, TS1, "First"),
956 format!("{UUID1} Channel-State: [CS_EXECUTE]"),
957 format!("{UUID2} Dialplan: sofia/internal/+15550001234@192.0.2.1 parsing [public]"),
958 ];
959 let mut stream = LogStream::new(lines.into_iter());
960 let first = stream.next().unwrap();
961 assert_eq!(first.uuid, UUID1);
962 assert_eq!(first.attached.len(), 1);
963 let second = stream.next().unwrap();
964 assert_eq!(second.uuid, UUID2);
965 assert_eq!(
966 second.message,
967 "Dialplan: sofia/internal/+15550001234@192.0.2.1 parsing [public]"
968 );
969 }
970
971 #[test]
974 fn channel_data_block_fields_and_variables() {
975 let lines = vec![
976 full_line(UUID1, TS1, "CHANNEL_DATA:"),
977 format!("{UUID1} Channel-Name: [sofia/internal/+15550001234@192.0.2.1]"),
978 format!("{UUID1} Channel-State: [CS_EXECUTE]"),
979 format!("{UUID1} Unique-ID: [{UUID1}]"),
980 "variable_sip_call_id: [test123@192.0.2.1]".to_string(),
981 "variable_direction: [inbound]".to_string(),
982 ];
983 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
984 assert_eq!(entries.len(), 1);
985 assert_eq!(entries[0].message_kind, MessageKind::ChannelData);
986 let block = entries[0].block.as_ref().expect("should have block");
987 match block {
988 Block::ChannelData { fields, variables } => {
989 assert_eq!(fields.len(), 3);
990 assert_eq!(
991 fields[0],
992 (
993 "Channel-Name".to_string(),
994 "sofia/internal/+15550001234@192.0.2.1".to_string()
995 )
996 );
997 assert_eq!(
998 fields[1],
999 ("Channel-State".to_string(), "CS_EXECUTE".to_string())
1000 );
1001 assert_eq!(fields[2], ("Unique-ID".to_string(), UUID1.to_string()));
1002 assert_eq!(variables.len(), 2);
1003 assert_eq!(
1004 variables[0],
1005 (
1006 "variable_sip_call_id".to_string(),
1007 "test123@192.0.2.1".to_string()
1008 )
1009 );
1010 assert_eq!(
1011 variables[1],
1012 ("variable_direction".to_string(), "inbound".to_string())
1013 );
1014 }
1015 other => panic!("expected ChannelData block, got {other:?}"),
1016 }
1017 }
1018
1019 #[test]
1020 fn channel_data_multiline_variable_reassembly() {
1021 let lines = vec![
1022 full_line(UUID1, TS1, "CHANNEL_DATA:"),
1023 format!("{UUID1} Channel-Name: [sofia/internal/+15550001234@192.0.2.1]"),
1024 "variable_switch_r_sdp: [v=0".to_string(),
1025 "o=- 1234 5678 IN IP4 192.0.2.1".to_string(),
1026 "s=-".to_string(),
1027 "c=IN IP4 192.0.2.1".to_string(),
1028 "m=audio 47758 RTP/AVP 0 101".to_string(),
1029 "a=rtpmap:0 PCMU/8000".to_string(),
1030 "]".to_string(),
1031 "variable_direction: [inbound]".to_string(),
1032 ];
1033 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1034 assert_eq!(entries.len(), 1);
1035 let block = entries[0].block.as_ref().expect("should have block");
1036 match block {
1037 Block::ChannelData { fields, variables } => {
1038 assert_eq!(fields.len(), 1);
1039 assert_eq!(variables.len(), 2);
1040 assert_eq!(variables[0].0, "variable_switch_r_sdp");
1041 assert!(variables[0].1.starts_with("v=0\n"));
1042 assert!(variables[0].1.contains("m=audio 47758 RTP/AVP 0 101"));
1043 assert!(!variables[0].1.ends_with(']'));
1044 assert_eq!(
1045 variables[1],
1046 ("variable_direction".to_string(), "inbound".to_string())
1047 );
1048 }
1049 other => panic!("expected ChannelData block, got {other:?}"),
1050 }
1051 assert_eq!(entries[0].attached.len(), 9);
1052 }
1053
1054 #[test]
1055 fn sdp_block_detection() {
1056 let lines = vec![
1057 full_line(UUID1, TS1, "Local SDP:"),
1058 "v=0".to_string(),
1059 "o=- 1234 5678 IN IP4 192.0.2.1".to_string(),
1060 "s=-".to_string(),
1061 "c=IN IP4 192.0.2.1".to_string(),
1062 "m=audio 10000 RTP/AVP 0".to_string(),
1063 "a=rtpmap:0 PCMU/8000".to_string(),
1064 ];
1065 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1066 assert_eq!(entries.len(), 1);
1067 match &entries[0].message_kind {
1068 MessageKind::SdpMarker { direction } => assert_eq!(*direction, SdpDirection::Local),
1069 other => panic!("expected SdpMarker, got {other:?}"),
1070 }
1071 let block = entries[0].block.as_ref().expect("should have block");
1072 match block {
1073 Block::Sdp { direction, body } => {
1074 assert_eq!(*direction, SdpDirection::Local);
1075 assert_eq!(body.len(), 6);
1076 assert_eq!(body[0], "v=0");
1077 assert_eq!(body[5], "a=rtpmap:0 PCMU/8000");
1078 }
1079 other => panic!("expected Sdp block, got {other:?}"),
1080 }
1081 }
1082
1083 #[test]
1084 fn sdp_block_terminated_by_primary_line() {
1085 let lines = vec![
1086 full_line(UUID1, TS1, "Remote SDP:"),
1087 "v=0".to_string(),
1088 "m=audio 10000 RTP/AVP 0".to_string(),
1089 full_line(UUID1, TS2, "Next event"),
1090 ];
1091 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1092 assert_eq!(entries.len(), 2);
1093 let block = entries[0].block.as_ref().expect("should have block");
1094 match block {
1095 Block::Sdp { direction, body } => {
1096 assert_eq!(*direction, SdpDirection::Remote);
1097 assert_eq!(body.len(), 2);
1098 }
1099 other => panic!("expected Sdp block, got {other:?}"),
1100 }
1101 assert!(entries[1].block.is_none());
1102 }
1103
1104 #[test]
1105 fn sdp_from_uuid_continuation() {
1106 let lines = vec![
1107 format!("{UUID1} Local SDP:"),
1108 format!("{UUID1} v=0"),
1109 format!("{UUID1} o=FreeSWITCH 1234 5678 IN IP4 192.0.2.1"),
1110 format!("{UUID1} s=FreeSWITCH"),
1111 format!("{UUID1} c=IN IP4 192.0.2.1"),
1112 ];
1113 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1114 assert_eq!(entries.len(), 1);
1115 let block = entries[0].block.as_ref().expect("should have block");
1116 match block {
1117 Block::Sdp { direction, body } => {
1118 assert_eq!(*direction, SdpDirection::Local);
1119 assert_eq!(body.len(), 4);
1120 assert_eq!(body[0], "v=0");
1121 }
1122 other => panic!("expected Sdp block, got {other:?}"),
1123 }
1124 }
1125
1126 #[test]
1127 fn channel_data_interrupted_by_different_uuid() {
1128 let lines = vec![
1129 full_line(UUID1, TS1, "CHANNEL_DATA:"),
1130 format!("{UUID1} Channel-Name: [sofia/internal/+15550001234@192.0.2.1]"),
1131 format!("{UUID2} Dialplan: sofia/internal/+15559999999@192.0.2.1 parsing [public]"),
1132 ];
1133 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1134 assert_eq!(entries.len(), 2);
1135 let block = entries[0].block.as_ref().expect("should have block");
1136 match block {
1137 Block::ChannelData { fields, .. } => {
1138 assert_eq!(fields.len(), 1);
1139 }
1140 other => panic!("expected ChannelData, got {other:?}"),
1141 }
1142 }
1143
1144 #[test]
1145 fn no_block_for_non_block_message() {
1146 let lines = vec![full_line(UUID1, TS1, "some random freeswitch log message")];
1147 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1148 assert_eq!(entries.len(), 1);
1149 assert!(entries[0].block.is_none());
1150 assert_eq!(entries[0].message_kind, MessageKind::General);
1151 }
1152
1153 #[test]
1154 fn message_kind_on_execute() {
1155 let lines = vec![
1156 full_line(UUID1, TS1, "First"),
1157 format!("{UUID1} EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 set(foo=bar)"),
1158 ];
1159 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1160 assert_eq!(entries.len(), 2);
1161 match &entries[1].message_kind {
1162 MessageKind::Execute {
1163 application,
1164 arguments,
1165 ..
1166 } => {
1167 assert_eq!(application, "set");
1168 assert_eq!(arguments, "foo=bar");
1169 }
1170 other => panic!("expected Execute, got {other:?}"),
1171 }
1172 }
1173
1174 #[test]
1177 fn stats_lines_processed() {
1178 let lines = vec![
1179 full_line(UUID1, TS1, "First"),
1180 full_line(UUID1, TS2, "Second"),
1181 format!("{UUID1} Channel-State: [CS_EXECUTE]"),
1182 ];
1183 let mut stream = LogStream::new(lines.into_iter());
1184 let _: Vec<_> = stream.by_ref().collect();
1185 assert_eq!(stream.stats().lines_processed, 3);
1186 }
1187
1188 #[test]
1189 fn stats_unclassified_orphan() {
1190 let lines = vec![
1191 "variable_foo: [bar]".to_string(),
1192 full_line(UUID1, TS1, "After orphan"),
1193 ];
1194 let mut stream = LogStream::new(lines.into_iter())
1195 .unclassified_tracking(UnclassifiedTracking::TrackLines);
1196 let _: Vec<_> = stream.by_ref().collect();
1197 assert_eq!(stream.stats().lines_unclassified, 1);
1198 assert_eq!(stream.stats().unclassified_lines.len(), 1);
1199 assert_eq!(
1200 stream.stats().unclassified_lines[0].reason,
1201 UnclassifiedReason::OrphanContinuation,
1202 );
1203 }
1204
1205 #[test]
1206 fn stats_capture_data() {
1207 let lines = vec!["orphan line".to_string(), full_line(UUID1, TS1, "After")];
1208 let mut stream = LogStream::new(lines.into_iter())
1209 .unclassified_tracking(UnclassifiedTracking::CaptureData);
1210 let _: Vec<_> = stream.by_ref().collect();
1211 assert_eq!(stream.stats().unclassified_lines.len(), 1);
1212 assert_eq!(
1213 stream.stats().unclassified_lines[0].data.as_deref(),
1214 Some("orphan line"),
1215 );
1216 }
1217
1218 #[test]
1219 fn stats_count_only_no_allocation() {
1220 let lines = vec!["orphan line".to_string(), full_line(UUID1, TS1, "After")];
1221 let mut stream = LogStream::new(lines.into_iter());
1222 let _: Vec<_> = stream.by_ref().collect();
1223 assert_eq!(stream.stats().lines_unclassified, 1);
1224 assert!(stream.stats().unclassified_lines.is_empty());
1225 }
1226
1227 #[test]
1228 fn line_number_tracking() {
1229 let lines = vec![
1230 full_line(UUID1, TS1, "First"),
1231 format!("{UUID1} Channel-State: [CS_EXECUTE]"),
1232 full_line(UUID2, TS2, "Third"),
1233 ];
1234 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1235 assert_eq!(entries[0].line_number, 1);
1236 assert_eq!(entries[1].line_number, 3);
1237 }
1238
1239 #[test]
1240 fn drain_unclassified() {
1241 let lines = vec![
1242 "orphan1".to_string(),
1243 "orphan2".to_string(),
1244 full_line(UUID1, TS1, "After"),
1245 ];
1246 let mut stream = LogStream::new(lines.into_iter())
1247 .unclassified_tracking(UnclassifiedTracking::TrackLines);
1248 let _: Vec<_> = stream.by_ref().collect();
1249 let drained = stream.drain_unclassified();
1250 assert_eq!(drained.len(), 1);
1251 assert!(stream.stats().unclassified_lines.is_empty());
1252 assert_eq!(stream.stats().lines_unclassified, 1);
1253 }
1254
1255 #[test]
1263 fn continuation_lines_at_file_boundary_must_not_inherit_previous_timestamp() {
1264 use crate::TrackedChain;
1265
1266 let uuid_a = "aaaaaaaa-1111-2222-3333-444444444444";
1267 let uuid_b = "bbbbbbbb-1111-2222-3333-444444444444";
1268 let ts_old = "2025-01-15 23:58:03.000000";
1269 let ts_new = "2025-01-16 08:37:12.000000";
1270
1271 let seg1: Vec<String> = vec![format!(
1272 "{uuid_a} {ts_old} 95.00% [DEBUG] test.c:1 Last line in rotated file"
1273 )];
1274
1275 let seg2: Vec<String> = vec![
1278 format!("{uuid_b} CHANNEL_DATA:"),
1279 format!("{uuid_b} Channel-State: [CS_EXECUTE]"),
1280 format!("{uuid_b} {ts_new} 95.00% [DEBUG] test.c:1 First timestamped line in new file"),
1281 ];
1282
1283 let segments: Vec<(String, Box<dyn Iterator<Item = String>>)> = vec![
1284 ("rotated.log".to_string(), Box::new(seg1.into_iter())),
1285 ("freeswitch.log".to_string(), Box::new(seg2.into_iter())),
1286 ];
1287
1288 let (chain, _) = TrackedChain::new(segments);
1289 let entries: Vec<_> = LogStream::new(chain).collect();
1290
1291 let b_entry = entries
1292 .iter()
1293 .find(|e| e.uuid == uuid_b)
1294 .expect("should find entry for uuid_b");
1295
1296 assert_ne!(
1300 b_entry.timestamp, ts_old,
1301 "continuation lines in a new file segment inherited timestamp \
1302 '{ts_old}' from the previous segment — timestamps must not bleed \
1303 across file boundaries"
1304 );
1305 }
1306
1307 fn assert_accounting(stream: &LogStream<impl Iterator<Item = String>>) {
1310 let stats = stream.stats();
1311 assert_eq!(
1312 stats.unaccounted_lines(),
1313 0,
1314 "line accounting invariant violated: \
1315 processed={} + split={} != in_entries={} + empty_orphan={}",
1316 stats.lines_processed,
1317 stats.lines_split,
1318 stats.lines_in_entries,
1319 stats.lines_empty_orphan,
1320 );
1321 }
1322
1323 #[test]
1324 fn accounting_full_lines() {
1325 let lines = vec![
1326 full_line(UUID1, TS1, "First"),
1327 full_line(UUID2, TS2, "Second"),
1328 ];
1329 let mut stream = LogStream::new(lines.into_iter());
1330 let entries: Vec<_> = stream.by_ref().collect();
1331 assert_eq!(entries.len(), 2);
1332 assert_eq!(stream.stats().lines_in_entries, 2);
1333 assert_accounting(&stream);
1334 }
1335
1336 #[test]
1337 fn accounting_with_attached() {
1338 let lines = vec![
1339 full_line(UUID1, TS1, "CHANNEL_DATA:"),
1340 format!("{UUID1} Channel-State: [CS_EXECUTE]"),
1341 "variable_foo: [bar]".to_string(),
1342 full_line(UUID2, TS2, "Next"),
1343 ];
1344 let mut stream = LogStream::new(lines.into_iter());
1345 let entries: Vec<_> = stream.by_ref().collect();
1346 assert_eq!(entries.len(), 2);
1347 assert_eq!(stream.stats().lines_in_entries, 4);
1350 assert_accounting(&stream);
1351 }
1352
1353 #[test]
1354 fn accounting_system_line() {
1355 let lines = vec![format!(
1356 "{TS1} 95.97% [NOTICE] mod_logfile.c:217 New log started."
1357 )];
1358 let mut stream = LogStream::new(lines.into_iter());
1359 let _: Vec<_> = stream.by_ref().collect();
1360 assert_eq!(stream.stats().lines_in_entries, 1);
1361 assert_accounting(&stream);
1362 }
1363
1364 #[test]
1365 fn accounting_empty_orphan() {
1366 let lines = vec![
1367 String::new(),
1368 " ".to_string(),
1369 full_line(UUID1, TS1, "After"),
1370 ];
1371 let mut stream = LogStream::new(lines.into_iter());
1372 let entries: Vec<_> = stream.by_ref().collect();
1373 assert_eq!(entries.len(), 1);
1374 assert_eq!(stream.stats().lines_empty_orphan, 2);
1375 assert_accounting(&stream);
1376 }
1377
1378 #[test]
1379 fn accounting_empty_attached() {
1380 let lines = vec![
1381 full_line(UUID1, TS1, "First"),
1382 String::new(),
1383 "continuation".to_string(),
1384 ];
1385 let mut stream = LogStream::new(lines.into_iter());
1386 let entries: Vec<_> = stream.by_ref().collect();
1387 assert_eq!(entries.len(), 1);
1388 assert_eq!(entries[0].attached.len(), 2);
1389 assert_eq!(stream.stats().lines_empty_orphan, 0);
1390 assert_eq!(stream.stats().lines_in_entries, 3);
1391 assert_accounting(&stream);
1392 }
1393
1394 #[test]
1395 fn accounting_orphan_continuation() {
1396 let lines = vec!["orphan line".to_string(), full_line(UUID1, TS1, "After")];
1397 let mut stream = LogStream::new(lines.into_iter());
1398 let _: Vec<_> = stream.by_ref().collect();
1399 assert_accounting(&stream);
1400 }
1401
1402 #[test]
1403 fn accounting_codec_merging() {
1404 let lines = vec![
1405 full_line(
1406 UUID1,
1407 TS1,
1408 "Audio Codec Compare [PCMU:0:8000:20:64000:1]/[PCMU:0:8000:20:64000:1]",
1409 ),
1410 full_line(
1411 UUID1,
1412 TS1,
1413 "Audio Codec Compare [PCMU:0:8000:20:64000:1] is saved as a match",
1414 ),
1415 full_line(UUID2, TS2, "Next"),
1416 ];
1417 let mut stream = LogStream::new(lines.into_iter());
1418 let _: Vec<_> = stream.by_ref().collect();
1419 assert_accounting(&stream);
1420 }
1421
1422 #[test]
1423 fn accounting_truncated_line() {
1424 let lines = vec![
1425 full_line(UUID1, TS1, "First"),
1426 format!(
1427 "varia{UUID2} EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 set(x=y)"
1428 ),
1429 ];
1430 let mut stream = LogStream::new(lines.into_iter());
1431 let _: Vec<_> = stream.by_ref().collect();
1432 assert_accounting(&stream);
1433 }
1434
1435 #[test]
1436 fn accounting_long_line_collision_split() {
1437 let long_value = "x".repeat(MAX_LINE_PAYLOAD + 10);
1440 let line = format!(
1441 "variable_sip_multipart: [{long_value}]{UUID2} EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 set(foo=bar)"
1442 );
1443 let lines = vec![full_line(UUID1, TS1, "CHANNEL_DATA:"), line];
1444 let mut stream = LogStream::new(lines.into_iter());
1445 let entries: Vec<_> = stream.by_ref().collect();
1446
1447 assert_eq!(entries[0].message, "CHANNEL_DATA:");
1449
1450 let split_entry = entries.iter().find(|e| e.uuid == UUID2);
1452 assert!(
1453 split_entry.is_some(),
1454 "collision UUID should produce a separate entry"
1455 );
1456
1457 assert_eq!(stream.stats().lines_split, 1);
1458 assert_accounting(&stream);
1459 }
1460
1461 #[test]
1462 fn no_split_on_short_lines() {
1463 let line = format!("variable_call_uuid: [{UUID2}]");
1466 let lines = vec![full_line(UUID1, TS1, "CHANNEL_DATA:"), line];
1467 let mut stream = LogStream::new(lines.into_iter());
1468 let entries: Vec<_> = stream.by_ref().collect();
1469 assert_eq!(entries.len(), 1);
1470 assert_eq!(stream.stats().lines_split, 0);
1471 assert_accounting(&stream);
1472 }
1473
1474 #[test]
1475 fn timestamp_collision_splits_system_lines() {
1476 let line = format!(
1477 "{TS1} 98.03% [INFO] mod_event_socket.c:1752 Event Socket Command from ::1:42864: api sofia jsonstatus{TS2} 97.93% [INFO] mod_event_socket.c:1752 Event Socket Command from ::1:42898: api fsctl pause_check"
1478 );
1479 let mut stream = LogStream::new(std::iter::once(line));
1480 let entries: Vec<_> = stream.by_ref().collect();
1481 assert_eq!(entries.len(), 2);
1482 assert_eq!(
1483 entries[0].message,
1484 "Event Socket Command from ::1:42864: api sofia jsonstatus"
1485 );
1486 assert_eq!(
1487 entries[1].message,
1488 "Event Socket Command from ::1:42898: api fsctl pause_check"
1489 );
1490 assert_eq!(stream.stats().lines_split, 1);
1491 assert_accounting(&stream);
1492 }
1493
1494 #[test]
1495 fn timestamp_collision_splits_three_entries() {
1496 let ts3 = "2025-01-15 10:30:47.345678";
1497 let line = format!(
1498 "{TS1} 95.00% [INFO] mod.c:1 first{TS2} 96.00% [INFO] mod.c:1 second{ts3} 97.00% [INFO] mod.c:1 third"
1499 );
1500 let mut stream = LogStream::new(std::iter::once(line));
1501 let entries: Vec<_> = stream.by_ref().collect();
1502 assert_eq!(entries.len(), 3);
1503 assert_eq!(entries[0].message, "first");
1504 assert_eq!(entries[1].message, "second");
1505 assert_eq!(entries[2].message, "third");
1506 assert_eq!(stream.stats().lines_split, 2);
1507 assert_accounting(&stream);
1508 }
1509
1510 #[test]
1511 fn timestamp_collision_oversize_write_contention() {
1512 let entry = |n: usize| {
1518 format!(
1519 "{TS1} 98.77% [INFO] mod_event_socket.c:1754 Event Socket Command from ::1:42864: api db select/ngcs_sip_call_id/entry-{n:04}"
1520 )
1521 };
1522 let count: u64 = 20;
1523 let line: String = (0..count).map(|n| entry(n as usize)).collect();
1524 assert!(
1525 line.len() > super::MAX_LINE_PAYLOAD,
1526 "test fixture should exceed MAX_LINE_PAYLOAD, got {}",
1527 line.len()
1528 );
1529
1530 let mut stream = LogStream::new(std::iter::once(line));
1531 let entries: Vec<_> = stream.by_ref().collect();
1532 assert_eq!(entries.len() as u64, count);
1533 for (i, e) in entries.iter().enumerate() {
1534 assert_eq!(
1535 e.message,
1536 format!(
1537 "Event Socket Command from ::1:42864: api db select/ngcs_sip_call_id/entry-{i:04}"
1538 )
1539 );
1540 }
1541 assert_eq!(stream.stats().lines_split, count - 1);
1542 assert_accounting(&stream);
1543 }
1544
1545 #[test]
1546 fn timestamp_collision_with_uuid_prefix() {
1547 let line = format!(
1549 "{TS1} 95.00% [INFO] mod.c:1 first{UUID1} {TS2} 96.00% [DEBUG] sofia.c:100 second"
1550 );
1551 let mut stream = LogStream::new(std::iter::once(line));
1552 let entries: Vec<_> = stream.by_ref().collect();
1553 assert_eq!(entries.len(), 2);
1554 assert_eq!(entries[0].message, "first");
1555 assert_eq!(entries[1].uuid, UUID1);
1556 assert_eq!(entries[1].message, "second");
1557 assert_eq!(stream.stats().lines_split, 1);
1558 assert_accounting(&stream);
1559 }
1560
1561 #[test]
1562 fn channel_data_multiline_variable_spans_many_lines() {
1563 let lines = vec![
1564 full_line(UUID1, TS1, "CHANNEL_DATA:"),
1565 format!("{UUID1} Channel-Name: [sofia/internal/+15550001234@192.0.2.1]"),
1566 "variable_switch_r_sdp: [v=0".to_string(),
1567 "o=- 1234 5678 IN IP4 192.0.2.1".to_string(),
1568 "s=-".to_string(),
1569 "c=IN IP4 192.0.2.1".to_string(),
1570 "t=0 0".to_string(),
1571 "m=audio 47758 RTP/AVP 0 8 101".to_string(),
1572 "a=rtpmap:0 PCMU/8000".to_string(),
1573 "a=rtpmap:8 PCMA/8000".to_string(),
1574 "a=rtpmap:101 telephone-event/8000".to_string(),
1575 "a=fmtp:101 0-16".to_string(),
1576 "]".to_string(),
1577 "variable_direction: [inbound]".to_string(),
1578 ];
1579 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1580 assert_eq!(entries.len(), 1);
1581 let block = entries[0].block.as_ref().expect("should have block");
1582 match block {
1583 Block::ChannelData { fields, variables } => {
1584 assert_eq!(fields.len(), 1);
1585 assert_eq!(variables.len(), 2);
1586 assert_eq!(variables[0].0, "variable_switch_r_sdp");
1587 let sdp = &variables[0].1;
1588 assert!(sdp.starts_with("v=0\n"));
1589 assert!(sdp.contains("a=fmtp:101 0-16"));
1590 assert!(!sdp.ends_with(']'));
1591 assert_eq!(variables[1].0, "variable_direction");
1592 }
1593 other => panic!("expected ChannelData block, got {other:?}"),
1594 }
1595 }
1596
1597 #[test]
1598 fn sdp_from_verto_update_media() {
1599 let lines = vec![
1600 full_line(UUID1, TS1, "updateMedia: Local SDP"),
1601 "v=0".to_string(),
1602 "o=- 1234 5678 IN IP4 192.0.2.1".to_string(),
1603 "m=audio 10000 RTP/AVP 0".to_string(),
1604 ];
1605 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1606 assert_eq!(entries.len(), 1);
1607 match &entries[0].message_kind {
1608 MessageKind::SdpMarker { direction } => assert_eq!(*direction, SdpDirection::Local),
1609 other => panic!("expected SdpMarker, got {other:?}"),
1610 }
1611 let block = entries[0].block.as_ref().expect("should have block");
1612 match block {
1613 Block::Sdp { direction, body } => {
1614 assert_eq!(*direction, SdpDirection::Local);
1615 assert_eq!(body.len(), 3);
1616 }
1617 other => panic!("expected Sdp block, got {other:?}"),
1618 }
1619 }
1620
1621 #[test]
1622 fn duplicate_sdp_marker() {
1623 let lines = vec![
1624 full_line(UUID1, TS1, "Duplicate SDP"),
1625 "v=0".to_string(),
1626 "m=audio 10000 RTP/AVP 0".to_string(),
1627 ];
1628 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1629 assert_eq!(entries.len(), 1);
1630 match &entries[0].message_kind {
1631 MessageKind::SdpMarker { direction } => assert_eq!(*direction, SdpDirection::Unknown),
1632 other => panic!("expected SdpMarker, got {other:?}"),
1633 }
1634 assert!(entries[0].block.is_some());
1635 }
1636
1637 #[test]
1638 fn warning_on_unclosed_multiline_variable() {
1639 let lines = vec![
1640 full_line(UUID1, TS1, "CHANNEL_DATA:"),
1641 "variable_switch_r_sdp: [v=0".to_string(),
1642 "o=- 1234 5678 IN IP4 192.0.2.1".to_string(),
1643 full_line(UUID2, TS2, "Next entry"),
1644 ];
1645 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1646 assert_eq!(entries.len(), 2);
1647 assert!(
1648 entries[0]
1649 .warnings
1650 .iter()
1651 .any(|w| w.contains("unclosed multi-line variable")),
1652 "expected unclosed variable warning, got: {:?}",
1653 entries[0].warnings
1654 );
1655 }
1656
1657 #[test]
1658 fn warning_on_unparseable_channel_data_line() {
1659 let lines = vec![
1660 full_line(UUID1, TS1, "CHANNEL_DATA:"),
1661 format!("{UUID1} Channel-Name: [sofia/internal/+15550001234@192.0.2.1]"),
1662 format!("{UUID1} this is not a valid field line"),
1663 ];
1664 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1665 assert_eq!(entries.len(), 1);
1666 assert!(
1667 entries[0]
1668 .warnings
1669 .iter()
1670 .any(|w| w.contains("unparseable CHANNEL_DATA")),
1671 "expected unparseable warning, got: {:?}",
1672 entries[0].warnings
1673 );
1674 }
1675
1676 #[test]
1677 fn warning_on_unexpected_codec_continuation() {
1678 let lines = vec![
1679 full_line(
1680 UUID1,
1681 TS1,
1682 "Audio Codec Compare [PCMU:0:8000:20:64000:1]/[PCMU:0:8000:20:64000:1]",
1683 ),
1684 format!("{UUID1} some unexpected continuation line"),
1685 ];
1686 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1687 assert_eq!(entries.len(), 1);
1688 assert!(
1689 entries[0]
1690 .warnings
1691 .iter()
1692 .any(|w| w.contains("unexpected codec negotiation")),
1693 "expected codec warning, got: {:?}",
1694 entries[0].warnings
1695 );
1696 }
1697
1698 #[test]
1699 fn system_line_uuid_continuation_not_absorbed() {
1700 let lines = vec![
1703 format!("{TS1} 95.97% [INFO] mod_event_socket.c:1772 Event Socket command"),
1704 format!("{UUID1} Channel-State: [CS_EXECUTE]"),
1705 ];
1706 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1707 assert_eq!(
1708 entries.len(),
1709 2,
1710 "UUID continuation should not be absorbed by system entry"
1711 );
1712 assert_eq!(entries[0].uuid, "");
1713 assert_eq!(entries[1].uuid, UUID1);
1714 }
1715
1716 #[test]
1717 fn truncated_collision_in_channel_data_variable() {
1718 let padding = "x".repeat(2000);
1724 let collision_line = format!(
1725 "{UUID1} variable_long_xml: [{padding}{UUID1} EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 export(foo=bar)"
1726 );
1727 assert!(
1728 collision_line.len() > super::MAX_LINE_PAYLOAD,
1729 "test line must exceed buffer limit, got {}",
1730 collision_line.len()
1731 );
1732
1733 let lines = vec![
1734 full_line(UUID1, TS1, "CHANNEL_DATA:"),
1735 format!("{UUID1} Channel-Name: [sofia/internal/+15550001234@192.0.2.1]"),
1736 format!("{UUID1} variable_direction: [inbound]"),
1737 collision_line,
1738 full_line(UUID1, TS2, "Next log entry"),
1739 ];
1740
1741 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1742
1743 assert_eq!(entries[0].message, "CHANNEL_DATA:");
1745 let block = entries[0].block.as_ref().expect("should have block");
1746 match block {
1747 Block::ChannelData { fields, variables } => {
1748 assert_eq!(fields.len(), 1, "should have Channel-Name field");
1749 assert_eq!(fields[0].0, "Channel-Name");
1750 assert_eq!(
1751 variables.len(),
1752 2,
1753 "should have direction + unclosed long_xml"
1754 );
1755 assert_eq!(variables[0].0, "variable_direction");
1756 assert_eq!(variables[0].1, "inbound");
1757 assert_eq!(variables[1].0, "variable_long_xml");
1758 }
1759 other => panic!("expected ChannelData block, got {other:?}"),
1760 }
1761 assert!(
1762 entries[0]
1763 .warnings
1764 .iter()
1765 .any(|w| w.contains("line exceeds mod_logfile 2048-byte buffer")),
1766 "expected buffer overflow warning, got: {:?}",
1767 entries[0].warnings
1768 );
1769 assert!(
1770 entries[0]
1771 .warnings
1772 .iter()
1773 .any(|w| w.contains("unclosed multi-line variable")),
1774 "expected unclosed variable warning, got: {:?}",
1775 entries[0].warnings
1776 );
1777
1778 assert_eq!(entries[1].uuid, UUID1);
1780 assert!(
1781 entries[1].message.starts_with("EXECUTE "),
1782 "split entry should be EXECUTE, got: {}",
1783 entries[1].message
1784 );
1785
1786 assert_eq!(entries.len(), 3);
1788 assert_eq!(entries[2].message, "Next log entry");
1789 }
1790
1791 #[test]
1792 fn channel_data_uuid_drops_mid_block() {
1793 let lines = vec![
1798 full_line(UUID1, TS1, "CHANNEL_DATA:"),
1799 format!("{UUID1} variable_max_forwards: [69]"),
1800 format!("{UUID1} variable_presence_id: [1251@[2001:db8::10]]"),
1801 format!("{UUID1} variable_sip_h_X-Custom-ID: [c4da84eb-88a7-40b2-b90d-e5bc2a0f634e]"),
1802 "variable_sip_h_X-Call-Info: [<urn:test:callid:20260316>;purpose=emergency-CallId]"
1804 .to_string(),
1805 "variable_ep_codec_string: [mod_opus.opus@48000h@20i@2c]".to_string(),
1806 "variable_remote_media_ip: [2001:db8::10]".to_string(),
1807 "variable_remote_media_port: [9952]".to_string(),
1808 "variable_rtp_use_codec_name: [opus]".to_string(),
1809 full_line(UUID1, TS2, "Next entry"),
1810 ];
1811
1812 let mut stream = LogStream::new(lines.into_iter());
1813 let entries: Vec<_> = stream.by_ref().collect();
1814
1815 assert_eq!(entries.len(), 2);
1816 assert_eq!(entries[0].message, "CHANNEL_DATA:");
1817 let block = entries[0].block.as_ref().expect("should have block");
1818 match block {
1819 Block::ChannelData { fields, variables } => {
1820 assert_eq!(fields.len(), 0);
1821 assert_eq!(variables.len(), 8);
1822 assert_eq!(variables[0].0, "variable_max_forwards");
1824 assert_eq!(variables[0].1, "69");
1825 assert_eq!(variables[1].0, "variable_presence_id");
1826 assert_eq!(variables[1].1, "1251@[2001:db8::10]");
1827 assert_eq!(variables[2].0, "variable_sip_h_X-Custom-ID");
1828 assert_eq!(variables[3].0, "variable_sip_h_X-Call-Info");
1830 assert!(variables[3].1.contains("emergency-CallId"));
1831 assert_eq!(variables[4].0, "variable_ep_codec_string");
1832 assert_eq!(variables[7].0, "variable_rtp_use_codec_name");
1833 assert_eq!(variables[7].1, "opus");
1834 }
1835 other => panic!("expected ChannelData block, got {other:?}"),
1836 }
1837 assert_eq!(entries[0].attached.len(), 8);
1838 assert_eq!(entries[1].message, "Next entry");
1839 assert_accounting(&stream);
1840 }
1841
1842 #[test]
1843 fn channel_data_uuid_drops_with_multiline_variable() {
1844 let lines = vec![
1849 full_line(UUID1, TS1, "CHANNEL_DATA:"),
1850 format!("{UUID1} variable_max_forwards: [69]"),
1851 format!("{UUID1} variable_sip_h_X-Custom-ID: [c4da84eb-88a7-40b2-b90d-e5bc2a0f634e]"),
1852 "variable_switch_r_sdp: [v=0\r".to_string(),
1854 "o=FreeSWITCH 1773663549 1773663550 IN IP6 2001:db8::10\r".to_string(),
1855 "s=FreeSWITCH\r".to_string(),
1856 "c=IN IP6 2001:db8::10\r".to_string(),
1857 "t=0 0\r".to_string(),
1858 "m=audio 9952 RTP/AVP 102 101 13\r".to_string(),
1859 "a=rtpmap:102 opus/48000/2\r".to_string(),
1860 "a=ptime:20\r".to_string(),
1861 "]".to_string(),
1862 "variable_ep_codec_string: [mod_opus.opus@48000h@20i@2c]".to_string(),
1863 "variable_direction: [inbound]".to_string(),
1864 full_line(UUID1, TS2, "Next entry"),
1865 ];
1866
1867 let mut stream = LogStream::new(lines.into_iter());
1868 let entries: Vec<_> = stream.by_ref().collect();
1869
1870 assert_eq!(entries.len(), 2);
1871 let block = entries[0].block.as_ref().expect("should have block");
1872 match block {
1873 Block::ChannelData { fields, variables } => {
1874 assert_eq!(fields.len(), 0);
1875 assert_eq!(variables.len(), 5);
1876 assert_eq!(variables[0].0, "variable_max_forwards");
1877 assert_eq!(variables[1].0, "variable_sip_h_X-Custom-ID");
1878 assert_eq!(variables[2].0, "variable_switch_r_sdp");
1880 let sdp = &variables[2].1;
1881 assert!(
1882 sdp.starts_with("v=0\r\n"),
1883 "SDP should start with v=0\\r\\n, got: {sdp:?}"
1884 );
1885 assert!(sdp.contains("m=audio 9952 RTP/AVP 102 101 13\r"));
1886 assert!(sdp.contains("a=ptime:20\r"));
1887 assert!(!sdp.ends_with(']'), "closing bracket should be stripped");
1888 assert_eq!(variables[3].0, "variable_ep_codec_string");
1890 assert_eq!(variables[4].0, "variable_direction");
1891 assert_eq!(variables[4].1, "inbound");
1892 }
1893 other => panic!("expected ChannelData block, got {other:?}"),
1894 }
1895 assert_eq!(entries[0].attached.len(), 13);
1897 assert_accounting(&stream);
1898 }
1899
1900 #[test]
1901 fn channel_data_bare_variable_collision_with_execute() {
1902 let collision = format!(
1909 "variable_call_uuid: {UUID1} EXECUTE [depth=0] \
1910 sofia/internal-v6/1251@[2001:db8::10] export(nolocal:test_var=value)"
1911 );
1912
1913 let lines = vec![
1914 full_line(UUID1, TS1, "CHANNEL_DATA:"),
1915 format!("{UUID1} variable_max_forwards: [69]"),
1916 "variable_DP_MATCH: [ARRAY::create_conference|:create_conference]".to_string(),
1918 collision,
1919 full_line(
1921 UUID1,
1922 TS2,
1923 "EXPORT (export_vars) (REMOTE ONLY) [test_var]=[value]",
1924 ),
1925 ];
1926
1927 let mut stream = LogStream::new(lines.into_iter());
1928 let entries: Vec<_> = stream.by_ref().collect();
1929
1930 assert_eq!(entries.len(), 3);
1932 let block = entries[0].block.as_ref().expect("should have block");
1933 match block {
1934 Block::ChannelData { fields, variables } => {
1935 assert_eq!(fields.len(), 0);
1936 assert_eq!(variables.len(), 2);
1937 assert_eq!(variables[0].0, "variable_max_forwards");
1938 assert_eq!(variables[1].0, "variable_DP_MATCH");
1939 }
1940 other => panic!("expected ChannelData block, got {other:?}"),
1941 }
1942
1943 assert_eq!(entries[1].uuid, UUID1);
1945 assert_eq!(entries[1].kind, LineKind::Truncated);
1946 assert!(
1947 entries[1].message.starts_with("EXECUTE "),
1948 "truncated line should yield EXECUTE, got: {}",
1949 entries[1].message
1950 );
1951
1952 assert_eq!(entries[2].message_kind.label(), "variable");
1954 assert_accounting(&stream);
1955 }
1956
1957 #[test]
1958 fn system_line_with_embedded_uuid_gets_entry_uuid() {
1959 let lines = vec![
1962 format!(
1963 "{TS1} 95.97% [DEBUG] switch_cpp.cpp:1466 {UUID1} DAA-LOG WaveManager originate"
1964 ),
1965 format!(
1966 "{TS1} 95.97% [WARNING] switch_cpp.cpp:1466 {UUID1} DAA-LOG Failed to create session"
1967 ),
1968 full_line(UUID1, TS2, "State Change CS_EXECUTE -> CS_HIBERNATE"),
1969 ];
1970
1971 let mut stream = LogStream::new(lines.into_iter());
1972 let entries: Vec<_> = stream.by_ref().collect();
1973
1974 assert_eq!(entries.len(), 3);
1975 assert_eq!(entries[0].uuid, UUID1);
1977 assert_eq!(entries[0].kind, LineKind::System);
1978 assert_eq!(entries[0].message, "DAA-LOG WaveManager originate");
1979
1980 assert_eq!(entries[1].uuid, UUID1);
1981 assert_eq!(entries[1].kind, LineKind::System);
1982 assert_eq!(entries[1].message, "DAA-LOG Failed to create session");
1983
1984 assert_eq!(entries[2].uuid, UUID1);
1986 assert_eq!(entries[2].kind, LineKind::Full);
1987 assert_accounting(&stream);
1988 }
1989}