Skip to main content

freeswitch_log_parser/
stream.rs

1use crate::attached::AttachedLines;
2use crate::level::LogLevel;
3use crate::line::{is_date_at, is_log_header_at, is_uuid_at, parse_line, LineKind};
4use crate::message::{classify_message, MessageKind, SdpDirection};
5
6/// Structured data extracted from a multi-line dump that follows a primary log entry.
7///
8/// Each variant corresponds to a block type that the stream state machine
9/// recognizes and reassembles from continuation lines.
10#[derive(Debug, Clone, PartialEq, Eq)]
11#[non_exhaustive]
12pub enum Block {
13    /// Channel variable dump — `Channel-*` fields and `variable_*` key-value pairs.
14    /// Multi-line variable values (e.g. embedded SDP) are reassembled with `\n` separators.
15    ChannelData {
16        fields: Vec<(String, String)>,
17        variables: Vec<(String, String)>,
18    },
19    /// SDP session description body, collected line by line.
20    Sdp {
21        direction: SdpDirection,
22        body: Vec<String>,
23    },
24    /// Codec negotiation sequence — offered/local comparisons and selected matches.
25    CodecNegotiation {
26        comparisons: Vec<(String, String)>,
27        selected: Vec<String>,
28    },
29}
30
31/// Controls how much detail is recorded for lines that couldn't be fully classified.
32///
33/// Higher fidelity levels allocate more memory. The default is `CountOnly`.
34#[derive(Debug, Clone, Copy, PartialEq, Eq)]
35pub enum UnclassifiedTracking {
36    /// Increment the counter only — zero allocation.
37    CountOnly,
38    /// Record line number and reason for each unclassified line.
39    TrackLines,
40    /// Like `TrackLines` plus the full line content.
41    CaptureData,
42}
43
44/// Why a line was marked as unclassified.
45#[derive(Debug, Clone, PartialEq, Eq)]
46#[non_exhaustive]
47pub enum UnclassifiedReason {
48    /// Bare continuation line arrived with no pending entry to attach to.
49    OrphanContinuation,
50    /// Line was parsed but the message didn't match any known pattern.
51    UnknownMessageFormat,
52    /// EXECUTE or variable line was only partially readable.
53    TruncatedField,
54}
55
56/// Record of a single unclassified line, captured when tracking is enabled.
57#[derive(Debug, Clone, PartialEq, Eq)]
58pub struct UnclassifiedLine {
59    pub line_number: u64,
60    pub reason: UnclassifiedReason,
61    /// The full line content; only populated under [`UnclassifiedTracking::CaptureData`].
62    pub data: Option<String>,
63}
64
65/// Cumulative parsing statistics, updated as lines flow through the stream.
66#[derive(Debug, Clone, Default)]
67pub struct ParseStats {
68    pub lines_processed: u64,
69    pub lines_unclassified: u64,
70    /// Lines that became part of entries (primary line + attached lines per entry).
71    pub lines_in_entries: u64,
72    /// Empty lines that arrived with no pending entry to attach to.
73    pub lines_empty_orphan: u64,
74    /// Physical lines that were split into multiple logical entries due to
75    /// mod_logfile's 2048-byte snprintf truncation causing same-line collisions.
76    pub lines_split: u64,
77    /// Populated only when tracking is `TrackLines` or `CaptureData`.
78    pub unclassified_lines: Vec<UnclassifiedLine>,
79}
80
81impl ParseStats {
82    /// Lines that were processed but not accounted for by any tracking category.
83    ///
84    /// Returns 0 when the parser correctly accounts for every input line.
85    /// A non-zero value indicates a parser bug — lines were silently lost.
86    ///
87    /// Invariant: `lines_processed + lines_split == lines_in_entries + lines_empty_orphan`
88    pub fn unaccounted_lines(&self) -> u64 {
89        let expected = self.lines_in_entries + self.lines_empty_orphan;
90        let actual = self.lines_processed + self.lines_split;
91        actual.saturating_sub(expected)
92    }
93}
94
95/// A complete parsed log entry with all context resolved.
96///
97/// Produced by [`LogStream`]. Continuation lines have been grouped,
98/// UUID/timestamp inherited from context where needed, and multi-line blocks
99/// reassembled.
100#[derive(Debug)]
101pub struct LogEntry {
102    /// Session UUID, or empty string for system lines.
103    pub uuid: String,
104    /// Timestamp with microsecond precision; inherited from the previous entry for continuations.
105    pub timestamp: String,
106    /// `None` for continuation and truncated lines.
107    pub level: Option<LogLevel>,
108    /// Core scheduler idle percentage; `None` for continuations.
109    pub idle_pct: Option<String>,
110    /// Source file:line; `None` for continuations.
111    pub source: Option<String>,
112    /// The primary message text.
113    pub message: String,
114    /// Which line format originated this entry.
115    pub kind: LineKind,
116    /// Semantic classification of the message content.
117    pub message_kind: MessageKind,
118    /// Typed, parsed multi-line block; `None` for entries without a trailing block.
119    pub block: Option<Block>,
120    /// Raw continuation lines that followed the primary line.
121    pub attached: AttachedLines,
122    /// 1-based line number in the input stream.
123    pub line_number: u64,
124    /// Per-entry warnings about parsing anomalies.
125    pub warnings: Vec<String>,
126}
127
128fn parse_field_line(msg: &str) -> Option<(String, String)> {
129    let colon = msg.find(": ")?;
130    let name = &msg[..colon];
131    if name.contains(' ') || name.is_empty() {
132        return None;
133    }
134    let value_part = &msg[colon + 2..];
135    let value = if let Some(inner) = value_part.strip_prefix('[') {
136        inner.strip_suffix(']').unwrap_or(inner)
137    } else {
138        value_part
139    };
140    Some((name.to_string(), value.to_string()))
141}
142
143enum StreamState {
144    Idle,
145    InChannelData {
146        fields: Vec<(String, String)>,
147        variables: Vec<(String, String)>,
148        open_var_name: Option<String>,
149        open_var_value: Option<String>,
150    },
151    InSdp {
152        direction: SdpDirection,
153        body: Vec<String>,
154    },
155    InCodecNegotiation {
156        comparisons: Vec<(String, String)>,
157        selected: Vec<String>,
158    },
159}
160
161impl StreamState {
162    fn take_idle(&mut self) -> StreamState {
163        std::mem::replace(self, StreamState::Idle)
164    }
165}
166
167/// Layer 2 structural state machine — groups continuation lines, classifies
168/// messages, and detects multi-line blocks (CHANNEL_DATA, SDP, codec negotiation).
169///
170/// Wraps any `Iterator<Item = String>` and yields [`LogEntry`] values.
171/// Maintains `last_uuid` and `last_timestamp` to fill in context for
172/// continuation lines that lack their own.
173///
174/// Use the builder method [`unclassified_tracking()`](LogStream::unclassified_tracking)
175/// to control diagnostic detail before iterating.
176pub struct LogStream<I> {
177    lines: I,
178    last_uuid: String,
179    last_timestamp: String,
180    pending: Option<LogEntry>,
181    state: StreamState,
182    stats: ParseStats,
183    tracking: UnclassifiedTracking,
184    line_number: u64,
185    split_pending: Option<String>,
186    deferred_warning: Option<String>,
187}
188
189impl<I: Iterator<Item = String>> LogStream<I> {
190    /// Create a new stream from any line iterator.
191    pub fn new(lines: I) -> Self {
192        LogStream {
193            lines,
194            last_uuid: String::new(),
195            last_timestamp: String::new(),
196            pending: None,
197            state: StreamState::Idle,
198            stats: ParseStats::default(),
199            tracking: UnclassifiedTracking::CountOnly,
200            line_number: 0,
201            split_pending: None,
202            deferred_warning: None,
203        }
204    }
205
206    /// Set the unclassified line tracking level (builder pattern). Defaults to `CountOnly`.
207    pub fn unclassified_tracking(mut self, level: UnclassifiedTracking) -> Self {
208        self.tracking = level;
209        self
210    }
211
212    /// Cumulative parsing statistics up to the current position.
213    pub fn stats(&self) -> &ParseStats {
214        &self.stats
215    }
216
217    /// Take all accumulated unclassified line records, leaving the internal vec empty.
218    ///
219    /// The `lines_unclassified` counter is not reset.
220    pub fn drain_unclassified(&mut self) -> Vec<UnclassifiedLine> {
221        std::mem::take(&mut self.stats.unclassified_lines)
222    }
223
224    fn record_unclassified(&mut self, reason: UnclassifiedReason, data: Option<&str>) {
225        self.stats.lines_unclassified += 1;
226        match self.tracking {
227            UnclassifiedTracking::CountOnly => {}
228            UnclassifiedTracking::TrackLines => {
229                self.stats.unclassified_lines.push(UnclassifiedLine {
230                    line_number: self.line_number,
231                    reason,
232                    data: None,
233                });
234            }
235            UnclassifiedTracking::CaptureData => {
236                self.stats.unclassified_lines.push(UnclassifiedLine {
237                    line_number: self.line_number,
238                    reason,
239                    data: data.map(|s| s.to_string()),
240                });
241            }
242        }
243    }
244
245    fn finalize_block(&mut self) -> (Option<Block>, Vec<String>) {
246        let mut warnings = Vec::new();
247        match self.state.take_idle() {
248            StreamState::Idle => (None, warnings),
249            StreamState::InChannelData {
250                fields,
251                mut variables,
252                open_var_name,
253                open_var_value,
254            } => {
255                if let (Some(ref name), Some(value)) = (&open_var_name, open_var_value) {
256                    warnings.push(format!("unclosed multi-line variable: {name}"));
257                    variables.push((name.clone(), value));
258                }
259                (Some(Block::ChannelData { fields, variables }), warnings)
260            }
261            StreamState::InSdp { direction, body } => {
262                (Some(Block::Sdp { direction, body }), warnings)
263            }
264            StreamState::InCodecNegotiation {
265                comparisons,
266                selected,
267            } => (
268                Some(Block::CodecNegotiation {
269                    comparisons,
270                    selected,
271                }),
272                warnings,
273            ),
274        }
275    }
276
277    fn finalize_pending(&mut self) -> Option<LogEntry> {
278        let (block, warnings) = self.finalize_block();
279        if let Some(ref mut p) = self.pending {
280            p.block = block;
281            p.warnings.extend(warnings);
282            self.stats.lines_in_entries += 1 + p.attached.len() as u64;
283        }
284        self.pending.take()
285    }
286
287    fn start_block_for_message(&mut self, message_kind: &MessageKind) {
288        self.state = match message_kind {
289            MessageKind::ChannelData => StreamState::InChannelData {
290                fields: Vec::new(),
291                variables: Vec::new(),
292                open_var_name: None,
293                open_var_value: None,
294            },
295            MessageKind::SdpMarker { direction } => StreamState::InSdp {
296                direction: direction.clone(),
297                body: Vec::new(),
298            },
299            MessageKind::CodecNegotiation => StreamState::InCodecNegotiation {
300                comparisons: Vec::new(),
301                selected: Vec::new(),
302            },
303            _ => StreamState::Idle,
304        };
305    }
306
307    fn accumulate_codec_entry(&mut self, msg: &str) {
308        let mut warning = None;
309        if let StreamState::InCodecNegotiation {
310            comparisons,
311            selected,
312        } = &mut self.state
313        {
314            let rest = msg.strip_prefix("Audio Codec Compare ").unwrap_or(msg);
315            if rest.contains("is saved as a match") {
316                let codec = rest.find(']').map(|end| &rest[1..end]).unwrap_or(rest);
317                selected.push(codec.to_string());
318            } else if let Some(slash) = rest.find("]/[") {
319                let offered = &rest[1..slash];
320                let local = &rest[slash + 3..rest.len().saturating_sub(1)];
321                comparisons.push((offered.to_string(), local.to_string()));
322            } else {
323                warning = Some(format!(
324                    "unrecognized codec negotiation line: {}",
325                    if msg.len() > 80 { &msg[..80] } else { msg }
326                ));
327            }
328        }
329        if let (Some(w), Some(ref mut pending)) = (warning, &mut self.pending) {
330            pending.warnings.push(w);
331        }
332    }
333
334    fn accumulate_continuation(&mut self, msg: &str, line: &str) {
335        let msg_kind = classify_message(msg);
336        let mut warning = None;
337        match &mut self.state {
338            StreamState::InChannelData {
339                fields,
340                variables,
341                open_var_name,
342                open_var_value,
343            } => {
344                if let Some(ref mut val) = open_var_value {
345                    val.push('\n');
346                    val.push_str(msg);
347                    if msg.ends_with(']') {
348                        let trimmed = val.trim_end_matches(']').to_string();
349                        let name = open_var_name.take().unwrap();
350                        *open_var_value = None;
351                        variables.push((name, trimmed));
352                    }
353                } else {
354                    match &msg_kind {
355                        MessageKind::ChannelField { name, value } => {
356                            fields.push((name.clone(), value.clone()));
357                        }
358                        MessageKind::Variable { name, value } => {
359                            if !msg.ends_with(']') && msg.contains(": [") {
360                                *open_var_name = Some(name.clone());
361                                *open_var_value = Some(value.clone());
362                            } else {
363                                variables.push((name.clone(), value.clone()));
364                            }
365                        }
366                        _ => {
367                            if let Some((name, value)) = parse_field_line(msg) {
368                                fields.push((name, value));
369                            } else {
370                                warning = Some(format!(
371                                    "unparseable CHANNEL_DATA line: {}",
372                                    if msg.len() > 80 { &msg[..80] } else { msg }
373                                ));
374                            }
375                        }
376                    }
377                }
378            }
379            StreamState::InSdp { body, .. } => {
380                body.push(msg.to_string());
381            }
382            StreamState::InCodecNegotiation { .. } => {
383                warning = Some(format!(
384                    "unexpected codec negotiation continuation: {}",
385                    if msg.len() > 80 { &msg[..80] } else { msg }
386                ));
387            }
388            StreamState::Idle => {}
389        }
390        if let Some(ref mut pending) = self.pending {
391            if let Some(w) = warning {
392                pending.warnings.push(w);
393            }
394            pending.attached.push(line);
395        }
396    }
397
398    fn new_entry(
399        &mut self,
400        uuid: String,
401        timestamp: String,
402        message: String,
403        kind: LineKind,
404        message_kind: MessageKind,
405    ) -> LogEntry {
406        let mut warnings = Vec::new();
407        if let Some(w) = self.deferred_warning.take() {
408            warnings.push(w);
409        }
410        LogEntry {
411            uuid,
412            timestamp,
413            message,
414            kind,
415            message_kind,
416            level: None,
417            idle_pct: None,
418            source: None,
419            block: None,
420            attached: AttachedLines::new(),
421            line_number: self.line_number,
422            warnings,
423        }
424    }
425}
426
427/// mod_logfile's `snprintf` buffer size for UUID-prefixed lines.
428/// Lines exceeding this in the formatted output lose their trailing newline,
429/// causing the next queue entry to collide on the same physical line.
430const MOD_LOGFILE_BUF_SIZE: usize = 2048;
431
432/// Effective maximum payload per line (buffer minus UUID, space, newline).
433const MAX_LINE_PAYLOAD: usize = MOD_LOGFILE_BUF_SIZE - 36 - 1 - 1;
434
435/// Tolerance around the expected truncation boundary when scanning oversize
436/// lines for Format E collisions. The boundary is deterministic
437/// (`MAX_LINE_PAYLOAD` from the start of the truncated chunk) but a few bytes
438/// of slack covers minor variation in `snprintf` accounting and any future
439/// drift in the prepend format.
440const COLLISION_SCAN_SLACK: usize = 64;
441
442impl<I: Iterator<Item = String>> LogStream<I> {
443    /// Detect same-line collisions where multiple log entries were concatenated
444    /// without a newline separator.
445    ///
446    /// Two collision mechanisms exist in production:
447    ///
448    /// 1. **Buffer truncation** (Format E): `mod_logfile`'s 2048-byte `snprintf`
449    ///    buffer truncates a long line, losing the trailing `\n`. The next entry
450    ///    from the log queue collides on the same physical line. These lines
451    ///    always exceed `MAX_LINE_PAYLOAD`.
452    ///
453    /// 2. **Write contention**: multiple threads writing to the log file can
454    ///    interleave output, producing concatenated entries at any line length.
455    ///    Common with system lines (Format B) that lack UUID prefixes.
456    ///
457    /// Returns the (possibly truncated) line. If a collision is detected,
458    /// the suffix is stored in `split_pending` for processing in the next
459    /// iteration. Recursive: split suffixes pass through this function again.
460    fn detect_collision(&mut self, line: String) -> String {
461        if line.len() > MAX_LINE_PAYLOAD {
462            let warning = format!(
463                "line exceeds mod_logfile 2048-byte buffer ({} bytes), data may be truncated",
464                line.len() + 38,
465            );
466            if let Some(ref mut pending) = self.pending {
467                pending.warnings.push(warning);
468            } else {
469                self.deferred_warning = Some(warning);
470            }
471        }
472
473        // Skip past the line's own header to avoid matching itself.
474        let bytes = line.as_bytes();
475        let min_scan = if is_uuid_at(bytes, 0) {
476            if bytes.len() > 37 && bytes[37].is_ascii_digit() {
477                64 // Full line: UUID + timestamp
478            } else {
479                37 // UUID continuation
480            }
481        } else if is_date_at(bytes, 0) {
482            27 // System line: skip own timestamp
483        } else {
484            0
485        };
486
487        let end = bytes.len().saturating_sub(28);
488        // For oversize lines (Format E), the collision UUID lands at a
489        // deterministic offset of ~MAX_LINE_PAYLOAD from the chunk start —
490        // mod_logfile's snprintf truncation point. Bounding the scan to a
491        // small window avoids O(n²) behavior on lines that split many times,
492        // where the previous full-suffix scan re-walked tens of KB per split.
493        // Format B (write-contention timestamp collisions) at arbitrary
494        // offsets within an oversize line are not detected; that combination
495        // is not observed in production logs.
496        let (scan_lo, scan_hi) = if bytes.len() > MAX_LINE_PAYLOAD {
497            let lo = min_scan.max(MAX_LINE_PAYLOAD.saturating_sub(COLLISION_SCAN_SLACK));
498            let hi = end.min(MAX_LINE_PAYLOAD + COLLISION_SCAN_SLACK);
499            (lo, hi)
500        } else {
501            (min_scan, end)
502        };
503
504        if scan_lo <= scan_hi {
505            for offset in scan_lo..=scan_hi {
506                // Timestamp collision (System or Full line header)
507                if is_log_header_at(bytes, offset) {
508                    // Check if a UUID precedes the timestamp (Full line collision)
509                    let split_at = if offset >= 37 && is_uuid_at(bytes, offset - 37) {
510                        offset - 37
511                    } else {
512                        offset
513                    };
514                    self.split_pending = Some(line[split_at..].to_string());
515                    return line[..split_at].to_string();
516                }
517                // UUID collision without timestamp (Format E — truncated buffer)
518                if is_uuid_at(bytes, offset) && bytes.len() > MAX_LINE_PAYLOAD {
519                    self.split_pending = Some(line[offset..].to_string());
520                    return line[..offset].to_string();
521                }
522            }
523        }
524
525        line
526    }
527}
528
529impl<I: Iterator<Item = String>> Iterator for LogStream<I> {
530    type Item = LogEntry;
531
532    fn next(&mut self) -> Option<LogEntry> {
533        loop {
534            let line = if let Some(split) = self.split_pending.take() {
535                self.stats.lines_split += 1;
536                split
537            } else {
538                let Some(line) = self.lines.next() else {
539                    return self.finalize_pending();
540                };
541
542                if line.starts_with('\x00') {
543                    let yielded = self.finalize_pending();
544                    self.last_uuid.clear();
545                    self.last_timestamp.clear();
546                    if yielded.is_some() {
547                        return yielded;
548                    }
549                    continue;
550                }
551
552                self.line_number += 1;
553                self.stats.lines_processed += 1;
554                line
555            };
556
557            let line = self.detect_collision(line);
558
559            let parsed = parse_line(&line);
560
561            match parsed.kind {
562                LineKind::Full | LineKind::System | LineKind::Truncated => {
563                    let uuid = parsed.uuid.unwrap_or("").to_string();
564                    let message_kind = classify_message(parsed.message);
565
566                    // Merge consecutive codec negotiation entries with same UUID
567                    if message_kind == MessageKind::CodecNegotiation {
568                        if let (Some(ref pending), StreamState::InCodecNegotiation { .. }) =
569                            (&self.pending, &self.state)
570                        {
571                            if uuid == pending.uuid {
572                                self.accumulate_codec_entry(parsed.message);
573                                if let Some(ref mut p) = self.pending {
574                                    p.attached.push(&line);
575                                }
576                                continue;
577                            }
578                        }
579                    }
580
581                    let yielded = self.finalize_pending();
582
583                    let timestamp = parsed
584                        .timestamp
585                        .map(|t| t.to_string())
586                        .unwrap_or_else(|| self.last_timestamp.clone());
587
588                    if !uuid.is_empty() {
589                        self.last_uuid = uuid.clone();
590                    }
591                    if parsed.timestamp.is_some() {
592                        self.last_timestamp = timestamp.clone();
593                    }
594
595                    self.start_block_for_message(&message_kind);
596                    if message_kind == MessageKind::CodecNegotiation {
597                        self.accumulate_codec_entry(parsed.message);
598                    }
599
600                    let mut entry = self.new_entry(
601                        uuid,
602                        timestamp,
603                        parsed.message.to_string(),
604                        parsed.kind,
605                        message_kind,
606                    );
607                    entry.level = parsed.level;
608                    entry.idle_pct = parsed.idle_pct.map(|s| s.to_string());
609                    entry.source = parsed.source.map(|s| s.to_string());
610                    self.pending = Some(entry);
611
612                    if yielded.is_some() {
613                        return yielded;
614                    }
615                }
616
617                LineKind::UuidContinuation => {
618                    let uuid = parsed.uuid.unwrap_or("").to_string();
619                    let is_primary = parsed.message.starts_with("EXECUTE ");
620
621                    if let Some(ref pending) = self.pending {
622                        if !is_primary && uuid == pending.uuid {
623                            self.accumulate_continuation(parsed.message, &line);
624                        } else {
625                            let yielded = self.finalize_pending();
626                            let message_kind = classify_message(parsed.message);
627
628                            if !uuid.is_empty() {
629                                self.last_uuid = uuid.clone();
630                            }
631
632                            self.start_block_for_message(&message_kind);
633                            self.pending = Some(self.new_entry(
634                                uuid,
635                                self.last_timestamp.clone(),
636                                parsed.message.to_string(),
637                                parsed.kind,
638                                message_kind,
639                            ));
640
641                            return yielded;
642                        }
643                    } else {
644                        let message_kind = classify_message(parsed.message);
645
646                        if !uuid.is_empty() {
647                            self.last_uuid = uuid.clone();
648                        }
649
650                        self.start_block_for_message(&message_kind);
651                        self.pending = Some(self.new_entry(
652                            uuid,
653                            self.last_timestamp.clone(),
654                            parsed.message.to_string(),
655                            parsed.kind,
656                            message_kind,
657                        ));
658                    }
659                }
660
661                LineKind::BareContinuation => {
662                    if self.pending.is_some() {
663                        self.accumulate_continuation(parsed.message, &line);
664                    } else {
665                        self.record_unclassified(
666                            UnclassifiedReason::OrphanContinuation,
667                            Some(&line),
668                        );
669                        let message_kind = classify_message(parsed.message);
670                        self.pending = Some(self.new_entry(
671                            self.last_uuid.clone(),
672                            self.last_timestamp.clone(),
673                            parsed.message.to_string(),
674                            parsed.kind,
675                            message_kind,
676                        ));
677                    }
678                }
679
680                LineKind::Empty => {
681                    if let Some(ref mut pending) = self.pending {
682                        pending.attached.push(&line);
683                    } else {
684                        self.stats.lines_empty_orphan += 1;
685                    }
686                }
687            }
688        }
689    }
690}
691
692#[cfg(test)]
693mod tests {
694    use super::*;
695
696    const UUID1: &str = "a1b2c3d4-e5f6-7890-abcd-ef1234567890";
697    const UUID2: &str = "b2c3d4e5-f6a7-8901-bcde-f12345678901";
698
699    fn full_line(uuid: &str, ts: &str, msg: &str) -> String {
700        format!("{uuid} {ts} 95.97% [DEBUG] sofia.c:100 {msg}")
701    }
702
703    const TS1: &str = "2025-01-15 10:30:45.123456";
704    const TS2: &str = "2025-01-15 10:30:46.234567";
705
706    // --- Existing behavior tests (preserved) ---
707
708    #[test]
709    fn inherits_uuid_for_bare_continuation() {
710        let lines = vec![
711            full_line(UUID1, TS1, "CHANNEL_DATA:"),
712            "variable_foo: [bar]".to_string(),
713            "variable_baz: [qux]".to_string(),
714        ];
715        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
716        assert_eq!(entries.len(), 1);
717        assert_eq!(entries[0].uuid, UUID1);
718        assert_eq!(entries[0].attached.len(), 2);
719        assert_eq!(entries[0].attached.get(0), Some("variable_foo: [bar]"));
720        assert_eq!(entries[0].attached.get(1), Some("variable_baz: [qux]"));
721    }
722
723    #[test]
724    fn inherits_timestamp_for_uuid_continuation() {
725        let lines = vec![
726            full_line(UUID1, TS1, "First"),
727            format!("{UUID2} Channel-State: [CS_EXECUTE]"),
728        ];
729        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
730        assert_eq!(entries.len(), 2);
731        assert_eq!(entries[0].timestamp, TS1);
732        assert_eq!(entries[1].uuid, UUID2);
733        assert_eq!(entries[1].timestamp, TS1);
734    }
735
736    #[test]
737    fn new_full_line_yields_previous() {
738        let lines = vec![
739            full_line(UUID1, TS1, "First"),
740            full_line(UUID2, TS2, "Second"),
741        ];
742        let mut stream = LogStream::new(lines.into_iter());
743        let first = stream.next().unwrap();
744        assert_eq!(first.uuid, UUID1);
745        assert_eq!(first.message, "First");
746        let second = stream.next().unwrap();
747        assert_eq!(second.uuid, UUID2);
748        assert_eq!(second.message, "Second");
749        assert!(stream.next().is_none());
750    }
751
752    #[test]
753    fn channel_data_collected_as_attached() {
754        let lines = vec![
755            full_line(UUID1, TS1, "CHANNEL_DATA:"),
756            format!("{UUID1} Channel-Name: [sofia/internal/+15550001234@192.0.2.1]"),
757            format!("{UUID1} Unique-ID: [{UUID1}]"),
758            "variable_sip_call_id: [test123@192.0.2.1]".to_string(),
759        ];
760        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
761        assert_eq!(entries.len(), 1);
762        assert_eq!(entries[0].message, "CHANNEL_DATA:");
763        assert_eq!(entries[0].attached.len(), 3);
764    }
765
766    #[test]
767    fn sdp_body_collected_as_attached() {
768        let lines = vec![
769            full_line(UUID1, TS1, "Local SDP:"),
770            "v=0".to_string(),
771            "o=- 1234 5678 IN IP4 192.0.2.1".to_string(),
772            "s=-".to_string(),
773            "c=IN IP4 192.0.2.1".to_string(),
774            "m=audio 10000 RTP/AVP 0".to_string(),
775        ];
776        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
777        assert_eq!(entries.len(), 1);
778        assert_eq!(entries[0].attached.len(), 5);
779    }
780
781    #[test]
782    fn truncated_starts_new_entry() {
783        let lines = vec![
784            full_line(UUID1, TS1, "First"),
785            format!(
786                "varia{UUID2} EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 set(x=y)"
787            ),
788        ];
789        let mut stream = LogStream::new(lines.into_iter());
790        let first = stream.next().unwrap();
791        assert_eq!(first.uuid, UUID1);
792        assert_eq!(first.message, "First");
793        let second = stream.next().unwrap();
794        assert_eq!(second.uuid, UUID2);
795        assert_eq!(second.kind, LineKind::Truncated);
796    }
797
798    #[test]
799    fn empty_lines_in_attached() {
800        let lines = vec![
801            full_line(UUID1, TS1, "First"),
802            String::new(),
803            "continuation".to_string(),
804        ];
805        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
806        assert_eq!(entries.len(), 1);
807        assert_eq!(entries[0].attached.len(), 2);
808        assert_eq!(entries[0].attached.get(0), Some(""));
809        assert_eq!(entries[0].attached.get(1), Some("continuation"));
810    }
811
812    #[test]
813    fn system_line_no_uuid() {
814        let lines = vec![format!(
815            "{TS1} 95.97% [INFO] mod_event_socket.c:1772 Event Socket command"
816        )];
817        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
818        assert_eq!(entries.len(), 1);
819        assert_eq!(entries[0].uuid, "");
820        assert_eq!(entries[0].kind, LineKind::System);
821    }
822
823    #[test]
824    fn final_entry_on_exhaustion() {
825        let lines = vec![full_line(UUID1, TS1, "Only entry")];
826        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
827        assert_eq!(entries.len(), 1);
828        assert_eq!(entries[0].message, "Only entry");
829    }
830
831    #[test]
832    fn consecutive_full_lines() {
833        let lines = vec![
834            full_line(UUID1, TS1, "First"),
835            full_line(UUID1, TS2, "Second"),
836            full_line(UUID2, TS1, "Third"),
837        ];
838        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
839        assert_eq!(entries.len(), 3);
840        for entry in &entries {
841            assert!(entry.attached.is_empty());
842        }
843    }
844
845    #[test]
846    fn execute_after_channel_data_same_uuid() {
847        let lines = vec![
848            full_line(UUID1, TS1, "CHANNEL_DATA:"),
849            format!("{UUID1} Channel-State: [CS_EXECUTE]"),
850            format!("{UUID1} variable_sip_call_id: [test@192.0.2.1]"),
851            "variable_foo: [bar]".to_string(),
852            String::new(),
853            String::new(),
854            format!("{UUID1} EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 export(originate_timeout=3600)"),
855            full_line(UUID1, TS2, "EXPORT (export_vars) [originate_timeout]=[3600]"),
856        ];
857        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
858        assert_eq!(entries.len(), 3);
859        assert_eq!(entries[0].message, "CHANNEL_DATA:");
860        assert_eq!(entries[0].attached.len(), 5);
861        assert_eq!(entries[1].message, "EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 export(originate_timeout=3600)");
862        assert_eq!(entries[1].kind, LineKind::UuidContinuation);
863        assert_eq!(
864            entries[2].message,
865            "EXPORT (export_vars) [originate_timeout]=[3600]"
866        );
867    }
868
869    #[test]
870    fn execute_between_full_lines_same_uuid() {
871        let lines = vec![
872            full_line(UUID1, TS1, "CoreSession::setVariable(X-C911P-City, ST GEORGES)"),
873            format!("{UUID1} EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 db(insert/ng_{UUID1}/city/ST GEORGES)"),
874            full_line(UUID1, TS2, "CoreSession::setVariable(X-C911P-Region, SGS)"),
875        ];
876        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
877        assert_eq!(entries.len(), 3);
878        assert_eq!(
879            entries[0].message,
880            "CoreSession::setVariable(X-C911P-City, ST GEORGES)"
881        );
882        assert!(entries[0].attached.is_empty());
883        assert!(entries[1].message.starts_with("EXECUTE "));
884        assert_eq!(entries[1].kind, LineKind::UuidContinuation);
885        assert_eq!(
886            entries[2].message,
887            "CoreSession::setVariable(X-C911P-Region, SGS)"
888        );
889    }
890
891    #[test]
892    fn multiple_execute_between_full_lines() {
893        let lines = vec![
894            full_line(UUID1, TS1, "CoreSession::setVariable(ngcs_call_id, urn:emergency:uid:callid:test)"),
895            format!("{UUID1} EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 db(insert/ng_{UUID1}/call_id/urn:emergency:uid:callid:test)"),
896            format!("{UUID1} EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 db(insert/callid_codecs/urn:emergency:uid:callid:test/PCMU@8000h)"),
897            full_line(UUID1, TS2, "CoreSession::setVariable(ngcs_short_call_id, test)"),
898        ];
899        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
900        assert_eq!(entries.len(), 4);
901        assert!(entries[0].attached.is_empty());
902        assert!(entries[1].message.contains("call_id"));
903        assert!(entries[2].message.contains("callid_codecs"));
904        assert_eq!(
905            entries[3].message,
906            "CoreSession::setVariable(ngcs_short_call_id, test)"
907        );
908    }
909
910    #[test]
911    fn uuid_continuation_different_uuid_yields() {
912        let lines = vec![
913            full_line(UUID1, TS1, "First"),
914            format!("{UUID1} Channel-State: [CS_EXECUTE]"),
915            format!("{UUID2} Dialplan: sofia/internal/+15550001234@192.0.2.1 parsing [public]"),
916        ];
917        let mut stream = LogStream::new(lines.into_iter());
918        let first = stream.next().unwrap();
919        assert_eq!(first.uuid, UUID1);
920        assert_eq!(first.attached.len(), 1);
921        let second = stream.next().unwrap();
922        assert_eq!(second.uuid, UUID2);
923        assert_eq!(
924            second.message,
925            "Dialplan: sofia/internal/+15550001234@192.0.2.1 parsing [public]"
926        );
927    }
928
929    // --- New: Block detection tests ---
930
931    #[test]
932    fn channel_data_block_fields_and_variables() {
933        let lines = vec![
934            full_line(UUID1, TS1, "CHANNEL_DATA:"),
935            format!("{UUID1} Channel-Name: [sofia/internal/+15550001234@192.0.2.1]"),
936            format!("{UUID1} Channel-State: [CS_EXECUTE]"),
937            format!("{UUID1} Unique-ID: [{UUID1}]"),
938            "variable_sip_call_id: [test123@192.0.2.1]".to_string(),
939            "variable_direction: [inbound]".to_string(),
940        ];
941        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
942        assert_eq!(entries.len(), 1);
943        assert_eq!(entries[0].message_kind, MessageKind::ChannelData);
944        let block = entries[0].block.as_ref().expect("should have block");
945        match block {
946            Block::ChannelData { fields, variables } => {
947                assert_eq!(fields.len(), 3);
948                assert_eq!(
949                    fields[0],
950                    (
951                        "Channel-Name".to_string(),
952                        "sofia/internal/+15550001234@192.0.2.1".to_string()
953                    )
954                );
955                assert_eq!(
956                    fields[1],
957                    ("Channel-State".to_string(), "CS_EXECUTE".to_string())
958                );
959                assert_eq!(fields[2], ("Unique-ID".to_string(), UUID1.to_string()));
960                assert_eq!(variables.len(), 2);
961                assert_eq!(
962                    variables[0],
963                    (
964                        "variable_sip_call_id".to_string(),
965                        "test123@192.0.2.1".to_string()
966                    )
967                );
968                assert_eq!(
969                    variables[1],
970                    ("variable_direction".to_string(), "inbound".to_string())
971                );
972            }
973            other => panic!("expected ChannelData block, got {other:?}"),
974        }
975    }
976
977    #[test]
978    fn channel_data_multiline_variable_reassembly() {
979        let lines = vec![
980            full_line(UUID1, TS1, "CHANNEL_DATA:"),
981            format!("{UUID1} Channel-Name: [sofia/internal/+15550001234@192.0.2.1]"),
982            "variable_switch_r_sdp: [v=0".to_string(),
983            "o=- 1234 5678 IN IP4 192.0.2.1".to_string(),
984            "s=-".to_string(),
985            "c=IN IP4 192.0.2.1".to_string(),
986            "m=audio 47758 RTP/AVP 0 101".to_string(),
987            "a=rtpmap:0 PCMU/8000".to_string(),
988            "]".to_string(),
989            "variable_direction: [inbound]".to_string(),
990        ];
991        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
992        assert_eq!(entries.len(), 1);
993        let block = entries[0].block.as_ref().expect("should have block");
994        match block {
995            Block::ChannelData { fields, variables } => {
996                assert_eq!(fields.len(), 1);
997                assert_eq!(variables.len(), 2);
998                assert_eq!(variables[0].0, "variable_switch_r_sdp");
999                assert!(variables[0].1.starts_with("v=0\n"));
1000                assert!(variables[0].1.contains("m=audio 47758 RTP/AVP 0 101"));
1001                assert!(!variables[0].1.ends_with(']'));
1002                assert_eq!(
1003                    variables[1],
1004                    ("variable_direction".to_string(), "inbound".to_string())
1005                );
1006            }
1007            other => panic!("expected ChannelData block, got {other:?}"),
1008        }
1009        assert_eq!(entries[0].attached.len(), 9);
1010    }
1011
1012    #[test]
1013    fn sdp_block_detection() {
1014        let lines = vec![
1015            full_line(UUID1, TS1, "Local SDP:"),
1016            "v=0".to_string(),
1017            "o=- 1234 5678 IN IP4 192.0.2.1".to_string(),
1018            "s=-".to_string(),
1019            "c=IN IP4 192.0.2.1".to_string(),
1020            "m=audio 10000 RTP/AVP 0".to_string(),
1021            "a=rtpmap:0 PCMU/8000".to_string(),
1022        ];
1023        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1024        assert_eq!(entries.len(), 1);
1025        match &entries[0].message_kind {
1026            MessageKind::SdpMarker { direction } => assert_eq!(*direction, SdpDirection::Local),
1027            other => panic!("expected SdpMarker, got {other:?}"),
1028        }
1029        let block = entries[0].block.as_ref().expect("should have block");
1030        match block {
1031            Block::Sdp { direction, body } => {
1032                assert_eq!(*direction, SdpDirection::Local);
1033                assert_eq!(body.len(), 6);
1034                assert_eq!(body[0], "v=0");
1035                assert_eq!(body[5], "a=rtpmap:0 PCMU/8000");
1036            }
1037            other => panic!("expected Sdp block, got {other:?}"),
1038        }
1039    }
1040
1041    #[test]
1042    fn sdp_block_terminated_by_primary_line() {
1043        let lines = vec![
1044            full_line(UUID1, TS1, "Remote SDP:"),
1045            "v=0".to_string(),
1046            "m=audio 10000 RTP/AVP 0".to_string(),
1047            full_line(UUID1, TS2, "Next event"),
1048        ];
1049        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1050        assert_eq!(entries.len(), 2);
1051        let block = entries[0].block.as_ref().expect("should have block");
1052        match block {
1053            Block::Sdp { direction, body } => {
1054                assert_eq!(*direction, SdpDirection::Remote);
1055                assert_eq!(body.len(), 2);
1056            }
1057            other => panic!("expected Sdp block, got {other:?}"),
1058        }
1059        assert!(entries[1].block.is_none());
1060    }
1061
1062    #[test]
1063    fn sdp_from_uuid_continuation() {
1064        let lines = vec![
1065            format!("{UUID1} Local SDP:"),
1066            format!("{UUID1} v=0"),
1067            format!("{UUID1} o=FreeSWITCH 1234 5678 IN IP4 192.0.2.1"),
1068            format!("{UUID1} s=FreeSWITCH"),
1069            format!("{UUID1} c=IN IP4 192.0.2.1"),
1070        ];
1071        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1072        assert_eq!(entries.len(), 1);
1073        let block = entries[0].block.as_ref().expect("should have block");
1074        match block {
1075            Block::Sdp { direction, body } => {
1076                assert_eq!(*direction, SdpDirection::Local);
1077                assert_eq!(body.len(), 4);
1078                assert_eq!(body[0], "v=0");
1079            }
1080            other => panic!("expected Sdp block, got {other:?}"),
1081        }
1082    }
1083
1084    #[test]
1085    fn channel_data_interrupted_by_different_uuid() {
1086        let lines = vec![
1087            full_line(UUID1, TS1, "CHANNEL_DATA:"),
1088            format!("{UUID1} Channel-Name: [sofia/internal/+15550001234@192.0.2.1]"),
1089            format!("{UUID2} Dialplan: sofia/internal/+15559999999@192.0.2.1 parsing [public]"),
1090        ];
1091        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1092        assert_eq!(entries.len(), 2);
1093        let block = entries[0].block.as_ref().expect("should have block");
1094        match block {
1095            Block::ChannelData { fields, .. } => {
1096                assert_eq!(fields.len(), 1);
1097            }
1098            other => panic!("expected ChannelData, got {other:?}"),
1099        }
1100    }
1101
1102    #[test]
1103    fn no_block_for_non_block_message() {
1104        let lines = vec![full_line(UUID1, TS1, "some random freeswitch log message")];
1105        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1106        assert_eq!(entries.len(), 1);
1107        assert!(entries[0].block.is_none());
1108        assert_eq!(entries[0].message_kind, MessageKind::General);
1109    }
1110
1111    #[test]
1112    fn message_kind_on_execute() {
1113        let lines = vec![
1114            full_line(UUID1, TS1, "First"),
1115            format!("{UUID1} EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 set(foo=bar)"),
1116        ];
1117        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1118        assert_eq!(entries.len(), 2);
1119        match &entries[1].message_kind {
1120            MessageKind::Execute {
1121                application,
1122                arguments,
1123                ..
1124            } => {
1125                assert_eq!(application, "set");
1126                assert_eq!(arguments, "foo=bar");
1127            }
1128            other => panic!("expected Execute, got {other:?}"),
1129        }
1130    }
1131
1132    // --- New: ParseStats tests ---
1133
1134    #[test]
1135    fn stats_lines_processed() {
1136        let lines = vec![
1137            full_line(UUID1, TS1, "First"),
1138            full_line(UUID1, TS2, "Second"),
1139            format!("{UUID1} Channel-State: [CS_EXECUTE]"),
1140        ];
1141        let mut stream = LogStream::new(lines.into_iter());
1142        let _: Vec<_> = stream.by_ref().collect();
1143        assert_eq!(stream.stats().lines_processed, 3);
1144    }
1145
1146    #[test]
1147    fn stats_unclassified_orphan() {
1148        let lines = vec![
1149            "variable_foo: [bar]".to_string(),
1150            full_line(UUID1, TS1, "After orphan"),
1151        ];
1152        let mut stream = LogStream::new(lines.into_iter())
1153            .unclassified_tracking(UnclassifiedTracking::TrackLines);
1154        let _: Vec<_> = stream.by_ref().collect();
1155        assert_eq!(stream.stats().lines_unclassified, 1);
1156        assert_eq!(stream.stats().unclassified_lines.len(), 1);
1157        assert_eq!(
1158            stream.stats().unclassified_lines[0].reason,
1159            UnclassifiedReason::OrphanContinuation,
1160        );
1161    }
1162
1163    #[test]
1164    fn stats_capture_data() {
1165        let lines = vec!["orphan line".to_string(), full_line(UUID1, TS1, "After")];
1166        let mut stream = LogStream::new(lines.into_iter())
1167            .unclassified_tracking(UnclassifiedTracking::CaptureData);
1168        let _: Vec<_> = stream.by_ref().collect();
1169        assert_eq!(stream.stats().unclassified_lines.len(), 1);
1170        assert_eq!(
1171            stream.stats().unclassified_lines[0].data.as_deref(),
1172            Some("orphan line"),
1173        );
1174    }
1175
1176    #[test]
1177    fn stats_count_only_no_allocation() {
1178        let lines = vec!["orphan line".to_string(), full_line(UUID1, TS1, "After")];
1179        let mut stream = LogStream::new(lines.into_iter());
1180        let _: Vec<_> = stream.by_ref().collect();
1181        assert_eq!(stream.stats().lines_unclassified, 1);
1182        assert!(stream.stats().unclassified_lines.is_empty());
1183    }
1184
1185    #[test]
1186    fn line_number_tracking() {
1187        let lines = vec![
1188            full_line(UUID1, TS1, "First"),
1189            format!("{UUID1} Channel-State: [CS_EXECUTE]"),
1190            full_line(UUID2, TS2, "Third"),
1191        ];
1192        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1193        assert_eq!(entries[0].line_number, 1);
1194        assert_eq!(entries[1].line_number, 3);
1195    }
1196
1197    #[test]
1198    fn drain_unclassified() {
1199        let lines = vec![
1200            "orphan1".to_string(),
1201            "orphan2".to_string(),
1202            full_line(UUID1, TS1, "After"),
1203        ];
1204        let mut stream = LogStream::new(lines.into_iter())
1205            .unclassified_tracking(UnclassifiedTracking::TrackLines);
1206        let _: Vec<_> = stream.by_ref().collect();
1207        let drained = stream.drain_unclassified();
1208        assert_eq!(drained.len(), 1);
1209        assert!(stream.stats().unclassified_lines.is_empty());
1210        assert_eq!(stream.stats().lines_unclassified, 1);
1211    }
1212
1213    // BUG 1: When LogStream processes a TrackedChain of multiple file segments,
1214    // last_timestamp from the previous segment bleeds into continuation lines
1215    // at the start of the next segment. This causes entries to get timestamps
1216    // from a completely different file (potentially hours earlier).
1217    //
1218    // Reproduces: f2cb66d4 getting timestamp 23:58:03 from the rotated file
1219    // when freeswitch.log starts with its continuation lines.
1220    #[test]
1221    fn continuation_lines_at_file_boundary_must_not_inherit_previous_timestamp() {
1222        use crate::TrackedChain;
1223
1224        let uuid_a = "aaaaaaaa-1111-2222-3333-444444444444";
1225        let uuid_b = "bbbbbbbb-1111-2222-3333-444444444444";
1226        let ts_old = "2025-01-15 23:58:03.000000";
1227        let ts_new = "2025-01-16 08:37:12.000000";
1228
1229        let seg1: Vec<String> = vec![format!(
1230            "{uuid_a} {ts_old} 95.00% [DEBUG] test.c:1 Last line in rotated file"
1231        )];
1232
1233        // Segment 2 starts with UUID-continuation lines (Format C: UUID + message, no timestamp)
1234        // followed by a real timestamped line
1235        let seg2: Vec<String> = vec![
1236            format!("{uuid_b} CHANNEL_DATA:"),
1237            format!("{uuid_b} Channel-State: [CS_EXECUTE]"),
1238            format!("{uuid_b} {ts_new} 95.00% [DEBUG] test.c:1 First timestamped line in new file"),
1239        ];
1240
1241        let segments: Vec<(String, Box<dyn Iterator<Item = String>>)> = vec![
1242            ("rotated.log".to_string(), Box::new(seg1.into_iter())),
1243            ("freeswitch.log".to_string(), Box::new(seg2.into_iter())),
1244        ];
1245
1246        let (chain, _) = TrackedChain::new(segments);
1247        let entries: Vec<_> = LogStream::new(chain).collect();
1248
1249        let b_entry = entries
1250            .iter()
1251            .find(|e| e.uuid == uuid_b)
1252            .expect("should find entry for uuid_b");
1253
1254        // The CHANNEL_DATA entry for uuid_b must NOT have the timestamp from
1255        // segment 1 — it should either have the new file's first real timestamp
1256        // or be empty (indicating unknown).
1257        assert_ne!(
1258            b_entry.timestamp, ts_old,
1259            "continuation lines in a new file segment inherited timestamp \
1260             '{ts_old}' from the previous segment — timestamps must not bleed \
1261             across file boundaries"
1262        );
1263    }
1264
1265    // --- Line accounting tests ---
1266
1267    fn assert_accounting(stream: &LogStream<impl Iterator<Item = String>>) {
1268        let stats = stream.stats();
1269        assert_eq!(
1270            stats.unaccounted_lines(),
1271            0,
1272            "line accounting invariant violated: \
1273             processed={} + split={} != in_entries={} + empty_orphan={}",
1274            stats.lines_processed,
1275            stats.lines_split,
1276            stats.lines_in_entries,
1277            stats.lines_empty_orphan,
1278        );
1279    }
1280
1281    #[test]
1282    fn accounting_full_lines() {
1283        let lines = vec![
1284            full_line(UUID1, TS1, "First"),
1285            full_line(UUID2, TS2, "Second"),
1286        ];
1287        let mut stream = LogStream::new(lines.into_iter());
1288        let entries: Vec<_> = stream.by_ref().collect();
1289        assert_eq!(entries.len(), 2);
1290        assert_eq!(stream.stats().lines_in_entries, 2);
1291        assert_accounting(&stream);
1292    }
1293
1294    #[test]
1295    fn accounting_with_attached() {
1296        let lines = vec![
1297            full_line(UUID1, TS1, "CHANNEL_DATA:"),
1298            format!("{UUID1} Channel-State: [CS_EXECUTE]"),
1299            "variable_foo: [bar]".to_string(),
1300            full_line(UUID2, TS2, "Next"),
1301        ];
1302        let mut stream = LogStream::new(lines.into_iter());
1303        let entries: Vec<_> = stream.by_ref().collect();
1304        assert_eq!(entries.len(), 2);
1305        // Entry 1: 1 primary + 2 attached = 3 lines
1306        // Entry 2: 1 primary = 1 line
1307        assert_eq!(stream.stats().lines_in_entries, 4);
1308        assert_accounting(&stream);
1309    }
1310
1311    #[test]
1312    fn accounting_system_line() {
1313        let lines = vec![format!(
1314            "{TS1} 95.97% [NOTICE] mod_logfile.c:217 New log started."
1315        )];
1316        let mut stream = LogStream::new(lines.into_iter());
1317        let _: Vec<_> = stream.by_ref().collect();
1318        assert_eq!(stream.stats().lines_in_entries, 1);
1319        assert_accounting(&stream);
1320    }
1321
1322    #[test]
1323    fn accounting_empty_orphan() {
1324        let lines = vec![
1325            String::new(),
1326            "   ".to_string(),
1327            full_line(UUID1, TS1, "After"),
1328        ];
1329        let mut stream = LogStream::new(lines.into_iter());
1330        let entries: Vec<_> = stream.by_ref().collect();
1331        assert_eq!(entries.len(), 1);
1332        assert_eq!(stream.stats().lines_empty_orphan, 2);
1333        assert_accounting(&stream);
1334    }
1335
1336    #[test]
1337    fn accounting_empty_attached() {
1338        let lines = vec![
1339            full_line(UUID1, TS1, "First"),
1340            String::new(),
1341            "continuation".to_string(),
1342        ];
1343        let mut stream = LogStream::new(lines.into_iter());
1344        let entries: Vec<_> = stream.by_ref().collect();
1345        assert_eq!(entries.len(), 1);
1346        assert_eq!(entries[0].attached.len(), 2);
1347        assert_eq!(stream.stats().lines_empty_orphan, 0);
1348        assert_eq!(stream.stats().lines_in_entries, 3);
1349        assert_accounting(&stream);
1350    }
1351
1352    #[test]
1353    fn accounting_orphan_continuation() {
1354        let lines = vec!["orphan line".to_string(), full_line(UUID1, TS1, "After")];
1355        let mut stream = LogStream::new(lines.into_iter());
1356        let _: Vec<_> = stream.by_ref().collect();
1357        assert_accounting(&stream);
1358    }
1359
1360    #[test]
1361    fn accounting_codec_merging() {
1362        let lines = vec![
1363            full_line(
1364                UUID1,
1365                TS1,
1366                "Audio Codec Compare [PCMU:0:8000:20:64000:1]/[PCMU:0:8000:20:64000:1]",
1367            ),
1368            full_line(
1369                UUID1,
1370                TS1,
1371                "Audio Codec Compare [PCMU:0:8000:20:64000:1] is saved as a match",
1372            ),
1373            full_line(UUID2, TS2, "Next"),
1374        ];
1375        let mut stream = LogStream::new(lines.into_iter());
1376        let _: Vec<_> = stream.by_ref().collect();
1377        assert_accounting(&stream);
1378    }
1379
1380    #[test]
1381    fn accounting_truncated_line() {
1382        let lines = vec![
1383            full_line(UUID1, TS1, "First"),
1384            format!(
1385                "varia{UUID2} EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 set(x=y)"
1386            ),
1387        ];
1388        let mut stream = LogStream::new(lines.into_iter());
1389        let _: Vec<_> = stream.by_ref().collect();
1390        assert_accounting(&stream);
1391    }
1392
1393    #[test]
1394    fn accounting_long_line_collision_split() {
1395        // Simulate a long variable value exceeding mod_logfile's 2048-byte buffer,
1396        // followed by a collision UUID on the same physical line.
1397        let long_value = "x".repeat(MAX_LINE_PAYLOAD + 10);
1398        let line = format!(
1399            "variable_sip_multipart: [{long_value}]{UUID2} EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 set(foo=bar)"
1400        );
1401        let lines = vec![full_line(UUID1, TS1, "CHANNEL_DATA:"), line];
1402        let mut stream = LogStream::new(lines.into_iter());
1403        let entries: Vec<_> = stream.by_ref().collect();
1404
1405        // The CHANNEL_DATA entry should have the truncated variable as attached
1406        assert_eq!(entries[0].message, "CHANNEL_DATA:");
1407
1408        // The collision should have been split out as a separate entry
1409        let split_entry = entries.iter().find(|e| e.uuid == UUID2);
1410        assert!(
1411            split_entry.is_some(),
1412            "collision UUID should produce a separate entry"
1413        );
1414
1415        assert_eq!(stream.stats().lines_split, 1);
1416        assert_accounting(&stream);
1417    }
1418
1419    #[test]
1420    fn no_split_on_short_lines() {
1421        // Lines within the payload limit should never be split,
1422        // even if they happen to contain a UUID-like pattern.
1423        let line = format!("variable_call_uuid: [{UUID2}]");
1424        let lines = vec![full_line(UUID1, TS1, "CHANNEL_DATA:"), line];
1425        let mut stream = LogStream::new(lines.into_iter());
1426        let entries: Vec<_> = stream.by_ref().collect();
1427        assert_eq!(entries.len(), 1);
1428        assert_eq!(stream.stats().lines_split, 0);
1429        assert_accounting(&stream);
1430    }
1431
1432    #[test]
1433    fn timestamp_collision_splits_system_lines() {
1434        let line = format!(
1435            "{TS1} 98.03% [INFO] mod_event_socket.c:1752 Event Socket Command from ::1:42864: api sofia jsonstatus{TS2} 97.93% [INFO] mod_event_socket.c:1752 Event Socket Command from ::1:42898: api fsctl pause_check"
1436        );
1437        let mut stream = LogStream::new(std::iter::once(line));
1438        let entries: Vec<_> = stream.by_ref().collect();
1439        assert_eq!(entries.len(), 2);
1440        assert_eq!(
1441            entries[0].message,
1442            "Event Socket Command from ::1:42864: api sofia jsonstatus"
1443        );
1444        assert_eq!(
1445            entries[1].message,
1446            "Event Socket Command from ::1:42898: api fsctl pause_check"
1447        );
1448        assert_eq!(stream.stats().lines_split, 1);
1449        assert_accounting(&stream);
1450    }
1451
1452    #[test]
1453    fn timestamp_collision_splits_three_entries() {
1454        let ts3 = "2025-01-15 10:30:47.345678";
1455        let line = format!(
1456            "{TS1} 95.00% [INFO] mod.c:1 first{TS2} 96.00% [INFO] mod.c:1 second{ts3} 97.00% [INFO] mod.c:1 third"
1457        );
1458        let mut stream = LogStream::new(std::iter::once(line));
1459        let entries: Vec<_> = stream.by_ref().collect();
1460        assert_eq!(entries.len(), 3);
1461        assert_eq!(entries[0].message, "first");
1462        assert_eq!(entries[1].message, "second");
1463        assert_eq!(entries[2].message, "third");
1464        assert_eq!(stream.stats().lines_split, 2);
1465        assert_accounting(&stream);
1466    }
1467
1468    #[test]
1469    fn timestamp_collision_with_uuid_prefix() {
1470        // System line collides with Full line (UUID + timestamp)
1471        let line = format!(
1472            "{TS1} 95.00% [INFO] mod.c:1 first{UUID1} {TS2} 96.00% [DEBUG] sofia.c:100 second"
1473        );
1474        let mut stream = LogStream::new(std::iter::once(line));
1475        let entries: Vec<_> = stream.by_ref().collect();
1476        assert_eq!(entries.len(), 2);
1477        assert_eq!(entries[0].message, "first");
1478        assert_eq!(entries[1].uuid, UUID1);
1479        assert_eq!(entries[1].message, "second");
1480        assert_eq!(stream.stats().lines_split, 1);
1481        assert_accounting(&stream);
1482    }
1483
1484    #[test]
1485    fn channel_data_multiline_variable_spans_many_lines() {
1486        let lines = vec![
1487            full_line(UUID1, TS1, "CHANNEL_DATA:"),
1488            format!("{UUID1} Channel-Name: [sofia/internal/+15550001234@192.0.2.1]"),
1489            "variable_switch_r_sdp: [v=0".to_string(),
1490            "o=- 1234 5678 IN IP4 192.0.2.1".to_string(),
1491            "s=-".to_string(),
1492            "c=IN IP4 192.0.2.1".to_string(),
1493            "t=0 0".to_string(),
1494            "m=audio 47758 RTP/AVP 0 8 101".to_string(),
1495            "a=rtpmap:0 PCMU/8000".to_string(),
1496            "a=rtpmap:8 PCMA/8000".to_string(),
1497            "a=rtpmap:101 telephone-event/8000".to_string(),
1498            "a=fmtp:101 0-16".to_string(),
1499            "]".to_string(),
1500            "variable_direction: [inbound]".to_string(),
1501        ];
1502        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1503        assert_eq!(entries.len(), 1);
1504        let block = entries[0].block.as_ref().expect("should have block");
1505        match block {
1506            Block::ChannelData { fields, variables } => {
1507                assert_eq!(fields.len(), 1);
1508                assert_eq!(variables.len(), 2);
1509                assert_eq!(variables[0].0, "variable_switch_r_sdp");
1510                let sdp = &variables[0].1;
1511                assert!(sdp.starts_with("v=0\n"));
1512                assert!(sdp.contains("a=fmtp:101 0-16"));
1513                assert!(!sdp.ends_with(']'));
1514                assert_eq!(variables[1].0, "variable_direction");
1515            }
1516            other => panic!("expected ChannelData block, got {other:?}"),
1517        }
1518    }
1519
1520    #[test]
1521    fn sdp_from_verto_update_media() {
1522        let lines = vec![
1523            full_line(UUID1, TS1, "updateMedia: Local SDP"),
1524            "v=0".to_string(),
1525            "o=- 1234 5678 IN IP4 192.0.2.1".to_string(),
1526            "m=audio 10000 RTP/AVP 0".to_string(),
1527        ];
1528        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1529        assert_eq!(entries.len(), 1);
1530        match &entries[0].message_kind {
1531            MessageKind::SdpMarker { direction } => assert_eq!(*direction, SdpDirection::Local),
1532            other => panic!("expected SdpMarker, got {other:?}"),
1533        }
1534        let block = entries[0].block.as_ref().expect("should have block");
1535        match block {
1536            Block::Sdp { direction, body } => {
1537                assert_eq!(*direction, SdpDirection::Local);
1538                assert_eq!(body.len(), 3);
1539            }
1540            other => panic!("expected Sdp block, got {other:?}"),
1541        }
1542    }
1543
1544    #[test]
1545    fn duplicate_sdp_marker() {
1546        let lines = vec![
1547            full_line(UUID1, TS1, "Duplicate SDP"),
1548            "v=0".to_string(),
1549            "m=audio 10000 RTP/AVP 0".to_string(),
1550        ];
1551        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1552        assert_eq!(entries.len(), 1);
1553        match &entries[0].message_kind {
1554            MessageKind::SdpMarker { direction } => assert_eq!(*direction, SdpDirection::Unknown),
1555            other => panic!("expected SdpMarker, got {other:?}"),
1556        }
1557        assert!(entries[0].block.is_some());
1558    }
1559
1560    #[test]
1561    fn warning_on_unclosed_multiline_variable() {
1562        let lines = vec![
1563            full_line(UUID1, TS1, "CHANNEL_DATA:"),
1564            "variable_switch_r_sdp: [v=0".to_string(),
1565            "o=- 1234 5678 IN IP4 192.0.2.1".to_string(),
1566            full_line(UUID2, TS2, "Next entry"),
1567        ];
1568        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1569        assert_eq!(entries.len(), 2);
1570        assert!(
1571            entries[0]
1572                .warnings
1573                .iter()
1574                .any(|w| w.contains("unclosed multi-line variable")),
1575            "expected unclosed variable warning, got: {:?}",
1576            entries[0].warnings
1577        );
1578    }
1579
1580    #[test]
1581    fn warning_on_unparseable_channel_data_line() {
1582        let lines = vec![
1583            full_line(UUID1, TS1, "CHANNEL_DATA:"),
1584            format!("{UUID1} Channel-Name: [sofia/internal/+15550001234@192.0.2.1]"),
1585            format!("{UUID1} this is not a valid field line"),
1586        ];
1587        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1588        assert_eq!(entries.len(), 1);
1589        assert!(
1590            entries[0]
1591                .warnings
1592                .iter()
1593                .any(|w| w.contains("unparseable CHANNEL_DATA")),
1594            "expected unparseable warning, got: {:?}",
1595            entries[0].warnings
1596        );
1597    }
1598
1599    #[test]
1600    fn warning_on_unexpected_codec_continuation() {
1601        let lines = vec![
1602            full_line(
1603                UUID1,
1604                TS1,
1605                "Audio Codec Compare [PCMU:0:8000:20:64000:1]/[PCMU:0:8000:20:64000:1]",
1606            ),
1607            format!("{UUID1} some unexpected continuation line"),
1608        ];
1609        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1610        assert_eq!(entries.len(), 1);
1611        assert!(
1612            entries[0]
1613                .warnings
1614                .iter()
1615                .any(|w| w.contains("unexpected codec negotiation")),
1616            "expected codec warning, got: {:?}",
1617            entries[0].warnings
1618        );
1619    }
1620
1621    #[test]
1622    fn system_line_uuid_continuation_not_absorbed() {
1623        // After the bug fix, a UUID continuation should NOT be absorbed
1624        // by a pending system line (empty UUID).
1625        let lines = vec![
1626            format!("{TS1} 95.97% [INFO] mod_event_socket.c:1772 Event Socket command"),
1627            format!("{UUID1} Channel-State: [CS_EXECUTE]"),
1628        ];
1629        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1630        assert_eq!(
1631            entries.len(),
1632            2,
1633            "UUID continuation should not be absorbed by system entry"
1634        );
1635        assert_eq!(entries[0].uuid, "");
1636        assert_eq!(entries[1].uuid, UUID1);
1637    }
1638
1639    #[test]
1640    fn truncated_collision_in_channel_data_variable() {
1641        // A CHANNEL_DATA block where a variable value exceeds the 2048-byte
1642        // mod_logfile buffer, causing a truncated collision (Format E).
1643        // The variable_long_xml value opens with [ but the buffer truncation
1644        // causes a UUID+EXECUTE to collide on the same physical line before
1645        // the closing ].
1646        let padding = "x".repeat(2000);
1647        let collision_line = format!(
1648            "{UUID1} variable_long_xml: [{padding}{UUID1} EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 export(foo=bar)"
1649        );
1650        assert!(
1651            collision_line.len() > super::MAX_LINE_PAYLOAD,
1652            "test line must exceed buffer limit, got {}",
1653            collision_line.len()
1654        );
1655
1656        let lines = vec![
1657            full_line(UUID1, TS1, "CHANNEL_DATA:"),
1658            format!("{UUID1} Channel-Name: [sofia/internal/+15550001234@192.0.2.1]"),
1659            format!("{UUID1} variable_direction: [inbound]"),
1660            collision_line,
1661            full_line(UUID1, TS2, "Next log entry"),
1662        ];
1663
1664        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1665
1666        // Entry 0: CHANNEL_DATA with the variables
1667        assert_eq!(entries[0].message, "CHANNEL_DATA:");
1668        let block = entries[0].block.as_ref().expect("should have block");
1669        match block {
1670            Block::ChannelData { fields, variables } => {
1671                assert_eq!(fields.len(), 1, "should have Channel-Name field");
1672                assert_eq!(fields[0].0, "Channel-Name");
1673                assert_eq!(
1674                    variables.len(),
1675                    2,
1676                    "should have direction + unclosed long_xml"
1677                );
1678                assert_eq!(variables[0].0, "variable_direction");
1679                assert_eq!(variables[0].1, "inbound");
1680                assert_eq!(variables[1].0, "variable_long_xml");
1681            }
1682            other => panic!("expected ChannelData block, got {other:?}"),
1683        }
1684        assert!(
1685            entries[0]
1686                .warnings
1687                .iter()
1688                .any(|w| w.contains("line exceeds mod_logfile 2048-byte buffer")),
1689            "expected buffer overflow warning, got: {:?}",
1690            entries[0].warnings
1691        );
1692        assert!(
1693            entries[0]
1694                .warnings
1695                .iter()
1696                .any(|w| w.contains("unclosed multi-line variable")),
1697            "expected unclosed variable warning, got: {:?}",
1698            entries[0].warnings
1699        );
1700
1701        // Entry 1: the split EXECUTE line
1702        assert_eq!(entries[1].uuid, UUID1);
1703        assert!(
1704            entries[1].message.starts_with("EXECUTE "),
1705            "split entry should be EXECUTE, got: {}",
1706            entries[1].message
1707        );
1708
1709        // Entry 2: the next full log line
1710        assert_eq!(entries.len(), 3);
1711        assert_eq!(entries[2].message, "Next log entry");
1712    }
1713
1714    #[test]
1715    fn channel_data_uuid_drops_mid_block() {
1716        // Production scenario: mod_logfile stops prepending the UUID mid-way
1717        // through a CHANNEL_DATA dump. The first few variable lines carry the
1718        // UUID prefix (UuidContinuation), then the remaining lines arrive as
1719        // bare continuations. All should be accumulated into the same block.
1720        let lines = vec![
1721            full_line(UUID1, TS1, "CHANNEL_DATA:"),
1722            format!("{UUID1} variable_max_forwards: [69]"),
1723            format!("{UUID1} variable_presence_id: [1251@[2001:db8::10]]"),
1724            format!("{UUID1} variable_sip_h_X-Custom-ID: [c4da84eb-88a7-40b2-b90d-e5bc2a0f634e]"),
1725            // UUID drops — bare continuations for the rest
1726            "variable_sip_h_X-Call-Info: [<urn:test:callid:20260316>;purpose=emergency-CallId]"
1727                .to_string(),
1728            "variable_ep_codec_string: [mod_opus.opus@48000h@20i@2c]".to_string(),
1729            "variable_remote_media_ip: [2001:db8::10]".to_string(),
1730            "variable_remote_media_port: [9952]".to_string(),
1731            "variable_rtp_use_codec_name: [opus]".to_string(),
1732            full_line(UUID1, TS2, "Next entry"),
1733        ];
1734
1735        let mut stream = LogStream::new(lines.into_iter());
1736        let entries: Vec<_> = stream.by_ref().collect();
1737
1738        assert_eq!(entries.len(), 2);
1739        assert_eq!(entries[0].message, "CHANNEL_DATA:");
1740        let block = entries[0].block.as_ref().expect("should have block");
1741        match block {
1742            Block::ChannelData { fields, variables } => {
1743                assert_eq!(fields.len(), 0);
1744                assert_eq!(variables.len(), 8);
1745                // UUID-prefixed variables
1746                assert_eq!(variables[0].0, "variable_max_forwards");
1747                assert_eq!(variables[0].1, "69");
1748                assert_eq!(variables[1].0, "variable_presence_id");
1749                assert_eq!(variables[1].1, "1251@[2001:db8::10]");
1750                assert_eq!(variables[2].0, "variable_sip_h_X-Custom-ID");
1751                // Bare variables (UUID dropped)
1752                assert_eq!(variables[3].0, "variable_sip_h_X-Call-Info");
1753                assert!(variables[3].1.contains("emergency-CallId"));
1754                assert_eq!(variables[4].0, "variable_ep_codec_string");
1755                assert_eq!(variables[7].0, "variable_rtp_use_codec_name");
1756                assert_eq!(variables[7].1, "opus");
1757            }
1758            other => panic!("expected ChannelData block, got {other:?}"),
1759        }
1760        assert_eq!(entries[0].attached.len(), 8);
1761        assert_eq!(entries[1].message, "Next entry");
1762        assert_accounting(&stream);
1763    }
1764
1765    #[test]
1766    fn channel_data_uuid_drops_with_multiline_variable() {
1767        // UUID drops mid-block AND a multi-line variable (SDP body embedded
1768        // in variable_switch_r_sdp) spans many bare continuation lines.
1769        // The \r characters are real — SDP uses \r\n per RFC 4566, and
1770        // mod_logfile splits on \n leaving \r in the content.
1771        let lines = vec![
1772            full_line(UUID1, TS1, "CHANNEL_DATA:"),
1773            format!("{UUID1} variable_max_forwards: [69]"),
1774            format!("{UUID1} variable_sip_h_X-Custom-ID: [c4da84eb-88a7-40b2-b90d-e5bc2a0f634e]"),
1775            // UUID drops
1776            "variable_switch_r_sdp: [v=0\r".to_string(),
1777            "o=FreeSWITCH 1773663549 1773663550 IN IP6 2001:db8::10\r".to_string(),
1778            "s=FreeSWITCH\r".to_string(),
1779            "c=IN IP6 2001:db8::10\r".to_string(),
1780            "t=0 0\r".to_string(),
1781            "m=audio 9952 RTP/AVP 102 101 13\r".to_string(),
1782            "a=rtpmap:102 opus/48000/2\r".to_string(),
1783            "a=ptime:20\r".to_string(),
1784            "]".to_string(),
1785            "variable_ep_codec_string: [mod_opus.opus@48000h@20i@2c]".to_string(),
1786            "variable_direction: [inbound]".to_string(),
1787            full_line(UUID1, TS2, "Next entry"),
1788        ];
1789
1790        let mut stream = LogStream::new(lines.into_iter());
1791        let entries: Vec<_> = stream.by_ref().collect();
1792
1793        assert_eq!(entries.len(), 2);
1794        let block = entries[0].block.as_ref().expect("should have block");
1795        match block {
1796            Block::ChannelData { fields, variables } => {
1797                assert_eq!(fields.len(), 0);
1798                assert_eq!(variables.len(), 5);
1799                assert_eq!(variables[0].0, "variable_max_forwards");
1800                assert_eq!(variables[1].0, "variable_sip_h_X-Custom-ID");
1801                // Multi-line SDP variable reassembled from bare continuations
1802                assert_eq!(variables[2].0, "variable_switch_r_sdp");
1803                let sdp = &variables[2].1;
1804                assert!(
1805                    sdp.starts_with("v=0\r\n"),
1806                    "SDP should start with v=0\\r\\n, got: {sdp:?}"
1807                );
1808                assert!(sdp.contains("m=audio 9952 RTP/AVP 102 101 13\r"));
1809                assert!(sdp.contains("a=ptime:20\r"));
1810                assert!(!sdp.ends_with(']'), "closing bracket should be stripped");
1811                // Post-SDP bare variables
1812                assert_eq!(variables[3].0, "variable_ep_codec_string");
1813                assert_eq!(variables[4].0, "variable_direction");
1814                assert_eq!(variables[4].1, "inbound");
1815            }
1816            other => panic!("expected ChannelData block, got {other:?}"),
1817        }
1818        // 2 UUID continuations + 9 SDP lines (open + 7 content + close) + 2 bare = 13
1819        assert_eq!(entries[0].attached.len(), 13);
1820        assert_accounting(&stream);
1821    }
1822
1823    #[test]
1824    fn channel_data_bare_variable_collision_with_execute() {
1825        // Production collision: bare variable_call_uuid line on same physical
1826        // line as a UUID EXECUTE. The UUID appears at byte 20 ("variable_call_uuid: "
1827        // is 20 chars), within find_uuid_in's 50-byte scan window, so Layer 1
1828        // classifies it as Truncated — extracting the UUID and EXECUTE message.
1829        // The CHANNEL_DATA block loses variable_call_uuid (eaten as truncation
1830        // prefix) but correctly recovers the EXECUTE as a separate entry.
1831        let collision = format!(
1832            "variable_call_uuid: {UUID1} EXECUTE [depth=0] \
1833             sofia/internal-v6/1251@[2001:db8::10] export(nolocal:test_var=value)"
1834        );
1835
1836        let lines = vec![
1837            full_line(UUID1, TS1, "CHANNEL_DATA:"),
1838            format!("{UUID1} variable_max_forwards: [69]"),
1839            // UUID drops — bare continuations
1840            "variable_DP_MATCH: [ARRAY::create_conference|:create_conference]".to_string(),
1841            collision,
1842            // Full line resumes normal logging
1843            full_line(
1844                UUID1,
1845                TS2,
1846                "EXPORT (export_vars) (REMOTE ONLY) [test_var]=[value]",
1847            ),
1848        ];
1849
1850        let mut stream = LogStream::new(lines.into_iter());
1851        let entries: Vec<_> = stream.by_ref().collect();
1852
1853        // Entry 0: CHANNEL_DATA — variable_call_uuid lost to truncation prefix
1854        assert_eq!(entries.len(), 3);
1855        let block = entries[0].block.as_ref().expect("should have block");
1856        match block {
1857            Block::ChannelData { fields, variables } => {
1858                assert_eq!(fields.len(), 0);
1859                assert_eq!(variables.len(), 2);
1860                assert_eq!(variables[0].0, "variable_max_forwards");
1861                assert_eq!(variables[1].0, "variable_DP_MATCH");
1862            }
1863            other => panic!("expected ChannelData block, got {other:?}"),
1864        }
1865
1866        // Entry 1: EXECUTE recovered from the Truncated classification
1867        assert_eq!(entries[1].uuid, UUID1);
1868        assert_eq!(entries[1].kind, LineKind::Truncated);
1869        assert!(
1870            entries[1].message.starts_with("EXECUTE "),
1871            "truncated line should yield EXECUTE, got: {}",
1872            entries[1].message
1873        );
1874
1875        // Entry 2: normal EXPORT line
1876        assert_eq!(entries[2].message_kind.label(), "variable");
1877        assert_accounting(&stream);
1878    }
1879
1880    #[test]
1881    fn system_line_with_embedded_uuid_gets_entry_uuid() {
1882        // System lines (Format B) where switch_cpp.cpp logs the UUID at the
1883        // start of the message body should produce entries with the correct UUID.
1884        let lines = vec![
1885            format!(
1886                "{TS1} 95.97% [DEBUG] switch_cpp.cpp:1466 {UUID1} DAA-LOG WaveManager originate"
1887            ),
1888            format!(
1889                "{TS1} 95.97% [WARNING] switch_cpp.cpp:1466 {UUID1} DAA-LOG Failed to create session"
1890            ),
1891            full_line(UUID1, TS2, "State Change CS_EXECUTE -> CS_HIBERNATE"),
1892        ];
1893
1894        let mut stream = LogStream::new(lines.into_iter());
1895        let entries: Vec<_> = stream.by_ref().collect();
1896
1897        assert_eq!(entries.len(), 3);
1898        // Both System lines should have the UUID extracted from the message
1899        assert_eq!(entries[0].uuid, UUID1);
1900        assert_eq!(entries[0].kind, LineKind::System);
1901        assert_eq!(entries[0].message, "DAA-LOG WaveManager originate");
1902
1903        assert_eq!(entries[1].uuid, UUID1);
1904        assert_eq!(entries[1].kind, LineKind::System);
1905        assert_eq!(entries[1].message, "DAA-LOG Failed to create session");
1906
1907        // Full line still works normally
1908        assert_eq!(entries[2].uuid, UUID1);
1909        assert_eq!(entries[2].kind, LineKind::Full);
1910        assert_accounting(&stream);
1911    }
1912}