1use crate::level::LogLevel;
2use crate::line::{is_date_at, is_log_header_at, is_uuid_at, parse_line, LineKind};
3use crate::message::{classify_message, MessageKind, SdpDirection};
4
5#[derive(Debug, Clone, PartialEq, Eq)]
10#[non_exhaustive]
11pub enum Block {
12 ChannelData {
15 fields: Vec<(String, String)>,
16 variables: Vec<(String, String)>,
17 },
18 Sdp {
20 direction: SdpDirection,
21 body: Vec<String>,
22 },
23 CodecNegotiation {
25 comparisons: Vec<(String, String)>,
26 selected: Vec<String>,
27 },
28}
29
30#[derive(Debug, Clone, Copy, PartialEq, Eq)]
34pub enum UnclassifiedTracking {
35 CountOnly,
37 TrackLines,
39 CaptureData,
41}
42
43#[derive(Debug, Clone, PartialEq, Eq)]
45#[non_exhaustive]
46pub enum UnclassifiedReason {
47 OrphanContinuation,
49 UnknownMessageFormat,
51 TruncatedField,
53}
54
55#[derive(Debug, Clone, PartialEq, Eq)]
57pub struct UnclassifiedLine {
58 pub line_number: u64,
59 pub reason: UnclassifiedReason,
60 pub data: Option<String>,
62}
63
64#[derive(Debug, Clone, Default)]
66pub struct ParseStats {
67 pub lines_processed: u64,
68 pub lines_unclassified: u64,
69 pub lines_in_entries: u64,
71 pub lines_empty_orphan: u64,
73 pub lines_split: u64,
76 pub unclassified_lines: Vec<UnclassifiedLine>,
78}
79
80impl ParseStats {
81 pub fn unaccounted_lines(&self) -> u64 {
88 let expected = self.lines_in_entries + self.lines_empty_orphan;
89 let actual = self.lines_processed + self.lines_split;
90 actual.saturating_sub(expected)
91 }
92}
93
94#[derive(Debug)]
100pub struct LogEntry {
101 pub uuid: String,
103 pub timestamp: String,
105 pub level: Option<LogLevel>,
107 pub idle_pct: Option<String>,
109 pub source: Option<String>,
111 pub message: String,
113 pub kind: LineKind,
115 pub message_kind: MessageKind,
117 pub block: Option<Block>,
119 pub attached: Vec<String>,
121 pub line_number: u64,
123 pub warnings: Vec<String>,
125}
126
127fn parse_field_line(msg: &str) -> Option<(String, String)> {
128 let colon = msg.find(": ")?;
129 let name = &msg[..colon];
130 if name.contains(' ') || name.is_empty() {
131 return None;
132 }
133 let value_part = &msg[colon + 2..];
134 let value = if let Some(inner) = value_part.strip_prefix('[') {
135 inner.strip_suffix(']').unwrap_or(inner)
136 } else {
137 value_part
138 };
139 Some((name.to_string(), value.to_string()))
140}
141
142enum StreamState {
143 Idle,
144 InChannelData {
145 fields: Vec<(String, String)>,
146 variables: Vec<(String, String)>,
147 open_var_name: Option<String>,
148 open_var_value: Option<String>,
149 },
150 InSdp {
151 direction: SdpDirection,
152 body: Vec<String>,
153 },
154 InCodecNegotiation {
155 comparisons: Vec<(String, String)>,
156 selected: Vec<String>,
157 },
158}
159
160impl StreamState {
161 fn take_idle(&mut self) -> StreamState {
162 std::mem::replace(self, StreamState::Idle)
163 }
164}
165
166pub struct LogStream<I> {
176 lines: I,
177 last_uuid: String,
178 last_timestamp: String,
179 pending: Option<LogEntry>,
180 state: StreamState,
181 stats: ParseStats,
182 tracking: UnclassifiedTracking,
183 line_number: u64,
184 split_pending: Option<String>,
185 deferred_warning: Option<String>,
186}
187
188impl<I: Iterator<Item = String>> LogStream<I> {
189 pub fn new(lines: I) -> Self {
191 LogStream {
192 lines,
193 last_uuid: String::new(),
194 last_timestamp: String::new(),
195 pending: None,
196 state: StreamState::Idle,
197 stats: ParseStats::default(),
198 tracking: UnclassifiedTracking::CountOnly,
199 line_number: 0,
200 split_pending: None,
201 deferred_warning: None,
202 }
203 }
204
205 pub fn unclassified_tracking(mut self, level: UnclassifiedTracking) -> Self {
207 self.tracking = level;
208 self
209 }
210
211 pub fn stats(&self) -> &ParseStats {
213 &self.stats
214 }
215
216 pub fn drain_unclassified(&mut self) -> Vec<UnclassifiedLine> {
220 std::mem::take(&mut self.stats.unclassified_lines)
221 }
222
223 fn record_unclassified(&mut self, reason: UnclassifiedReason, data: Option<&str>) {
224 self.stats.lines_unclassified += 1;
225 match self.tracking {
226 UnclassifiedTracking::CountOnly => {}
227 UnclassifiedTracking::TrackLines => {
228 self.stats.unclassified_lines.push(UnclassifiedLine {
229 line_number: self.line_number,
230 reason,
231 data: None,
232 });
233 }
234 UnclassifiedTracking::CaptureData => {
235 self.stats.unclassified_lines.push(UnclassifiedLine {
236 line_number: self.line_number,
237 reason,
238 data: data.map(|s| s.to_string()),
239 });
240 }
241 }
242 }
243
244 fn finalize_block(&mut self) -> (Option<Block>, Vec<String>) {
245 let mut warnings = Vec::new();
246 match self.state.take_idle() {
247 StreamState::Idle => (None, warnings),
248 StreamState::InChannelData {
249 fields,
250 mut variables,
251 open_var_name,
252 open_var_value,
253 } => {
254 if let (Some(ref name), Some(value)) = (&open_var_name, open_var_value) {
255 warnings.push(format!("unclosed multi-line variable: {name}"));
256 variables.push((name.clone(), value));
257 }
258 (Some(Block::ChannelData { fields, variables }), warnings)
259 }
260 StreamState::InSdp { direction, body } => {
261 (Some(Block::Sdp { direction, body }), warnings)
262 }
263 StreamState::InCodecNegotiation {
264 comparisons,
265 selected,
266 } => (
267 Some(Block::CodecNegotiation {
268 comparisons,
269 selected,
270 }),
271 warnings,
272 ),
273 }
274 }
275
276 fn finalize_pending(&mut self) -> Option<LogEntry> {
277 let (block, warnings) = self.finalize_block();
278 if let Some(ref mut p) = self.pending {
279 p.block = block;
280 p.warnings.extend(warnings);
281 self.stats.lines_in_entries += 1 + p.attached.len() as u64;
282 }
283 self.pending.take()
284 }
285
286 fn start_block_for_message(&mut self, message_kind: &MessageKind) {
287 self.state = match message_kind {
288 MessageKind::ChannelData => StreamState::InChannelData {
289 fields: Vec::new(),
290 variables: Vec::new(),
291 open_var_name: None,
292 open_var_value: None,
293 },
294 MessageKind::SdpMarker { direction } => StreamState::InSdp {
295 direction: direction.clone(),
296 body: Vec::new(),
297 },
298 MessageKind::CodecNegotiation => StreamState::InCodecNegotiation {
299 comparisons: Vec::new(),
300 selected: Vec::new(),
301 },
302 _ => StreamState::Idle,
303 };
304 }
305
306 fn accumulate_codec_entry(&mut self, msg: &str) {
307 let mut warning = None;
308 if let StreamState::InCodecNegotiation {
309 comparisons,
310 selected,
311 } = &mut self.state
312 {
313 let rest = msg.strip_prefix("Audio Codec Compare ").unwrap_or(msg);
314 if rest.contains("is saved as a match") {
315 let codec = rest.find(']').map(|end| &rest[1..end]).unwrap_or(rest);
316 selected.push(codec.to_string());
317 } else if let Some(slash) = rest.find("]/[") {
318 let offered = &rest[1..slash];
319 let local = &rest[slash + 3..rest.len().saturating_sub(1)];
320 comparisons.push((offered.to_string(), local.to_string()));
321 } else {
322 warning = Some(format!(
323 "unrecognized codec negotiation line: {}",
324 if msg.len() > 80 { &msg[..80] } else { msg }
325 ));
326 }
327 }
328 if let (Some(w), Some(ref mut pending)) = (warning, &mut self.pending) {
329 pending.warnings.push(w);
330 }
331 }
332
333 fn accumulate_continuation(&mut self, msg: &str, line: &str) {
334 let msg_kind = classify_message(msg);
335 let mut warning = None;
336 match &mut self.state {
337 StreamState::InChannelData {
338 fields,
339 variables,
340 open_var_name,
341 open_var_value,
342 } => {
343 if let Some(ref mut val) = open_var_value {
344 val.push('\n');
345 val.push_str(msg);
346 if msg.ends_with(']') {
347 let trimmed = val.trim_end_matches(']').to_string();
348 let name = open_var_name.take().unwrap();
349 *open_var_value = None;
350 variables.push((name, trimmed));
351 }
352 } else {
353 match &msg_kind {
354 MessageKind::ChannelField { name, value } => {
355 fields.push((name.clone(), value.clone()));
356 }
357 MessageKind::Variable { name, value } => {
358 if !msg.ends_with(']') && msg.contains(": [") {
359 *open_var_name = Some(name.clone());
360 *open_var_value = Some(value.clone());
361 } else {
362 variables.push((name.clone(), value.clone()));
363 }
364 }
365 _ => {
366 if let Some((name, value)) = parse_field_line(msg) {
367 fields.push((name, value));
368 } else {
369 warning = Some(format!(
370 "unparseable CHANNEL_DATA line: {}",
371 if msg.len() > 80 { &msg[..80] } else { msg }
372 ));
373 }
374 }
375 }
376 }
377 }
378 StreamState::InSdp { body, .. } => {
379 body.push(msg.to_string());
380 }
381 StreamState::InCodecNegotiation { .. } => {
382 warning = Some(format!(
383 "unexpected codec negotiation continuation: {}",
384 if msg.len() > 80 { &msg[..80] } else { msg }
385 ));
386 }
387 StreamState::Idle => {}
388 }
389 if let Some(ref mut pending) = self.pending {
390 if let Some(w) = warning {
391 pending.warnings.push(w);
392 }
393 pending.attached.push(line.to_string());
394 }
395 }
396
397 fn new_entry(
398 &mut self,
399 uuid: String,
400 timestamp: String,
401 message: String,
402 kind: LineKind,
403 message_kind: MessageKind,
404 ) -> LogEntry {
405 let mut warnings = Vec::new();
406 if let Some(w) = self.deferred_warning.take() {
407 warnings.push(w);
408 }
409 LogEntry {
410 uuid,
411 timestamp,
412 message,
413 kind,
414 message_kind,
415 level: None,
416 idle_pct: None,
417 source: None,
418 block: None,
419 attached: Vec::new(),
420 line_number: self.line_number,
421 warnings,
422 }
423 }
424}
425
426const MOD_LOGFILE_BUF_SIZE: usize = 2048;
430
431const MAX_LINE_PAYLOAD: usize = MOD_LOGFILE_BUF_SIZE - 36 - 1 - 1;
433
434impl<I: Iterator<Item = String>> LogStream<I> {
435 fn detect_collision(&mut self, line: String) -> String {
453 if line.len() > MAX_LINE_PAYLOAD {
454 let warning = format!(
455 "line exceeds mod_logfile 2048-byte buffer ({} bytes), data may be truncated",
456 line.len() + 38,
457 );
458 if let Some(ref mut pending) = self.pending {
459 pending.warnings.push(warning);
460 } else {
461 self.deferred_warning = Some(warning);
462 }
463 }
464
465 let bytes = line.as_bytes();
467 let min_scan = if is_uuid_at(bytes, 0) {
468 if bytes.len() > 37 && bytes[37].is_ascii_digit() {
469 64 } else {
471 37 }
473 } else if is_date_at(bytes, 0) {
474 27 } else {
476 0
477 };
478
479 let end = bytes.len().saturating_sub(28);
480 for offset in min_scan..=end {
481 if is_log_header_at(bytes, offset) {
483 let split_at = if offset >= 37 && is_uuid_at(bytes, offset - 37) {
485 offset - 37
486 } else {
487 offset
488 };
489 self.split_pending = Some(line[split_at..].to_string());
490 return line[..split_at].to_string();
491 }
492 if is_uuid_at(bytes, offset) && bytes.len() > MAX_LINE_PAYLOAD {
494 self.split_pending = Some(line[offset..].to_string());
495 return line[..offset].to_string();
496 }
497 }
498
499 line
500 }
501}
502
503impl<I: Iterator<Item = String>> Iterator for LogStream<I> {
504 type Item = LogEntry;
505
506 fn next(&mut self) -> Option<LogEntry> {
507 loop {
508 let line = if let Some(split) = self.split_pending.take() {
509 self.stats.lines_split += 1;
510 split
511 } else {
512 let Some(line) = self.lines.next() else {
513 return self.finalize_pending();
514 };
515
516 if line.starts_with('\x00') {
517 let yielded = self.finalize_pending();
518 self.last_uuid.clear();
519 self.last_timestamp.clear();
520 if yielded.is_some() {
521 return yielded;
522 }
523 continue;
524 }
525
526 self.line_number += 1;
527 self.stats.lines_processed += 1;
528 line
529 };
530
531 let line = self.detect_collision(line);
532
533 let parsed = parse_line(&line);
534
535 match parsed.kind {
536 LineKind::Full | LineKind::System | LineKind::Truncated => {
537 let uuid = parsed.uuid.unwrap_or("").to_string();
538 let message_kind = classify_message(parsed.message);
539
540 if message_kind == MessageKind::CodecNegotiation {
542 if let (Some(ref pending), StreamState::InCodecNegotiation { .. }) =
543 (&self.pending, &self.state)
544 {
545 if uuid == pending.uuid {
546 self.accumulate_codec_entry(parsed.message);
547 if let Some(ref mut p) = self.pending {
548 p.attached.push(line);
549 }
550 continue;
551 }
552 }
553 }
554
555 let yielded = self.finalize_pending();
556
557 let timestamp = parsed
558 .timestamp
559 .map(|t| t.to_string())
560 .unwrap_or_else(|| self.last_timestamp.clone());
561
562 if !uuid.is_empty() {
563 self.last_uuid = uuid.clone();
564 }
565 if parsed.timestamp.is_some() {
566 self.last_timestamp = timestamp.clone();
567 }
568
569 self.start_block_for_message(&message_kind);
570 if message_kind == MessageKind::CodecNegotiation {
571 self.accumulate_codec_entry(parsed.message);
572 }
573
574 let mut entry = self.new_entry(
575 uuid,
576 timestamp,
577 parsed.message.to_string(),
578 parsed.kind,
579 message_kind,
580 );
581 entry.level = parsed.level;
582 entry.idle_pct = parsed.idle_pct.map(|s| s.to_string());
583 entry.source = parsed.source.map(|s| s.to_string());
584 self.pending = Some(entry);
585
586 if yielded.is_some() {
587 return yielded;
588 }
589 }
590
591 LineKind::UuidContinuation => {
592 let uuid = parsed.uuid.unwrap_or("").to_string();
593 let is_primary = parsed.message.starts_with("EXECUTE ");
594
595 if let Some(ref pending) = self.pending {
596 if !is_primary && uuid == pending.uuid {
597 self.accumulate_continuation(parsed.message, &line);
598 } else {
599 let yielded = self.finalize_pending();
600 let message_kind = classify_message(parsed.message);
601
602 if !uuid.is_empty() {
603 self.last_uuid = uuid.clone();
604 }
605
606 self.start_block_for_message(&message_kind);
607 self.pending = Some(self.new_entry(
608 uuid,
609 self.last_timestamp.clone(),
610 parsed.message.to_string(),
611 parsed.kind,
612 message_kind,
613 ));
614
615 return yielded;
616 }
617 } else {
618 let message_kind = classify_message(parsed.message);
619
620 if !uuid.is_empty() {
621 self.last_uuid = uuid.clone();
622 }
623
624 self.start_block_for_message(&message_kind);
625 self.pending = Some(self.new_entry(
626 uuid,
627 self.last_timestamp.clone(),
628 parsed.message.to_string(),
629 parsed.kind,
630 message_kind,
631 ));
632 }
633 }
634
635 LineKind::BareContinuation => {
636 if self.pending.is_some() {
637 self.accumulate_continuation(parsed.message, &line);
638 } else {
639 self.record_unclassified(
640 UnclassifiedReason::OrphanContinuation,
641 Some(&line),
642 );
643 let message_kind = classify_message(parsed.message);
644 self.pending = Some(self.new_entry(
645 self.last_uuid.clone(),
646 self.last_timestamp.clone(),
647 parsed.message.to_string(),
648 parsed.kind,
649 message_kind,
650 ));
651 }
652 }
653
654 LineKind::Empty => {
655 if let Some(ref mut pending) = self.pending {
656 pending.attached.push(line);
657 } else {
658 self.stats.lines_empty_orphan += 1;
659 }
660 }
661 }
662 }
663 }
664}
665
666#[cfg(test)]
667mod tests {
668 use super::*;
669
670 const UUID1: &str = "a1b2c3d4-e5f6-7890-abcd-ef1234567890";
671 const UUID2: &str = "b2c3d4e5-f6a7-8901-bcde-f12345678901";
672
673 fn full_line(uuid: &str, ts: &str, msg: &str) -> String {
674 format!("{uuid} {ts} 95.97% [DEBUG] sofia.c:100 {msg}")
675 }
676
677 const TS1: &str = "2025-01-15 10:30:45.123456";
678 const TS2: &str = "2025-01-15 10:30:46.234567";
679
680 #[test]
683 fn inherits_uuid_for_bare_continuation() {
684 let lines = vec![
685 full_line(UUID1, TS1, "CHANNEL_DATA:"),
686 "variable_foo: [bar]".to_string(),
687 "variable_baz: [qux]".to_string(),
688 ];
689 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
690 assert_eq!(entries.len(), 1);
691 assert_eq!(entries[0].uuid, UUID1);
692 assert_eq!(entries[0].attached.len(), 2);
693 assert_eq!(entries[0].attached[0], "variable_foo: [bar]");
694 assert_eq!(entries[0].attached[1], "variable_baz: [qux]");
695 }
696
697 #[test]
698 fn inherits_timestamp_for_uuid_continuation() {
699 let lines = vec![
700 full_line(UUID1, TS1, "First"),
701 format!("{UUID2} Channel-State: [CS_EXECUTE]"),
702 ];
703 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
704 assert_eq!(entries.len(), 2);
705 assert_eq!(entries[0].timestamp, TS1);
706 assert_eq!(entries[1].uuid, UUID2);
707 assert_eq!(entries[1].timestamp, TS1);
708 }
709
710 #[test]
711 fn new_full_line_yields_previous() {
712 let lines = vec![
713 full_line(UUID1, TS1, "First"),
714 full_line(UUID2, TS2, "Second"),
715 ];
716 let mut stream = LogStream::new(lines.into_iter());
717 let first = stream.next().unwrap();
718 assert_eq!(first.uuid, UUID1);
719 assert_eq!(first.message, "First");
720 let second = stream.next().unwrap();
721 assert_eq!(second.uuid, UUID2);
722 assert_eq!(second.message, "Second");
723 assert!(stream.next().is_none());
724 }
725
726 #[test]
727 fn channel_data_collected_as_attached() {
728 let lines = vec![
729 full_line(UUID1, TS1, "CHANNEL_DATA:"),
730 format!("{UUID1} Channel-Name: [sofia/internal/+15550001234@192.0.2.1]"),
731 format!("{UUID1} Unique-ID: [{UUID1}]"),
732 "variable_sip_call_id: [test123@192.0.2.1]".to_string(),
733 ];
734 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
735 assert_eq!(entries.len(), 1);
736 assert_eq!(entries[0].message, "CHANNEL_DATA:");
737 assert_eq!(entries[0].attached.len(), 3);
738 }
739
740 #[test]
741 fn sdp_body_collected_as_attached() {
742 let lines = vec![
743 full_line(UUID1, TS1, "Local SDP:"),
744 "v=0".to_string(),
745 "o=- 1234 5678 IN IP4 192.0.2.1".to_string(),
746 "s=-".to_string(),
747 "c=IN IP4 192.0.2.1".to_string(),
748 "m=audio 10000 RTP/AVP 0".to_string(),
749 ];
750 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
751 assert_eq!(entries.len(), 1);
752 assert_eq!(entries[0].attached.len(), 5);
753 }
754
755 #[test]
756 fn truncated_starts_new_entry() {
757 let lines = vec![
758 full_line(UUID1, TS1, "First"),
759 format!(
760 "varia{UUID2} EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 set(x=y)"
761 ),
762 ];
763 let mut stream = LogStream::new(lines.into_iter());
764 let first = stream.next().unwrap();
765 assert_eq!(first.uuid, UUID1);
766 assert_eq!(first.message, "First");
767 let second = stream.next().unwrap();
768 assert_eq!(second.uuid, UUID2);
769 assert_eq!(second.kind, LineKind::Truncated);
770 }
771
772 #[test]
773 fn empty_lines_in_attached() {
774 let lines = vec![
775 full_line(UUID1, TS1, "First"),
776 String::new(),
777 "continuation".to_string(),
778 ];
779 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
780 assert_eq!(entries.len(), 1);
781 assert_eq!(entries[0].attached.len(), 2);
782 assert_eq!(entries[0].attached[0], "");
783 assert_eq!(entries[0].attached[1], "continuation");
784 }
785
786 #[test]
787 fn system_line_no_uuid() {
788 let lines = vec![format!(
789 "{TS1} 95.97% [INFO] mod_event_socket.c:1772 Event Socket command"
790 )];
791 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
792 assert_eq!(entries.len(), 1);
793 assert_eq!(entries[0].uuid, "");
794 assert_eq!(entries[0].kind, LineKind::System);
795 }
796
797 #[test]
798 fn final_entry_on_exhaustion() {
799 let lines = vec![full_line(UUID1, TS1, "Only entry")];
800 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
801 assert_eq!(entries.len(), 1);
802 assert_eq!(entries[0].message, "Only entry");
803 }
804
805 #[test]
806 fn consecutive_full_lines() {
807 let lines = vec![
808 full_line(UUID1, TS1, "First"),
809 full_line(UUID1, TS2, "Second"),
810 full_line(UUID2, TS1, "Third"),
811 ];
812 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
813 assert_eq!(entries.len(), 3);
814 for entry in &entries {
815 assert!(entry.attached.is_empty());
816 }
817 }
818
819 #[test]
820 fn execute_after_channel_data_same_uuid() {
821 let lines = vec![
822 full_line(UUID1, TS1, "CHANNEL_DATA:"),
823 format!("{UUID1} Channel-State: [CS_EXECUTE]"),
824 format!("{UUID1} variable_sip_call_id: [test@192.0.2.1]"),
825 "variable_foo: [bar]".to_string(),
826 String::new(),
827 String::new(),
828 format!("{UUID1} EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 export(originate_timeout=3600)"),
829 full_line(UUID1, TS2, "EXPORT (export_vars) [originate_timeout]=[3600]"),
830 ];
831 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
832 assert_eq!(entries.len(), 3);
833 assert_eq!(entries[0].message, "CHANNEL_DATA:");
834 assert_eq!(entries[0].attached.len(), 5);
835 assert_eq!(entries[1].message, "EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 export(originate_timeout=3600)");
836 assert_eq!(entries[1].kind, LineKind::UuidContinuation);
837 assert_eq!(
838 entries[2].message,
839 "EXPORT (export_vars) [originate_timeout]=[3600]"
840 );
841 }
842
843 #[test]
844 fn execute_between_full_lines_same_uuid() {
845 let lines = vec![
846 full_line(UUID1, TS1, "CoreSession::setVariable(X-C911P-City, ST GEORGES)"),
847 format!("{UUID1} EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 db(insert/ng_{UUID1}/city/ST GEORGES)"),
848 full_line(UUID1, TS2, "CoreSession::setVariable(X-C911P-Region, SGS)"),
849 ];
850 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
851 assert_eq!(entries.len(), 3);
852 assert_eq!(
853 entries[0].message,
854 "CoreSession::setVariable(X-C911P-City, ST GEORGES)"
855 );
856 assert!(entries[0].attached.is_empty());
857 assert!(entries[1].message.starts_with("EXECUTE "));
858 assert_eq!(entries[1].kind, LineKind::UuidContinuation);
859 assert_eq!(
860 entries[2].message,
861 "CoreSession::setVariable(X-C911P-Region, SGS)"
862 );
863 }
864
865 #[test]
866 fn multiple_execute_between_full_lines() {
867 let lines = vec![
868 full_line(UUID1, TS1, "CoreSession::setVariable(ngcs_call_id, urn:emergency:uid:callid:test)"),
869 format!("{UUID1} EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 db(insert/ng_{UUID1}/call_id/urn:emergency:uid:callid:test)"),
870 format!("{UUID1} EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 db(insert/callid_codecs/urn:emergency:uid:callid:test/PCMU@8000h)"),
871 full_line(UUID1, TS2, "CoreSession::setVariable(ngcs_short_call_id, test)"),
872 ];
873 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
874 assert_eq!(entries.len(), 4);
875 assert!(entries[0].attached.is_empty());
876 assert!(entries[1].message.contains("call_id"));
877 assert!(entries[2].message.contains("callid_codecs"));
878 assert_eq!(
879 entries[3].message,
880 "CoreSession::setVariable(ngcs_short_call_id, test)"
881 );
882 }
883
884 #[test]
885 fn uuid_continuation_different_uuid_yields() {
886 let lines = vec![
887 full_line(UUID1, TS1, "First"),
888 format!("{UUID1} Channel-State: [CS_EXECUTE]"),
889 format!("{UUID2} Dialplan: sofia/internal/+15550001234@192.0.2.1 parsing [public]"),
890 ];
891 let mut stream = LogStream::new(lines.into_iter());
892 let first = stream.next().unwrap();
893 assert_eq!(first.uuid, UUID1);
894 assert_eq!(first.attached.len(), 1);
895 let second = stream.next().unwrap();
896 assert_eq!(second.uuid, UUID2);
897 assert_eq!(
898 second.message,
899 "Dialplan: sofia/internal/+15550001234@192.0.2.1 parsing [public]"
900 );
901 }
902
903 #[test]
906 fn channel_data_block_fields_and_variables() {
907 let lines = vec![
908 full_line(UUID1, TS1, "CHANNEL_DATA:"),
909 format!("{UUID1} Channel-Name: [sofia/internal/+15550001234@192.0.2.1]"),
910 format!("{UUID1} Channel-State: [CS_EXECUTE]"),
911 format!("{UUID1} Unique-ID: [{UUID1}]"),
912 "variable_sip_call_id: [test123@192.0.2.1]".to_string(),
913 "variable_direction: [inbound]".to_string(),
914 ];
915 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
916 assert_eq!(entries.len(), 1);
917 assert_eq!(entries[0].message_kind, MessageKind::ChannelData);
918 let block = entries[0].block.as_ref().expect("should have block");
919 match block {
920 Block::ChannelData { fields, variables } => {
921 assert_eq!(fields.len(), 3);
922 assert_eq!(
923 fields[0],
924 (
925 "Channel-Name".to_string(),
926 "sofia/internal/+15550001234@192.0.2.1".to_string()
927 )
928 );
929 assert_eq!(
930 fields[1],
931 ("Channel-State".to_string(), "CS_EXECUTE".to_string())
932 );
933 assert_eq!(fields[2], ("Unique-ID".to_string(), UUID1.to_string()));
934 assert_eq!(variables.len(), 2);
935 assert_eq!(
936 variables[0],
937 (
938 "variable_sip_call_id".to_string(),
939 "test123@192.0.2.1".to_string()
940 )
941 );
942 assert_eq!(
943 variables[1],
944 ("variable_direction".to_string(), "inbound".to_string())
945 );
946 }
947 other => panic!("expected ChannelData block, got {other:?}"),
948 }
949 }
950
951 #[test]
952 fn channel_data_multiline_variable_reassembly() {
953 let lines = vec![
954 full_line(UUID1, TS1, "CHANNEL_DATA:"),
955 format!("{UUID1} Channel-Name: [sofia/internal/+15550001234@192.0.2.1]"),
956 "variable_switch_r_sdp: [v=0".to_string(),
957 "o=- 1234 5678 IN IP4 192.0.2.1".to_string(),
958 "s=-".to_string(),
959 "c=IN IP4 192.0.2.1".to_string(),
960 "m=audio 47758 RTP/AVP 0 101".to_string(),
961 "a=rtpmap:0 PCMU/8000".to_string(),
962 "]".to_string(),
963 "variable_direction: [inbound]".to_string(),
964 ];
965 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
966 assert_eq!(entries.len(), 1);
967 let block = entries[0].block.as_ref().expect("should have block");
968 match block {
969 Block::ChannelData { fields, variables } => {
970 assert_eq!(fields.len(), 1);
971 assert_eq!(variables.len(), 2);
972 assert_eq!(variables[0].0, "variable_switch_r_sdp");
973 assert!(variables[0].1.starts_with("v=0\n"));
974 assert!(variables[0].1.contains("m=audio 47758 RTP/AVP 0 101"));
975 assert!(!variables[0].1.ends_with(']'));
976 assert_eq!(
977 variables[1],
978 ("variable_direction".to_string(), "inbound".to_string())
979 );
980 }
981 other => panic!("expected ChannelData block, got {other:?}"),
982 }
983 assert_eq!(entries[0].attached.len(), 9);
984 }
985
986 #[test]
987 fn sdp_block_detection() {
988 let lines = vec![
989 full_line(UUID1, TS1, "Local SDP:"),
990 "v=0".to_string(),
991 "o=- 1234 5678 IN IP4 192.0.2.1".to_string(),
992 "s=-".to_string(),
993 "c=IN IP4 192.0.2.1".to_string(),
994 "m=audio 10000 RTP/AVP 0".to_string(),
995 "a=rtpmap:0 PCMU/8000".to_string(),
996 ];
997 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
998 assert_eq!(entries.len(), 1);
999 match &entries[0].message_kind {
1000 MessageKind::SdpMarker { direction } => assert_eq!(*direction, SdpDirection::Local),
1001 other => panic!("expected SdpMarker, got {other:?}"),
1002 }
1003 let block = entries[0].block.as_ref().expect("should have block");
1004 match block {
1005 Block::Sdp { direction, body } => {
1006 assert_eq!(*direction, SdpDirection::Local);
1007 assert_eq!(body.len(), 6);
1008 assert_eq!(body[0], "v=0");
1009 assert_eq!(body[5], "a=rtpmap:0 PCMU/8000");
1010 }
1011 other => panic!("expected Sdp block, got {other:?}"),
1012 }
1013 }
1014
1015 #[test]
1016 fn sdp_block_terminated_by_primary_line() {
1017 let lines = vec![
1018 full_line(UUID1, TS1, "Remote SDP:"),
1019 "v=0".to_string(),
1020 "m=audio 10000 RTP/AVP 0".to_string(),
1021 full_line(UUID1, TS2, "Next event"),
1022 ];
1023 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1024 assert_eq!(entries.len(), 2);
1025 let block = entries[0].block.as_ref().expect("should have block");
1026 match block {
1027 Block::Sdp { direction, body } => {
1028 assert_eq!(*direction, SdpDirection::Remote);
1029 assert_eq!(body.len(), 2);
1030 }
1031 other => panic!("expected Sdp block, got {other:?}"),
1032 }
1033 assert!(entries[1].block.is_none());
1034 }
1035
1036 #[test]
1037 fn sdp_from_uuid_continuation() {
1038 let lines = vec![
1039 format!("{UUID1} Local SDP:"),
1040 format!("{UUID1} v=0"),
1041 format!("{UUID1} o=FreeSWITCH 1234 5678 IN IP4 192.0.2.1"),
1042 format!("{UUID1} s=FreeSWITCH"),
1043 format!("{UUID1} c=IN IP4 192.0.2.1"),
1044 ];
1045 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1046 assert_eq!(entries.len(), 1);
1047 let block = entries[0].block.as_ref().expect("should have block");
1048 match block {
1049 Block::Sdp { direction, body } => {
1050 assert_eq!(*direction, SdpDirection::Local);
1051 assert_eq!(body.len(), 4);
1052 assert_eq!(body[0], "v=0");
1053 }
1054 other => panic!("expected Sdp block, got {other:?}"),
1055 }
1056 }
1057
1058 #[test]
1059 fn channel_data_interrupted_by_different_uuid() {
1060 let lines = vec![
1061 full_line(UUID1, TS1, "CHANNEL_DATA:"),
1062 format!("{UUID1} Channel-Name: [sofia/internal/+15550001234@192.0.2.1]"),
1063 format!("{UUID2} Dialplan: sofia/internal/+15559999999@192.0.2.1 parsing [public]"),
1064 ];
1065 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1066 assert_eq!(entries.len(), 2);
1067 let block = entries[0].block.as_ref().expect("should have block");
1068 match block {
1069 Block::ChannelData { fields, .. } => {
1070 assert_eq!(fields.len(), 1);
1071 }
1072 other => panic!("expected ChannelData, got {other:?}"),
1073 }
1074 }
1075
1076 #[test]
1077 fn no_block_for_non_block_message() {
1078 let lines = vec![full_line(UUID1, TS1, "some random freeswitch log message")];
1079 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1080 assert_eq!(entries.len(), 1);
1081 assert!(entries[0].block.is_none());
1082 assert_eq!(entries[0].message_kind, MessageKind::General);
1083 }
1084
1085 #[test]
1086 fn message_kind_on_execute() {
1087 let lines = vec![
1088 full_line(UUID1, TS1, "First"),
1089 format!("{UUID1} EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 set(foo=bar)"),
1090 ];
1091 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1092 assert_eq!(entries.len(), 2);
1093 match &entries[1].message_kind {
1094 MessageKind::Execute {
1095 application,
1096 arguments,
1097 ..
1098 } => {
1099 assert_eq!(application, "set");
1100 assert_eq!(arguments, "foo=bar");
1101 }
1102 other => panic!("expected Execute, got {other:?}"),
1103 }
1104 }
1105
1106 #[test]
1109 fn stats_lines_processed() {
1110 let lines = vec![
1111 full_line(UUID1, TS1, "First"),
1112 full_line(UUID1, TS2, "Second"),
1113 format!("{UUID1} Channel-State: [CS_EXECUTE]"),
1114 ];
1115 let mut stream = LogStream::new(lines.into_iter());
1116 let _: Vec<_> = stream.by_ref().collect();
1117 assert_eq!(stream.stats().lines_processed, 3);
1118 }
1119
1120 #[test]
1121 fn stats_unclassified_orphan() {
1122 let lines = vec![
1123 "variable_foo: [bar]".to_string(),
1124 full_line(UUID1, TS1, "After orphan"),
1125 ];
1126 let mut stream = LogStream::new(lines.into_iter())
1127 .unclassified_tracking(UnclassifiedTracking::TrackLines);
1128 let _: Vec<_> = stream.by_ref().collect();
1129 assert_eq!(stream.stats().lines_unclassified, 1);
1130 assert_eq!(stream.stats().unclassified_lines.len(), 1);
1131 assert_eq!(
1132 stream.stats().unclassified_lines[0].reason,
1133 UnclassifiedReason::OrphanContinuation,
1134 );
1135 }
1136
1137 #[test]
1138 fn stats_capture_data() {
1139 let lines = vec!["orphan line".to_string(), full_line(UUID1, TS1, "After")];
1140 let mut stream = LogStream::new(lines.into_iter())
1141 .unclassified_tracking(UnclassifiedTracking::CaptureData);
1142 let _: Vec<_> = stream.by_ref().collect();
1143 assert_eq!(stream.stats().unclassified_lines.len(), 1);
1144 assert_eq!(
1145 stream.stats().unclassified_lines[0].data.as_deref(),
1146 Some("orphan line"),
1147 );
1148 }
1149
1150 #[test]
1151 fn stats_count_only_no_allocation() {
1152 let lines = vec!["orphan line".to_string(), full_line(UUID1, TS1, "After")];
1153 let mut stream = LogStream::new(lines.into_iter());
1154 let _: Vec<_> = stream.by_ref().collect();
1155 assert_eq!(stream.stats().lines_unclassified, 1);
1156 assert!(stream.stats().unclassified_lines.is_empty());
1157 }
1158
1159 #[test]
1160 fn line_number_tracking() {
1161 let lines = vec![
1162 full_line(UUID1, TS1, "First"),
1163 format!("{UUID1} Channel-State: [CS_EXECUTE]"),
1164 full_line(UUID2, TS2, "Third"),
1165 ];
1166 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1167 assert_eq!(entries[0].line_number, 1);
1168 assert_eq!(entries[1].line_number, 3);
1169 }
1170
1171 #[test]
1172 fn drain_unclassified() {
1173 let lines = vec![
1174 "orphan1".to_string(),
1175 "orphan2".to_string(),
1176 full_line(UUID1, TS1, "After"),
1177 ];
1178 let mut stream = LogStream::new(lines.into_iter())
1179 .unclassified_tracking(UnclassifiedTracking::TrackLines);
1180 let _: Vec<_> = stream.by_ref().collect();
1181 let drained = stream.drain_unclassified();
1182 assert_eq!(drained.len(), 1);
1183 assert!(stream.stats().unclassified_lines.is_empty());
1184 assert_eq!(stream.stats().lines_unclassified, 1);
1185 }
1186
1187 #[test]
1195 fn continuation_lines_at_file_boundary_must_not_inherit_previous_timestamp() {
1196 use crate::TrackedChain;
1197
1198 let uuid_a = "aaaaaaaa-1111-2222-3333-444444444444";
1199 let uuid_b = "bbbbbbbb-1111-2222-3333-444444444444";
1200 let ts_old = "2025-01-15 23:58:03.000000";
1201 let ts_new = "2025-01-16 08:37:12.000000";
1202
1203 let seg1: Vec<String> = vec![format!(
1204 "{uuid_a} {ts_old} 95.00% [DEBUG] test.c:1 Last line in rotated file"
1205 )];
1206
1207 let seg2: Vec<String> = vec![
1210 format!("{uuid_b} CHANNEL_DATA:"),
1211 format!("{uuid_b} Channel-State: [CS_EXECUTE]"),
1212 format!("{uuid_b} {ts_new} 95.00% [DEBUG] test.c:1 First timestamped line in new file"),
1213 ];
1214
1215 let segments: Vec<(String, Box<dyn Iterator<Item = String>>)> = vec![
1216 ("rotated.log".to_string(), Box::new(seg1.into_iter())),
1217 ("freeswitch.log".to_string(), Box::new(seg2.into_iter())),
1218 ];
1219
1220 let (chain, _) = TrackedChain::new(segments);
1221 let entries: Vec<_> = LogStream::new(chain).collect();
1222
1223 let b_entry = entries
1224 .iter()
1225 .find(|e| e.uuid == uuid_b)
1226 .expect("should find entry for uuid_b");
1227
1228 assert_ne!(
1232 b_entry.timestamp, ts_old,
1233 "continuation lines in a new file segment inherited timestamp \
1234 '{ts_old}' from the previous segment — timestamps must not bleed \
1235 across file boundaries"
1236 );
1237 }
1238
1239 fn assert_accounting(stream: &LogStream<impl Iterator<Item = String>>) {
1242 let stats = stream.stats();
1243 assert_eq!(
1244 stats.unaccounted_lines(),
1245 0,
1246 "line accounting invariant violated: \
1247 processed={} + split={} != in_entries={} + empty_orphan={}",
1248 stats.lines_processed,
1249 stats.lines_split,
1250 stats.lines_in_entries,
1251 stats.lines_empty_orphan,
1252 );
1253 }
1254
1255 #[test]
1256 fn accounting_full_lines() {
1257 let lines = vec![
1258 full_line(UUID1, TS1, "First"),
1259 full_line(UUID2, TS2, "Second"),
1260 ];
1261 let mut stream = LogStream::new(lines.into_iter());
1262 let entries: Vec<_> = stream.by_ref().collect();
1263 assert_eq!(entries.len(), 2);
1264 assert_eq!(stream.stats().lines_in_entries, 2);
1265 assert_accounting(&stream);
1266 }
1267
1268 #[test]
1269 fn accounting_with_attached() {
1270 let lines = vec![
1271 full_line(UUID1, TS1, "CHANNEL_DATA:"),
1272 format!("{UUID1} Channel-State: [CS_EXECUTE]"),
1273 "variable_foo: [bar]".to_string(),
1274 full_line(UUID2, TS2, "Next"),
1275 ];
1276 let mut stream = LogStream::new(lines.into_iter());
1277 let entries: Vec<_> = stream.by_ref().collect();
1278 assert_eq!(entries.len(), 2);
1279 assert_eq!(stream.stats().lines_in_entries, 4);
1282 assert_accounting(&stream);
1283 }
1284
1285 #[test]
1286 fn accounting_system_line() {
1287 let lines = vec![format!(
1288 "{TS1} 95.97% [NOTICE] mod_logfile.c:217 New log started."
1289 )];
1290 let mut stream = LogStream::new(lines.into_iter());
1291 let _: Vec<_> = stream.by_ref().collect();
1292 assert_eq!(stream.stats().lines_in_entries, 1);
1293 assert_accounting(&stream);
1294 }
1295
1296 #[test]
1297 fn accounting_empty_orphan() {
1298 let lines = vec![
1299 String::new(),
1300 " ".to_string(),
1301 full_line(UUID1, TS1, "After"),
1302 ];
1303 let mut stream = LogStream::new(lines.into_iter());
1304 let entries: Vec<_> = stream.by_ref().collect();
1305 assert_eq!(entries.len(), 1);
1306 assert_eq!(stream.stats().lines_empty_orphan, 2);
1307 assert_accounting(&stream);
1308 }
1309
1310 #[test]
1311 fn accounting_empty_attached() {
1312 let lines = vec![
1313 full_line(UUID1, TS1, "First"),
1314 String::new(),
1315 "continuation".to_string(),
1316 ];
1317 let mut stream = LogStream::new(lines.into_iter());
1318 let entries: Vec<_> = stream.by_ref().collect();
1319 assert_eq!(entries.len(), 1);
1320 assert_eq!(entries[0].attached.len(), 2);
1321 assert_eq!(stream.stats().lines_empty_orphan, 0);
1322 assert_eq!(stream.stats().lines_in_entries, 3);
1323 assert_accounting(&stream);
1324 }
1325
1326 #[test]
1327 fn accounting_orphan_continuation() {
1328 let lines = vec!["orphan line".to_string(), full_line(UUID1, TS1, "After")];
1329 let mut stream = LogStream::new(lines.into_iter());
1330 let _: Vec<_> = stream.by_ref().collect();
1331 assert_accounting(&stream);
1332 }
1333
1334 #[test]
1335 fn accounting_codec_merging() {
1336 let lines = vec![
1337 full_line(
1338 UUID1,
1339 TS1,
1340 "Audio Codec Compare [PCMU:0:8000:20:64000:1]/[PCMU:0:8000:20:64000:1]",
1341 ),
1342 full_line(
1343 UUID1,
1344 TS1,
1345 "Audio Codec Compare [PCMU:0:8000:20:64000:1] is saved as a match",
1346 ),
1347 full_line(UUID2, TS2, "Next"),
1348 ];
1349 let mut stream = LogStream::new(lines.into_iter());
1350 let _: Vec<_> = stream.by_ref().collect();
1351 assert_accounting(&stream);
1352 }
1353
1354 #[test]
1355 fn accounting_truncated_line() {
1356 let lines = vec![
1357 full_line(UUID1, TS1, "First"),
1358 format!(
1359 "varia{UUID2} EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 set(x=y)"
1360 ),
1361 ];
1362 let mut stream = LogStream::new(lines.into_iter());
1363 let _: Vec<_> = stream.by_ref().collect();
1364 assert_accounting(&stream);
1365 }
1366
1367 #[test]
1368 fn accounting_long_line_collision_split() {
1369 let long_value = "x".repeat(MAX_LINE_PAYLOAD + 10);
1372 let line = format!(
1373 "variable_sip_multipart: [{long_value}]{UUID2} EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 set(foo=bar)"
1374 );
1375 let lines = vec![full_line(UUID1, TS1, "CHANNEL_DATA:"), line];
1376 let mut stream = LogStream::new(lines.into_iter());
1377 let entries: Vec<_> = stream.by_ref().collect();
1378
1379 assert_eq!(entries[0].message, "CHANNEL_DATA:");
1381
1382 let split_entry = entries.iter().find(|e| e.uuid == UUID2);
1384 assert!(
1385 split_entry.is_some(),
1386 "collision UUID should produce a separate entry"
1387 );
1388
1389 assert_eq!(stream.stats().lines_split, 1);
1390 assert_accounting(&stream);
1391 }
1392
1393 #[test]
1394 fn no_split_on_short_lines() {
1395 let line = format!("variable_call_uuid: [{UUID2}]");
1398 let lines = vec![full_line(UUID1, TS1, "CHANNEL_DATA:"), line];
1399 let mut stream = LogStream::new(lines.into_iter());
1400 let entries: Vec<_> = stream.by_ref().collect();
1401 assert_eq!(entries.len(), 1);
1402 assert_eq!(stream.stats().lines_split, 0);
1403 assert_accounting(&stream);
1404 }
1405
1406 #[test]
1407 fn timestamp_collision_splits_system_lines() {
1408 let line = format!(
1409 "{TS1} 98.03% [INFO] mod_event_socket.c:1752 Event Socket Command from ::1:42864: api sofia jsonstatus{TS2} 97.93% [INFO] mod_event_socket.c:1752 Event Socket Command from ::1:42898: api fsctl pause_check"
1410 );
1411 let mut stream = LogStream::new(std::iter::once(line));
1412 let entries: Vec<_> = stream.by_ref().collect();
1413 assert_eq!(entries.len(), 2);
1414 assert_eq!(
1415 entries[0].message,
1416 "Event Socket Command from ::1:42864: api sofia jsonstatus"
1417 );
1418 assert_eq!(
1419 entries[1].message,
1420 "Event Socket Command from ::1:42898: api fsctl pause_check"
1421 );
1422 assert_eq!(stream.stats().lines_split, 1);
1423 assert_accounting(&stream);
1424 }
1425
1426 #[test]
1427 fn timestamp_collision_splits_three_entries() {
1428 let ts3 = "2025-01-15 10:30:47.345678";
1429 let line = format!(
1430 "{TS1} 95.00% [INFO] mod.c:1 first{TS2} 96.00% [INFO] mod.c:1 second{ts3} 97.00% [INFO] mod.c:1 third"
1431 );
1432 let mut stream = LogStream::new(std::iter::once(line));
1433 let entries: Vec<_> = stream.by_ref().collect();
1434 assert_eq!(entries.len(), 3);
1435 assert_eq!(entries[0].message, "first");
1436 assert_eq!(entries[1].message, "second");
1437 assert_eq!(entries[2].message, "third");
1438 assert_eq!(stream.stats().lines_split, 2);
1439 assert_accounting(&stream);
1440 }
1441
1442 #[test]
1443 fn timestamp_collision_with_uuid_prefix() {
1444 let line = format!(
1446 "{TS1} 95.00% [INFO] mod.c:1 first{UUID1} {TS2} 96.00% [DEBUG] sofia.c:100 second"
1447 );
1448 let mut stream = LogStream::new(std::iter::once(line));
1449 let entries: Vec<_> = stream.by_ref().collect();
1450 assert_eq!(entries.len(), 2);
1451 assert_eq!(entries[0].message, "first");
1452 assert_eq!(entries[1].uuid, UUID1);
1453 assert_eq!(entries[1].message, "second");
1454 assert_eq!(stream.stats().lines_split, 1);
1455 assert_accounting(&stream);
1456 }
1457
1458 #[test]
1459 fn channel_data_multiline_variable_spans_many_lines() {
1460 let lines = vec![
1461 full_line(UUID1, TS1, "CHANNEL_DATA:"),
1462 format!("{UUID1} Channel-Name: [sofia/internal/+15550001234@192.0.2.1]"),
1463 "variable_switch_r_sdp: [v=0".to_string(),
1464 "o=- 1234 5678 IN IP4 192.0.2.1".to_string(),
1465 "s=-".to_string(),
1466 "c=IN IP4 192.0.2.1".to_string(),
1467 "t=0 0".to_string(),
1468 "m=audio 47758 RTP/AVP 0 8 101".to_string(),
1469 "a=rtpmap:0 PCMU/8000".to_string(),
1470 "a=rtpmap:8 PCMA/8000".to_string(),
1471 "a=rtpmap:101 telephone-event/8000".to_string(),
1472 "a=fmtp:101 0-16".to_string(),
1473 "]".to_string(),
1474 "variable_direction: [inbound]".to_string(),
1475 ];
1476 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1477 assert_eq!(entries.len(), 1);
1478 let block = entries[0].block.as_ref().expect("should have block");
1479 match block {
1480 Block::ChannelData { fields, variables } => {
1481 assert_eq!(fields.len(), 1);
1482 assert_eq!(variables.len(), 2);
1483 assert_eq!(variables[0].0, "variable_switch_r_sdp");
1484 let sdp = &variables[0].1;
1485 assert!(sdp.starts_with("v=0\n"));
1486 assert!(sdp.contains("a=fmtp:101 0-16"));
1487 assert!(!sdp.ends_with(']'));
1488 assert_eq!(variables[1].0, "variable_direction");
1489 }
1490 other => panic!("expected ChannelData block, got {other:?}"),
1491 }
1492 }
1493
1494 #[test]
1495 fn sdp_from_verto_update_media() {
1496 let lines = vec![
1497 full_line(UUID1, TS1, "updateMedia: Local SDP"),
1498 "v=0".to_string(),
1499 "o=- 1234 5678 IN IP4 192.0.2.1".to_string(),
1500 "m=audio 10000 RTP/AVP 0".to_string(),
1501 ];
1502 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1503 assert_eq!(entries.len(), 1);
1504 match &entries[0].message_kind {
1505 MessageKind::SdpMarker { direction } => assert_eq!(*direction, SdpDirection::Local),
1506 other => panic!("expected SdpMarker, got {other:?}"),
1507 }
1508 let block = entries[0].block.as_ref().expect("should have block");
1509 match block {
1510 Block::Sdp { direction, body } => {
1511 assert_eq!(*direction, SdpDirection::Local);
1512 assert_eq!(body.len(), 3);
1513 }
1514 other => panic!("expected Sdp block, got {other:?}"),
1515 }
1516 }
1517
1518 #[test]
1519 fn duplicate_sdp_marker() {
1520 let lines = vec![
1521 full_line(UUID1, TS1, "Duplicate SDP"),
1522 "v=0".to_string(),
1523 "m=audio 10000 RTP/AVP 0".to_string(),
1524 ];
1525 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1526 assert_eq!(entries.len(), 1);
1527 match &entries[0].message_kind {
1528 MessageKind::SdpMarker { direction } => assert_eq!(*direction, SdpDirection::Unknown),
1529 other => panic!("expected SdpMarker, got {other:?}"),
1530 }
1531 assert!(entries[0].block.is_some());
1532 }
1533
1534 #[test]
1535 fn warning_on_unclosed_multiline_variable() {
1536 let lines = vec![
1537 full_line(UUID1, TS1, "CHANNEL_DATA:"),
1538 "variable_switch_r_sdp: [v=0".to_string(),
1539 "o=- 1234 5678 IN IP4 192.0.2.1".to_string(),
1540 full_line(UUID2, TS2, "Next entry"),
1541 ];
1542 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1543 assert_eq!(entries.len(), 2);
1544 assert!(
1545 entries[0]
1546 .warnings
1547 .iter()
1548 .any(|w| w.contains("unclosed multi-line variable")),
1549 "expected unclosed variable warning, got: {:?}",
1550 entries[0].warnings
1551 );
1552 }
1553
1554 #[test]
1555 fn warning_on_unparseable_channel_data_line() {
1556 let lines = vec![
1557 full_line(UUID1, TS1, "CHANNEL_DATA:"),
1558 format!("{UUID1} Channel-Name: [sofia/internal/+15550001234@192.0.2.1]"),
1559 format!("{UUID1} this is not a valid field line"),
1560 ];
1561 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1562 assert_eq!(entries.len(), 1);
1563 assert!(
1564 entries[0]
1565 .warnings
1566 .iter()
1567 .any(|w| w.contains("unparseable CHANNEL_DATA")),
1568 "expected unparseable warning, got: {:?}",
1569 entries[0].warnings
1570 );
1571 }
1572
1573 #[test]
1574 fn warning_on_unexpected_codec_continuation() {
1575 let lines = vec![
1576 full_line(
1577 UUID1,
1578 TS1,
1579 "Audio Codec Compare [PCMU:0:8000:20:64000:1]/[PCMU:0:8000:20:64000:1]",
1580 ),
1581 format!("{UUID1} some unexpected continuation line"),
1582 ];
1583 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1584 assert_eq!(entries.len(), 1);
1585 assert!(
1586 entries[0]
1587 .warnings
1588 .iter()
1589 .any(|w| w.contains("unexpected codec negotiation")),
1590 "expected codec warning, got: {:?}",
1591 entries[0].warnings
1592 );
1593 }
1594
1595 #[test]
1596 fn system_line_uuid_continuation_not_absorbed() {
1597 let lines = vec![
1600 format!("{TS1} 95.97% [INFO] mod_event_socket.c:1772 Event Socket command"),
1601 format!("{UUID1} Channel-State: [CS_EXECUTE]"),
1602 ];
1603 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1604 assert_eq!(
1605 entries.len(),
1606 2,
1607 "UUID continuation should not be absorbed by system entry"
1608 );
1609 assert_eq!(entries[0].uuid, "");
1610 assert_eq!(entries[1].uuid, UUID1);
1611 }
1612
1613 #[test]
1614 fn truncated_collision_in_channel_data_variable() {
1615 let padding = "x".repeat(2000);
1621 let collision_line = format!(
1622 "{UUID1} variable_long_xml: [{padding}{UUID1} EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 export(foo=bar)"
1623 );
1624 assert!(
1625 collision_line.len() > super::MAX_LINE_PAYLOAD,
1626 "test line must exceed buffer limit, got {}",
1627 collision_line.len()
1628 );
1629
1630 let lines = vec![
1631 full_line(UUID1, TS1, "CHANNEL_DATA:"),
1632 format!("{UUID1} Channel-Name: [sofia/internal/+15550001234@192.0.2.1]"),
1633 format!("{UUID1} variable_direction: [inbound]"),
1634 collision_line,
1635 full_line(UUID1, TS2, "Next log entry"),
1636 ];
1637
1638 let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1639
1640 assert_eq!(entries[0].message, "CHANNEL_DATA:");
1642 let block = entries[0].block.as_ref().expect("should have block");
1643 match block {
1644 Block::ChannelData { fields, variables } => {
1645 assert_eq!(fields.len(), 1, "should have Channel-Name field");
1646 assert_eq!(fields[0].0, "Channel-Name");
1647 assert_eq!(
1648 variables.len(),
1649 2,
1650 "should have direction + unclosed long_xml"
1651 );
1652 assert_eq!(variables[0].0, "variable_direction");
1653 assert_eq!(variables[0].1, "inbound");
1654 assert_eq!(variables[1].0, "variable_long_xml");
1655 }
1656 other => panic!("expected ChannelData block, got {other:?}"),
1657 }
1658 assert!(
1659 entries[0]
1660 .warnings
1661 .iter()
1662 .any(|w| w.contains("line exceeds mod_logfile 2048-byte buffer")),
1663 "expected buffer overflow warning, got: {:?}",
1664 entries[0].warnings
1665 );
1666 assert!(
1667 entries[0]
1668 .warnings
1669 .iter()
1670 .any(|w| w.contains("unclosed multi-line variable")),
1671 "expected unclosed variable warning, got: {:?}",
1672 entries[0].warnings
1673 );
1674
1675 assert_eq!(entries[1].uuid, UUID1);
1677 assert!(
1678 entries[1].message.starts_with("EXECUTE "),
1679 "split entry should be EXECUTE, got: {}",
1680 entries[1].message
1681 );
1682
1683 assert_eq!(entries.len(), 3);
1685 assert_eq!(entries[2].message, "Next log entry");
1686 }
1687
1688 #[test]
1689 fn channel_data_uuid_drops_mid_block() {
1690 let lines = vec![
1695 full_line(UUID1, TS1, "CHANNEL_DATA:"),
1696 format!("{UUID1} variable_max_forwards: [69]"),
1697 format!("{UUID1} variable_presence_id: [1251@[2001:db8::10]]"),
1698 format!("{UUID1} variable_sip_h_X-Custom-ID: [c4da84eb-88a7-40b2-b90d-e5bc2a0f634e]"),
1699 "variable_sip_h_X-Call-Info: [<urn:test:callid:20260316>;purpose=emergency-CallId]"
1701 .to_string(),
1702 "variable_ep_codec_string: [mod_opus.opus@48000h@20i@2c]".to_string(),
1703 "variable_remote_media_ip: [2001:db8::10]".to_string(),
1704 "variable_remote_media_port: [9952]".to_string(),
1705 "variable_rtp_use_codec_name: [opus]".to_string(),
1706 full_line(UUID1, TS2, "Next entry"),
1707 ];
1708
1709 let mut stream = LogStream::new(lines.into_iter());
1710 let entries: Vec<_> = stream.by_ref().collect();
1711
1712 assert_eq!(entries.len(), 2);
1713 assert_eq!(entries[0].message, "CHANNEL_DATA:");
1714 let block = entries[0].block.as_ref().expect("should have block");
1715 match block {
1716 Block::ChannelData { fields, variables } => {
1717 assert_eq!(fields.len(), 0);
1718 assert_eq!(variables.len(), 8);
1719 assert_eq!(variables[0].0, "variable_max_forwards");
1721 assert_eq!(variables[0].1, "69");
1722 assert_eq!(variables[1].0, "variable_presence_id");
1723 assert_eq!(variables[1].1, "1251@[2001:db8::10]");
1724 assert_eq!(variables[2].0, "variable_sip_h_X-Custom-ID");
1725 assert_eq!(variables[3].0, "variable_sip_h_X-Call-Info");
1727 assert!(variables[3].1.contains("emergency-CallId"));
1728 assert_eq!(variables[4].0, "variable_ep_codec_string");
1729 assert_eq!(variables[7].0, "variable_rtp_use_codec_name");
1730 assert_eq!(variables[7].1, "opus");
1731 }
1732 other => panic!("expected ChannelData block, got {other:?}"),
1733 }
1734 assert_eq!(entries[0].attached.len(), 8);
1735 assert_eq!(entries[1].message, "Next entry");
1736 assert_accounting(&stream);
1737 }
1738
1739 #[test]
1740 fn channel_data_uuid_drops_with_multiline_variable() {
1741 let lines = vec![
1746 full_line(UUID1, TS1, "CHANNEL_DATA:"),
1747 format!("{UUID1} variable_max_forwards: [69]"),
1748 format!("{UUID1} variable_sip_h_X-Custom-ID: [c4da84eb-88a7-40b2-b90d-e5bc2a0f634e]"),
1749 "variable_switch_r_sdp: [v=0\r".to_string(),
1751 "o=FreeSWITCH 1773663549 1773663550 IN IP6 2001:db8::10\r".to_string(),
1752 "s=FreeSWITCH\r".to_string(),
1753 "c=IN IP6 2001:db8::10\r".to_string(),
1754 "t=0 0\r".to_string(),
1755 "m=audio 9952 RTP/AVP 102 101 13\r".to_string(),
1756 "a=rtpmap:102 opus/48000/2\r".to_string(),
1757 "a=ptime:20\r".to_string(),
1758 "]".to_string(),
1759 "variable_ep_codec_string: [mod_opus.opus@48000h@20i@2c]".to_string(),
1760 "variable_direction: [inbound]".to_string(),
1761 full_line(UUID1, TS2, "Next entry"),
1762 ];
1763
1764 let mut stream = LogStream::new(lines.into_iter());
1765 let entries: Vec<_> = stream.by_ref().collect();
1766
1767 assert_eq!(entries.len(), 2);
1768 let block = entries[0].block.as_ref().expect("should have block");
1769 match block {
1770 Block::ChannelData { fields, variables } => {
1771 assert_eq!(fields.len(), 0);
1772 assert_eq!(variables.len(), 5);
1773 assert_eq!(variables[0].0, "variable_max_forwards");
1774 assert_eq!(variables[1].0, "variable_sip_h_X-Custom-ID");
1775 assert_eq!(variables[2].0, "variable_switch_r_sdp");
1777 let sdp = &variables[2].1;
1778 assert!(
1779 sdp.starts_with("v=0\r\n"),
1780 "SDP should start with v=0\\r\\n, got: {sdp:?}"
1781 );
1782 assert!(sdp.contains("m=audio 9952 RTP/AVP 102 101 13\r"));
1783 assert!(sdp.contains("a=ptime:20\r"));
1784 assert!(!sdp.ends_with(']'), "closing bracket should be stripped");
1785 assert_eq!(variables[3].0, "variable_ep_codec_string");
1787 assert_eq!(variables[4].0, "variable_direction");
1788 assert_eq!(variables[4].1, "inbound");
1789 }
1790 other => panic!("expected ChannelData block, got {other:?}"),
1791 }
1792 assert_eq!(entries[0].attached.len(), 13);
1794 assert_accounting(&stream);
1795 }
1796
1797 #[test]
1798 fn channel_data_bare_variable_collision_with_execute() {
1799 let collision = format!(
1806 "variable_call_uuid: {UUID1} EXECUTE [depth=0] \
1807 sofia/internal-v6/1251@[2001:db8::10] export(nolocal:test_var=value)"
1808 );
1809
1810 let lines = vec![
1811 full_line(UUID1, TS1, "CHANNEL_DATA:"),
1812 format!("{UUID1} variable_max_forwards: [69]"),
1813 "variable_DP_MATCH: [ARRAY::create_conference|:create_conference]".to_string(),
1815 collision,
1816 full_line(
1818 UUID1,
1819 TS2,
1820 "EXPORT (export_vars) (REMOTE ONLY) [test_var]=[value]",
1821 ),
1822 ];
1823
1824 let mut stream = LogStream::new(lines.into_iter());
1825 let entries: Vec<_> = stream.by_ref().collect();
1826
1827 assert_eq!(entries.len(), 3);
1829 let block = entries[0].block.as_ref().expect("should have block");
1830 match block {
1831 Block::ChannelData { fields, variables } => {
1832 assert_eq!(fields.len(), 0);
1833 assert_eq!(variables.len(), 2);
1834 assert_eq!(variables[0].0, "variable_max_forwards");
1835 assert_eq!(variables[1].0, "variable_DP_MATCH");
1836 }
1837 other => panic!("expected ChannelData block, got {other:?}"),
1838 }
1839
1840 assert_eq!(entries[1].uuid, UUID1);
1842 assert_eq!(entries[1].kind, LineKind::Truncated);
1843 assert!(
1844 entries[1].message.starts_with("EXECUTE "),
1845 "truncated line should yield EXECUTE, got: {}",
1846 entries[1].message
1847 );
1848
1849 assert_eq!(entries[2].message_kind.label(), "variable");
1851 assert_accounting(&stream);
1852 }
1853
1854 #[test]
1855 fn system_line_with_embedded_uuid_gets_entry_uuid() {
1856 let lines = vec![
1859 format!(
1860 "{TS1} 95.97% [DEBUG] switch_cpp.cpp:1466 {UUID1} DAA-LOG WaveManager originate"
1861 ),
1862 format!(
1863 "{TS1} 95.97% [WARNING] switch_cpp.cpp:1466 {UUID1} DAA-LOG Failed to create session"
1864 ),
1865 full_line(UUID1, TS2, "State Change CS_EXECUTE -> CS_HIBERNATE"),
1866 ];
1867
1868 let mut stream = LogStream::new(lines.into_iter());
1869 let entries: Vec<_> = stream.by_ref().collect();
1870
1871 assert_eq!(entries.len(), 3);
1872 assert_eq!(entries[0].uuid, UUID1);
1874 assert_eq!(entries[0].kind, LineKind::System);
1875 assert_eq!(entries[0].message, "DAA-LOG WaveManager originate");
1876
1877 assert_eq!(entries[1].uuid, UUID1);
1878 assert_eq!(entries[1].kind, LineKind::System);
1879 assert_eq!(entries[1].message, "DAA-LOG Failed to create session");
1880
1881 assert_eq!(entries[2].uuid, UUID1);
1883 assert_eq!(entries[2].kind, LineKind::Full);
1884 assert_accounting(&stream);
1885 }
1886}