1use crate::attached::AttachedLines;
2use crate::level::LogLevel;
3use crate::line::{is_date_at, is_log_header_at, is_uuid_at, parse_line, LineKind};
4use crate::message::{classify_message, MessageKind, SdpDirection};
5
6#[derive(Debug, Clone, PartialEq, Eq)]
11#[non_exhaustive]
12pub enum Block {
13 ChannelData {
16 fields: Vec<(String, String)>,
17 variables: Vec<(String, String)>,
18 },
19 Sdp {
21 direction: SdpDirection,
22 body: Vec<String>,
23 },
24 CodecNegotiation {
26 comparisons: Vec<(String, String)>,
27 selected: Vec<String>,
28 },
29}
30
31#[derive(Debug, Clone, Copy, PartialEq, Eq)]
35pub enum UnclassifiedTracking {
36 CountOnly,
38 TrackLines,
40 CaptureData,
42}
43
44#[derive(Debug, Clone, PartialEq, Eq)]
46#[non_exhaustive]
47pub enum UnclassifiedReason {
48 OrphanContinuation,
50 UnknownMessageFormat,
52 TruncatedField,
54}
55
56#[derive(Debug, Clone, PartialEq, Eq)]
58pub struct UnclassifiedLine {
59 pub line_number: u64,
60 pub reason: UnclassifiedReason,
61 pub data: Option<String>,
63}
64
65#[derive(Debug, Clone, Default)]
67pub struct ParseStats {
68 pub lines_processed: u64,
69 pub lines_unclassified: u64,
70 pub lines_in_entries: u64,
72 pub lines_empty_orphan: u64,
74 pub lines_split: u64,
77 pub unclassified_lines: Vec<UnclassifiedLine>,
79}
80
81impl ParseStats {
82 pub fn unaccounted_lines(&self) -> u64 {
89 let expected = self.lines_in_entries + self.lines_empty_orphan;
90 let actual = self.lines_processed + self.lines_split;
91 actual.saturating_sub(expected)
92 }
93}
94
95#[derive(Debug)]
101pub struct LogEntry {
102 pub uuid: String,
104 pub timestamp: String,
106 pub level: Option<LogLevel>,
108 pub idle_pct: Option<String>,
110 pub source: Option<String>,
112 pub message: String,
114 pub kind: LineKind,
116 pub message_kind: MessageKind,
118 pub block: Option<Block>,
120 pub attached: AttachedLines,
122 pub line_number: u64,
124 pub warnings: Vec<String>,
126}
127
128fn parse_field_line(msg: &str) -> Option<(String, String)> {
129 let colon = msg.find(": ")?;
130 let name = &msg[..colon];
131 if name.contains(' ') || name.is_empty() {
132 return None;
133 }
134 let value_part = &msg[colon + 2..];
135 let value = if let Some(inner) = value_part.strip_prefix('[') {
136 inner.strip_suffix(']').unwrap_or(inner)
137 } else {
138 value_part
139 };
140 Some((name.to_string(), value.to_string()))
141}
142
143enum StreamState {
144 Idle,
145 InChannelData {
146 fields: Vec<(String, String)>,
147 variables: Vec<(String, String)>,
148 open_var_name: Option<String>,
149 open_var_value: Option<String>,
150 },
151 InSdp {
152 direction: SdpDirection,
153 body: Vec<String>,
154 },
155 InCodecNegotiation {
156 comparisons: Vec<(String, String)>,
157 selected: Vec<String>,
158 },
159}
160
161impl StreamState {
162 fn take_idle(&mut self) -> StreamState {
163 std::mem::replace(self, StreamState::Idle)
164 }
165}
166
167pub struct LogStream<I> {
177 lines: I,
178 last_uuid: String,
179 last_timestamp: String,
180 pending: Option<LogEntry>,
181 state: StreamState,
182 stats: ParseStats,
183 tracking: UnclassifiedTracking,
184 line_number: u64,
185 split_pending: Option<String>,
186 deferred_warning: Option<String>,
187}
188
189impl<I: Iterator<Item = String>> LogStream<I> {
190 pub fn new(lines: I) -> Self {
192 LogStream {
193 lines,
194 last_uuid: String::new(),
195 last_timestamp: String::new(),
196 pending: None,
197 state: StreamState::Idle,
198 stats: ParseStats::default(),
199 tracking: UnclassifiedTracking::CountOnly,
200 line_number: 0,
201 split_pending: None,
202 deferred_warning: None,
203 }
204 }
205
206 pub fn unclassified_tracking(mut self, level: UnclassifiedTracking) -> Self {
208 self.tracking = level;
209 self
210 }
211
212 pub fn stats(&self) -> &ParseStats {
214 &self.stats
215 }
216
217 pub fn drain_unclassified(&mut self) -> Vec<UnclassifiedLine> {
221 std::mem::take(&mut self.stats.unclassified_lines)
222 }
223
224 fn record_unclassified(&mut self, reason: UnclassifiedReason, data: Option<&str>) {
225 self.stats.lines_unclassified += 1;
226 match self.tracking {
227 UnclassifiedTracking::CountOnly => {}
228 UnclassifiedTracking::TrackLines => {
229 self.stats.unclassified_lines.push(UnclassifiedLine {
230 line_number: self.line_number,
231 reason,
232 data: None,
233 });
234 }
235 UnclassifiedTracking::CaptureData => {
236 self.stats.unclassified_lines.push(UnclassifiedLine {
237 line_number: self.line_number,
238 reason,
239 data: data.map(|s| s.to_string()),
240 });
241 }
242 }
243 }
244
245 fn finalize_block(&mut self) -> (Option<Block>, Vec<String>) {
246 let mut warnings = Vec::new();
247 match self.state.take_idle() {
248 StreamState::Idle => (None, warnings),
249 StreamState::InChannelData {
250 fields,
251 mut variables,
252 open_var_name,
253 open_var_value,
254 } => {
255 if let (Some(ref name), Some(value)) = (&open_var_name, open_var_value) {
256 warnings.push(format!("unclosed multi-line variable: {name}"));
257 variables.push((name.clone(), value));
258 }
259 (Some(Block::ChannelData { fields, variables }), warnings)
260 }
261 StreamState::InSdp { direction, body } => {
262 (Some(Block::Sdp { direction, body }), warnings)
263 }
264 StreamState::InCodecNegotiation {
265 comparisons,
266 selected,
267 } => (
268 Some(Block::CodecNegotiation {
269 comparisons,
270 selected,
271 }),
272 warnings,
273 ),
274 }
275 }
276
277 fn finalize_pending(&mut self) -> Option<LogEntry> {
278 let (block, warnings) = self.finalize_block();
279 if let Some(ref mut p) = self.pending {
280 p.block = block;
281 p.warnings.extend(warnings);
282 self.stats.lines_in_entries += 1 + p.attached.len() as u64;
283 }
284 self.pending.take()
285 }
286
287 fn start_block_for_message(&mut self, message_kind: &MessageKind) {
288 self.state = match message_kind {
289 MessageKind::ChannelData => StreamState::InChannelData {
290 fields: Vec::new(),
291 variables: Vec::new(),
292 open_var_name: None,
293 open_var_value: None,
294 },
295 MessageKind::SdpMarker { direction } => StreamState::InSdp {
296 direction: direction.clone(),
297 body: Vec::new(),
298 },
299 MessageKind::CodecNegotiation => StreamState::InCodecNegotiation {
300 comparisons: Vec::new(),
301 selected: Vec::new(),
302 },
303 _ => StreamState::Idle,
304 };
305 }
306
307 fn accumulate_codec_entry(&mut self, msg: &str) {
308 let mut warning = None;
309 if let StreamState::InCodecNegotiation {
310 comparisons,
311 selected,
312 } = &mut self.state
313 {
314 let rest = msg.strip_prefix("Audio Codec Compare ").unwrap_or(msg);
315 if rest.contains("is saved as a match") {
316 let codec = rest.find(']').map(|end| &rest[1..end]).unwrap_or(rest);
317 selected.push(codec.to_string());
318 } else if let Some(slash) = rest.find("]/[") {
319 let offered = &rest[1..slash];
320 let local = &rest[slash + 3..rest.len().saturating_sub(1)];
321 comparisons.push((offered.to_string(), local.to_string()));
322 } else {
323 warning = Some(format!(
324 "unrecognized codec negotiation line: {}",
325 if msg.len() > 80 { &msg[..80] } else { msg }
326 ));
327 }
328 }
329 if let (Some(w), Some(ref mut pending)) = (warning, &mut self.pending) {
330 pending.warnings.push(w);
331 }
332 }
333
334 fn accumulate_continuation(&mut self, msg: &str, line: &str) {
335 let msg_kind = classify_message(msg);
336 let mut warning = None;
337 match &mut self.state {
338 StreamState::InChannelData {
339 fields,
340 variables,
341 open_var_name,
342 open_var_value,
343 } => {
344 if let Some(ref mut val) = open_var_value {
345 val.push('\n');
346 val.push_str(msg);
347 if msg.ends_with(']') {
348 let trimmed = val.trim_end_matches(']').to_string();
349 let name = open_var_name.take().unwrap();
350 *open_var_value = None;
351 variables.push((name, trimmed));
352 }
353 } else {
354 match &msg_kind {
355 MessageKind::ChannelField { name, value } => {
356 fields.push((name.clone(), value.clone()));
357 }
358 MessageKind::Variable { name, value } => {
359 if !msg.ends_with(']') && msg.contains(": [") {
360 *open_var_name = Some(name.clone());
361 *open_var_value = Some(value.clone());
362 } else {
363 variables.push((name.clone(), value.clone()));
364 }
365 }
366 _ => {
367 if let Some((name, value)) = parse_field_line(msg) {
368 fields.push((name, value));
369 } else {
370 warning = Some(format!(
371 "unparseable CHANNEL_DATA line: {}",
372 if msg.len() > 80 { &msg[..80] } else { msg }
373 ));
374 }
375 }
376 }
377 }
378 }
379 StreamState::InSdp { body, .. } => {
380 body.push(msg.to_string());
381 }
382 StreamState::InCodecNegotiation { .. } => {
383 warning = Some(format!(
384 "unexpected codec negotiation continuation: {}",
385 if msg.len() > 80 { &msg[..80] } else { msg }
386 ));
387 }
388 StreamState::Idle => {}
389 }
390 if let Some(ref mut pending) = self.pending {
391 if let Some(w) = warning {
392 pending.warnings.push(w);
393 }
394 pending.attached.push(line);
395 }
396 }
397
398 fn new_entry(
399 &mut self,
400 uuid: String,
401 timestamp: String,
402 message: String,
403 kind: LineKind,
404 message_kind: MessageKind,
405 ) -> LogEntry {
406 let mut warnings = Vec::new();
407 if let Some(w) = self.deferred_warning.take() {
408 warnings.push(w);
409 }
410 LogEntry {
411 uuid,
412 timestamp,
413 message,
414 kind,
415 message_kind,
416 level: None,
417 idle_pct: None,
418 source: None,
419 block: None,
420 attached: AttachedLines::new(),
421 line_number: self.line_number,
422 warnings,
423 }
424 }
425}
426
427const MOD_LOGFILE_BUF_SIZE: usize = 2048;
431
432const MAX_LINE_PAYLOAD: usize = MOD_LOGFILE_BUF_SIZE - 36 - 1 - 1;
434
435const COLLISION_SCAN_SLACK: usize = 64;
441
442impl<I: Iterator<Item = String>> LogStream<I> {
443 fn detect_collision(&mut self, line: String) -> String {
461 if line.len() > MAX_LINE_PAYLOAD {
462 let warning = format!(
463 "line exceeds mod_logfile 2048-byte buffer ({} bytes), data may be truncated",
464 line.len() + 38,
465 );
466 if let Some(ref mut pending) = self.pending {
467 pending.warnings.push(warning);
468 } else {
469 self.deferred_warning = Some(warning);
470 }
471 }
472
473 let bytes = line.as_bytes();
475 let min_scan = if is_uuid_at(bytes, 0) {
476 if bytes.len() > 37 && bytes[37].is_ascii_digit() {
477 64 } else {
479 37 }
481 } else if is_date_at(bytes, 0) {
482 27 } else {
484 0
485 };
486
487 let end = bytes.len().saturating_sub(28);
488 let (scan_lo, scan_hi) = if bytes.len() > MAX_LINE_PAYLOAD {
497 let lo = min_scan.max(MAX_LINE_PAYLOAD.saturating_sub(COLLISION_SCAN_SLACK));
498 let hi = end.min(MAX_LINE_PAYLOAD + COLLISION_SCAN_SLACK);
499 (lo, hi)
500 } else {
501 (min_scan, end)
502 };
503
504 if scan_lo <= scan_hi {
505 for offset in scan_lo..=scan_hi {
506 if is_log_header_at(bytes, offset) {
508 let split_at = if offset >= 37 && is_uuid_at(bytes, offset - 37) {
510 offset - 37
511 } else {
512 offset
513 };
514 self.split_pending = Some(line[split_at..].to_string());
515 return line[..split_at].to_string();
516 }
517 if is_uuid_at(bytes, offset) && bytes.len() > MAX_LINE_PAYLOAD {
519 self.split_pending = Some(line[offset..].to_string());
520 return line[..offset].to_string();
521 }
522 }
523 }
524
525 line
526 }
527}
528
529impl<I: Iterator<Item = String>> Iterator for LogStream<I> {
530 type Item = LogEntry;
531
532 fn next(&mut self) -> Option<LogEntry> {
533 loop {
534 let line = if let Some(split) = self.split_pending.take() {
535 self.stats.lines_split += 1;
536 split
537 } else {
538 let Some(line) = self.lines.next() else {
539 return self.finalize_pending();
540 };
541
542 if line.starts_with('\x00') {
543 let yielded = self.finalize_pending();
544 self.last_uuid.clear();
545 self.last_timestamp.clear();
546 if yielded.is_some() {
547 return yielded;
548 }
549 continue;
550 }
551
552 self.line_number += 1;
553 self.stats.lines_processed += 1;
554 line
555 };
556
557 let line = self.detect_collision(line);
558
559 let parsed = parse_line(&line);
560
561 match parsed.kind {
562 LineKind::Full | LineKind::System | LineKind::Truncated => {
563 let uuid = parsed.uuid.unwrap_or("").to_string();
564 let message_kind = classify_message(parsed.message);
565
566 if message_kind == MessageKind::CodecNegotiation {
568 if let (Some(ref pending), StreamState::InCodecNegotiation { .. }) =
569 (&self.pending, &self.state)
570 {
571 if uuid == pending.uuid {
572 self.accumulate_codec_entry(parsed.message);
573 if let Some(ref mut p) = self.pending {
574 p.attached.push(&line);
575 }
576 continue;
577 }
578 }
579 }
580
581 let yielded = self.finalize_pending();
582
583 let timestamp = parsed
584 .timestamp
585 .map(|t| t.to_string())
586 .unwrap_or_else(|| self.last_timestamp.clone());
587
588 if !uuid.is_empty() {
589 self.last_uuid = uuid.clone();
590 }
591 if parsed.timestamp.is_some() {
592 self.last_timestamp = timestamp.clone();
593 }
594
595 self.start_block_for_message(&message_kind);
596 if message_kind == MessageKind::CodecNegotiation {
597 self.accumulate_codec_entry(parsed.message);
598 }
599
600 let mut entry = self.new_entry(
601 uuid,
602 timestamp,
603 parsed.message.to_string(),
604 parsed.kind,
605 message_kind,
606 );
607 entry.level = parsed.level;
608 entry.idle_pct = parsed.idle_pct.map(|s| s.to_string());
609 entry.source = parsed.source.map(|s| s.to_string());
610 self.pending = Some(entry);
611
612 if yielded.is_some() {
613 return yielded;
614 }
615 }
616
617 LineKind::UuidContinuation => {
618 let uuid = parsed.uuid.unwrap_or("").to_string();
619 let is_primary = parsed.message.starts_with("EXECUTE ");
620
621 if let Some(ref pending) = self.pending {
622 if !is_primary && uuid == pending.uuid {
623 self.accumulate_continuation(parsed.message, &line);
624 } else {
625 let yielded = self.finalize_pending();
626 let message_kind = classify_message(parsed.message);
627
628 if !uuid.is_empty() {
629 self.last_uuid = uuid.clone();
630 }
631
632 self.start_block_for_message(&message_kind);
633 self.pending = Some(self.new_entry(
634 uuid,
635 self.last_timestamp.clone(),
636 parsed.message.to_string(),
637 parsed.kind,
638 message_kind,
639 ));
640
641 return yielded;
642 }
643 } else {
644 let message_kind = classify_message(parsed.message);
645
646 if !uuid.is_empty() {
647 self.last_uuid = uuid.clone();
648 }
649
650 self.start_block_for_message(&message_kind);
651 self.pending = Some(self.new_entry(
652 uuid,
653 self.last_timestamp.clone(),
654 parsed.message.to_string(),
655 parsed.kind,
656 message_kind,
657 ));
658 }
659 }
660
661 LineKind::BareContinuation => {
662 if self.pending.is_some() {
663 self.accumulate_continuation(parsed.message, &line);
664 } else {
665 self.record_unclassified(
666 UnclassifiedReason::OrphanContinuation,
667 Some(&line),
668 );
669 let message_kind = classify_message(parsed.message);
670 self.pending = Some(self.new_entry(
671 self.last_uuid.clone(),
672 self.last_timestamp.clone(),
673 parsed.message.to_string(),
674 parsed.kind,
675 message_kind,
676 ));
677 }
678 }
679
680 LineKind::Empty => {
681 if let Some(ref mut pending) = self.pending {
682 pending.attached.push(&line);
683 } else {
684 self.stats.lines_empty_orphan += 1;
685 }
686 }
687 }
688 }
689 }
690}
691
692#[cfg(test)]
693mod tests {
694 use super::*;
695
696 const UUID1: &str = "a1b2c3d4-e5f6-7890-abcd-ef1234567890";
697 const UUID2: &str = "b2c3d4e5-f6a7-8901-bcde-f12345678901";
698
699 fn full_line(uuid: &str, ts: &str, msg: &str) -> String {
700 format!("{uuid} {ts} 95.97% [DEBUG] sofia.c:100 {msg}")
701 }
702
703 const TS1: &str = "2025-01-15 10:30:45.123456";
704 const TS2: &str = "2025-01-15 10:30:46.234567";
705
706 #[test]
709 fn inherits_uuid_for_bare_continuation() {
710 let lines = vec![
711 full_line(UUID1, TS1, "CHANNEL_DATA:"),
712 "variable_foo: [bar]".to_string(),
713 "variable_baz: [qux]".to_string(),
714 ];
715 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
716 assert_eq!(entries.len(), 1);
717 assert_eq!(entries[0].uuid, UUID1);
718 assert_eq!(entries[0].attached.len(), 2);
719 assert_eq!(entries[0].attached.get(0), Some("variable_foo: [bar]"));
720 assert_eq!(entries[0].attached.get(1), Some("variable_baz: [qux]"));
721 }
722
723 #[test]
724 fn inherits_timestamp_for_uuid_continuation() {
725 let lines = vec![
726 full_line(UUID1, TS1, "First"),
727 format!("{UUID2} Channel-State: [CS_EXECUTE]"),
728 ];
729 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
730 assert_eq!(entries.len(), 2);
731 assert_eq!(entries[0].timestamp, TS1);
732 assert_eq!(entries[1].uuid, UUID2);
733 assert_eq!(entries[1].timestamp, TS1);
734 }
735
736 #[test]
737 fn new_full_line_yields_previous() {
738 let lines = vec![
739 full_line(UUID1, TS1, "First"),
740 full_line(UUID2, TS2, "Second"),
741 ];
742 let mut stream = LogStream::new(lines.into_iter());
743 let first = stream.next().unwrap();
744 assert_eq!(first.uuid, UUID1);
745 assert_eq!(first.message, "First");
746 let second = stream.next().unwrap();
747 assert_eq!(second.uuid, UUID2);
748 assert_eq!(second.message, "Second");
749 assert!(stream.next().is_none());
750 }
751
752 #[test]
753 fn channel_data_collected_as_attached() {
754 let lines = vec![
755 full_line(UUID1, TS1, "CHANNEL_DATA:"),
756 format!("{UUID1} Channel-Name: [sofia/internal/+15550001234@192.0.2.1]"),
757 format!("{UUID1} Unique-ID: [{UUID1}]"),
758 "variable_sip_call_id: [test123@192.0.2.1]".to_string(),
759 ];
760 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
761 assert_eq!(entries.len(), 1);
762 assert_eq!(entries[0].message, "CHANNEL_DATA:");
763 assert_eq!(entries[0].attached.len(), 3);
764 }
765
766 #[test]
767 fn sdp_body_collected_as_attached() {
768 let lines = vec![
769 full_line(UUID1, TS1, "Local SDP:"),
770 "v=0".to_string(),
771 "o=- 1234 5678 IN IP4 192.0.2.1".to_string(),
772 "s=-".to_string(),
773 "c=IN IP4 192.0.2.1".to_string(),
774 "m=audio 10000 RTP/AVP 0".to_string(),
775 ];
776 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
777 assert_eq!(entries.len(), 1);
778 assert_eq!(entries[0].attached.len(), 5);
779 }
780
781 #[test]
782 fn truncated_starts_new_entry() {
783 let lines = vec![
784 full_line(UUID1, TS1, "First"),
785 format!(
786 "varia{UUID2} EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 set(x=y)"
787 ),
788 ];
789 let mut stream = LogStream::new(lines.into_iter());
790 let first = stream.next().unwrap();
791 assert_eq!(first.uuid, UUID1);
792 assert_eq!(first.message, "First");
793 let second = stream.next().unwrap();
794 assert_eq!(second.uuid, UUID2);
795 assert_eq!(second.kind, LineKind::Truncated);
796 }
797
798 #[test]
799 fn empty_lines_in_attached() {
800 let lines = vec![
801 full_line(UUID1, TS1, "First"),
802 String::new(),
803 "continuation".to_string(),
804 ];
805 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
806 assert_eq!(entries.len(), 1);
807 assert_eq!(entries[0].attached.len(), 2);
808 assert_eq!(entries[0].attached.get(0), Some(""));
809 assert_eq!(entries[0].attached.get(1), Some("continuation"));
810 }
811
812 #[test]
813 fn system_line_no_uuid() {
814 let lines = vec![format!(
815 "{TS1} 95.97% [INFO] mod_event_socket.c:1772 Event Socket command"
816 )];
817 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
818 assert_eq!(entries.len(), 1);
819 assert_eq!(entries[0].uuid, "");
820 assert_eq!(entries[0].kind, LineKind::System);
821 }
822
823 #[test]
824 fn final_entry_on_exhaustion() {
825 let lines = vec![full_line(UUID1, TS1, "Only entry")];
826 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
827 assert_eq!(entries.len(), 1);
828 assert_eq!(entries[0].message, "Only entry");
829 }
830
831 #[test]
832 fn consecutive_full_lines() {
833 let lines = vec![
834 full_line(UUID1, TS1, "First"),
835 full_line(UUID1, TS2, "Second"),
836 full_line(UUID2, TS1, "Third"),
837 ];
838 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
839 assert_eq!(entries.len(), 3);
840 for entry in &entries {
841 assert!(entry.attached.is_empty());
842 }
843 }
844
845 #[test]
846 fn execute_after_channel_data_same_uuid() {
847 let lines = vec![
848 full_line(UUID1, TS1, "CHANNEL_DATA:"),
849 format!("{UUID1} Channel-State: [CS_EXECUTE]"),
850 format!("{UUID1} variable_sip_call_id: [test@192.0.2.1]"),
851 "variable_foo: [bar]".to_string(),
852 String::new(),
853 String::new(),
854 format!("{UUID1} EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 export(originate_timeout=3600)"),
855 full_line(UUID1, TS2, "EXPORT (export_vars) [originate_timeout]=[3600]"),
856 ];
857 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
858 assert_eq!(entries.len(), 3);
859 assert_eq!(entries[0].message, "CHANNEL_DATA:");
860 assert_eq!(entries[0].attached.len(), 5);
861 assert_eq!(entries[1].message, "EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 export(originate_timeout=3600)");
862 assert_eq!(entries[1].kind, LineKind::UuidContinuation);
863 assert_eq!(
864 entries[2].message,
865 "EXPORT (export_vars) [originate_timeout]=[3600]"
866 );
867 }
868
869 #[test]
870 fn execute_between_full_lines_same_uuid() {
871 let lines = vec![
872 full_line(UUID1, TS1, "CoreSession::setVariable(X-C911P-City, ST GEORGES)"),
873 format!("{UUID1} EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 db(insert/ng_{UUID1}/city/ST GEORGES)"),
874 full_line(UUID1, TS2, "CoreSession::setVariable(X-C911P-Region, SGS)"),
875 ];
876 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
877 assert_eq!(entries.len(), 3);
878 assert_eq!(
879 entries[0].message,
880 "CoreSession::setVariable(X-C911P-City, ST GEORGES)"
881 );
882 assert!(entries[0].attached.is_empty());
883 assert!(entries[1].message.starts_with("EXECUTE "));
884 assert_eq!(entries[1].kind, LineKind::UuidContinuation);
885 assert_eq!(
886 entries[2].message,
887 "CoreSession::setVariable(X-C911P-Region, SGS)"
888 );
889 }
890
891 #[test]
892 fn multiple_execute_between_full_lines() {
893 let lines = vec![
894 full_line(UUID1, TS1, "CoreSession::setVariable(ngcs_call_id, urn:emergency:uid:callid:test)"),
895 format!("{UUID1} EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 db(insert/ng_{UUID1}/call_id/urn:emergency:uid:callid:test)"),
896 format!("{UUID1} EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 db(insert/callid_codecs/urn:emergency:uid:callid:test/PCMU@8000h)"),
897 full_line(UUID1, TS2, "CoreSession::setVariable(ngcs_short_call_id, test)"),
898 ];
899 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
900 assert_eq!(entries.len(), 4);
901 assert!(entries[0].attached.is_empty());
902 assert!(entries[1].message.contains("call_id"));
903 assert!(entries[2].message.contains("callid_codecs"));
904 assert_eq!(
905 entries[3].message,
906 "CoreSession::setVariable(ngcs_short_call_id, test)"
907 );
908 }
909
910 #[test]
911 fn uuid_continuation_different_uuid_yields() {
912 let lines = vec![
913 full_line(UUID1, TS1, "First"),
914 format!("{UUID1} Channel-State: [CS_EXECUTE]"),
915 format!("{UUID2} Dialplan: sofia/internal/+15550001234@192.0.2.1 parsing [public]"),
916 ];
917 let mut stream = LogStream::new(lines.into_iter());
918 let first = stream.next().unwrap();
919 assert_eq!(first.uuid, UUID1);
920 assert_eq!(first.attached.len(), 1);
921 let second = stream.next().unwrap();
922 assert_eq!(second.uuid, UUID2);
923 assert_eq!(
924 second.message,
925 "Dialplan: sofia/internal/+15550001234@192.0.2.1 parsing [public]"
926 );
927 }
928
929 #[test]
932 fn channel_data_block_fields_and_variables() {
933 let lines = vec![
934 full_line(UUID1, TS1, "CHANNEL_DATA:"),
935 format!("{UUID1} Channel-Name: [sofia/internal/+15550001234@192.0.2.1]"),
936 format!("{UUID1} Channel-State: [CS_EXECUTE]"),
937 format!("{UUID1} Unique-ID: [{UUID1}]"),
938 "variable_sip_call_id: [test123@192.0.2.1]".to_string(),
939 "variable_direction: [inbound]".to_string(),
940 ];
941 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
942 assert_eq!(entries.len(), 1);
943 assert_eq!(entries[0].message_kind, MessageKind::ChannelData);
944 let block = entries[0].block.as_ref().expect("should have block");
945 match block {
946 Block::ChannelData { fields, variables } => {
947 assert_eq!(fields.len(), 3);
948 assert_eq!(
949 fields[0],
950 (
951 "Channel-Name".to_string(),
952 "sofia/internal/+15550001234@192.0.2.1".to_string()
953 )
954 );
955 assert_eq!(
956 fields[1],
957 ("Channel-State".to_string(), "CS_EXECUTE".to_string())
958 );
959 assert_eq!(fields[2], ("Unique-ID".to_string(), UUID1.to_string()));
960 assert_eq!(variables.len(), 2);
961 assert_eq!(
962 variables[0],
963 (
964 "variable_sip_call_id".to_string(),
965 "test123@192.0.2.1".to_string()
966 )
967 );
968 assert_eq!(
969 variables[1],
970 ("variable_direction".to_string(), "inbound".to_string())
971 );
972 }
973 other => panic!("expected ChannelData block, got {other:?}"),
974 }
975 }
976
977 #[test]
978 fn channel_data_multiline_variable_reassembly() {
979 let lines = vec![
980 full_line(UUID1, TS1, "CHANNEL_DATA:"),
981 format!("{UUID1} Channel-Name: [sofia/internal/+15550001234@192.0.2.1]"),
982 "variable_switch_r_sdp: [v=0".to_string(),
983 "o=- 1234 5678 IN IP4 192.0.2.1".to_string(),
984 "s=-".to_string(),
985 "c=IN IP4 192.0.2.1".to_string(),
986 "m=audio 47758 RTP/AVP 0 101".to_string(),
987 "a=rtpmap:0 PCMU/8000".to_string(),
988 "]".to_string(),
989 "variable_direction: [inbound]".to_string(),
990 ];
991 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
992 assert_eq!(entries.len(), 1);
993 let block = entries[0].block.as_ref().expect("should have block");
994 match block {
995 Block::ChannelData { fields, variables } => {
996 assert_eq!(fields.len(), 1);
997 assert_eq!(variables.len(), 2);
998 assert_eq!(variables[0].0, "variable_switch_r_sdp");
999 assert!(variables[0].1.starts_with("v=0\n"));
1000 assert!(variables[0].1.contains("m=audio 47758 RTP/AVP 0 101"));
1001 assert!(!variables[0].1.ends_with(']'));
1002 assert_eq!(
1003 variables[1],
1004 ("variable_direction".to_string(), "inbound".to_string())
1005 );
1006 }
1007 other => panic!("expected ChannelData block, got {other:?}"),
1008 }
1009 assert_eq!(entries[0].attached.len(), 9);
1010 }
1011
1012 #[test]
1013 fn sdp_block_detection() {
1014 let lines = vec![
1015 full_line(UUID1, TS1, "Local SDP:"),
1016 "v=0".to_string(),
1017 "o=- 1234 5678 IN IP4 192.0.2.1".to_string(),
1018 "s=-".to_string(),
1019 "c=IN IP4 192.0.2.1".to_string(),
1020 "m=audio 10000 RTP/AVP 0".to_string(),
1021 "a=rtpmap:0 PCMU/8000".to_string(),
1022 ];
1023 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1024 assert_eq!(entries.len(), 1);
1025 match &entries[0].message_kind {
1026 MessageKind::SdpMarker { direction } => assert_eq!(*direction, SdpDirection::Local),
1027 other => panic!("expected SdpMarker, got {other:?}"),
1028 }
1029 let block = entries[0].block.as_ref().expect("should have block");
1030 match block {
1031 Block::Sdp { direction, body } => {
1032 assert_eq!(*direction, SdpDirection::Local);
1033 assert_eq!(body.len(), 6);
1034 assert_eq!(body[0], "v=0");
1035 assert_eq!(body[5], "a=rtpmap:0 PCMU/8000");
1036 }
1037 other => panic!("expected Sdp block, got {other:?}"),
1038 }
1039 }
1040
1041 #[test]
1042 fn sdp_block_terminated_by_primary_line() {
1043 let lines = vec![
1044 full_line(UUID1, TS1, "Remote SDP:"),
1045 "v=0".to_string(),
1046 "m=audio 10000 RTP/AVP 0".to_string(),
1047 full_line(UUID1, TS2, "Next event"),
1048 ];
1049 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1050 assert_eq!(entries.len(), 2);
1051 let block = entries[0].block.as_ref().expect("should have block");
1052 match block {
1053 Block::Sdp { direction, body } => {
1054 assert_eq!(*direction, SdpDirection::Remote);
1055 assert_eq!(body.len(), 2);
1056 }
1057 other => panic!("expected Sdp block, got {other:?}"),
1058 }
1059 assert!(entries[1].block.is_none());
1060 }
1061
1062 #[test]
1063 fn sdp_from_uuid_continuation() {
1064 let lines = vec![
1065 format!("{UUID1} Local SDP:"),
1066 format!("{UUID1} v=0"),
1067 format!("{UUID1} o=FreeSWITCH 1234 5678 IN IP4 192.0.2.1"),
1068 format!("{UUID1} s=FreeSWITCH"),
1069 format!("{UUID1} c=IN IP4 192.0.2.1"),
1070 ];
1071 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1072 assert_eq!(entries.len(), 1);
1073 let block = entries[0].block.as_ref().expect("should have block");
1074 match block {
1075 Block::Sdp { direction, body } => {
1076 assert_eq!(*direction, SdpDirection::Local);
1077 assert_eq!(body.len(), 4);
1078 assert_eq!(body[0], "v=0");
1079 }
1080 other => panic!("expected Sdp block, got {other:?}"),
1081 }
1082 }
1083
1084 #[test]
1085 fn channel_data_interrupted_by_different_uuid() {
1086 let lines = vec![
1087 full_line(UUID1, TS1, "CHANNEL_DATA:"),
1088 format!("{UUID1} Channel-Name: [sofia/internal/+15550001234@192.0.2.1]"),
1089 format!("{UUID2} Dialplan: sofia/internal/+15559999999@192.0.2.1 parsing [public]"),
1090 ];
1091 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1092 assert_eq!(entries.len(), 2);
1093 let block = entries[0].block.as_ref().expect("should have block");
1094 match block {
1095 Block::ChannelData { fields, .. } => {
1096 assert_eq!(fields.len(), 1);
1097 }
1098 other => panic!("expected ChannelData, got {other:?}"),
1099 }
1100 }
1101
1102 #[test]
1103 fn no_block_for_non_block_message() {
1104 let lines = vec![full_line(UUID1, TS1, "some random freeswitch log message")];
1105 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1106 assert_eq!(entries.len(), 1);
1107 assert!(entries[0].block.is_none());
1108 assert_eq!(entries[0].message_kind, MessageKind::General);
1109 }
1110
1111 #[test]
1112 fn message_kind_on_execute() {
1113 let lines = vec![
1114 full_line(UUID1, TS1, "First"),
1115 format!("{UUID1} EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 set(foo=bar)"),
1116 ];
1117 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1118 assert_eq!(entries.len(), 2);
1119 match &entries[1].message_kind {
1120 MessageKind::Execute {
1121 application,
1122 arguments,
1123 ..
1124 } => {
1125 assert_eq!(application, "set");
1126 assert_eq!(arguments, "foo=bar");
1127 }
1128 other => panic!("expected Execute, got {other:?}"),
1129 }
1130 }
1131
1132 #[test]
1135 fn stats_lines_processed() {
1136 let lines = vec![
1137 full_line(UUID1, TS1, "First"),
1138 full_line(UUID1, TS2, "Second"),
1139 format!("{UUID1} Channel-State: [CS_EXECUTE]"),
1140 ];
1141 let mut stream = LogStream::new(lines.into_iter());
1142 let _: Vec<_> = stream.by_ref().collect();
1143 assert_eq!(stream.stats().lines_processed, 3);
1144 }
1145
1146 #[test]
1147 fn stats_unclassified_orphan() {
1148 let lines = vec![
1149 "variable_foo: [bar]".to_string(),
1150 full_line(UUID1, TS1, "After orphan"),
1151 ];
1152 let mut stream = LogStream::new(lines.into_iter())
1153 .unclassified_tracking(UnclassifiedTracking::TrackLines);
1154 let _: Vec<_> = stream.by_ref().collect();
1155 assert_eq!(stream.stats().lines_unclassified, 1);
1156 assert_eq!(stream.stats().unclassified_lines.len(), 1);
1157 assert_eq!(
1158 stream.stats().unclassified_lines[0].reason,
1159 UnclassifiedReason::OrphanContinuation,
1160 );
1161 }
1162
1163 #[test]
1164 fn stats_capture_data() {
1165 let lines = vec!["orphan line".to_string(), full_line(UUID1, TS1, "After")];
1166 let mut stream = LogStream::new(lines.into_iter())
1167 .unclassified_tracking(UnclassifiedTracking::CaptureData);
1168 let _: Vec<_> = stream.by_ref().collect();
1169 assert_eq!(stream.stats().unclassified_lines.len(), 1);
1170 assert_eq!(
1171 stream.stats().unclassified_lines[0].data.as_deref(),
1172 Some("orphan line"),
1173 );
1174 }
1175
1176 #[test]
1177 fn stats_count_only_no_allocation() {
1178 let lines = vec!["orphan line".to_string(), full_line(UUID1, TS1, "After")];
1179 let mut stream = LogStream::new(lines.into_iter());
1180 let _: Vec<_> = stream.by_ref().collect();
1181 assert_eq!(stream.stats().lines_unclassified, 1);
1182 assert!(stream.stats().unclassified_lines.is_empty());
1183 }
1184
1185 #[test]
1186 fn line_number_tracking() {
1187 let lines = vec![
1188 full_line(UUID1, TS1, "First"),
1189 format!("{UUID1} Channel-State: [CS_EXECUTE]"),
1190 full_line(UUID2, TS2, "Third"),
1191 ];
1192 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1193 assert_eq!(entries[0].line_number, 1);
1194 assert_eq!(entries[1].line_number, 3);
1195 }
1196
1197 #[test]
1198 fn drain_unclassified() {
1199 let lines = vec![
1200 "orphan1".to_string(),
1201 "orphan2".to_string(),
1202 full_line(UUID1, TS1, "After"),
1203 ];
1204 let mut stream = LogStream::new(lines.into_iter())
1205 .unclassified_tracking(UnclassifiedTracking::TrackLines);
1206 let _: Vec<_> = stream.by_ref().collect();
1207 let drained = stream.drain_unclassified();
1208 assert_eq!(drained.len(), 1);
1209 assert!(stream.stats().unclassified_lines.is_empty());
1210 assert_eq!(stream.stats().lines_unclassified, 1);
1211 }
1212
1213 #[test]
1221 fn continuation_lines_at_file_boundary_must_not_inherit_previous_timestamp() {
1222 use crate::TrackedChain;
1223
1224 let uuid_a = "aaaaaaaa-1111-2222-3333-444444444444";
1225 let uuid_b = "bbbbbbbb-1111-2222-3333-444444444444";
1226 let ts_old = "2025-01-15 23:58:03.000000";
1227 let ts_new = "2025-01-16 08:37:12.000000";
1228
1229 let seg1: Vec<String> = vec![format!(
1230 "{uuid_a} {ts_old} 95.00% [DEBUG] test.c:1 Last line in rotated file"
1231 )];
1232
1233 let seg2: Vec<String> = vec![
1236 format!("{uuid_b} CHANNEL_DATA:"),
1237 format!("{uuid_b} Channel-State: [CS_EXECUTE]"),
1238 format!("{uuid_b} {ts_new} 95.00% [DEBUG] test.c:1 First timestamped line in new file"),
1239 ];
1240
1241 let segments: Vec<(String, Box<dyn Iterator<Item = String>>)> = vec![
1242 ("rotated.log".to_string(), Box::new(seg1.into_iter())),
1243 ("freeswitch.log".to_string(), Box::new(seg2.into_iter())),
1244 ];
1245
1246 let (chain, _) = TrackedChain::new(segments);
1247 let entries: Vec<_> = LogStream::new(chain).collect();
1248
1249 let b_entry = entries
1250 .iter()
1251 .find(|e| e.uuid == uuid_b)
1252 .expect("should find entry for uuid_b");
1253
1254 assert_ne!(
1258 b_entry.timestamp, ts_old,
1259 "continuation lines in a new file segment inherited timestamp \
1260 '{ts_old}' from the previous segment — timestamps must not bleed \
1261 across file boundaries"
1262 );
1263 }
1264
1265 fn assert_accounting(stream: &LogStream<impl Iterator<Item = String>>) {
1268 let stats = stream.stats();
1269 assert_eq!(
1270 stats.unaccounted_lines(),
1271 0,
1272 "line accounting invariant violated: \
1273 processed={} + split={} != in_entries={} + empty_orphan={}",
1274 stats.lines_processed,
1275 stats.lines_split,
1276 stats.lines_in_entries,
1277 stats.lines_empty_orphan,
1278 );
1279 }
1280
1281 #[test]
1282 fn accounting_full_lines() {
1283 let lines = vec![
1284 full_line(UUID1, TS1, "First"),
1285 full_line(UUID2, TS2, "Second"),
1286 ];
1287 let mut stream = LogStream::new(lines.into_iter());
1288 let entries: Vec<_> = stream.by_ref().collect();
1289 assert_eq!(entries.len(), 2);
1290 assert_eq!(stream.stats().lines_in_entries, 2);
1291 assert_accounting(&stream);
1292 }
1293
1294 #[test]
1295 fn accounting_with_attached() {
1296 let lines = vec![
1297 full_line(UUID1, TS1, "CHANNEL_DATA:"),
1298 format!("{UUID1} Channel-State: [CS_EXECUTE]"),
1299 "variable_foo: [bar]".to_string(),
1300 full_line(UUID2, TS2, "Next"),
1301 ];
1302 let mut stream = LogStream::new(lines.into_iter());
1303 let entries: Vec<_> = stream.by_ref().collect();
1304 assert_eq!(entries.len(), 2);
1305 assert_eq!(stream.stats().lines_in_entries, 4);
1308 assert_accounting(&stream);
1309 }
1310
1311 #[test]
1312 fn accounting_system_line() {
1313 let lines = vec![format!(
1314 "{TS1} 95.97% [NOTICE] mod_logfile.c:217 New log started."
1315 )];
1316 let mut stream = LogStream::new(lines.into_iter());
1317 let _: Vec<_> = stream.by_ref().collect();
1318 assert_eq!(stream.stats().lines_in_entries, 1);
1319 assert_accounting(&stream);
1320 }
1321
1322 #[test]
1323 fn accounting_empty_orphan() {
1324 let lines = vec![
1325 String::new(),
1326 " ".to_string(),
1327 full_line(UUID1, TS1, "After"),
1328 ];
1329 let mut stream = LogStream::new(lines.into_iter());
1330 let entries: Vec<_> = stream.by_ref().collect();
1331 assert_eq!(entries.len(), 1);
1332 assert_eq!(stream.stats().lines_empty_orphan, 2);
1333 assert_accounting(&stream);
1334 }
1335
1336 #[test]
1337 fn accounting_empty_attached() {
1338 let lines = vec![
1339 full_line(UUID1, TS1, "First"),
1340 String::new(),
1341 "continuation".to_string(),
1342 ];
1343 let mut stream = LogStream::new(lines.into_iter());
1344 let entries: Vec<_> = stream.by_ref().collect();
1345 assert_eq!(entries.len(), 1);
1346 assert_eq!(entries[0].attached.len(), 2);
1347 assert_eq!(stream.stats().lines_empty_orphan, 0);
1348 assert_eq!(stream.stats().lines_in_entries, 3);
1349 assert_accounting(&stream);
1350 }
1351
1352 #[test]
1353 fn accounting_orphan_continuation() {
1354 let lines = vec!["orphan line".to_string(), full_line(UUID1, TS1, "After")];
1355 let mut stream = LogStream::new(lines.into_iter());
1356 let _: Vec<_> = stream.by_ref().collect();
1357 assert_accounting(&stream);
1358 }
1359
1360 #[test]
1361 fn accounting_codec_merging() {
1362 let lines = vec![
1363 full_line(
1364 UUID1,
1365 TS1,
1366 "Audio Codec Compare [PCMU:0:8000:20:64000:1]/[PCMU:0:8000:20:64000:1]",
1367 ),
1368 full_line(
1369 UUID1,
1370 TS1,
1371 "Audio Codec Compare [PCMU:0:8000:20:64000:1] is saved as a match",
1372 ),
1373 full_line(UUID2, TS2, "Next"),
1374 ];
1375 let mut stream = LogStream::new(lines.into_iter());
1376 let _: Vec<_> = stream.by_ref().collect();
1377 assert_accounting(&stream);
1378 }
1379
1380 #[test]
1381 fn accounting_truncated_line() {
1382 let lines = vec![
1383 full_line(UUID1, TS1, "First"),
1384 format!(
1385 "varia{UUID2} EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 set(x=y)"
1386 ),
1387 ];
1388 let mut stream = LogStream::new(lines.into_iter());
1389 let _: Vec<_> = stream.by_ref().collect();
1390 assert_accounting(&stream);
1391 }
1392
1393 #[test]
1394 fn accounting_long_line_collision_split() {
1395 let long_value = "x".repeat(MAX_LINE_PAYLOAD + 10);
1398 let line = format!(
1399 "variable_sip_multipart: [{long_value}]{UUID2} EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 set(foo=bar)"
1400 );
1401 let lines = vec![full_line(UUID1, TS1, "CHANNEL_DATA:"), line];
1402 let mut stream = LogStream::new(lines.into_iter());
1403 let entries: Vec<_> = stream.by_ref().collect();
1404
1405 assert_eq!(entries[0].message, "CHANNEL_DATA:");
1407
1408 let split_entry = entries.iter().find(|e| e.uuid == UUID2);
1410 assert!(
1411 split_entry.is_some(),
1412 "collision UUID should produce a separate entry"
1413 );
1414
1415 assert_eq!(stream.stats().lines_split, 1);
1416 assert_accounting(&stream);
1417 }
1418
1419 #[test]
1420 fn no_split_on_short_lines() {
1421 let line = format!("variable_call_uuid: [{UUID2}]");
1424 let lines = vec![full_line(UUID1, TS1, "CHANNEL_DATA:"), line];
1425 let mut stream = LogStream::new(lines.into_iter());
1426 let entries: Vec<_> = stream.by_ref().collect();
1427 assert_eq!(entries.len(), 1);
1428 assert_eq!(stream.stats().lines_split, 0);
1429 assert_accounting(&stream);
1430 }
1431
1432 #[test]
1433 fn timestamp_collision_splits_system_lines() {
1434 let line = format!(
1435 "{TS1} 98.03% [INFO] mod_event_socket.c:1752 Event Socket Command from ::1:42864: api sofia jsonstatus{TS2} 97.93% [INFO] mod_event_socket.c:1752 Event Socket Command from ::1:42898: api fsctl pause_check"
1436 );
1437 let mut stream = LogStream::new(std::iter::once(line));
1438 let entries: Vec<_> = stream.by_ref().collect();
1439 assert_eq!(entries.len(), 2);
1440 assert_eq!(
1441 entries[0].message,
1442 "Event Socket Command from ::1:42864: api sofia jsonstatus"
1443 );
1444 assert_eq!(
1445 entries[1].message,
1446 "Event Socket Command from ::1:42898: api fsctl pause_check"
1447 );
1448 assert_eq!(stream.stats().lines_split, 1);
1449 assert_accounting(&stream);
1450 }
1451
1452 #[test]
1453 fn timestamp_collision_splits_three_entries() {
1454 let ts3 = "2025-01-15 10:30:47.345678";
1455 let line = format!(
1456 "{TS1} 95.00% [INFO] mod.c:1 first{TS2} 96.00% [INFO] mod.c:1 second{ts3} 97.00% [INFO] mod.c:1 third"
1457 );
1458 let mut stream = LogStream::new(std::iter::once(line));
1459 let entries: Vec<_> = stream.by_ref().collect();
1460 assert_eq!(entries.len(), 3);
1461 assert_eq!(entries[0].message, "first");
1462 assert_eq!(entries[1].message, "second");
1463 assert_eq!(entries[2].message, "third");
1464 assert_eq!(stream.stats().lines_split, 2);
1465 assert_accounting(&stream);
1466 }
1467
1468 #[test]
1469 fn timestamp_collision_with_uuid_prefix() {
1470 let line = format!(
1472 "{TS1} 95.00% [INFO] mod.c:1 first{UUID1} {TS2} 96.00% [DEBUG] sofia.c:100 second"
1473 );
1474 let mut stream = LogStream::new(std::iter::once(line));
1475 let entries: Vec<_> = stream.by_ref().collect();
1476 assert_eq!(entries.len(), 2);
1477 assert_eq!(entries[0].message, "first");
1478 assert_eq!(entries[1].uuid, UUID1);
1479 assert_eq!(entries[1].message, "second");
1480 assert_eq!(stream.stats().lines_split, 1);
1481 assert_accounting(&stream);
1482 }
1483
1484 #[test]
1485 fn channel_data_multiline_variable_spans_many_lines() {
1486 let lines = vec![
1487 full_line(UUID1, TS1, "CHANNEL_DATA:"),
1488 format!("{UUID1} Channel-Name: [sofia/internal/+15550001234@192.0.2.1]"),
1489 "variable_switch_r_sdp: [v=0".to_string(),
1490 "o=- 1234 5678 IN IP4 192.0.2.1".to_string(),
1491 "s=-".to_string(),
1492 "c=IN IP4 192.0.2.1".to_string(),
1493 "t=0 0".to_string(),
1494 "m=audio 47758 RTP/AVP 0 8 101".to_string(),
1495 "a=rtpmap:0 PCMU/8000".to_string(),
1496 "a=rtpmap:8 PCMA/8000".to_string(),
1497 "a=rtpmap:101 telephone-event/8000".to_string(),
1498 "a=fmtp:101 0-16".to_string(),
1499 "]".to_string(),
1500 "variable_direction: [inbound]".to_string(),
1501 ];
1502 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1503 assert_eq!(entries.len(), 1);
1504 let block = entries[0].block.as_ref().expect("should have block");
1505 match block {
1506 Block::ChannelData { fields, variables } => {
1507 assert_eq!(fields.len(), 1);
1508 assert_eq!(variables.len(), 2);
1509 assert_eq!(variables[0].0, "variable_switch_r_sdp");
1510 let sdp = &variables[0].1;
1511 assert!(sdp.starts_with("v=0\n"));
1512 assert!(sdp.contains("a=fmtp:101 0-16"));
1513 assert!(!sdp.ends_with(']'));
1514 assert_eq!(variables[1].0, "variable_direction");
1515 }
1516 other => panic!("expected ChannelData block, got {other:?}"),
1517 }
1518 }
1519
1520 #[test]
1521 fn sdp_from_verto_update_media() {
1522 let lines = vec![
1523 full_line(UUID1, TS1, "updateMedia: Local SDP"),
1524 "v=0".to_string(),
1525 "o=- 1234 5678 IN IP4 192.0.2.1".to_string(),
1526 "m=audio 10000 RTP/AVP 0".to_string(),
1527 ];
1528 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1529 assert_eq!(entries.len(), 1);
1530 match &entries[0].message_kind {
1531 MessageKind::SdpMarker { direction } => assert_eq!(*direction, SdpDirection::Local),
1532 other => panic!("expected SdpMarker, got {other:?}"),
1533 }
1534 let block = entries[0].block.as_ref().expect("should have block");
1535 match block {
1536 Block::Sdp { direction, body } => {
1537 assert_eq!(*direction, SdpDirection::Local);
1538 assert_eq!(body.len(), 3);
1539 }
1540 other => panic!("expected Sdp block, got {other:?}"),
1541 }
1542 }
1543
1544 #[test]
1545 fn duplicate_sdp_marker() {
1546 let lines = vec![
1547 full_line(UUID1, TS1, "Duplicate SDP"),
1548 "v=0".to_string(),
1549 "m=audio 10000 RTP/AVP 0".to_string(),
1550 ];
1551 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1552 assert_eq!(entries.len(), 1);
1553 match &entries[0].message_kind {
1554 MessageKind::SdpMarker { direction } => assert_eq!(*direction, SdpDirection::Unknown),
1555 other => panic!("expected SdpMarker, got {other:?}"),
1556 }
1557 assert!(entries[0].block.is_some());
1558 }
1559
1560 #[test]
1561 fn warning_on_unclosed_multiline_variable() {
1562 let lines = vec![
1563 full_line(UUID1, TS1, "CHANNEL_DATA:"),
1564 "variable_switch_r_sdp: [v=0".to_string(),
1565 "o=- 1234 5678 IN IP4 192.0.2.1".to_string(),
1566 full_line(UUID2, TS2, "Next entry"),
1567 ];
1568 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1569 assert_eq!(entries.len(), 2);
1570 assert!(
1571 entries[0]
1572 .warnings
1573 .iter()
1574 .any(|w| w.contains("unclosed multi-line variable")),
1575 "expected unclosed variable warning, got: {:?}",
1576 entries[0].warnings
1577 );
1578 }
1579
1580 #[test]
1581 fn warning_on_unparseable_channel_data_line() {
1582 let lines = vec![
1583 full_line(UUID1, TS1, "CHANNEL_DATA:"),
1584 format!("{UUID1} Channel-Name: [sofia/internal/+15550001234@192.0.2.1]"),
1585 format!("{UUID1} this is not a valid field line"),
1586 ];
1587 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1588 assert_eq!(entries.len(), 1);
1589 assert!(
1590 entries[0]
1591 .warnings
1592 .iter()
1593 .any(|w| w.contains("unparseable CHANNEL_DATA")),
1594 "expected unparseable warning, got: {:?}",
1595 entries[0].warnings
1596 );
1597 }
1598
1599 #[test]
1600 fn warning_on_unexpected_codec_continuation() {
1601 let lines = vec![
1602 full_line(
1603 UUID1,
1604 TS1,
1605 "Audio Codec Compare [PCMU:0:8000:20:64000:1]/[PCMU:0:8000:20:64000:1]",
1606 ),
1607 format!("{UUID1} some unexpected continuation line"),
1608 ];
1609 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1610 assert_eq!(entries.len(), 1);
1611 assert!(
1612 entries[0]
1613 .warnings
1614 .iter()
1615 .any(|w| w.contains("unexpected codec negotiation")),
1616 "expected codec warning, got: {:?}",
1617 entries[0].warnings
1618 );
1619 }
1620
1621 #[test]
1622 fn system_line_uuid_continuation_not_absorbed() {
1623 let lines = vec![
1626 format!("{TS1} 95.97% [INFO] mod_event_socket.c:1772 Event Socket command"),
1627 format!("{UUID1} Channel-State: [CS_EXECUTE]"),
1628 ];
1629 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1630 assert_eq!(
1631 entries.len(),
1632 2,
1633 "UUID continuation should not be absorbed by system entry"
1634 );
1635 assert_eq!(entries[0].uuid, "");
1636 assert_eq!(entries[1].uuid, UUID1);
1637 }
1638
1639 #[test]
1640 fn truncated_collision_in_channel_data_variable() {
1641 let padding = "x".repeat(2000);
1647 let collision_line = format!(
1648 "{UUID1} variable_long_xml: [{padding}{UUID1} EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 export(foo=bar)"
1649 );
1650 assert!(
1651 collision_line.len() > super::MAX_LINE_PAYLOAD,
1652 "test line must exceed buffer limit, got {}",
1653 collision_line.len()
1654 );
1655
1656 let lines = vec![
1657 full_line(UUID1, TS1, "CHANNEL_DATA:"),
1658 format!("{UUID1} Channel-Name: [sofia/internal/+15550001234@192.0.2.1]"),
1659 format!("{UUID1} variable_direction: [inbound]"),
1660 collision_line,
1661 full_line(UUID1, TS2, "Next log entry"),
1662 ];
1663
1664 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1665
1666 assert_eq!(entries[0].message, "CHANNEL_DATA:");
1668 let block = entries[0].block.as_ref().expect("should have block");
1669 match block {
1670 Block::ChannelData { fields, variables } => {
1671 assert_eq!(fields.len(), 1, "should have Channel-Name field");
1672 assert_eq!(fields[0].0, "Channel-Name");
1673 assert_eq!(
1674 variables.len(),
1675 2,
1676 "should have direction + unclosed long_xml"
1677 );
1678 assert_eq!(variables[0].0, "variable_direction");
1679 assert_eq!(variables[0].1, "inbound");
1680 assert_eq!(variables[1].0, "variable_long_xml");
1681 }
1682 other => panic!("expected ChannelData block, got {other:?}"),
1683 }
1684 assert!(
1685 entries[0]
1686 .warnings
1687 .iter()
1688 .any(|w| w.contains("line exceeds mod_logfile 2048-byte buffer")),
1689 "expected buffer overflow warning, got: {:?}",
1690 entries[0].warnings
1691 );
1692 assert!(
1693 entries[0]
1694 .warnings
1695 .iter()
1696 .any(|w| w.contains("unclosed multi-line variable")),
1697 "expected unclosed variable warning, got: {:?}",
1698 entries[0].warnings
1699 );
1700
1701 assert_eq!(entries[1].uuid, UUID1);
1703 assert!(
1704 entries[1].message.starts_with("EXECUTE "),
1705 "split entry should be EXECUTE, got: {}",
1706 entries[1].message
1707 );
1708
1709 assert_eq!(entries.len(), 3);
1711 assert_eq!(entries[2].message, "Next log entry");
1712 }
1713
1714 #[test]
1715 fn channel_data_uuid_drops_mid_block() {
1716 let lines = vec![
1721 full_line(UUID1, TS1, "CHANNEL_DATA:"),
1722 format!("{UUID1} variable_max_forwards: [69]"),
1723 format!("{UUID1} variable_presence_id: [1251@[2001:db8::10]]"),
1724 format!("{UUID1} variable_sip_h_X-Custom-ID: [c4da84eb-88a7-40b2-b90d-e5bc2a0f634e]"),
1725 "variable_sip_h_X-Call-Info: [<urn:test:callid:20260316>;purpose=emergency-CallId]"
1727 .to_string(),
1728 "variable_ep_codec_string: [mod_opus.opus@48000h@20i@2c]".to_string(),
1729 "variable_remote_media_ip: [2001:db8::10]".to_string(),
1730 "variable_remote_media_port: [9952]".to_string(),
1731 "variable_rtp_use_codec_name: [opus]".to_string(),
1732 full_line(UUID1, TS2, "Next entry"),
1733 ];
1734
1735 let mut stream = LogStream::new(lines.into_iter());
1736 let entries: Vec<_> = stream.by_ref().collect();
1737
1738 assert_eq!(entries.len(), 2);
1739 assert_eq!(entries[0].message, "CHANNEL_DATA:");
1740 let block = entries[0].block.as_ref().expect("should have block");
1741 match block {
1742 Block::ChannelData { fields, variables } => {
1743 assert_eq!(fields.len(), 0);
1744 assert_eq!(variables.len(), 8);
1745 assert_eq!(variables[0].0, "variable_max_forwards");
1747 assert_eq!(variables[0].1, "69");
1748 assert_eq!(variables[1].0, "variable_presence_id");
1749 assert_eq!(variables[1].1, "1251@[2001:db8::10]");
1750 assert_eq!(variables[2].0, "variable_sip_h_X-Custom-ID");
1751 assert_eq!(variables[3].0, "variable_sip_h_X-Call-Info");
1753 assert!(variables[3].1.contains("emergency-CallId"));
1754 assert_eq!(variables[4].0, "variable_ep_codec_string");
1755 assert_eq!(variables[7].0, "variable_rtp_use_codec_name");
1756 assert_eq!(variables[7].1, "opus");
1757 }
1758 other => panic!("expected ChannelData block, got {other:?}"),
1759 }
1760 assert_eq!(entries[0].attached.len(), 8);
1761 assert_eq!(entries[1].message, "Next entry");
1762 assert_accounting(&stream);
1763 }
1764
1765 #[test]
1766 fn channel_data_uuid_drops_with_multiline_variable() {
1767 let lines = vec![
1772 full_line(UUID1, TS1, "CHANNEL_DATA:"),
1773 format!("{UUID1} variable_max_forwards: [69]"),
1774 format!("{UUID1} variable_sip_h_X-Custom-ID: [c4da84eb-88a7-40b2-b90d-e5bc2a0f634e]"),
1775 "variable_switch_r_sdp: [v=0\r".to_string(),
1777 "o=FreeSWITCH 1773663549 1773663550 IN IP6 2001:db8::10\r".to_string(),
1778 "s=FreeSWITCH\r".to_string(),
1779 "c=IN IP6 2001:db8::10\r".to_string(),
1780 "t=0 0\r".to_string(),
1781 "m=audio 9952 RTP/AVP 102 101 13\r".to_string(),
1782 "a=rtpmap:102 opus/48000/2\r".to_string(),
1783 "a=ptime:20\r".to_string(),
1784 "]".to_string(),
1785 "variable_ep_codec_string: [mod_opus.opus@48000h@20i@2c]".to_string(),
1786 "variable_direction: [inbound]".to_string(),
1787 full_line(UUID1, TS2, "Next entry"),
1788 ];
1789
1790 let mut stream = LogStream::new(lines.into_iter());
1791 let entries: Vec<_> = stream.by_ref().collect();
1792
1793 assert_eq!(entries.len(), 2);
1794 let block = entries[0].block.as_ref().expect("should have block");
1795 match block {
1796 Block::ChannelData { fields, variables } => {
1797 assert_eq!(fields.len(), 0);
1798 assert_eq!(variables.len(), 5);
1799 assert_eq!(variables[0].0, "variable_max_forwards");
1800 assert_eq!(variables[1].0, "variable_sip_h_X-Custom-ID");
1801 assert_eq!(variables[2].0, "variable_switch_r_sdp");
1803 let sdp = &variables[2].1;
1804 assert!(
1805 sdp.starts_with("v=0\r\n"),
1806 "SDP should start with v=0\\r\\n, got: {sdp:?}"
1807 );
1808 assert!(sdp.contains("m=audio 9952 RTP/AVP 102 101 13\r"));
1809 assert!(sdp.contains("a=ptime:20\r"));
1810 assert!(!sdp.ends_with(']'), "closing bracket should be stripped");
1811 assert_eq!(variables[3].0, "variable_ep_codec_string");
1813 assert_eq!(variables[4].0, "variable_direction");
1814 assert_eq!(variables[4].1, "inbound");
1815 }
1816 other => panic!("expected ChannelData block, got {other:?}"),
1817 }
1818 assert_eq!(entries[0].attached.len(), 13);
1820 assert_accounting(&stream);
1821 }
1822
1823 #[test]
1824 fn channel_data_bare_variable_collision_with_execute() {
1825 let collision = format!(
1832 "variable_call_uuid: {UUID1} EXECUTE [depth=0] \
1833 sofia/internal-v6/1251@[2001:db8::10] export(nolocal:test_var=value)"
1834 );
1835
1836 let lines = vec![
1837 full_line(UUID1, TS1, "CHANNEL_DATA:"),
1838 format!("{UUID1} variable_max_forwards: [69]"),
1839 "variable_DP_MATCH: [ARRAY::create_conference|:create_conference]".to_string(),
1841 collision,
1842 full_line(
1844 UUID1,
1845 TS2,
1846 "EXPORT (export_vars) (REMOTE ONLY) [test_var]=[value]",
1847 ),
1848 ];
1849
1850 let mut stream = LogStream::new(lines.into_iter());
1851 let entries: Vec<_> = stream.by_ref().collect();
1852
1853 assert_eq!(entries.len(), 3);
1855 let block = entries[0].block.as_ref().expect("should have block");
1856 match block {
1857 Block::ChannelData { fields, variables } => {
1858 assert_eq!(fields.len(), 0);
1859 assert_eq!(variables.len(), 2);
1860 assert_eq!(variables[0].0, "variable_max_forwards");
1861 assert_eq!(variables[1].0, "variable_DP_MATCH");
1862 }
1863 other => panic!("expected ChannelData block, got {other:?}"),
1864 }
1865
1866 assert_eq!(entries[1].uuid, UUID1);
1868 assert_eq!(entries[1].kind, LineKind::Truncated);
1869 assert!(
1870 entries[1].message.starts_with("EXECUTE "),
1871 "truncated line should yield EXECUTE, got: {}",
1872 entries[1].message
1873 );
1874
1875 assert_eq!(entries[2].message_kind.label(), "variable");
1877 assert_accounting(&stream);
1878 }
1879
1880 #[test]
1881 fn system_line_with_embedded_uuid_gets_entry_uuid() {
1882 let lines = vec![
1885 format!(
1886 "{TS1} 95.97% [DEBUG] switch_cpp.cpp:1466 {UUID1} DAA-LOG WaveManager originate"
1887 ),
1888 format!(
1889 "{TS1} 95.97% [WARNING] switch_cpp.cpp:1466 {UUID1} DAA-LOG Failed to create session"
1890 ),
1891 full_line(UUID1, TS2, "State Change CS_EXECUTE -> CS_HIBERNATE"),
1892 ];
1893
1894 let mut stream = LogStream::new(lines.into_iter());
1895 let entries: Vec<_> = stream.by_ref().collect();
1896
1897 assert_eq!(entries.len(), 3);
1898 assert_eq!(entries[0].uuid, UUID1);
1900 assert_eq!(entries[0].kind, LineKind::System);
1901 assert_eq!(entries[0].message, "DAA-LOG WaveManager originate");
1902
1903 assert_eq!(entries[1].uuid, UUID1);
1904 assert_eq!(entries[1].kind, LineKind::System);
1905 assert_eq!(entries[1].message, "DAA-LOG Failed to create session");
1906
1907 assert_eq!(entries[2].uuid, UUID1);
1909 assert_eq!(entries[2].kind, LineKind::Full);
1910 assert_accounting(&stream);
1911 }
1912}