Skip to main content

freeswitch_log_parser/
stream.rs

1use crate::attached::AttachedLines;
2use crate::level::LogLevel;
3use crate::line::{
4    is_date_at, is_log_header_at, is_uuid_at, parse_line, LineKind, UUID_PREFIX_LEN,
5};
6use crate::message::{classify_message, MessageKind, SdpDirection};
7use std::collections::VecDeque;
8
9/// Structured data extracted from a multi-line dump that follows a primary log entry.
10///
11/// Each variant corresponds to a block type that the stream state machine
12/// recognizes and reassembles from continuation lines.
13#[derive(Debug, Clone, PartialEq, Eq)]
14#[non_exhaustive]
15pub enum Block {
16    /// Channel variable dump — `Channel-*` fields and `variable_*` key-value pairs.
17    /// Multi-line variable values (e.g. embedded SDP) are reassembled with `\n` separators.
18    ChannelData {
19        fields: Vec<(String, String)>,
20        variables: Vec<(String, String)>,
21    },
22    /// SDP session description body, collected line by line.
23    Sdp {
24        direction: SdpDirection,
25        body: Vec<String>,
26    },
27    /// Codec negotiation sequence — offered/local comparisons and selected matches.
28    CodecNegotiation {
29        comparisons: Vec<(String, String)>,
30        selected: Vec<String>,
31    },
32}
33
34/// Controls how much detail is recorded for lines that couldn't be fully classified.
35///
36/// Higher fidelity levels allocate more memory. The default is `CountOnly`.
37#[derive(Debug, Clone, Copy, PartialEq, Eq)]
38pub enum UnclassifiedTracking {
39    /// Increment the counter only — zero allocation.
40    CountOnly,
41    /// Record line number and reason for each unclassified line.
42    TrackLines,
43    /// Like `TrackLines` plus the full line content.
44    CaptureData,
45}
46
47/// Why a line was marked as unclassified.
48#[derive(Debug, Clone, PartialEq, Eq)]
49#[non_exhaustive]
50pub enum UnclassifiedReason {
51    /// Bare continuation line arrived with no pending entry to attach to.
52    OrphanContinuation,
53    /// Line was parsed but the message didn't match any known pattern.
54    UnknownMessageFormat,
55    /// EXECUTE or variable line was only partially readable.
56    TruncatedField,
57}
58
59/// Record of a single unclassified line, captured when tracking is enabled.
60#[derive(Debug, Clone, PartialEq, Eq)]
61pub struct UnclassifiedLine {
62    pub line_number: u64,
63    pub reason: UnclassifiedReason,
64    /// The full line content; only populated under [`UnclassifiedTracking::CaptureData`].
65    pub data: Option<String>,
66}
67
68/// Cumulative parsing statistics, updated as lines flow through the stream.
69#[derive(Debug, Clone, Default)]
70pub struct ParseStats {
71    pub lines_processed: u64,
72    pub lines_unclassified: u64,
73    /// Lines that became part of entries (primary line + attached lines per entry).
74    pub lines_in_entries: u64,
75    /// Empty lines that arrived with no pending entry to attach to.
76    pub lines_empty_orphan: u64,
77    /// Physical lines that were split into multiple logical entries due to
78    /// mod_logfile's 2048-byte snprintf truncation causing same-line collisions.
79    pub lines_split: u64,
80    /// Populated only when tracking is `TrackLines` or `CaptureData`.
81    pub unclassified_lines: Vec<UnclassifiedLine>,
82}
83
84impl ParseStats {
85    /// Lines that were processed but not accounted for by any tracking category.
86    ///
87    /// Returns 0 when the parser correctly accounts for every input line.
88    /// A non-zero value indicates a parser bug — lines were silently lost.
89    ///
90    /// Invariant: `lines_processed + lines_split == lines_in_entries + lines_empty_orphan`
91    pub fn unaccounted_lines(&self) -> u64 {
92        let expected = self.lines_in_entries + self.lines_empty_orphan;
93        let actual = self.lines_processed + self.lines_split;
94        actual.saturating_sub(expected)
95    }
96}
97
98/// A complete parsed log entry with all context resolved.
99///
100/// Produced by [`LogStream`]. Continuation lines have been grouped,
101/// UUID/timestamp inherited from context where needed, and multi-line blocks
102/// reassembled.
103#[derive(Debug)]
104pub struct LogEntry {
105    /// Session UUID, or empty string for system lines.
106    pub uuid: String,
107    /// Timestamp with microsecond precision; inherited from the previous entry for continuations.
108    pub timestamp: String,
109    /// `None` for continuation and truncated lines.
110    pub level: Option<LogLevel>,
111    /// Core scheduler idle percentage; `None` for continuations.
112    pub idle_pct: Option<String>,
113    /// Source file:line; `None` for continuations.
114    pub source: Option<String>,
115    /// The primary message text.
116    pub message: String,
117    /// Which line format originated this entry.
118    pub kind: LineKind,
119    /// Semantic classification of the message content.
120    pub message_kind: MessageKind,
121    /// Typed, parsed multi-line block; `None` for entries without a trailing block.
122    pub block: Option<Block>,
123    /// Raw continuation lines that followed the primary line.
124    pub attached: AttachedLines,
125    /// 1-based line number in the input stream.
126    pub line_number: u64,
127    /// Per-entry warnings about parsing anomalies.
128    pub warnings: Vec<String>,
129}
130
131fn parse_field_line(msg: &str) -> Option<(String, String)> {
132    let colon = msg.find(": ")?;
133    let name = &msg[..colon];
134    if name.contains(' ') || name.is_empty() {
135        return None;
136    }
137    let value_part = &msg[colon + 2..];
138    let value = if let Some(inner) = value_part.strip_prefix('[') {
139        inner.strip_suffix(']').unwrap_or(inner)
140    } else {
141        value_part
142    };
143    Some((name.to_string(), value.to_string()))
144}
145
146enum StreamState {
147    Idle,
148    InChannelData {
149        fields: Vec<(String, String)>,
150        variables: Vec<(String, String)>,
151        open_var_name: Option<String>,
152        open_var_value: Option<String>,
153    },
154    InSdp {
155        direction: SdpDirection,
156        body: Vec<String>,
157    },
158    InCodecNegotiation {
159        comparisons: Vec<(String, String)>,
160        selected: Vec<String>,
161    },
162}
163
164impl StreamState {
165    fn take_idle(&mut self) -> StreamState {
166        std::mem::replace(self, StreamState::Idle)
167    }
168}
169
170/// Layer 2 structural state machine — groups continuation lines, classifies
171/// messages, and detects multi-line blocks (CHANNEL_DATA, SDP, codec negotiation).
172///
173/// Wraps any `Iterator<Item = String>` and yields [`LogEntry`] values.
174/// Maintains `last_uuid` and `last_timestamp` to fill in context for
175/// continuation lines that lack their own.
176///
177/// Use the builder method [`unclassified_tracking()`](LogStream::unclassified_tracking)
178/// to control diagnostic detail before iterating.
179pub struct LogStream<I> {
180    lines: I,
181    last_uuid: String,
182    last_timestamp: String,
183    pending: Option<LogEntry>,
184    state: StreamState,
185    stats: ParseStats,
186    tracking: UnclassifiedTracking,
187    line_number: u64,
188    split_pending: VecDeque<String>,
189    deferred_warning: Option<String>,
190}
191
192impl<I: Iterator<Item = String>> LogStream<I> {
193    /// Create a new stream from any line iterator.
194    pub fn new(lines: I) -> Self {
195        LogStream {
196            lines,
197            last_uuid: String::new(),
198            last_timestamp: String::new(),
199            pending: None,
200            state: StreamState::Idle,
201            stats: ParseStats::default(),
202            tracking: UnclassifiedTracking::CountOnly,
203            line_number: 0,
204            split_pending: VecDeque::new(),
205            deferred_warning: None,
206        }
207    }
208
209    /// Set the unclassified line tracking level (builder pattern). Defaults to `CountOnly`.
210    pub fn unclassified_tracking(mut self, level: UnclassifiedTracking) -> Self {
211        self.tracking = level;
212        self
213    }
214
215    /// Cumulative parsing statistics up to the current position.
216    pub fn stats(&self) -> &ParseStats {
217        &self.stats
218    }
219
220    /// Take all accumulated unclassified line records, leaving the internal vec empty.
221    ///
222    /// The `lines_unclassified` counter is not reset.
223    pub fn drain_unclassified(&mut self) -> Vec<UnclassifiedLine> {
224        std::mem::take(&mut self.stats.unclassified_lines)
225    }
226
227    fn record_unclassified(&mut self, reason: UnclassifiedReason, data: Option<&str>) {
228        self.stats.lines_unclassified += 1;
229        match self.tracking {
230            UnclassifiedTracking::CountOnly => {}
231            UnclassifiedTracking::TrackLines => {
232                self.stats.unclassified_lines.push(UnclassifiedLine {
233                    line_number: self.line_number,
234                    reason,
235                    data: None,
236                });
237            }
238            UnclassifiedTracking::CaptureData => {
239                self.stats.unclassified_lines.push(UnclassifiedLine {
240                    line_number: self.line_number,
241                    reason,
242                    data: data.map(|s| s.to_string()),
243                });
244            }
245        }
246    }
247
248    fn finalize_block(&mut self) -> (Option<Block>, Vec<String>) {
249        let mut warnings = Vec::new();
250        match self.state.take_idle() {
251            StreamState::Idle => (None, warnings),
252            StreamState::InChannelData {
253                fields,
254                mut variables,
255                open_var_name,
256                open_var_value,
257            } => {
258                if let (Some(ref name), Some(value)) = (&open_var_name, open_var_value) {
259                    warnings.push(format!("unclosed multi-line variable: {name}"));
260                    variables.push((name.clone(), value));
261                }
262                (Some(Block::ChannelData { fields, variables }), warnings)
263            }
264            StreamState::InSdp { direction, body } => {
265                (Some(Block::Sdp { direction, body }), warnings)
266            }
267            StreamState::InCodecNegotiation {
268                comparisons,
269                selected,
270            } => (
271                Some(Block::CodecNegotiation {
272                    comparisons,
273                    selected,
274                }),
275                warnings,
276            ),
277        }
278    }
279
280    fn finalize_pending(&mut self) -> Option<LogEntry> {
281        let (block, warnings) = self.finalize_block();
282        if let Some(ref mut p) = self.pending {
283            p.block = block;
284            p.warnings.extend(warnings);
285            self.stats.lines_in_entries += 1 + p.attached.len() as u64;
286        }
287        self.pending.take()
288    }
289
290    fn start_block_for_message(&mut self, message_kind: &MessageKind) {
291        self.state = match message_kind {
292            MessageKind::ChannelData => StreamState::InChannelData {
293                fields: Vec::new(),
294                variables: Vec::new(),
295                open_var_name: None,
296                open_var_value: None,
297            },
298            MessageKind::SdpMarker { direction } => StreamState::InSdp {
299                direction: direction.clone(),
300                body: Vec::new(),
301            },
302            MessageKind::CodecNegotiation => StreamState::InCodecNegotiation {
303                comparisons: Vec::new(),
304                selected: Vec::new(),
305            },
306            _ => StreamState::Idle,
307        };
308    }
309
310    fn accumulate_codec_entry(&mut self, msg: &str) {
311        let mut warning = None;
312        if let StreamState::InCodecNegotiation {
313            comparisons,
314            selected,
315        } = &mut self.state
316        {
317            let rest = msg.strip_prefix("Audio Codec Compare ").unwrap_or(msg);
318            if rest.contains("is saved as a match") {
319                let codec = rest.find(']').map(|end| &rest[1..end]).unwrap_or(rest);
320                selected.push(codec.to_string());
321            } else if let Some(slash) = rest.find("]/[") {
322                let offered = &rest[1..slash];
323                let local = &rest[slash + 3..rest.len().saturating_sub(1)];
324                comparisons.push((offered.to_string(), local.to_string()));
325            } else {
326                warning = Some(format!(
327                    "unrecognized codec negotiation line: {}",
328                    if msg.len() > 80 { &msg[..80] } else { msg }
329                ));
330            }
331        }
332        if let (Some(w), Some(ref mut pending)) = (warning, &mut self.pending) {
333            pending.warnings.push(w);
334        }
335    }
336
337    fn accumulate_continuation(&mut self, msg: &str, line: &str) {
338        let msg_kind = classify_message(msg);
339        let mut warning = None;
340        match &mut self.state {
341            StreamState::InChannelData {
342                fields,
343                variables,
344                open_var_name,
345                open_var_value,
346            } => {
347                if let Some(ref mut val) = open_var_value {
348                    val.push('\n');
349                    val.push_str(msg);
350                    if msg.ends_with(']') {
351                        let trimmed = val.trim_end_matches(']').to_string();
352                        let name = open_var_name.take().unwrap();
353                        *open_var_value = None;
354                        variables.push((name, trimmed));
355                    }
356                } else {
357                    match &msg_kind {
358                        MessageKind::ChannelField { name, value } => {
359                            fields.push((name.clone(), value.clone()));
360                        }
361                        MessageKind::Variable { name, value } => {
362                            if !msg.ends_with(']') && msg.contains(": [") {
363                                *open_var_name = Some(name.clone());
364                                *open_var_value = Some(value.clone());
365                            } else {
366                                variables.push((name.clone(), value.clone()));
367                            }
368                        }
369                        _ => {
370                            if let Some((name, value)) = parse_field_line(msg) {
371                                fields.push((name, value));
372                            } else {
373                                warning = Some(format!(
374                                    "unparseable CHANNEL_DATA line: {}",
375                                    if msg.len() > 80 { &msg[..80] } else { msg }
376                                ));
377                            }
378                        }
379                    }
380                }
381            }
382            StreamState::InSdp { body, .. } => {
383                body.push(msg.to_string());
384            }
385            StreamState::InCodecNegotiation { .. } => {
386                warning = Some(format!(
387                    "unexpected codec negotiation continuation: {}",
388                    if msg.len() > 80 { &msg[..80] } else { msg }
389                ));
390            }
391            StreamState::Idle => {}
392        }
393        if let Some(ref mut pending) = self.pending {
394            if let Some(w) = warning {
395                pending.warnings.push(w);
396            }
397            pending.attached.push(line);
398        }
399    }
400
401    fn new_entry(
402        &mut self,
403        uuid: String,
404        timestamp: String,
405        message: String,
406        kind: LineKind,
407        message_kind: MessageKind,
408    ) -> LogEntry {
409        let mut warnings = Vec::new();
410        if let Some(w) = self.deferred_warning.take() {
411            warnings.push(w);
412        }
413        LogEntry {
414            uuid,
415            timestamp,
416            message,
417            kind,
418            message_kind,
419            level: None,
420            idle_pct: None,
421            source: None,
422            block: None,
423            attached: AttachedLines::new(),
424            line_number: self.line_number,
425            warnings,
426        }
427    }
428}
429
430/// mod_logfile's `snprintf` buffer size for UUID-prefixed lines.
431/// Lines exceeding this in the formatted output lose their trailing newline,
432/// causing the next queue entry to collide on the same physical line.
433const MOD_LOGFILE_BUF_SIZE: usize = 2048;
434
435/// Effective maximum payload per line (buffer minus the UUID prefix
436/// `mod_logfile` prepends, minus the trailing newline).
437const MAX_LINE_PAYLOAD: usize = MOD_LOGFILE_BUF_SIZE - UUID_PREFIX_LEN - 1;
438
439/// Tolerance around the expected truncation boundary when scanning oversize
440/// lines for Format E collisions. The boundary is deterministic
441/// (`MAX_LINE_PAYLOAD` from the start of the truncated chunk) but a few bytes
442/// of slack covers minor variation in `snprintf` accounting and any future
443/// drift in the prepend format.
444const COLLISION_SCAN_SLACK: usize = 64;
445
446impl<I: Iterator<Item = String>> LogStream<I> {
447    /// Detect same-line collisions where multiple log entries were concatenated
448    /// without a newline separator.
449    ///
450    /// Two collision mechanisms exist in production:
451    ///
452    /// 1. **Buffer truncation** (Format E): `mod_logfile`'s 2048-byte `snprintf`
453    ///    buffer truncates a long line, losing the trailing `\n`. The next entry
454    ///    from the log queue collides on the same physical line. These lines
455    ///    always exceed `MAX_LINE_PAYLOAD`.
456    ///
457    /// 2. **Write contention**: multiple threads writing to the log file can
458    ///    interleave output, producing concatenated entries at any line length.
459    ///    Common with system lines (Format B) that lack UUID prefixes.
460    ///
461    /// Returns the (possibly truncated) line. If a collision is detected,
462    /// the suffix is stored in `split_pending` for processing in the next
463    /// iteration. Recursive: split suffixes pass through this function again.
464    fn detect_collision(&mut self, line: String) -> String {
465        if line.len() > MAX_LINE_PAYLOAD {
466            let warning = format!(
467                "line exceeds mod_logfile 2048-byte buffer ({} bytes), data may be truncated",
468                line.len() + 38,
469            );
470            if let Some(ref mut pending) = self.pending {
471                pending.warnings.push(warning);
472            } else {
473                self.deferred_warning = Some(warning);
474            }
475        }
476
477        // Skip past the line's own header to avoid matching itself.
478        let bytes = line.as_bytes();
479        let min_scan = if is_uuid_at(bytes, 0) {
480            if bytes.len() > UUID_PREFIX_LEN && bytes[UUID_PREFIX_LEN].is_ascii_digit() {
481                64 // Full line: UUID + timestamp
482            } else {
483                UUID_PREFIX_LEN // UUID continuation
484            }
485        } else if is_date_at(bytes, 0) {
486            27 // System line: skip own timestamp
487        } else {
488            0
489        };
490
491        let end = bytes.len().saturating_sub(28);
492        let oversize = bytes.len() > MAX_LINE_PAYLOAD;
493
494        // Single linear pass collecting every split point. Two collision
495        // mechanisms handled in one walk:
496        //
497        //   * `is_log_header_at`: timestamp header (Format B write
498        //     contention, Full/System line collisions). Fast-fails after
499        //     one byte for non-digit input, so the per-offset cost stays
500        //     low even on hundreds-of-KB lines.
501        //
502        //   * `is_uuid_at` within a ±64-byte window around the next
503        //     expected mod_logfile truncation boundary (Format E). The
504        //     boundary is `MAX_LINE_PAYLOAD` bytes past the start of the
505        //     current chunk; we advance it as splits are found. Bounding
506        //     this check is what kept the previous optimization fast on
507        //     60 KB embedded-SDP lines — a 36-byte hex pattern check at
508        //     every offset would dominate the scan.
509        //
510        // Collecting all splits in one pass (rather than splitting,
511        // re-feeding the suffix, and re-scanning from scratch) is the
512        // structural fix for the prior O(n²) behavior.
513        let mut splits: Vec<usize> = Vec::new();
514        let mut chunk_start = 0usize;
515        let mut offset = min_scan;
516        while offset <= end {
517            if is_log_header_at(bytes, offset) {
518                let split_at = if offset >= chunk_start + UUID_PREFIX_LEN
519                    && is_uuid_at(bytes, offset - UUID_PREFIX_LEN)
520                {
521                    offset - UUID_PREFIX_LEN
522                } else {
523                    offset
524                };
525                if split_at > chunk_start {
526                    splits.push(split_at);
527                    chunk_start = split_at;
528                    offset += 27;
529                } else {
530                    // Header at current chunk's own start — already
531                    // accounted for. Step past it without recording a
532                    // split. The max guarantees forward progress when
533                    // the UUID-prefix check rewinds split_at behind us.
534                    offset = (offset + 27).max(offset + 1);
535                }
536                continue;
537            }
538            if oversize {
539                let boundary = chunk_start + MAX_LINE_PAYLOAD;
540                if offset + COLLISION_SCAN_SLACK >= boundary
541                    && offset <= boundary + COLLISION_SCAN_SLACK
542                    && is_uuid_at(bytes, offset)
543                {
544                    splits.push(offset);
545                    chunk_start = offset;
546                    offset += UUID_PREFIX_LEN;
547                    continue;
548                }
549            }
550            offset += 1;
551        }
552
553        if splits.is_empty() {
554            return line;
555        }
556
557        // First chunk returned; the rest queued for subsequent iterations.
558        // Building right-to-left with split_off avoids intermediate copies.
559        let mut tail = line;
560        let mut chunks: Vec<String> = Vec::with_capacity(splits.len());
561        for &at in splits.iter().rev() {
562            chunks.push(tail.split_off(at));
563        }
564        chunks.reverse();
565        self.split_pending.extend(chunks);
566        tail
567    }
568}
569
570impl<I: Iterator<Item = String>> Iterator for LogStream<I> {
571    type Item = LogEntry;
572
573    fn next(&mut self) -> Option<LogEntry> {
574        loop {
575            let line = if let Some(split) = self.split_pending.pop_front() {
576                self.stats.lines_split += 1;
577                // Already split out by a prior detect_collision pass —
578                // skip re-scanning, which would just walk the chunk again
579                // and find nothing.
580                split
581            } else {
582                let Some(line) = self.lines.next() else {
583                    return self.finalize_pending();
584                };
585
586                if line.starts_with('\x00') {
587                    let yielded = self.finalize_pending();
588                    self.last_uuid.clear();
589                    self.last_timestamp.clear();
590                    if yielded.is_some() {
591                        return yielded;
592                    }
593                    continue;
594                }
595
596                self.line_number += 1;
597                self.stats.lines_processed += 1;
598                self.detect_collision(line)
599            };
600
601            let parsed = parse_line(&line);
602
603            match parsed.kind {
604                LineKind::Full | LineKind::System | LineKind::Truncated => {
605                    let uuid = parsed.uuid.unwrap_or("").to_string();
606                    let message_kind = classify_message(parsed.message);
607
608                    // Merge consecutive codec negotiation entries with same UUID
609                    if message_kind == MessageKind::CodecNegotiation {
610                        if let (Some(ref pending), StreamState::InCodecNegotiation { .. }) =
611                            (&self.pending, &self.state)
612                        {
613                            if uuid == pending.uuid {
614                                self.accumulate_codec_entry(parsed.message);
615                                if let Some(ref mut p) = self.pending {
616                                    p.attached.push(&line);
617                                }
618                                continue;
619                            }
620                        }
621                    }
622
623                    let yielded = self.finalize_pending();
624
625                    let timestamp = parsed
626                        .timestamp
627                        .map(|t| t.to_string())
628                        .unwrap_or_else(|| self.last_timestamp.clone());
629
630                    if !uuid.is_empty() {
631                        self.last_uuid = uuid.clone();
632                    }
633                    if parsed.timestamp.is_some() {
634                        self.last_timestamp = timestamp.clone();
635                    }
636
637                    self.start_block_for_message(&message_kind);
638                    if message_kind == MessageKind::CodecNegotiation {
639                        self.accumulate_codec_entry(parsed.message);
640                    }
641
642                    let mut entry = self.new_entry(
643                        uuid,
644                        timestamp,
645                        parsed.message.to_string(),
646                        parsed.kind,
647                        message_kind,
648                    );
649                    entry.level = parsed.level;
650                    entry.idle_pct = parsed.idle_pct.map(|s| s.to_string());
651                    entry.source = parsed.source.map(|s| s.to_string());
652                    self.pending = Some(entry);
653
654                    if yielded.is_some() {
655                        return yielded;
656                    }
657                }
658
659                LineKind::UuidContinuation => {
660                    let uuid = parsed.uuid.unwrap_or("").to_string();
661                    let is_primary = parsed.message.starts_with("EXECUTE ");
662
663                    if let Some(ref pending) = self.pending {
664                        if !is_primary && uuid == pending.uuid {
665                            self.accumulate_continuation(parsed.message, &line);
666                        } else {
667                            let yielded = self.finalize_pending();
668                            let message_kind = classify_message(parsed.message);
669
670                            if !uuid.is_empty() {
671                                self.last_uuid = uuid.clone();
672                            }
673
674                            self.start_block_for_message(&message_kind);
675                            self.pending = Some(self.new_entry(
676                                uuid,
677                                self.last_timestamp.clone(),
678                                parsed.message.to_string(),
679                                parsed.kind,
680                                message_kind,
681                            ));
682
683                            return yielded;
684                        }
685                    } else {
686                        let message_kind = classify_message(parsed.message);
687
688                        if !uuid.is_empty() {
689                            self.last_uuid = uuid.clone();
690                        }
691
692                        self.start_block_for_message(&message_kind);
693                        self.pending = Some(self.new_entry(
694                            uuid,
695                            self.last_timestamp.clone(),
696                            parsed.message.to_string(),
697                            parsed.kind,
698                            message_kind,
699                        ));
700                    }
701                }
702
703                LineKind::BareContinuation => {
704                    if self.pending.is_some() {
705                        self.accumulate_continuation(parsed.message, &line);
706                    } else {
707                        self.record_unclassified(
708                            UnclassifiedReason::OrphanContinuation,
709                            Some(&line),
710                        );
711                        let message_kind = classify_message(parsed.message);
712                        self.pending = Some(self.new_entry(
713                            self.last_uuid.clone(),
714                            self.last_timestamp.clone(),
715                            parsed.message.to_string(),
716                            parsed.kind,
717                            message_kind,
718                        ));
719                    }
720                }
721
722                LineKind::Empty => {
723                    if let Some(ref mut pending) = self.pending {
724                        pending.attached.push(&line);
725                    } else {
726                        self.stats.lines_empty_orphan += 1;
727                    }
728                }
729            }
730        }
731    }
732}
733
734#[cfg(test)]
735mod tests {
736    use super::*;
737
738    const UUID1: &str = "a1b2c3d4-e5f6-7890-abcd-ef1234567890";
739    const UUID2: &str = "b2c3d4e5-f6a7-8901-bcde-f12345678901";
740
741    fn full_line(uuid: &str, ts: &str, msg: &str) -> String {
742        format!("{uuid} {ts} 95.97% [DEBUG] sofia.c:100 {msg}")
743    }
744
745    const TS1: &str = "2025-01-15 10:30:45.123456";
746    const TS2: &str = "2025-01-15 10:30:46.234567";
747
748    // --- Existing behavior tests (preserved) ---
749
750    #[test]
751    fn inherits_uuid_for_bare_continuation() {
752        let lines = vec![
753            full_line(UUID1, TS1, "CHANNEL_DATA:"),
754            "variable_foo: [bar]".to_string(),
755            "variable_baz: [qux]".to_string(),
756        ];
757        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
758        assert_eq!(entries.len(), 1);
759        assert_eq!(entries[0].uuid, UUID1);
760        assert_eq!(entries[0].attached.len(), 2);
761        assert_eq!(entries[0].attached.get(0), Some("variable_foo: [bar]"));
762        assert_eq!(entries[0].attached.get(1), Some("variable_baz: [qux]"));
763    }
764
765    #[test]
766    fn inherits_timestamp_for_uuid_continuation() {
767        let lines = vec![
768            full_line(UUID1, TS1, "First"),
769            format!("{UUID2} Channel-State: [CS_EXECUTE]"),
770        ];
771        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
772        assert_eq!(entries.len(), 2);
773        assert_eq!(entries[0].timestamp, TS1);
774        assert_eq!(entries[1].uuid, UUID2);
775        assert_eq!(entries[1].timestamp, TS1);
776    }
777
778    #[test]
779    fn new_full_line_yields_previous() {
780        let lines = vec![
781            full_line(UUID1, TS1, "First"),
782            full_line(UUID2, TS2, "Second"),
783        ];
784        let mut stream = LogStream::new(lines.into_iter());
785        let first = stream.next().unwrap();
786        assert_eq!(first.uuid, UUID1);
787        assert_eq!(first.message, "First");
788        let second = stream.next().unwrap();
789        assert_eq!(second.uuid, UUID2);
790        assert_eq!(second.message, "Second");
791        assert!(stream.next().is_none());
792    }
793
794    #[test]
795    fn channel_data_collected_as_attached() {
796        let lines = vec![
797            full_line(UUID1, TS1, "CHANNEL_DATA:"),
798            format!("{UUID1} Channel-Name: [sofia/internal/+15550001234@192.0.2.1]"),
799            format!("{UUID1} Unique-ID: [{UUID1}]"),
800            "variable_sip_call_id: [test123@192.0.2.1]".to_string(),
801        ];
802        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
803        assert_eq!(entries.len(), 1);
804        assert_eq!(entries[0].message, "CHANNEL_DATA:");
805        assert_eq!(entries[0].attached.len(), 3);
806    }
807
808    #[test]
809    fn sdp_body_collected_as_attached() {
810        let lines = vec![
811            full_line(UUID1, TS1, "Local SDP:"),
812            "v=0".to_string(),
813            "o=- 1234 5678 IN IP4 192.0.2.1".to_string(),
814            "s=-".to_string(),
815            "c=IN IP4 192.0.2.1".to_string(),
816            "m=audio 10000 RTP/AVP 0".to_string(),
817        ];
818        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
819        assert_eq!(entries.len(), 1);
820        assert_eq!(entries[0].attached.len(), 5);
821    }
822
823    #[test]
824    fn truncated_starts_new_entry() {
825        let lines = vec![
826            full_line(UUID1, TS1, "First"),
827            format!(
828                "varia{UUID2} EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 set(x=y)"
829            ),
830        ];
831        let mut stream = LogStream::new(lines.into_iter());
832        let first = stream.next().unwrap();
833        assert_eq!(first.uuid, UUID1);
834        assert_eq!(first.message, "First");
835        let second = stream.next().unwrap();
836        assert_eq!(second.uuid, UUID2);
837        assert_eq!(second.kind, LineKind::Truncated);
838    }
839
840    #[test]
841    fn empty_lines_in_attached() {
842        let lines = vec![
843            full_line(UUID1, TS1, "First"),
844            String::new(),
845            "continuation".to_string(),
846        ];
847        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
848        assert_eq!(entries.len(), 1);
849        assert_eq!(entries[0].attached.len(), 2);
850        assert_eq!(entries[0].attached.get(0), Some(""));
851        assert_eq!(entries[0].attached.get(1), Some("continuation"));
852    }
853
854    #[test]
855    fn system_line_no_uuid() {
856        let lines = vec![format!(
857            "{TS1} 95.97% [INFO] mod_event_socket.c:1772 Event Socket command"
858        )];
859        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
860        assert_eq!(entries.len(), 1);
861        assert_eq!(entries[0].uuid, "");
862        assert_eq!(entries[0].kind, LineKind::System);
863    }
864
865    #[test]
866    fn final_entry_on_exhaustion() {
867        let lines = vec![full_line(UUID1, TS1, "Only entry")];
868        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
869        assert_eq!(entries.len(), 1);
870        assert_eq!(entries[0].message, "Only entry");
871    }
872
873    #[test]
874    fn consecutive_full_lines() {
875        let lines = vec![
876            full_line(UUID1, TS1, "First"),
877            full_line(UUID1, TS2, "Second"),
878            full_line(UUID2, TS1, "Third"),
879        ];
880        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
881        assert_eq!(entries.len(), 3);
882        for entry in &entries {
883            assert!(entry.attached.is_empty());
884        }
885    }
886
887    #[test]
888    fn execute_after_channel_data_same_uuid() {
889        let lines = vec![
890            full_line(UUID1, TS1, "CHANNEL_DATA:"),
891            format!("{UUID1} Channel-State: [CS_EXECUTE]"),
892            format!("{UUID1} variable_sip_call_id: [test@192.0.2.1]"),
893            "variable_foo: [bar]".to_string(),
894            String::new(),
895            String::new(),
896            format!("{UUID1} EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 export(originate_timeout=3600)"),
897            full_line(UUID1, TS2, "EXPORT (export_vars) [originate_timeout]=[3600]"),
898        ];
899        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
900        assert_eq!(entries.len(), 3);
901        assert_eq!(entries[0].message, "CHANNEL_DATA:");
902        assert_eq!(entries[0].attached.len(), 5);
903        assert_eq!(entries[1].message, "EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 export(originate_timeout=3600)");
904        assert_eq!(entries[1].kind, LineKind::UuidContinuation);
905        assert_eq!(
906            entries[2].message,
907            "EXPORT (export_vars) [originate_timeout]=[3600]"
908        );
909    }
910
911    #[test]
912    fn execute_between_full_lines_same_uuid() {
913        let lines = vec![
914            full_line(UUID1, TS1, "CoreSession::setVariable(X-C911P-City, ST GEORGES)"),
915            format!("{UUID1} EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 db(insert/ng_{UUID1}/city/ST GEORGES)"),
916            full_line(UUID1, TS2, "CoreSession::setVariable(X-C911P-Region, SGS)"),
917        ];
918        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
919        assert_eq!(entries.len(), 3);
920        assert_eq!(
921            entries[0].message,
922            "CoreSession::setVariable(X-C911P-City, ST GEORGES)"
923        );
924        assert!(entries[0].attached.is_empty());
925        assert!(entries[1].message.starts_with("EXECUTE "));
926        assert_eq!(entries[1].kind, LineKind::UuidContinuation);
927        assert_eq!(
928            entries[2].message,
929            "CoreSession::setVariable(X-C911P-Region, SGS)"
930        );
931    }
932
933    #[test]
934    fn multiple_execute_between_full_lines() {
935        let lines = vec![
936            full_line(UUID1, TS1, "CoreSession::setVariable(ngcs_call_id, urn:emergency:uid:callid:test)"),
937            format!("{UUID1} EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 db(insert/ng_{UUID1}/call_id/urn:emergency:uid:callid:test)"),
938            format!("{UUID1} EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 db(insert/callid_codecs/urn:emergency:uid:callid:test/PCMU@8000h)"),
939            full_line(UUID1, TS2, "CoreSession::setVariable(ngcs_short_call_id, test)"),
940        ];
941        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
942        assert_eq!(entries.len(), 4);
943        assert!(entries[0].attached.is_empty());
944        assert!(entries[1].message.contains("call_id"));
945        assert!(entries[2].message.contains("callid_codecs"));
946        assert_eq!(
947            entries[3].message,
948            "CoreSession::setVariable(ngcs_short_call_id, test)"
949        );
950    }
951
952    #[test]
953    fn uuid_continuation_different_uuid_yields() {
954        let lines = vec![
955            full_line(UUID1, TS1, "First"),
956            format!("{UUID1} Channel-State: [CS_EXECUTE]"),
957            format!("{UUID2} Dialplan: sofia/internal/+15550001234@192.0.2.1 parsing [public]"),
958        ];
959        let mut stream = LogStream::new(lines.into_iter());
960        let first = stream.next().unwrap();
961        assert_eq!(first.uuid, UUID1);
962        assert_eq!(first.attached.len(), 1);
963        let second = stream.next().unwrap();
964        assert_eq!(second.uuid, UUID2);
965        assert_eq!(
966            second.message,
967            "Dialplan: sofia/internal/+15550001234@192.0.2.1 parsing [public]"
968        );
969    }
970
971    // --- New: Block detection tests ---
972
973    #[test]
974    fn channel_data_block_fields_and_variables() {
975        let lines = vec![
976            full_line(UUID1, TS1, "CHANNEL_DATA:"),
977            format!("{UUID1} Channel-Name: [sofia/internal/+15550001234@192.0.2.1]"),
978            format!("{UUID1} Channel-State: [CS_EXECUTE]"),
979            format!("{UUID1} Unique-ID: [{UUID1}]"),
980            "variable_sip_call_id: [test123@192.0.2.1]".to_string(),
981            "variable_direction: [inbound]".to_string(),
982        ];
983        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
984        assert_eq!(entries.len(), 1);
985        assert_eq!(entries[0].message_kind, MessageKind::ChannelData);
986        let block = entries[0].block.as_ref().expect("should have block");
987        match block {
988            Block::ChannelData { fields, variables } => {
989                assert_eq!(fields.len(), 3);
990                assert_eq!(
991                    fields[0],
992                    (
993                        "Channel-Name".to_string(),
994                        "sofia/internal/+15550001234@192.0.2.1".to_string()
995                    )
996                );
997                assert_eq!(
998                    fields[1],
999                    ("Channel-State".to_string(), "CS_EXECUTE".to_string())
1000                );
1001                assert_eq!(fields[2], ("Unique-ID".to_string(), UUID1.to_string()));
1002                assert_eq!(variables.len(), 2);
1003                assert_eq!(
1004                    variables[0],
1005                    (
1006                        "variable_sip_call_id".to_string(),
1007                        "test123@192.0.2.1".to_string()
1008                    )
1009                );
1010                assert_eq!(
1011                    variables[1],
1012                    ("variable_direction".to_string(), "inbound".to_string())
1013                );
1014            }
1015            other => panic!("expected ChannelData block, got {other:?}"),
1016        }
1017    }
1018
1019    #[test]
1020    fn channel_data_multiline_variable_reassembly() {
1021        let lines = vec![
1022            full_line(UUID1, TS1, "CHANNEL_DATA:"),
1023            format!("{UUID1} Channel-Name: [sofia/internal/+15550001234@192.0.2.1]"),
1024            "variable_switch_r_sdp: [v=0".to_string(),
1025            "o=- 1234 5678 IN IP4 192.0.2.1".to_string(),
1026            "s=-".to_string(),
1027            "c=IN IP4 192.0.2.1".to_string(),
1028            "m=audio 47758 RTP/AVP 0 101".to_string(),
1029            "a=rtpmap:0 PCMU/8000".to_string(),
1030            "]".to_string(),
1031            "variable_direction: [inbound]".to_string(),
1032        ];
1033        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1034        assert_eq!(entries.len(), 1);
1035        let block = entries[0].block.as_ref().expect("should have block");
1036        match block {
1037            Block::ChannelData { fields, variables } => {
1038                assert_eq!(fields.len(), 1);
1039                assert_eq!(variables.len(), 2);
1040                assert_eq!(variables[0].0, "variable_switch_r_sdp");
1041                assert!(variables[0].1.starts_with("v=0\n"));
1042                assert!(variables[0].1.contains("m=audio 47758 RTP/AVP 0 101"));
1043                assert!(!variables[0].1.ends_with(']'));
1044                assert_eq!(
1045                    variables[1],
1046                    ("variable_direction".to_string(), "inbound".to_string())
1047                );
1048            }
1049            other => panic!("expected ChannelData block, got {other:?}"),
1050        }
1051        assert_eq!(entries[0].attached.len(), 9);
1052    }
1053
1054    #[test]
1055    fn sdp_block_detection() {
1056        let lines = vec![
1057            full_line(UUID1, TS1, "Local SDP:"),
1058            "v=0".to_string(),
1059            "o=- 1234 5678 IN IP4 192.0.2.1".to_string(),
1060            "s=-".to_string(),
1061            "c=IN IP4 192.0.2.1".to_string(),
1062            "m=audio 10000 RTP/AVP 0".to_string(),
1063            "a=rtpmap:0 PCMU/8000".to_string(),
1064        ];
1065        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1066        assert_eq!(entries.len(), 1);
1067        match &entries[0].message_kind {
1068            MessageKind::SdpMarker { direction } => assert_eq!(*direction, SdpDirection::Local),
1069            other => panic!("expected SdpMarker, got {other:?}"),
1070        }
1071        let block = entries[0].block.as_ref().expect("should have block");
1072        match block {
1073            Block::Sdp { direction, body } => {
1074                assert_eq!(*direction, SdpDirection::Local);
1075                assert_eq!(body.len(), 6);
1076                assert_eq!(body[0], "v=0");
1077                assert_eq!(body[5], "a=rtpmap:0 PCMU/8000");
1078            }
1079            other => panic!("expected Sdp block, got {other:?}"),
1080        }
1081    }
1082
1083    #[test]
1084    fn sdp_block_terminated_by_primary_line() {
1085        let lines = vec![
1086            full_line(UUID1, TS1, "Remote SDP:"),
1087            "v=0".to_string(),
1088            "m=audio 10000 RTP/AVP 0".to_string(),
1089            full_line(UUID1, TS2, "Next event"),
1090        ];
1091        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1092        assert_eq!(entries.len(), 2);
1093        let block = entries[0].block.as_ref().expect("should have block");
1094        match block {
1095            Block::Sdp { direction, body } => {
1096                assert_eq!(*direction, SdpDirection::Remote);
1097                assert_eq!(body.len(), 2);
1098            }
1099            other => panic!("expected Sdp block, got {other:?}"),
1100        }
1101        assert!(entries[1].block.is_none());
1102    }
1103
1104    #[test]
1105    fn sdp_from_uuid_continuation() {
1106        let lines = vec![
1107            format!("{UUID1} Local SDP:"),
1108            format!("{UUID1} v=0"),
1109            format!("{UUID1} o=FreeSWITCH 1234 5678 IN IP4 192.0.2.1"),
1110            format!("{UUID1} s=FreeSWITCH"),
1111            format!("{UUID1} c=IN IP4 192.0.2.1"),
1112        ];
1113        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1114        assert_eq!(entries.len(), 1);
1115        let block = entries[0].block.as_ref().expect("should have block");
1116        match block {
1117            Block::Sdp { direction, body } => {
1118                assert_eq!(*direction, SdpDirection::Local);
1119                assert_eq!(body.len(), 4);
1120                assert_eq!(body[0], "v=0");
1121            }
1122            other => panic!("expected Sdp block, got {other:?}"),
1123        }
1124    }
1125
1126    #[test]
1127    fn channel_data_interrupted_by_different_uuid() {
1128        let lines = vec![
1129            full_line(UUID1, TS1, "CHANNEL_DATA:"),
1130            format!("{UUID1} Channel-Name: [sofia/internal/+15550001234@192.0.2.1]"),
1131            format!("{UUID2} Dialplan: sofia/internal/+15559999999@192.0.2.1 parsing [public]"),
1132        ];
1133        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1134        assert_eq!(entries.len(), 2);
1135        let block = entries[0].block.as_ref().expect("should have block");
1136        match block {
1137            Block::ChannelData { fields, .. } => {
1138                assert_eq!(fields.len(), 1);
1139            }
1140            other => panic!("expected ChannelData, got {other:?}"),
1141        }
1142    }
1143
1144    #[test]
1145    fn no_block_for_non_block_message() {
1146        let lines = vec![full_line(UUID1, TS1, "some random freeswitch log message")];
1147        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1148        assert_eq!(entries.len(), 1);
1149        assert!(entries[0].block.is_none());
1150        assert_eq!(entries[0].message_kind, MessageKind::General);
1151    }
1152
1153    #[test]
1154    fn message_kind_on_execute() {
1155        let lines = vec![
1156            full_line(UUID1, TS1, "First"),
1157            format!("{UUID1} EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 set(foo=bar)"),
1158        ];
1159        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1160        assert_eq!(entries.len(), 2);
1161        match &entries[1].message_kind {
1162            MessageKind::Execute {
1163                application,
1164                arguments,
1165                ..
1166            } => {
1167                assert_eq!(application, "set");
1168                assert_eq!(arguments, "foo=bar");
1169            }
1170            other => panic!("expected Execute, got {other:?}"),
1171        }
1172    }
1173
1174    // --- New: ParseStats tests ---
1175
1176    #[test]
1177    fn stats_lines_processed() {
1178        let lines = vec![
1179            full_line(UUID1, TS1, "First"),
1180            full_line(UUID1, TS2, "Second"),
1181            format!("{UUID1} Channel-State: [CS_EXECUTE]"),
1182        ];
1183        let mut stream = LogStream::new(lines.into_iter());
1184        let _: Vec<_> = stream.by_ref().collect();
1185        assert_eq!(stream.stats().lines_processed, 3);
1186    }
1187
1188    #[test]
1189    fn stats_unclassified_orphan() {
1190        let lines = vec![
1191            "variable_foo: [bar]".to_string(),
1192            full_line(UUID1, TS1, "After orphan"),
1193        ];
1194        let mut stream = LogStream::new(lines.into_iter())
1195            .unclassified_tracking(UnclassifiedTracking::TrackLines);
1196        let _: Vec<_> = stream.by_ref().collect();
1197        assert_eq!(stream.stats().lines_unclassified, 1);
1198        assert_eq!(stream.stats().unclassified_lines.len(), 1);
1199        assert_eq!(
1200            stream.stats().unclassified_lines[0].reason,
1201            UnclassifiedReason::OrphanContinuation,
1202        );
1203    }
1204
1205    #[test]
1206    fn stats_capture_data() {
1207        let lines = vec!["orphan line".to_string(), full_line(UUID1, TS1, "After")];
1208        let mut stream = LogStream::new(lines.into_iter())
1209            .unclassified_tracking(UnclassifiedTracking::CaptureData);
1210        let _: Vec<_> = stream.by_ref().collect();
1211        assert_eq!(stream.stats().unclassified_lines.len(), 1);
1212        assert_eq!(
1213            stream.stats().unclassified_lines[0].data.as_deref(),
1214            Some("orphan line"),
1215        );
1216    }
1217
1218    #[test]
1219    fn stats_count_only_no_allocation() {
1220        let lines = vec!["orphan line".to_string(), full_line(UUID1, TS1, "After")];
1221        let mut stream = LogStream::new(lines.into_iter());
1222        let _: Vec<_> = stream.by_ref().collect();
1223        assert_eq!(stream.stats().lines_unclassified, 1);
1224        assert!(stream.stats().unclassified_lines.is_empty());
1225    }
1226
1227    #[test]
1228    fn line_number_tracking() {
1229        let lines = vec![
1230            full_line(UUID1, TS1, "First"),
1231            format!("{UUID1} Channel-State: [CS_EXECUTE]"),
1232            full_line(UUID2, TS2, "Third"),
1233        ];
1234        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1235        assert_eq!(entries[0].line_number, 1);
1236        assert_eq!(entries[1].line_number, 3);
1237    }
1238
1239    #[test]
1240    fn drain_unclassified() {
1241        let lines = vec![
1242            "orphan1".to_string(),
1243            "orphan2".to_string(),
1244            full_line(UUID1, TS1, "After"),
1245        ];
1246        let mut stream = LogStream::new(lines.into_iter())
1247            .unclassified_tracking(UnclassifiedTracking::TrackLines);
1248        let _: Vec<_> = stream.by_ref().collect();
1249        let drained = stream.drain_unclassified();
1250        assert_eq!(drained.len(), 1);
1251        assert!(stream.stats().unclassified_lines.is_empty());
1252        assert_eq!(stream.stats().lines_unclassified, 1);
1253    }
1254
1255    // BUG 1: When LogStream processes a TrackedChain of multiple file segments,
1256    // last_timestamp from the previous segment bleeds into continuation lines
1257    // at the start of the next segment. This causes entries to get timestamps
1258    // from a completely different file (potentially hours earlier).
1259    //
1260    // Reproduces: f2cb66d4 getting timestamp 23:58:03 from the rotated file
1261    // when freeswitch.log starts with its continuation lines.
1262    #[test]
1263    fn continuation_lines_at_file_boundary_must_not_inherit_previous_timestamp() {
1264        use crate::TrackedChain;
1265
1266        let uuid_a = "aaaaaaaa-1111-2222-3333-444444444444";
1267        let uuid_b = "bbbbbbbb-1111-2222-3333-444444444444";
1268        let ts_old = "2025-01-15 23:58:03.000000";
1269        let ts_new = "2025-01-16 08:37:12.000000";
1270
1271        let seg1: Vec<String> = vec![format!(
1272            "{uuid_a} {ts_old} 95.00% [DEBUG] test.c:1 Last line in rotated file"
1273        )];
1274
1275        // Segment 2 starts with UUID-continuation lines (Format C: UUID + message, no timestamp)
1276        // followed by a real timestamped line
1277        let seg2: Vec<String> = vec![
1278            format!("{uuid_b} CHANNEL_DATA:"),
1279            format!("{uuid_b} Channel-State: [CS_EXECUTE]"),
1280            format!("{uuid_b} {ts_new} 95.00% [DEBUG] test.c:1 First timestamped line in new file"),
1281        ];
1282
1283        let segments: Vec<(String, Box<dyn Iterator<Item = String>>)> = vec![
1284            ("rotated.log".to_string(), Box::new(seg1.into_iter())),
1285            ("freeswitch.log".to_string(), Box::new(seg2.into_iter())),
1286        ];
1287
1288        let (chain, _) = TrackedChain::new(segments);
1289        let entries: Vec<_> = LogStream::new(chain).collect();
1290
1291        let b_entry = entries
1292            .iter()
1293            .find(|e| e.uuid == uuid_b)
1294            .expect("should find entry for uuid_b");
1295
1296        // The CHANNEL_DATA entry for uuid_b must NOT have the timestamp from
1297        // segment 1 — it should either have the new file's first real timestamp
1298        // or be empty (indicating unknown).
1299        assert_ne!(
1300            b_entry.timestamp, ts_old,
1301            "continuation lines in a new file segment inherited timestamp \
1302             '{ts_old}' from the previous segment — timestamps must not bleed \
1303             across file boundaries"
1304        );
1305    }
1306
1307    // --- Line accounting tests ---
1308
1309    fn assert_accounting(stream: &LogStream<impl Iterator<Item = String>>) {
1310        let stats = stream.stats();
1311        assert_eq!(
1312            stats.unaccounted_lines(),
1313            0,
1314            "line accounting invariant violated: \
1315             processed={} + split={} != in_entries={} + empty_orphan={}",
1316            stats.lines_processed,
1317            stats.lines_split,
1318            stats.lines_in_entries,
1319            stats.lines_empty_orphan,
1320        );
1321    }
1322
1323    #[test]
1324    fn accounting_full_lines() {
1325        let lines = vec![
1326            full_line(UUID1, TS1, "First"),
1327            full_line(UUID2, TS2, "Second"),
1328        ];
1329        let mut stream = LogStream::new(lines.into_iter());
1330        let entries: Vec<_> = stream.by_ref().collect();
1331        assert_eq!(entries.len(), 2);
1332        assert_eq!(stream.stats().lines_in_entries, 2);
1333        assert_accounting(&stream);
1334    }
1335
1336    #[test]
1337    fn accounting_with_attached() {
1338        let lines = vec![
1339            full_line(UUID1, TS1, "CHANNEL_DATA:"),
1340            format!("{UUID1} Channel-State: [CS_EXECUTE]"),
1341            "variable_foo: [bar]".to_string(),
1342            full_line(UUID2, TS2, "Next"),
1343        ];
1344        let mut stream = LogStream::new(lines.into_iter());
1345        let entries: Vec<_> = stream.by_ref().collect();
1346        assert_eq!(entries.len(), 2);
1347        // Entry 1: 1 primary + 2 attached = 3 lines
1348        // Entry 2: 1 primary = 1 line
1349        assert_eq!(stream.stats().lines_in_entries, 4);
1350        assert_accounting(&stream);
1351    }
1352
1353    #[test]
1354    fn accounting_system_line() {
1355        let lines = vec![format!(
1356            "{TS1} 95.97% [NOTICE] mod_logfile.c:217 New log started."
1357        )];
1358        let mut stream = LogStream::new(lines.into_iter());
1359        let _: Vec<_> = stream.by_ref().collect();
1360        assert_eq!(stream.stats().lines_in_entries, 1);
1361        assert_accounting(&stream);
1362    }
1363
1364    #[test]
1365    fn accounting_empty_orphan() {
1366        let lines = vec![
1367            String::new(),
1368            "   ".to_string(),
1369            full_line(UUID1, TS1, "After"),
1370        ];
1371        let mut stream = LogStream::new(lines.into_iter());
1372        let entries: Vec<_> = stream.by_ref().collect();
1373        assert_eq!(entries.len(), 1);
1374        assert_eq!(stream.stats().lines_empty_orphan, 2);
1375        assert_accounting(&stream);
1376    }
1377
1378    #[test]
1379    fn accounting_empty_attached() {
1380        let lines = vec![
1381            full_line(UUID1, TS1, "First"),
1382            String::new(),
1383            "continuation".to_string(),
1384        ];
1385        let mut stream = LogStream::new(lines.into_iter());
1386        let entries: Vec<_> = stream.by_ref().collect();
1387        assert_eq!(entries.len(), 1);
1388        assert_eq!(entries[0].attached.len(), 2);
1389        assert_eq!(stream.stats().lines_empty_orphan, 0);
1390        assert_eq!(stream.stats().lines_in_entries, 3);
1391        assert_accounting(&stream);
1392    }
1393
1394    #[test]
1395    fn accounting_orphan_continuation() {
1396        let lines = vec!["orphan line".to_string(), full_line(UUID1, TS1, "After")];
1397        let mut stream = LogStream::new(lines.into_iter());
1398        let _: Vec<_> = stream.by_ref().collect();
1399        assert_accounting(&stream);
1400    }
1401
1402    #[test]
1403    fn accounting_codec_merging() {
1404        let lines = vec![
1405            full_line(
1406                UUID1,
1407                TS1,
1408                "Audio Codec Compare [PCMU:0:8000:20:64000:1]/[PCMU:0:8000:20:64000:1]",
1409            ),
1410            full_line(
1411                UUID1,
1412                TS1,
1413                "Audio Codec Compare [PCMU:0:8000:20:64000:1] is saved as a match",
1414            ),
1415            full_line(UUID2, TS2, "Next"),
1416        ];
1417        let mut stream = LogStream::new(lines.into_iter());
1418        let _: Vec<_> = stream.by_ref().collect();
1419        assert_accounting(&stream);
1420    }
1421
1422    #[test]
1423    fn accounting_truncated_line() {
1424        let lines = vec![
1425            full_line(UUID1, TS1, "First"),
1426            format!(
1427                "varia{UUID2} EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 set(x=y)"
1428            ),
1429        ];
1430        let mut stream = LogStream::new(lines.into_iter());
1431        let _: Vec<_> = stream.by_ref().collect();
1432        assert_accounting(&stream);
1433    }
1434
1435    #[test]
1436    fn accounting_long_line_collision_split() {
1437        // Simulate a long variable value exceeding mod_logfile's 2048-byte buffer,
1438        // followed by a collision UUID on the same physical line.
1439        let long_value = "x".repeat(MAX_LINE_PAYLOAD + 10);
1440        let line = format!(
1441            "variable_sip_multipart: [{long_value}]{UUID2} EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 set(foo=bar)"
1442        );
1443        let lines = vec![full_line(UUID1, TS1, "CHANNEL_DATA:"), line];
1444        let mut stream = LogStream::new(lines.into_iter());
1445        let entries: Vec<_> = stream.by_ref().collect();
1446
1447        // The CHANNEL_DATA entry should have the truncated variable as attached
1448        assert_eq!(entries[0].message, "CHANNEL_DATA:");
1449
1450        // The collision should have been split out as a separate entry
1451        let split_entry = entries.iter().find(|e| e.uuid == UUID2);
1452        assert!(
1453            split_entry.is_some(),
1454            "collision UUID should produce a separate entry"
1455        );
1456
1457        assert_eq!(stream.stats().lines_split, 1);
1458        assert_accounting(&stream);
1459    }
1460
1461    #[test]
1462    fn no_split_on_short_lines() {
1463        // Lines within the payload limit should never be split,
1464        // even if they happen to contain a UUID-like pattern.
1465        let line = format!("variable_call_uuid: [{UUID2}]");
1466        let lines = vec![full_line(UUID1, TS1, "CHANNEL_DATA:"), line];
1467        let mut stream = LogStream::new(lines.into_iter());
1468        let entries: Vec<_> = stream.by_ref().collect();
1469        assert_eq!(entries.len(), 1);
1470        assert_eq!(stream.stats().lines_split, 0);
1471        assert_accounting(&stream);
1472    }
1473
1474    #[test]
1475    fn timestamp_collision_splits_system_lines() {
1476        let line = format!(
1477            "{TS1} 98.03% [INFO] mod_event_socket.c:1752 Event Socket Command from ::1:42864: api sofia jsonstatus{TS2} 97.93% [INFO] mod_event_socket.c:1752 Event Socket Command from ::1:42898: api fsctl pause_check"
1478        );
1479        let mut stream = LogStream::new(std::iter::once(line));
1480        let entries: Vec<_> = stream.by_ref().collect();
1481        assert_eq!(entries.len(), 2);
1482        assert_eq!(
1483            entries[0].message,
1484            "Event Socket Command from ::1:42864: api sofia jsonstatus"
1485        );
1486        assert_eq!(
1487            entries[1].message,
1488            "Event Socket Command from ::1:42898: api fsctl pause_check"
1489        );
1490        assert_eq!(stream.stats().lines_split, 1);
1491        assert_accounting(&stream);
1492    }
1493
1494    #[test]
1495    fn timestamp_collision_splits_three_entries() {
1496        let ts3 = "2025-01-15 10:30:47.345678";
1497        let line = format!(
1498            "{TS1} 95.00% [INFO] mod.c:1 first{TS2} 96.00% [INFO] mod.c:1 second{ts3} 97.00% [INFO] mod.c:1 third"
1499        );
1500        let mut stream = LogStream::new(std::iter::once(line));
1501        let entries: Vec<_> = stream.by_ref().collect();
1502        assert_eq!(entries.len(), 3);
1503        assert_eq!(entries[0].message, "first");
1504        assert_eq!(entries[1].message, "second");
1505        assert_eq!(entries[2].message, "third");
1506        assert_eq!(stream.stats().lines_split, 2);
1507        assert_accounting(&stream);
1508    }
1509
1510    #[test]
1511    fn timestamp_collision_oversize_write_contention() {
1512        // Production case: write contention concatenates dozens of short
1513        // Event Socket entries into a single physical line whose total
1514        // length exceeds MAX_LINE_PAYLOAD. Timestamps appear at offsets
1515        // ~150 bytes apart — none aligned with the 2010-byte buffer
1516        // boundary. All entries must still split out.
1517        let entry = |n: usize| {
1518            format!(
1519                "{TS1} 98.77% [INFO] mod_event_socket.c:1754 Event Socket Command from ::1:42864: api db select/ngcs_sip_call_id/entry-{n:04}"
1520            )
1521        };
1522        let count: u64 = 20;
1523        let line: String = (0..count).map(|n| entry(n as usize)).collect();
1524        assert!(
1525            line.len() > super::MAX_LINE_PAYLOAD,
1526            "test fixture should exceed MAX_LINE_PAYLOAD, got {}",
1527            line.len()
1528        );
1529
1530        let mut stream = LogStream::new(std::iter::once(line));
1531        let entries: Vec<_> = stream.by_ref().collect();
1532        assert_eq!(entries.len() as u64, count);
1533        for (i, e) in entries.iter().enumerate() {
1534            assert_eq!(
1535                e.message,
1536                format!(
1537                    "Event Socket Command from ::1:42864: api db select/ngcs_sip_call_id/entry-{i:04}"
1538                )
1539            );
1540        }
1541        assert_eq!(stream.stats().lines_split, count - 1);
1542        assert_accounting(&stream);
1543    }
1544
1545    #[test]
1546    fn timestamp_collision_with_uuid_prefix() {
1547        // System line collides with Full line (UUID + timestamp)
1548        let line = format!(
1549            "{TS1} 95.00% [INFO] mod.c:1 first{UUID1} {TS2} 96.00% [DEBUG] sofia.c:100 second"
1550        );
1551        let mut stream = LogStream::new(std::iter::once(line));
1552        let entries: Vec<_> = stream.by_ref().collect();
1553        assert_eq!(entries.len(), 2);
1554        assert_eq!(entries[0].message, "first");
1555        assert_eq!(entries[1].uuid, UUID1);
1556        assert_eq!(entries[1].message, "second");
1557        assert_eq!(stream.stats().lines_split, 1);
1558        assert_accounting(&stream);
1559    }
1560
1561    #[test]
1562    fn channel_data_multiline_variable_spans_many_lines() {
1563        let lines = vec![
1564            full_line(UUID1, TS1, "CHANNEL_DATA:"),
1565            format!("{UUID1} Channel-Name: [sofia/internal/+15550001234@192.0.2.1]"),
1566            "variable_switch_r_sdp: [v=0".to_string(),
1567            "o=- 1234 5678 IN IP4 192.0.2.1".to_string(),
1568            "s=-".to_string(),
1569            "c=IN IP4 192.0.2.1".to_string(),
1570            "t=0 0".to_string(),
1571            "m=audio 47758 RTP/AVP 0 8 101".to_string(),
1572            "a=rtpmap:0 PCMU/8000".to_string(),
1573            "a=rtpmap:8 PCMA/8000".to_string(),
1574            "a=rtpmap:101 telephone-event/8000".to_string(),
1575            "a=fmtp:101 0-16".to_string(),
1576            "]".to_string(),
1577            "variable_direction: [inbound]".to_string(),
1578        ];
1579        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1580        assert_eq!(entries.len(), 1);
1581        let block = entries[0].block.as_ref().expect("should have block");
1582        match block {
1583            Block::ChannelData { fields, variables } => {
1584                assert_eq!(fields.len(), 1);
1585                assert_eq!(variables.len(), 2);
1586                assert_eq!(variables[0].0, "variable_switch_r_sdp");
1587                let sdp = &variables[0].1;
1588                assert!(sdp.starts_with("v=0\n"));
1589                assert!(sdp.contains("a=fmtp:101 0-16"));
1590                assert!(!sdp.ends_with(']'));
1591                assert_eq!(variables[1].0, "variable_direction");
1592            }
1593            other => panic!("expected ChannelData block, got {other:?}"),
1594        }
1595    }
1596
1597    #[test]
1598    fn sdp_from_verto_update_media() {
1599        let lines = vec![
1600            full_line(UUID1, TS1, "updateMedia: Local SDP"),
1601            "v=0".to_string(),
1602            "o=- 1234 5678 IN IP4 192.0.2.1".to_string(),
1603            "m=audio 10000 RTP/AVP 0".to_string(),
1604        ];
1605        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1606        assert_eq!(entries.len(), 1);
1607        match &entries[0].message_kind {
1608            MessageKind::SdpMarker { direction } => assert_eq!(*direction, SdpDirection::Local),
1609            other => panic!("expected SdpMarker, got {other:?}"),
1610        }
1611        let block = entries[0].block.as_ref().expect("should have block");
1612        match block {
1613            Block::Sdp { direction, body } => {
1614                assert_eq!(*direction, SdpDirection::Local);
1615                assert_eq!(body.len(), 3);
1616            }
1617            other => panic!("expected Sdp block, got {other:?}"),
1618        }
1619    }
1620
1621    #[test]
1622    fn duplicate_sdp_marker() {
1623        let lines = vec![
1624            full_line(UUID1, TS1, "Duplicate SDP"),
1625            "v=0".to_string(),
1626            "m=audio 10000 RTP/AVP 0".to_string(),
1627        ];
1628        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1629        assert_eq!(entries.len(), 1);
1630        match &entries[0].message_kind {
1631            MessageKind::SdpMarker { direction } => assert_eq!(*direction, SdpDirection::Unknown),
1632            other => panic!("expected SdpMarker, got {other:?}"),
1633        }
1634        assert!(entries[0].block.is_some());
1635    }
1636
1637    #[test]
1638    fn warning_on_unclosed_multiline_variable() {
1639        let lines = vec![
1640            full_line(UUID1, TS1, "CHANNEL_DATA:"),
1641            "variable_switch_r_sdp: [v=0".to_string(),
1642            "o=- 1234 5678 IN IP4 192.0.2.1".to_string(),
1643            full_line(UUID2, TS2, "Next entry"),
1644        ];
1645        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1646        assert_eq!(entries.len(), 2);
1647        assert!(
1648            entries[0]
1649                .warnings
1650                .iter()
1651                .any(|w| w.contains("unclosed multi-line variable")),
1652            "expected unclosed variable warning, got: {:?}",
1653            entries[0].warnings
1654        );
1655    }
1656
1657    #[test]
1658    fn warning_on_unparseable_channel_data_line() {
1659        let lines = vec![
1660            full_line(UUID1, TS1, "CHANNEL_DATA:"),
1661            format!("{UUID1} Channel-Name: [sofia/internal/+15550001234@192.0.2.1]"),
1662            format!("{UUID1} this is not a valid field line"),
1663        ];
1664        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1665        assert_eq!(entries.len(), 1);
1666        assert!(
1667            entries[0]
1668                .warnings
1669                .iter()
1670                .any(|w| w.contains("unparseable CHANNEL_DATA")),
1671            "expected unparseable warning, got: {:?}",
1672            entries[0].warnings
1673        );
1674    }
1675
1676    #[test]
1677    fn warning_on_unexpected_codec_continuation() {
1678        let lines = vec![
1679            full_line(
1680                UUID1,
1681                TS1,
1682                "Audio Codec Compare [PCMU:0:8000:20:64000:1]/[PCMU:0:8000:20:64000:1]",
1683            ),
1684            format!("{UUID1} some unexpected continuation line"),
1685        ];
1686        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1687        assert_eq!(entries.len(), 1);
1688        assert!(
1689            entries[0]
1690                .warnings
1691                .iter()
1692                .any(|w| w.contains("unexpected codec negotiation")),
1693            "expected codec warning, got: {:?}",
1694            entries[0].warnings
1695        );
1696    }
1697
1698    #[test]
1699    fn system_line_uuid_continuation_not_absorbed() {
1700        // After the bug fix, a UUID continuation should NOT be absorbed
1701        // by a pending system line (empty UUID).
1702        let lines = vec![
1703            format!("{TS1} 95.97% [INFO] mod_event_socket.c:1772 Event Socket command"),
1704            format!("{UUID1} Channel-State: [CS_EXECUTE]"),
1705        ];
1706        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1707        assert_eq!(
1708            entries.len(),
1709            2,
1710            "UUID continuation should not be absorbed by system entry"
1711        );
1712        assert_eq!(entries[0].uuid, "");
1713        assert_eq!(entries[1].uuid, UUID1);
1714    }
1715
1716    #[test]
1717    fn truncated_collision_in_channel_data_variable() {
1718        // A CHANNEL_DATA block where a variable value exceeds the 2048-byte
1719        // mod_logfile buffer, causing a truncated collision (Format E).
1720        // The variable_long_xml value opens with [ but the buffer truncation
1721        // causes a UUID+EXECUTE to collide on the same physical line before
1722        // the closing ].
1723        let padding = "x".repeat(2000);
1724        let collision_line = format!(
1725            "{UUID1} variable_long_xml: [{padding}{UUID1} EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 export(foo=bar)"
1726        );
1727        assert!(
1728            collision_line.len() > super::MAX_LINE_PAYLOAD,
1729            "test line must exceed buffer limit, got {}",
1730            collision_line.len()
1731        );
1732
1733        let lines = vec![
1734            full_line(UUID1, TS1, "CHANNEL_DATA:"),
1735            format!("{UUID1} Channel-Name: [sofia/internal/+15550001234@192.0.2.1]"),
1736            format!("{UUID1} variable_direction: [inbound]"),
1737            collision_line,
1738            full_line(UUID1, TS2, "Next log entry"),
1739        ];
1740
1741        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1742
1743        // Entry 0: CHANNEL_DATA with the variables
1744        assert_eq!(entries[0].message, "CHANNEL_DATA:");
1745        let block = entries[0].block.as_ref().expect("should have block");
1746        match block {
1747            Block::ChannelData { fields, variables } => {
1748                assert_eq!(fields.len(), 1, "should have Channel-Name field");
1749                assert_eq!(fields[0].0, "Channel-Name");
1750                assert_eq!(
1751                    variables.len(),
1752                    2,
1753                    "should have direction + unclosed long_xml"
1754                );
1755                assert_eq!(variables[0].0, "variable_direction");
1756                assert_eq!(variables[0].1, "inbound");
1757                assert_eq!(variables[1].0, "variable_long_xml");
1758            }
1759            other => panic!("expected ChannelData block, got {other:?}"),
1760        }
1761        assert!(
1762            entries[0]
1763                .warnings
1764                .iter()
1765                .any(|w| w.contains("line exceeds mod_logfile 2048-byte buffer")),
1766            "expected buffer overflow warning, got: {:?}",
1767            entries[0].warnings
1768        );
1769        assert!(
1770            entries[0]
1771                .warnings
1772                .iter()
1773                .any(|w| w.contains("unclosed multi-line variable")),
1774            "expected unclosed variable warning, got: {:?}",
1775            entries[0].warnings
1776        );
1777
1778        // Entry 1: the split EXECUTE line
1779        assert_eq!(entries[1].uuid, UUID1);
1780        assert!(
1781            entries[1].message.starts_with("EXECUTE "),
1782            "split entry should be EXECUTE, got: {}",
1783            entries[1].message
1784        );
1785
1786        // Entry 2: the next full log line
1787        assert_eq!(entries.len(), 3);
1788        assert_eq!(entries[2].message, "Next log entry");
1789    }
1790
1791    #[test]
1792    fn channel_data_uuid_drops_mid_block() {
1793        // Production scenario: mod_logfile stops prepending the UUID mid-way
1794        // through a CHANNEL_DATA dump. The first few variable lines carry the
1795        // UUID prefix (UuidContinuation), then the remaining lines arrive as
1796        // bare continuations. All should be accumulated into the same block.
1797        let lines = vec![
1798            full_line(UUID1, TS1, "CHANNEL_DATA:"),
1799            format!("{UUID1} variable_max_forwards: [69]"),
1800            format!("{UUID1} variable_presence_id: [1251@[2001:db8::10]]"),
1801            format!("{UUID1} variable_sip_h_X-Custom-ID: [c4da84eb-88a7-40b2-b90d-e5bc2a0f634e]"),
1802            // UUID drops — bare continuations for the rest
1803            "variable_sip_h_X-Call-Info: [<urn:test:callid:20260316>;purpose=emergency-CallId]"
1804                .to_string(),
1805            "variable_ep_codec_string: [mod_opus.opus@48000h@20i@2c]".to_string(),
1806            "variable_remote_media_ip: [2001:db8::10]".to_string(),
1807            "variable_remote_media_port: [9952]".to_string(),
1808            "variable_rtp_use_codec_name: [opus]".to_string(),
1809            full_line(UUID1, TS2, "Next entry"),
1810        ];
1811
1812        let mut stream = LogStream::new(lines.into_iter());
1813        let entries: Vec<_> = stream.by_ref().collect();
1814
1815        assert_eq!(entries.len(), 2);
1816        assert_eq!(entries[0].message, "CHANNEL_DATA:");
1817        let block = entries[0].block.as_ref().expect("should have block");
1818        match block {
1819            Block::ChannelData { fields, variables } => {
1820                assert_eq!(fields.len(), 0);
1821                assert_eq!(variables.len(), 8);
1822                // UUID-prefixed variables
1823                assert_eq!(variables[0].0, "variable_max_forwards");
1824                assert_eq!(variables[0].1, "69");
1825                assert_eq!(variables[1].0, "variable_presence_id");
1826                assert_eq!(variables[1].1, "1251@[2001:db8::10]");
1827                assert_eq!(variables[2].0, "variable_sip_h_X-Custom-ID");
1828                // Bare variables (UUID dropped)
1829                assert_eq!(variables[3].0, "variable_sip_h_X-Call-Info");
1830                assert!(variables[3].1.contains("emergency-CallId"));
1831                assert_eq!(variables[4].0, "variable_ep_codec_string");
1832                assert_eq!(variables[7].0, "variable_rtp_use_codec_name");
1833                assert_eq!(variables[7].1, "opus");
1834            }
1835            other => panic!("expected ChannelData block, got {other:?}"),
1836        }
1837        assert_eq!(entries[0].attached.len(), 8);
1838        assert_eq!(entries[1].message, "Next entry");
1839        assert_accounting(&stream);
1840    }
1841
1842    #[test]
1843    fn channel_data_uuid_drops_with_multiline_variable() {
1844        // UUID drops mid-block AND a multi-line variable (SDP body embedded
1845        // in variable_switch_r_sdp) spans many bare continuation lines.
1846        // The \r characters are real — SDP uses \r\n per RFC 4566, and
1847        // mod_logfile splits on \n leaving \r in the content.
1848        let lines = vec![
1849            full_line(UUID1, TS1, "CHANNEL_DATA:"),
1850            format!("{UUID1} variable_max_forwards: [69]"),
1851            format!("{UUID1} variable_sip_h_X-Custom-ID: [c4da84eb-88a7-40b2-b90d-e5bc2a0f634e]"),
1852            // UUID drops
1853            "variable_switch_r_sdp: [v=0\r".to_string(),
1854            "o=FreeSWITCH 1773663549 1773663550 IN IP6 2001:db8::10\r".to_string(),
1855            "s=FreeSWITCH\r".to_string(),
1856            "c=IN IP6 2001:db8::10\r".to_string(),
1857            "t=0 0\r".to_string(),
1858            "m=audio 9952 RTP/AVP 102 101 13\r".to_string(),
1859            "a=rtpmap:102 opus/48000/2\r".to_string(),
1860            "a=ptime:20\r".to_string(),
1861            "]".to_string(),
1862            "variable_ep_codec_string: [mod_opus.opus@48000h@20i@2c]".to_string(),
1863            "variable_direction: [inbound]".to_string(),
1864            full_line(UUID1, TS2, "Next entry"),
1865        ];
1866
1867        let mut stream = LogStream::new(lines.into_iter());
1868        let entries: Vec<_> = stream.by_ref().collect();
1869
1870        assert_eq!(entries.len(), 2);
1871        let block = entries[0].block.as_ref().expect("should have block");
1872        match block {
1873            Block::ChannelData { fields, variables } => {
1874                assert_eq!(fields.len(), 0);
1875                assert_eq!(variables.len(), 5);
1876                assert_eq!(variables[0].0, "variable_max_forwards");
1877                assert_eq!(variables[1].0, "variable_sip_h_X-Custom-ID");
1878                // Multi-line SDP variable reassembled from bare continuations
1879                assert_eq!(variables[2].0, "variable_switch_r_sdp");
1880                let sdp = &variables[2].1;
1881                assert!(
1882                    sdp.starts_with("v=0\r\n"),
1883                    "SDP should start with v=0\\r\\n, got: {sdp:?}"
1884                );
1885                assert!(sdp.contains("m=audio 9952 RTP/AVP 102 101 13\r"));
1886                assert!(sdp.contains("a=ptime:20\r"));
1887                assert!(!sdp.ends_with(']'), "closing bracket should be stripped");
1888                // Post-SDP bare variables
1889                assert_eq!(variables[3].0, "variable_ep_codec_string");
1890                assert_eq!(variables[4].0, "variable_direction");
1891                assert_eq!(variables[4].1, "inbound");
1892            }
1893            other => panic!("expected ChannelData block, got {other:?}"),
1894        }
1895        // 2 UUID continuations + 9 SDP lines (open + 7 content + close) + 2 bare = 13
1896        assert_eq!(entries[0].attached.len(), 13);
1897        assert_accounting(&stream);
1898    }
1899
1900    #[test]
1901    fn channel_data_bare_variable_collision_with_execute() {
1902        // Production collision: bare variable_call_uuid line on same physical
1903        // line as a UUID EXECUTE. The UUID appears at byte 20 ("variable_call_uuid: "
1904        // is 20 chars), within find_uuid_in's 50-byte scan window, so Layer 1
1905        // classifies it as Truncated — extracting the UUID and EXECUTE message.
1906        // The CHANNEL_DATA block loses variable_call_uuid (eaten as truncation
1907        // prefix) but correctly recovers the EXECUTE as a separate entry.
1908        let collision = format!(
1909            "variable_call_uuid: {UUID1} EXECUTE [depth=0] \
1910             sofia/internal-v6/1251@[2001:db8::10] export(nolocal:test_var=value)"
1911        );
1912
1913        let lines = vec![
1914            full_line(UUID1, TS1, "CHANNEL_DATA:"),
1915            format!("{UUID1} variable_max_forwards: [69]"),
1916            // UUID drops — bare continuations
1917            "variable_DP_MATCH: [ARRAY::create_conference|:create_conference]".to_string(),
1918            collision,
1919            // Full line resumes normal logging
1920            full_line(
1921                UUID1,
1922                TS2,
1923                "EXPORT (export_vars) (REMOTE ONLY) [test_var]=[value]",
1924            ),
1925        ];
1926
1927        let mut stream = LogStream::new(lines.into_iter());
1928        let entries: Vec<_> = stream.by_ref().collect();
1929
1930        // Entry 0: CHANNEL_DATA — variable_call_uuid lost to truncation prefix
1931        assert_eq!(entries.len(), 3);
1932        let block = entries[0].block.as_ref().expect("should have block");
1933        match block {
1934            Block::ChannelData { fields, variables } => {
1935                assert_eq!(fields.len(), 0);
1936                assert_eq!(variables.len(), 2);
1937                assert_eq!(variables[0].0, "variable_max_forwards");
1938                assert_eq!(variables[1].0, "variable_DP_MATCH");
1939            }
1940            other => panic!("expected ChannelData block, got {other:?}"),
1941        }
1942
1943        // Entry 1: EXECUTE recovered from the Truncated classification
1944        assert_eq!(entries[1].uuid, UUID1);
1945        assert_eq!(entries[1].kind, LineKind::Truncated);
1946        assert!(
1947            entries[1].message.starts_with("EXECUTE "),
1948            "truncated line should yield EXECUTE, got: {}",
1949            entries[1].message
1950        );
1951
1952        // Entry 2: normal EXPORT line
1953        assert_eq!(entries[2].message_kind.label(), "variable");
1954        assert_accounting(&stream);
1955    }
1956
1957    #[test]
1958    fn system_line_with_embedded_uuid_gets_entry_uuid() {
1959        // System lines (Format B) where switch_cpp.cpp logs the UUID at the
1960        // start of the message body should produce entries with the correct UUID.
1961        let lines = vec![
1962            format!(
1963                "{TS1} 95.97% [DEBUG] switch_cpp.cpp:1466 {UUID1} DAA-LOG WaveManager originate"
1964            ),
1965            format!(
1966                "{TS1} 95.97% [WARNING] switch_cpp.cpp:1466 {UUID1} DAA-LOG Failed to create session"
1967            ),
1968            full_line(UUID1, TS2, "State Change CS_EXECUTE -> CS_HIBERNATE"),
1969        ];
1970
1971        let mut stream = LogStream::new(lines.into_iter());
1972        let entries: Vec<_> = stream.by_ref().collect();
1973
1974        assert_eq!(entries.len(), 3);
1975        // Both System lines should have the UUID extracted from the message
1976        assert_eq!(entries[0].uuid, UUID1);
1977        assert_eq!(entries[0].kind, LineKind::System);
1978        assert_eq!(entries[0].message, "DAA-LOG WaveManager originate");
1979
1980        assert_eq!(entries[1].uuid, UUID1);
1981        assert_eq!(entries[1].kind, LineKind::System);
1982        assert_eq!(entries[1].message, "DAA-LOG Failed to create session");
1983
1984        // Full line still works normally
1985        assert_eq!(entries[2].uuid, UUID1);
1986        assert_eq!(entries[2].kind, LineKind::Full);
1987        assert_accounting(&stream);
1988    }
1989}