Skip to main content

freeswitch_log_parser/
stream.rs

1use crate::level::LogLevel;
2use crate::line::{is_date_at, is_log_header_at, is_uuid_at, parse_line, LineKind};
3use crate::message::{classify_message, MessageKind, SdpDirection};
4
5/// Structured data extracted from a multi-line dump that follows a primary log entry.
6///
7/// Each variant corresponds to a block type that the stream state machine
8/// recognizes and reassembles from continuation lines.
9#[derive(Debug, Clone, PartialEq, Eq)]
10#[non_exhaustive]
11pub enum Block {
12    /// Channel variable dump — `Channel-*` fields and `variable_*` key-value pairs.
13    /// Multi-line variable values (e.g. embedded SDP) are reassembled with `\n` separators.
14    ChannelData {
15        fields: Vec<(String, String)>,
16        variables: Vec<(String, String)>,
17    },
18    /// SDP session description body, collected line by line.
19    Sdp {
20        direction: SdpDirection,
21        body: Vec<String>,
22    },
23    /// Codec negotiation sequence — offered/local comparisons and selected matches.
24    CodecNegotiation {
25        comparisons: Vec<(String, String)>,
26        selected: Vec<String>,
27    },
28}
29
30/// Controls how much detail is recorded for lines that couldn't be fully classified.
31///
32/// Higher fidelity levels allocate more memory. The default is `CountOnly`.
33#[derive(Debug, Clone, Copy, PartialEq, Eq)]
34pub enum UnclassifiedTracking {
35    /// Increment the counter only — zero allocation.
36    CountOnly,
37    /// Record line number and reason for each unclassified line.
38    TrackLines,
39    /// Like `TrackLines` plus the full line content.
40    CaptureData,
41}
42
43/// Why a line was marked as unclassified.
44#[derive(Debug, Clone, PartialEq, Eq)]
45#[non_exhaustive]
46pub enum UnclassifiedReason {
47    /// Bare continuation line arrived with no pending entry to attach to.
48    OrphanContinuation,
49    /// Line was parsed but the message didn't match any known pattern.
50    UnknownMessageFormat,
51    /// EXECUTE or variable line was only partially readable.
52    TruncatedField,
53}
54
55/// Record of a single unclassified line, captured when tracking is enabled.
56#[derive(Debug, Clone, PartialEq, Eq)]
57pub struct UnclassifiedLine {
58    pub line_number: u64,
59    pub reason: UnclassifiedReason,
60    /// The full line content; only populated under [`UnclassifiedTracking::CaptureData`].
61    pub data: Option<String>,
62}
63
64/// Cumulative parsing statistics, updated as lines flow through the stream.
65#[derive(Debug, Clone, Default)]
66pub struct ParseStats {
67    pub lines_processed: u64,
68    pub lines_unclassified: u64,
69    /// Lines that became part of entries (primary line + attached lines per entry).
70    pub lines_in_entries: u64,
71    /// Empty lines that arrived with no pending entry to attach to.
72    pub lines_empty_orphan: u64,
73    /// Physical lines that were split into multiple logical entries due to
74    /// mod_logfile's 2048-byte snprintf truncation causing same-line collisions.
75    pub lines_split: u64,
76    /// Populated only when tracking is `TrackLines` or `CaptureData`.
77    pub unclassified_lines: Vec<UnclassifiedLine>,
78}
79
80impl ParseStats {
81    /// Lines that were processed but not accounted for by any tracking category.
82    ///
83    /// Returns 0 when the parser correctly accounts for every input line.
84    /// A non-zero value indicates a parser bug — lines were silently lost.
85    ///
86    /// Invariant: `lines_processed + lines_split == lines_in_entries + lines_empty_orphan`
87    pub fn unaccounted_lines(&self) -> u64 {
88        let expected = self.lines_in_entries + self.lines_empty_orphan;
89        let actual = self.lines_processed + self.lines_split;
90        actual.saturating_sub(expected)
91    }
92}
93
94/// A complete parsed log entry with all context resolved.
95///
96/// Produced by [`LogStream`]. Continuation lines have been grouped,
97/// UUID/timestamp inherited from context where needed, and multi-line blocks
98/// reassembled.
99#[derive(Debug)]
100pub struct LogEntry {
101    /// Session UUID, or empty string for system lines.
102    pub uuid: String,
103    /// Timestamp with microsecond precision; inherited from the previous entry for continuations.
104    pub timestamp: String,
105    /// `None` for continuation and truncated lines.
106    pub level: Option<LogLevel>,
107    /// Core scheduler idle percentage; `None` for continuations.
108    pub idle_pct: Option<String>,
109    /// Source file:line; `None` for continuations.
110    pub source: Option<String>,
111    /// The primary message text.
112    pub message: String,
113    /// Which line format originated this entry.
114    pub kind: LineKind,
115    /// Semantic classification of the message content.
116    pub message_kind: MessageKind,
117    /// Typed, parsed multi-line block; `None` for entries without a trailing block.
118    pub block: Option<Block>,
119    /// Raw continuation lines that followed the primary line.
120    pub attached: Vec<String>,
121    /// 1-based line number in the input stream.
122    pub line_number: u64,
123    /// Per-entry warnings about parsing anomalies.
124    pub warnings: Vec<String>,
125}
126
127fn parse_field_line(msg: &str) -> Option<(String, String)> {
128    let colon = msg.find(": ")?;
129    let name = &msg[..colon];
130    if name.contains(' ') || name.is_empty() {
131        return None;
132    }
133    let value_part = &msg[colon + 2..];
134    let value = if let Some(inner) = value_part.strip_prefix('[') {
135        inner.strip_suffix(']').unwrap_or(inner)
136    } else {
137        value_part
138    };
139    Some((name.to_string(), value.to_string()))
140}
141
142enum StreamState {
143    Idle,
144    InChannelData {
145        fields: Vec<(String, String)>,
146        variables: Vec<(String, String)>,
147        open_var_name: Option<String>,
148        open_var_value: Option<String>,
149    },
150    InSdp {
151        direction: SdpDirection,
152        body: Vec<String>,
153    },
154    InCodecNegotiation {
155        comparisons: Vec<(String, String)>,
156        selected: Vec<String>,
157    },
158}
159
160impl StreamState {
161    fn take_idle(&mut self) -> StreamState {
162        std::mem::replace(self, StreamState::Idle)
163    }
164}
165
166/// Layer 2 structural state machine — groups continuation lines, classifies
167/// messages, and detects multi-line blocks (CHANNEL_DATA, SDP, codec negotiation).
168///
169/// Wraps any `Iterator<Item = String>` and yields [`LogEntry`] values.
170/// Maintains `last_uuid` and `last_timestamp` to fill in context for
171/// continuation lines that lack their own.
172///
173/// Use the builder method [`unclassified_tracking()`](LogStream::unclassified_tracking)
174/// to control diagnostic detail before iterating.
175pub struct LogStream<I> {
176    lines: I,
177    last_uuid: String,
178    last_timestamp: String,
179    pending: Option<LogEntry>,
180    state: StreamState,
181    stats: ParseStats,
182    tracking: UnclassifiedTracking,
183    line_number: u64,
184    split_pending: Option<String>,
185    deferred_warning: Option<String>,
186}
187
188impl<I: Iterator<Item = String>> LogStream<I> {
189    /// Create a new stream from any line iterator.
190    pub fn new(lines: I) -> Self {
191        LogStream {
192            lines,
193            last_uuid: String::new(),
194            last_timestamp: String::new(),
195            pending: None,
196            state: StreamState::Idle,
197            stats: ParseStats::default(),
198            tracking: UnclassifiedTracking::CountOnly,
199            line_number: 0,
200            split_pending: None,
201            deferred_warning: None,
202        }
203    }
204
205    /// Set the unclassified line tracking level (builder pattern). Defaults to `CountOnly`.
206    pub fn unclassified_tracking(mut self, level: UnclassifiedTracking) -> Self {
207        self.tracking = level;
208        self
209    }
210
211    /// Cumulative parsing statistics up to the current position.
212    pub fn stats(&self) -> &ParseStats {
213        &self.stats
214    }
215
216    /// Take all accumulated unclassified line records, leaving the internal vec empty.
217    ///
218    /// The `lines_unclassified` counter is not reset.
219    pub fn drain_unclassified(&mut self) -> Vec<UnclassifiedLine> {
220        std::mem::take(&mut self.stats.unclassified_lines)
221    }
222
223    fn record_unclassified(&mut self, reason: UnclassifiedReason, data: Option<&str>) {
224        self.stats.lines_unclassified += 1;
225        match self.tracking {
226            UnclassifiedTracking::CountOnly => {}
227            UnclassifiedTracking::TrackLines => {
228                self.stats.unclassified_lines.push(UnclassifiedLine {
229                    line_number: self.line_number,
230                    reason,
231                    data: None,
232                });
233            }
234            UnclassifiedTracking::CaptureData => {
235                self.stats.unclassified_lines.push(UnclassifiedLine {
236                    line_number: self.line_number,
237                    reason,
238                    data: data.map(|s| s.to_string()),
239                });
240            }
241        }
242    }
243
244    fn finalize_block(&mut self) -> (Option<Block>, Vec<String>) {
245        let mut warnings = Vec::new();
246        match self.state.take_idle() {
247            StreamState::Idle => (None, warnings),
248            StreamState::InChannelData {
249                fields,
250                mut variables,
251                open_var_name,
252                open_var_value,
253            } => {
254                if let (Some(ref name), Some(value)) = (&open_var_name, open_var_value) {
255                    warnings.push(format!("unclosed multi-line variable: {name}"));
256                    variables.push((name.clone(), value));
257                }
258                (Some(Block::ChannelData { fields, variables }), warnings)
259            }
260            StreamState::InSdp { direction, body } => {
261                (Some(Block::Sdp { direction, body }), warnings)
262            }
263            StreamState::InCodecNegotiation {
264                comparisons,
265                selected,
266            } => (
267                Some(Block::CodecNegotiation {
268                    comparisons,
269                    selected,
270                }),
271                warnings,
272            ),
273        }
274    }
275
276    fn finalize_pending(&mut self) -> Option<LogEntry> {
277        let (block, warnings) = self.finalize_block();
278        if let Some(ref mut p) = self.pending {
279            p.block = block;
280            p.warnings.extend(warnings);
281            self.stats.lines_in_entries += 1 + p.attached.len() as u64;
282        }
283        self.pending.take()
284    }
285
286    fn start_block_for_message(&mut self, message_kind: &MessageKind) {
287        self.state = match message_kind {
288            MessageKind::ChannelData => StreamState::InChannelData {
289                fields: Vec::new(),
290                variables: Vec::new(),
291                open_var_name: None,
292                open_var_value: None,
293            },
294            MessageKind::SdpMarker { direction } => StreamState::InSdp {
295                direction: direction.clone(),
296                body: Vec::new(),
297            },
298            MessageKind::CodecNegotiation => StreamState::InCodecNegotiation {
299                comparisons: Vec::new(),
300                selected: Vec::new(),
301            },
302            _ => StreamState::Idle,
303        };
304    }
305
306    fn accumulate_codec_entry(&mut self, msg: &str) {
307        let mut warning = None;
308        if let StreamState::InCodecNegotiation {
309            comparisons,
310            selected,
311        } = &mut self.state
312        {
313            let rest = msg.strip_prefix("Audio Codec Compare ").unwrap_or(msg);
314            if rest.contains("is saved as a match") {
315                let codec = rest.find(']').map(|end| &rest[1..end]).unwrap_or(rest);
316                selected.push(codec.to_string());
317            } else if let Some(slash) = rest.find("]/[") {
318                let offered = &rest[1..slash];
319                let local = &rest[slash + 3..rest.len().saturating_sub(1)];
320                comparisons.push((offered.to_string(), local.to_string()));
321            } else {
322                warning = Some(format!(
323                    "unrecognized codec negotiation line: {}",
324                    if msg.len() > 80 { &msg[..80] } else { msg }
325                ));
326            }
327        }
328        if let (Some(w), Some(ref mut pending)) = (warning, &mut self.pending) {
329            pending.warnings.push(w);
330        }
331    }
332
333    fn accumulate_continuation(&mut self, msg: &str, line: &str) {
334        let msg_kind = classify_message(msg);
335        let mut warning = None;
336        match &mut self.state {
337            StreamState::InChannelData {
338                fields,
339                variables,
340                open_var_name,
341                open_var_value,
342            } => {
343                if let Some(ref mut val) = open_var_value {
344                    val.push('\n');
345                    val.push_str(msg);
346                    if msg.ends_with(']') {
347                        let trimmed = val.trim_end_matches(']').to_string();
348                        let name = open_var_name.take().unwrap();
349                        *open_var_value = None;
350                        variables.push((name, trimmed));
351                    }
352                } else {
353                    match &msg_kind {
354                        MessageKind::ChannelField { name, value } => {
355                            fields.push((name.clone(), value.clone()));
356                        }
357                        MessageKind::Variable { name, value } => {
358                            if !msg.ends_with(']') && msg.contains(": [") {
359                                *open_var_name = Some(name.clone());
360                                *open_var_value = Some(value.clone());
361                            } else {
362                                variables.push((name.clone(), value.clone()));
363                            }
364                        }
365                        _ => {
366                            if let Some((name, value)) = parse_field_line(msg) {
367                                fields.push((name, value));
368                            } else {
369                                warning = Some(format!(
370                                    "unparseable CHANNEL_DATA line: {}",
371                                    if msg.len() > 80 { &msg[..80] } else { msg }
372                                ));
373                            }
374                        }
375                    }
376                }
377            }
378            StreamState::InSdp { body, .. } => {
379                body.push(msg.to_string());
380            }
381            StreamState::InCodecNegotiation { .. } => {
382                warning = Some(format!(
383                    "unexpected codec negotiation continuation: {}",
384                    if msg.len() > 80 { &msg[..80] } else { msg }
385                ));
386            }
387            StreamState::Idle => {}
388        }
389        if let Some(ref mut pending) = self.pending {
390            if let Some(w) = warning {
391                pending.warnings.push(w);
392            }
393            pending.attached.push(line.to_string());
394        }
395    }
396
397    fn new_entry(
398        &mut self,
399        uuid: String,
400        timestamp: String,
401        message: String,
402        kind: LineKind,
403        message_kind: MessageKind,
404    ) -> LogEntry {
405        let mut warnings = Vec::new();
406        if let Some(w) = self.deferred_warning.take() {
407            warnings.push(w);
408        }
409        LogEntry {
410            uuid,
411            timestamp,
412            message,
413            kind,
414            message_kind,
415            level: None,
416            idle_pct: None,
417            source: None,
418            block: None,
419            attached: Vec::new(),
420            line_number: self.line_number,
421            warnings,
422        }
423    }
424}
425
426/// mod_logfile's `snprintf` buffer size for UUID-prefixed lines.
427/// Lines exceeding this in the formatted output lose their trailing newline,
428/// causing the next queue entry to collide on the same physical line.
429const MOD_LOGFILE_BUF_SIZE: usize = 2048;
430
431/// Effective maximum payload per line (buffer minus UUID, space, newline).
432const MAX_LINE_PAYLOAD: usize = MOD_LOGFILE_BUF_SIZE - 36 - 1 - 1;
433
434impl<I: Iterator<Item = String>> LogStream<I> {
435    /// Detect same-line collisions where multiple log entries were concatenated
436    /// without a newline separator.
437    ///
438    /// Two collision mechanisms exist in production:
439    ///
440    /// 1. **Buffer truncation** (Format E): `mod_logfile`'s 2048-byte `snprintf`
441    ///    buffer truncates a long line, losing the trailing `\n`. The next entry
442    ///    from the log queue collides on the same physical line. These lines
443    ///    always exceed `MAX_LINE_PAYLOAD`.
444    ///
445    /// 2. **Write contention**: multiple threads writing to the log file can
446    ///    interleave output, producing concatenated entries at any line length.
447    ///    Common with system lines (Format B) that lack UUID prefixes.
448    ///
449    /// Returns the (possibly truncated) line. If a collision is detected,
450    /// the suffix is stored in `split_pending` for processing in the next
451    /// iteration. Recursive: split suffixes pass through this function again.
452    fn detect_collision(&mut self, line: String) -> String {
453        if line.len() > MAX_LINE_PAYLOAD {
454            let warning = format!(
455                "line exceeds mod_logfile 2048-byte buffer ({} bytes), data may be truncated",
456                line.len() + 38,
457            );
458            if let Some(ref mut pending) = self.pending {
459                pending.warnings.push(warning);
460            } else {
461                self.deferred_warning = Some(warning);
462            }
463        }
464
465        // Skip past the line's own header to avoid matching itself.
466        let bytes = line.as_bytes();
467        let min_scan = if is_uuid_at(bytes, 0) {
468            if bytes.len() > 37 && bytes[37].is_ascii_digit() {
469                64 // Full line: UUID + timestamp
470            } else {
471                37 // UUID continuation
472            }
473        } else if is_date_at(bytes, 0) {
474            27 // System line: skip own timestamp
475        } else {
476            0
477        };
478
479        let end = bytes.len().saturating_sub(28);
480        for offset in min_scan..=end {
481            // Timestamp collision (System or Full line header)
482            if is_log_header_at(bytes, offset) {
483                // Check if a UUID precedes the timestamp (Full line collision)
484                let split_at = if offset >= 37 && is_uuid_at(bytes, offset - 37) {
485                    offset - 37
486                } else {
487                    offset
488                };
489                self.split_pending = Some(line[split_at..].to_string());
490                return line[..split_at].to_string();
491            }
492            // UUID collision without timestamp (Format E — truncated buffer)
493            if is_uuid_at(bytes, offset) && bytes.len() > MAX_LINE_PAYLOAD {
494                self.split_pending = Some(line[offset..].to_string());
495                return line[..offset].to_string();
496            }
497        }
498
499        line
500    }
501}
502
503impl<I: Iterator<Item = String>> Iterator for LogStream<I> {
504    type Item = LogEntry;
505
506    fn next(&mut self) -> Option<LogEntry> {
507        loop {
508            let line = if let Some(split) = self.split_pending.take() {
509                self.stats.lines_split += 1;
510                split
511            } else {
512                let Some(line) = self.lines.next() else {
513                    return self.finalize_pending();
514                };
515
516                if line.starts_with('\x00') {
517                    let yielded = self.finalize_pending();
518                    self.last_uuid.clear();
519                    self.last_timestamp.clear();
520                    if yielded.is_some() {
521                        return yielded;
522                    }
523                    continue;
524                }
525
526                self.line_number += 1;
527                self.stats.lines_processed += 1;
528                line
529            };
530
531            let line = self.detect_collision(line);
532
533            let parsed = parse_line(&line);
534
535            match parsed.kind {
536                LineKind::Full | LineKind::System | LineKind::Truncated => {
537                    let uuid = parsed.uuid.unwrap_or("").to_string();
538                    let message_kind = classify_message(parsed.message);
539
540                    // Merge consecutive codec negotiation entries with same UUID
541                    if message_kind == MessageKind::CodecNegotiation {
542                        if let (Some(ref pending), StreamState::InCodecNegotiation { .. }) =
543                            (&self.pending, &self.state)
544                        {
545                            if uuid == pending.uuid {
546                                self.accumulate_codec_entry(parsed.message);
547                                if let Some(ref mut p) = self.pending {
548                                    p.attached.push(line);
549                                }
550                                continue;
551                            }
552                        }
553                    }
554
555                    let yielded = self.finalize_pending();
556
557                    let timestamp = parsed
558                        .timestamp
559                        .map(|t| t.to_string())
560                        .unwrap_or_else(|| self.last_timestamp.clone());
561
562                    if !uuid.is_empty() {
563                        self.last_uuid = uuid.clone();
564                    }
565                    if parsed.timestamp.is_some() {
566                        self.last_timestamp = timestamp.clone();
567                    }
568
569                    self.start_block_for_message(&message_kind);
570                    if message_kind == MessageKind::CodecNegotiation {
571                        self.accumulate_codec_entry(parsed.message);
572                    }
573
574                    let mut entry = self.new_entry(
575                        uuid,
576                        timestamp,
577                        parsed.message.to_string(),
578                        parsed.kind,
579                        message_kind,
580                    );
581                    entry.level = parsed.level;
582                    entry.idle_pct = parsed.idle_pct.map(|s| s.to_string());
583                    entry.source = parsed.source.map(|s| s.to_string());
584                    self.pending = Some(entry);
585
586                    if yielded.is_some() {
587                        return yielded;
588                    }
589                }
590
591                LineKind::UuidContinuation => {
592                    let uuid = parsed.uuid.unwrap_or("").to_string();
593                    let is_primary = parsed.message.starts_with("EXECUTE ");
594
595                    if let Some(ref pending) = self.pending {
596                        if !is_primary && uuid == pending.uuid {
597                            self.accumulate_continuation(parsed.message, &line);
598                        } else {
599                            let yielded = self.finalize_pending();
600                            let message_kind = classify_message(parsed.message);
601
602                            if !uuid.is_empty() {
603                                self.last_uuid = uuid.clone();
604                            }
605
606                            self.start_block_for_message(&message_kind);
607                            self.pending = Some(self.new_entry(
608                                uuid,
609                                self.last_timestamp.clone(),
610                                parsed.message.to_string(),
611                                parsed.kind,
612                                message_kind,
613                            ));
614
615                            return yielded;
616                        }
617                    } else {
618                        let message_kind = classify_message(parsed.message);
619
620                        if !uuid.is_empty() {
621                            self.last_uuid = uuid.clone();
622                        }
623
624                        self.start_block_for_message(&message_kind);
625                        self.pending = Some(self.new_entry(
626                            uuid,
627                            self.last_timestamp.clone(),
628                            parsed.message.to_string(),
629                            parsed.kind,
630                            message_kind,
631                        ));
632                    }
633                }
634
635                LineKind::BareContinuation => {
636                    if self.pending.is_some() {
637                        self.accumulate_continuation(parsed.message, &line);
638                    } else {
639                        self.record_unclassified(
640                            UnclassifiedReason::OrphanContinuation,
641                            Some(&line),
642                        );
643                        let message_kind = classify_message(parsed.message);
644                        self.pending = Some(self.new_entry(
645                            self.last_uuid.clone(),
646                            self.last_timestamp.clone(),
647                            parsed.message.to_string(),
648                            parsed.kind,
649                            message_kind,
650                        ));
651                    }
652                }
653
654                LineKind::Empty => {
655                    if let Some(ref mut pending) = self.pending {
656                        pending.attached.push(line);
657                    } else {
658                        self.stats.lines_empty_orphan += 1;
659                    }
660                }
661            }
662        }
663    }
664}
665
666#[cfg(test)]
667mod tests {
668    use super::*;
669
670    const UUID1: &str = "a1b2c3d4-e5f6-7890-abcd-ef1234567890";
671    const UUID2: &str = "b2c3d4e5-f6a7-8901-bcde-f12345678901";
672
673    fn full_line(uuid: &str, ts: &str, msg: &str) -> String {
674        format!("{uuid} {ts} 95.97% [DEBUG] sofia.c:100 {msg}")
675    }
676
677    const TS1: &str = "2025-01-15 10:30:45.123456";
678    const TS2: &str = "2025-01-15 10:30:46.234567";
679
680    // --- Existing behavior tests (preserved) ---
681
682    #[test]
683    fn inherits_uuid_for_bare_continuation() {
684        let lines = vec![
685            full_line(UUID1, TS1, "CHANNEL_DATA:"),
686            "variable_foo: [bar]".to_string(),
687            "variable_baz: [qux]".to_string(),
688        ];
689        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
690        assert_eq!(entries.len(), 1);
691        assert_eq!(entries[0].uuid, UUID1);
692        assert_eq!(entries[0].attached.len(), 2);
693        assert_eq!(entries[0].attached[0], "variable_foo: [bar]");
694        assert_eq!(entries[0].attached[1], "variable_baz: [qux]");
695    }
696
697    #[test]
698    fn inherits_timestamp_for_uuid_continuation() {
699        let lines = vec![
700            full_line(UUID1, TS1, "First"),
701            format!("{UUID2} Channel-State: [CS_EXECUTE]"),
702        ];
703        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
704        assert_eq!(entries.len(), 2);
705        assert_eq!(entries[0].timestamp, TS1);
706        assert_eq!(entries[1].uuid, UUID2);
707        assert_eq!(entries[1].timestamp, TS1);
708    }
709
710    #[test]
711    fn new_full_line_yields_previous() {
712        let lines = vec![
713            full_line(UUID1, TS1, "First"),
714            full_line(UUID2, TS2, "Second"),
715        ];
716        let mut stream = LogStream::new(lines.into_iter());
717        let first = stream.next().unwrap();
718        assert_eq!(first.uuid, UUID1);
719        assert_eq!(first.message, "First");
720        let second = stream.next().unwrap();
721        assert_eq!(second.uuid, UUID2);
722        assert_eq!(second.message, "Second");
723        assert!(stream.next().is_none());
724    }
725
726    #[test]
727    fn channel_data_collected_as_attached() {
728        let lines = vec![
729            full_line(UUID1, TS1, "CHANNEL_DATA:"),
730            format!("{UUID1} Channel-Name: [sofia/internal/+15550001234@192.0.2.1]"),
731            format!("{UUID1} Unique-ID: [{UUID1}]"),
732            "variable_sip_call_id: [test123@192.0.2.1]".to_string(),
733        ];
734        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
735        assert_eq!(entries.len(), 1);
736        assert_eq!(entries[0].message, "CHANNEL_DATA:");
737        assert_eq!(entries[0].attached.len(), 3);
738    }
739
740    #[test]
741    fn sdp_body_collected_as_attached() {
742        let lines = vec![
743            full_line(UUID1, TS1, "Local SDP:"),
744            "v=0".to_string(),
745            "o=- 1234 5678 IN IP4 192.0.2.1".to_string(),
746            "s=-".to_string(),
747            "c=IN IP4 192.0.2.1".to_string(),
748            "m=audio 10000 RTP/AVP 0".to_string(),
749        ];
750        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
751        assert_eq!(entries.len(), 1);
752        assert_eq!(entries[0].attached.len(), 5);
753    }
754
755    #[test]
756    fn truncated_starts_new_entry() {
757        let lines = vec![
758            full_line(UUID1, TS1, "First"),
759            format!(
760                "varia{UUID2} EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 set(x=y)"
761            ),
762        ];
763        let mut stream = LogStream::new(lines.into_iter());
764        let first = stream.next().unwrap();
765        assert_eq!(first.uuid, UUID1);
766        assert_eq!(first.message, "First");
767        let second = stream.next().unwrap();
768        assert_eq!(second.uuid, UUID2);
769        assert_eq!(second.kind, LineKind::Truncated);
770    }
771
772    #[test]
773    fn empty_lines_in_attached() {
774        let lines = vec![
775            full_line(UUID1, TS1, "First"),
776            String::new(),
777            "continuation".to_string(),
778        ];
779        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
780        assert_eq!(entries.len(), 1);
781        assert_eq!(entries[0].attached.len(), 2);
782        assert_eq!(entries[0].attached[0], "");
783        assert_eq!(entries[0].attached[1], "continuation");
784    }
785
786    #[test]
787    fn system_line_no_uuid() {
788        let lines = vec![format!(
789            "{TS1} 95.97% [INFO] mod_event_socket.c:1772 Event Socket command"
790        )];
791        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
792        assert_eq!(entries.len(), 1);
793        assert_eq!(entries[0].uuid, "");
794        assert_eq!(entries[0].kind, LineKind::System);
795    }
796
797    #[test]
798    fn final_entry_on_exhaustion() {
799        let lines = vec![full_line(UUID1, TS1, "Only entry")];
800        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
801        assert_eq!(entries.len(), 1);
802        assert_eq!(entries[0].message, "Only entry");
803    }
804
805    #[test]
806    fn consecutive_full_lines() {
807        let lines = vec![
808            full_line(UUID1, TS1, "First"),
809            full_line(UUID1, TS2, "Second"),
810            full_line(UUID2, TS1, "Third"),
811        ];
812        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
813        assert_eq!(entries.len(), 3);
814        for entry in &entries {
815            assert!(entry.attached.is_empty());
816        }
817    }
818
819    #[test]
820    fn execute_after_channel_data_same_uuid() {
821        let lines = vec![
822            full_line(UUID1, TS1, "CHANNEL_DATA:"),
823            format!("{UUID1} Channel-State: [CS_EXECUTE]"),
824            format!("{UUID1} variable_sip_call_id: [test@192.0.2.1]"),
825            "variable_foo: [bar]".to_string(),
826            String::new(),
827            String::new(),
828            format!("{UUID1} EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 export(originate_timeout=3600)"),
829            full_line(UUID1, TS2, "EXPORT (export_vars) [originate_timeout]=[3600]"),
830        ];
831        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
832        assert_eq!(entries.len(), 3);
833        assert_eq!(entries[0].message, "CHANNEL_DATA:");
834        assert_eq!(entries[0].attached.len(), 5);
835        assert_eq!(entries[1].message, "EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 export(originate_timeout=3600)");
836        assert_eq!(entries[1].kind, LineKind::UuidContinuation);
837        assert_eq!(
838            entries[2].message,
839            "EXPORT (export_vars) [originate_timeout]=[3600]"
840        );
841    }
842
843    #[test]
844    fn execute_between_full_lines_same_uuid() {
845        let lines = vec![
846            full_line(UUID1, TS1, "CoreSession::setVariable(X-C911P-City, ST GEORGES)"),
847            format!("{UUID1} EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 db(insert/ng_{UUID1}/city/ST GEORGES)"),
848            full_line(UUID1, TS2, "CoreSession::setVariable(X-C911P-Region, SGS)"),
849        ];
850        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
851        assert_eq!(entries.len(), 3);
852        assert_eq!(
853            entries[0].message,
854            "CoreSession::setVariable(X-C911P-City, ST GEORGES)"
855        );
856        assert!(entries[0].attached.is_empty());
857        assert!(entries[1].message.starts_with("EXECUTE "));
858        assert_eq!(entries[1].kind, LineKind::UuidContinuation);
859        assert_eq!(
860            entries[2].message,
861            "CoreSession::setVariable(X-C911P-Region, SGS)"
862        );
863    }
864
865    #[test]
866    fn multiple_execute_between_full_lines() {
867        let lines = vec![
868            full_line(UUID1, TS1, "CoreSession::setVariable(ngcs_call_id, urn:emergency:uid:callid:test)"),
869            format!("{UUID1} EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 db(insert/ng_{UUID1}/call_id/urn:emergency:uid:callid:test)"),
870            format!("{UUID1} EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 db(insert/callid_codecs/urn:emergency:uid:callid:test/PCMU@8000h)"),
871            full_line(UUID1, TS2, "CoreSession::setVariable(ngcs_short_call_id, test)"),
872        ];
873        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
874        assert_eq!(entries.len(), 4);
875        assert!(entries[0].attached.is_empty());
876        assert!(entries[1].message.contains("call_id"));
877        assert!(entries[2].message.contains("callid_codecs"));
878        assert_eq!(
879            entries[3].message,
880            "CoreSession::setVariable(ngcs_short_call_id, test)"
881        );
882    }
883
884    #[test]
885    fn uuid_continuation_different_uuid_yields() {
886        let lines = vec![
887            full_line(UUID1, TS1, "First"),
888            format!("{UUID1} Channel-State: [CS_EXECUTE]"),
889            format!("{UUID2} Dialplan: sofia/internal/+15550001234@192.0.2.1 parsing [public]"),
890        ];
891        let mut stream = LogStream::new(lines.into_iter());
892        let first = stream.next().unwrap();
893        assert_eq!(first.uuid, UUID1);
894        assert_eq!(first.attached.len(), 1);
895        let second = stream.next().unwrap();
896        assert_eq!(second.uuid, UUID2);
897        assert_eq!(
898            second.message,
899            "Dialplan: sofia/internal/+15550001234@192.0.2.1 parsing [public]"
900        );
901    }
902
903    // --- New: Block detection tests ---
904
905    #[test]
906    fn channel_data_block_fields_and_variables() {
907        let lines = vec![
908            full_line(UUID1, TS1, "CHANNEL_DATA:"),
909            format!("{UUID1} Channel-Name: [sofia/internal/+15550001234@192.0.2.1]"),
910            format!("{UUID1} Channel-State: [CS_EXECUTE]"),
911            format!("{UUID1} Unique-ID: [{UUID1}]"),
912            "variable_sip_call_id: [test123@192.0.2.1]".to_string(),
913            "variable_direction: [inbound]".to_string(),
914        ];
915        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
916        assert_eq!(entries.len(), 1);
917        assert_eq!(entries[0].message_kind, MessageKind::ChannelData);
918        let block = entries[0].block.as_ref().expect("should have block");
919        match block {
920            Block::ChannelData { fields, variables } => {
921                assert_eq!(fields.len(), 3);
922                assert_eq!(
923                    fields[0],
924                    (
925                        "Channel-Name".to_string(),
926                        "sofia/internal/+15550001234@192.0.2.1".to_string()
927                    )
928                );
929                assert_eq!(
930                    fields[1],
931                    ("Channel-State".to_string(), "CS_EXECUTE".to_string())
932                );
933                assert_eq!(fields[2], ("Unique-ID".to_string(), UUID1.to_string()));
934                assert_eq!(variables.len(), 2);
935                assert_eq!(
936                    variables[0],
937                    (
938                        "variable_sip_call_id".to_string(),
939                        "test123@192.0.2.1".to_string()
940                    )
941                );
942                assert_eq!(
943                    variables[1],
944                    ("variable_direction".to_string(), "inbound".to_string())
945                );
946            }
947            other => panic!("expected ChannelData block, got {other:?}"),
948        }
949    }
950
951    #[test]
952    fn channel_data_multiline_variable_reassembly() {
953        let lines = vec![
954            full_line(UUID1, TS1, "CHANNEL_DATA:"),
955            format!("{UUID1} Channel-Name: [sofia/internal/+15550001234@192.0.2.1]"),
956            "variable_switch_r_sdp: [v=0".to_string(),
957            "o=- 1234 5678 IN IP4 192.0.2.1".to_string(),
958            "s=-".to_string(),
959            "c=IN IP4 192.0.2.1".to_string(),
960            "m=audio 47758 RTP/AVP 0 101".to_string(),
961            "a=rtpmap:0 PCMU/8000".to_string(),
962            "]".to_string(),
963            "variable_direction: [inbound]".to_string(),
964        ];
965        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
966        assert_eq!(entries.len(), 1);
967        let block = entries[0].block.as_ref().expect("should have block");
968        match block {
969            Block::ChannelData { fields, variables } => {
970                assert_eq!(fields.len(), 1);
971                assert_eq!(variables.len(), 2);
972                assert_eq!(variables[0].0, "variable_switch_r_sdp");
973                assert!(variables[0].1.starts_with("v=0\n"));
974                assert!(variables[0].1.contains("m=audio 47758 RTP/AVP 0 101"));
975                assert!(!variables[0].1.ends_with(']'));
976                assert_eq!(
977                    variables[1],
978                    ("variable_direction".to_string(), "inbound".to_string())
979                );
980            }
981            other => panic!("expected ChannelData block, got {other:?}"),
982        }
983        assert_eq!(entries[0].attached.len(), 9);
984    }
985
986    #[test]
987    fn sdp_block_detection() {
988        let lines = vec![
989            full_line(UUID1, TS1, "Local SDP:"),
990            "v=0".to_string(),
991            "o=- 1234 5678 IN IP4 192.0.2.1".to_string(),
992            "s=-".to_string(),
993            "c=IN IP4 192.0.2.1".to_string(),
994            "m=audio 10000 RTP/AVP 0".to_string(),
995            "a=rtpmap:0 PCMU/8000".to_string(),
996        ];
997        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
998        assert_eq!(entries.len(), 1);
999        match &entries[0].message_kind {
1000            MessageKind::SdpMarker { direction } => assert_eq!(*direction, SdpDirection::Local),
1001            other => panic!("expected SdpMarker, got {other:?}"),
1002        }
1003        let block = entries[0].block.as_ref().expect("should have block");
1004        match block {
1005            Block::Sdp { direction, body } => {
1006                assert_eq!(*direction, SdpDirection::Local);
1007                assert_eq!(body.len(), 6);
1008                assert_eq!(body[0], "v=0");
1009                assert_eq!(body[5], "a=rtpmap:0 PCMU/8000");
1010            }
1011            other => panic!("expected Sdp block, got {other:?}"),
1012        }
1013    }
1014
1015    #[test]
1016    fn sdp_block_terminated_by_primary_line() {
1017        let lines = vec![
1018            full_line(UUID1, TS1, "Remote SDP:"),
1019            "v=0".to_string(),
1020            "m=audio 10000 RTP/AVP 0".to_string(),
1021            full_line(UUID1, TS2, "Next event"),
1022        ];
1023        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1024        assert_eq!(entries.len(), 2);
1025        let block = entries[0].block.as_ref().expect("should have block");
1026        match block {
1027            Block::Sdp { direction, body } => {
1028                assert_eq!(*direction, SdpDirection::Remote);
1029                assert_eq!(body.len(), 2);
1030            }
1031            other => panic!("expected Sdp block, got {other:?}"),
1032        }
1033        assert!(entries[1].block.is_none());
1034    }
1035
1036    #[test]
1037    fn sdp_from_uuid_continuation() {
1038        let lines = vec![
1039            format!("{UUID1} Local SDP:"),
1040            format!("{UUID1} v=0"),
1041            format!("{UUID1} o=FreeSWITCH 1234 5678 IN IP4 192.0.2.1"),
1042            format!("{UUID1} s=FreeSWITCH"),
1043            format!("{UUID1} c=IN IP4 192.0.2.1"),
1044        ];
1045        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1046        assert_eq!(entries.len(), 1);
1047        let block = entries[0].block.as_ref().expect("should have block");
1048        match block {
1049            Block::Sdp { direction, body } => {
1050                assert_eq!(*direction, SdpDirection::Local);
1051                assert_eq!(body.len(), 4);
1052                assert_eq!(body[0], "v=0");
1053            }
1054            other => panic!("expected Sdp block, got {other:?}"),
1055        }
1056    }
1057
1058    #[test]
1059    fn channel_data_interrupted_by_different_uuid() {
1060        let lines = vec![
1061            full_line(UUID1, TS1, "CHANNEL_DATA:"),
1062            format!("{UUID1} Channel-Name: [sofia/internal/+15550001234@192.0.2.1]"),
1063            format!("{UUID2} Dialplan: sofia/internal/+15559999999@192.0.2.1 parsing [public]"),
1064        ];
1065        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1066        assert_eq!(entries.len(), 2);
1067        let block = entries[0].block.as_ref().expect("should have block");
1068        match block {
1069            Block::ChannelData { fields, .. } => {
1070                assert_eq!(fields.len(), 1);
1071            }
1072            other => panic!("expected ChannelData, got {other:?}"),
1073        }
1074    }
1075
1076    #[test]
1077    fn no_block_for_non_block_message() {
1078        let lines = vec![full_line(UUID1, TS1, "some random freeswitch log message")];
1079        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1080        assert_eq!(entries.len(), 1);
1081        assert!(entries[0].block.is_none());
1082        assert_eq!(entries[0].message_kind, MessageKind::General);
1083    }
1084
1085    #[test]
1086    fn message_kind_on_execute() {
1087        let lines = vec![
1088            full_line(UUID1, TS1, "First"),
1089            format!("{UUID1} EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 set(foo=bar)"),
1090        ];
1091        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1092        assert_eq!(entries.len(), 2);
1093        match &entries[1].message_kind {
1094            MessageKind::Execute {
1095                application,
1096                arguments,
1097                ..
1098            } => {
1099                assert_eq!(application, "set");
1100                assert_eq!(arguments, "foo=bar");
1101            }
1102            other => panic!("expected Execute, got {other:?}"),
1103        }
1104    }
1105
1106    // --- New: ParseStats tests ---
1107
1108    #[test]
1109    fn stats_lines_processed() {
1110        let lines = vec![
1111            full_line(UUID1, TS1, "First"),
1112            full_line(UUID1, TS2, "Second"),
1113            format!("{UUID1} Channel-State: [CS_EXECUTE]"),
1114        ];
1115        let mut stream = LogStream::new(lines.into_iter());
1116        let _: Vec<_> = stream.by_ref().collect();
1117        assert_eq!(stream.stats().lines_processed, 3);
1118    }
1119
1120    #[test]
1121    fn stats_unclassified_orphan() {
1122        let lines = vec![
1123            "variable_foo: [bar]".to_string(),
1124            full_line(UUID1, TS1, "After orphan"),
1125        ];
1126        let mut stream = LogStream::new(lines.into_iter())
1127            .unclassified_tracking(UnclassifiedTracking::TrackLines);
1128        let _: Vec<_> = stream.by_ref().collect();
1129        assert_eq!(stream.stats().lines_unclassified, 1);
1130        assert_eq!(stream.stats().unclassified_lines.len(), 1);
1131        assert_eq!(
1132            stream.stats().unclassified_lines[0].reason,
1133            UnclassifiedReason::OrphanContinuation,
1134        );
1135    }
1136
1137    #[test]
1138    fn stats_capture_data() {
1139        let lines = vec!["orphan line".to_string(), full_line(UUID1, TS1, "After")];
1140        let mut stream = LogStream::new(lines.into_iter())
1141            .unclassified_tracking(UnclassifiedTracking::CaptureData);
1142        let _: Vec<_> = stream.by_ref().collect();
1143        assert_eq!(stream.stats().unclassified_lines.len(), 1);
1144        assert_eq!(
1145            stream.stats().unclassified_lines[0].data.as_deref(),
1146            Some("orphan line"),
1147        );
1148    }
1149
1150    #[test]
1151    fn stats_count_only_no_allocation() {
1152        let lines = vec!["orphan line".to_string(), full_line(UUID1, TS1, "After")];
1153        let mut stream = LogStream::new(lines.into_iter());
1154        let _: Vec<_> = stream.by_ref().collect();
1155        assert_eq!(stream.stats().lines_unclassified, 1);
1156        assert!(stream.stats().unclassified_lines.is_empty());
1157    }
1158
1159    #[test]
1160    fn line_number_tracking() {
1161        let lines = vec![
1162            full_line(UUID1, TS1, "First"),
1163            format!("{UUID1} Channel-State: [CS_EXECUTE]"),
1164            full_line(UUID2, TS2, "Third"),
1165        ];
1166        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1167        assert_eq!(entries[0].line_number, 1);
1168        assert_eq!(entries[1].line_number, 3);
1169    }
1170
1171    #[test]
1172    fn drain_unclassified() {
1173        let lines = vec![
1174            "orphan1".to_string(),
1175            "orphan2".to_string(),
1176            full_line(UUID1, TS1, "After"),
1177        ];
1178        let mut stream = LogStream::new(lines.into_iter())
1179            .unclassified_tracking(UnclassifiedTracking::TrackLines);
1180        let _: Vec<_> = stream.by_ref().collect();
1181        let drained = stream.drain_unclassified();
1182        assert_eq!(drained.len(), 1);
1183        assert!(stream.stats().unclassified_lines.is_empty());
1184        assert_eq!(stream.stats().lines_unclassified, 1);
1185    }
1186
1187    // BUG 1: When LogStream processes a TrackedChain of multiple file segments,
1188    // last_timestamp from the previous segment bleeds into continuation lines
1189    // at the start of the next segment. This causes entries to get timestamps
1190    // from a completely different file (potentially hours earlier).
1191    //
1192    // Reproduces: f2cb66d4 getting timestamp 23:58:03 from the rotated file
1193    // when freeswitch.log starts with its continuation lines.
1194    #[test]
1195    fn continuation_lines_at_file_boundary_must_not_inherit_previous_timestamp() {
1196        use crate::TrackedChain;
1197
1198        let uuid_a = "aaaaaaaa-1111-2222-3333-444444444444";
1199        let uuid_b = "bbbbbbbb-1111-2222-3333-444444444444";
1200        let ts_old = "2025-01-15 23:58:03.000000";
1201        let ts_new = "2025-01-16 08:37:12.000000";
1202
1203        let seg1: Vec<String> = vec![format!(
1204            "{uuid_a} {ts_old} 95.00% [DEBUG] test.c:1 Last line in rotated file"
1205        )];
1206
1207        // Segment 2 starts with UUID-continuation lines (Format C: UUID + message, no timestamp)
1208        // followed by a real timestamped line
1209        let seg2: Vec<String> = vec![
1210            format!("{uuid_b} CHANNEL_DATA:"),
1211            format!("{uuid_b} Channel-State: [CS_EXECUTE]"),
1212            format!("{uuid_b} {ts_new} 95.00% [DEBUG] test.c:1 First timestamped line in new file"),
1213        ];
1214
1215        let segments: Vec<(String, Box<dyn Iterator<Item = String>>)> = vec![
1216            ("rotated.log".to_string(), Box::new(seg1.into_iter())),
1217            ("freeswitch.log".to_string(), Box::new(seg2.into_iter())),
1218        ];
1219
1220        let (chain, _) = TrackedChain::new(segments);
1221        let entries: Vec<_> = LogStream::new(chain).collect();
1222
1223        let b_entry = entries
1224            .iter()
1225            .find(|e| e.uuid == uuid_b)
1226            .expect("should find entry for uuid_b");
1227
1228        // The CHANNEL_DATA entry for uuid_b must NOT have the timestamp from
1229        // segment 1 — it should either have the new file's first real timestamp
1230        // or be empty (indicating unknown).
1231        assert_ne!(
1232            b_entry.timestamp, ts_old,
1233            "continuation lines in a new file segment inherited timestamp \
1234             '{ts_old}' from the previous segment — timestamps must not bleed \
1235             across file boundaries"
1236        );
1237    }
1238
1239    // --- Line accounting tests ---
1240
1241    fn assert_accounting(stream: &LogStream<impl Iterator<Item = String>>) {
1242        let stats = stream.stats();
1243        assert_eq!(
1244            stats.unaccounted_lines(),
1245            0,
1246            "line accounting invariant violated: \
1247             processed={} + split={} != in_entries={} + empty_orphan={}",
1248            stats.lines_processed,
1249            stats.lines_split,
1250            stats.lines_in_entries,
1251            stats.lines_empty_orphan,
1252        );
1253    }
1254
1255    #[test]
1256    fn accounting_full_lines() {
1257        let lines = vec![
1258            full_line(UUID1, TS1, "First"),
1259            full_line(UUID2, TS2, "Second"),
1260        ];
1261        let mut stream = LogStream::new(lines.into_iter());
1262        let entries: Vec<_> = stream.by_ref().collect();
1263        assert_eq!(entries.len(), 2);
1264        assert_eq!(stream.stats().lines_in_entries, 2);
1265        assert_accounting(&stream);
1266    }
1267
1268    #[test]
1269    fn accounting_with_attached() {
1270        let lines = vec![
1271            full_line(UUID1, TS1, "CHANNEL_DATA:"),
1272            format!("{UUID1} Channel-State: [CS_EXECUTE]"),
1273            "variable_foo: [bar]".to_string(),
1274            full_line(UUID2, TS2, "Next"),
1275        ];
1276        let mut stream = LogStream::new(lines.into_iter());
1277        let entries: Vec<_> = stream.by_ref().collect();
1278        assert_eq!(entries.len(), 2);
1279        // Entry 1: 1 primary + 2 attached = 3 lines
1280        // Entry 2: 1 primary = 1 line
1281        assert_eq!(stream.stats().lines_in_entries, 4);
1282        assert_accounting(&stream);
1283    }
1284
1285    #[test]
1286    fn accounting_system_line() {
1287        let lines = vec![format!(
1288            "{TS1} 95.97% [NOTICE] mod_logfile.c:217 New log started."
1289        )];
1290        let mut stream = LogStream::new(lines.into_iter());
1291        let _: Vec<_> = stream.by_ref().collect();
1292        assert_eq!(stream.stats().lines_in_entries, 1);
1293        assert_accounting(&stream);
1294    }
1295
1296    #[test]
1297    fn accounting_empty_orphan() {
1298        let lines = vec![
1299            String::new(),
1300            "   ".to_string(),
1301            full_line(UUID1, TS1, "After"),
1302        ];
1303        let mut stream = LogStream::new(lines.into_iter());
1304        let entries: Vec<_> = stream.by_ref().collect();
1305        assert_eq!(entries.len(), 1);
1306        assert_eq!(stream.stats().lines_empty_orphan, 2);
1307        assert_accounting(&stream);
1308    }
1309
1310    #[test]
1311    fn accounting_empty_attached() {
1312        let lines = vec![
1313            full_line(UUID1, TS1, "First"),
1314            String::new(),
1315            "continuation".to_string(),
1316        ];
1317        let mut stream = LogStream::new(lines.into_iter());
1318        let entries: Vec<_> = stream.by_ref().collect();
1319        assert_eq!(entries.len(), 1);
1320        assert_eq!(entries[0].attached.len(), 2);
1321        assert_eq!(stream.stats().lines_empty_orphan, 0);
1322        assert_eq!(stream.stats().lines_in_entries, 3);
1323        assert_accounting(&stream);
1324    }
1325
1326    #[test]
1327    fn accounting_orphan_continuation() {
1328        let lines = vec!["orphan line".to_string(), full_line(UUID1, TS1, "After")];
1329        let mut stream = LogStream::new(lines.into_iter());
1330        let _: Vec<_> = stream.by_ref().collect();
1331        assert_accounting(&stream);
1332    }
1333
1334    #[test]
1335    fn accounting_codec_merging() {
1336        let lines = vec![
1337            full_line(
1338                UUID1,
1339                TS1,
1340                "Audio Codec Compare [PCMU:0:8000:20:64000:1]/[PCMU:0:8000:20:64000:1]",
1341            ),
1342            full_line(
1343                UUID1,
1344                TS1,
1345                "Audio Codec Compare [PCMU:0:8000:20:64000:1] is saved as a match",
1346            ),
1347            full_line(UUID2, TS2, "Next"),
1348        ];
1349        let mut stream = LogStream::new(lines.into_iter());
1350        let _: Vec<_> = stream.by_ref().collect();
1351        assert_accounting(&stream);
1352    }
1353
1354    #[test]
1355    fn accounting_truncated_line() {
1356        let lines = vec![
1357            full_line(UUID1, TS1, "First"),
1358            format!(
1359                "varia{UUID2} EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 set(x=y)"
1360            ),
1361        ];
1362        let mut stream = LogStream::new(lines.into_iter());
1363        let _: Vec<_> = stream.by_ref().collect();
1364        assert_accounting(&stream);
1365    }
1366
1367    #[test]
1368    fn accounting_long_line_collision_split() {
1369        // Simulate a long variable value exceeding mod_logfile's 2048-byte buffer,
1370        // followed by a collision UUID on the same physical line.
1371        let long_value = "x".repeat(MAX_LINE_PAYLOAD + 10);
1372        let line = format!(
1373            "variable_sip_multipart: [{long_value}]{UUID2} EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 set(foo=bar)"
1374        );
1375        let lines = vec![full_line(UUID1, TS1, "CHANNEL_DATA:"), line];
1376        let mut stream = LogStream::new(lines.into_iter());
1377        let entries: Vec<_> = stream.by_ref().collect();
1378
1379        // The CHANNEL_DATA entry should have the truncated variable as attached
1380        assert_eq!(entries[0].message, "CHANNEL_DATA:");
1381
1382        // The collision should have been split out as a separate entry
1383        let split_entry = entries.iter().find(|e| e.uuid == UUID2);
1384        assert!(
1385            split_entry.is_some(),
1386            "collision UUID should produce a separate entry"
1387        );
1388
1389        assert_eq!(stream.stats().lines_split, 1);
1390        assert_accounting(&stream);
1391    }
1392
1393    #[test]
1394    fn no_split_on_short_lines() {
1395        // Lines within the payload limit should never be split,
1396        // even if they happen to contain a UUID-like pattern.
1397        let line = format!("variable_call_uuid: [{UUID2}]");
1398        let lines = vec![full_line(UUID1, TS1, "CHANNEL_DATA:"), line];
1399        let mut stream = LogStream::new(lines.into_iter());
1400        let entries: Vec<_> = stream.by_ref().collect();
1401        assert_eq!(entries.len(), 1);
1402        assert_eq!(stream.stats().lines_split, 0);
1403        assert_accounting(&stream);
1404    }
1405
1406    #[test]
1407    fn timestamp_collision_splits_system_lines() {
1408        let line = format!(
1409            "{TS1} 98.03% [INFO] mod_event_socket.c:1752 Event Socket Command from ::1:42864: api sofia jsonstatus{TS2} 97.93% [INFO] mod_event_socket.c:1752 Event Socket Command from ::1:42898: api fsctl pause_check"
1410        );
1411        let mut stream = LogStream::new(std::iter::once(line));
1412        let entries: Vec<_> = stream.by_ref().collect();
1413        assert_eq!(entries.len(), 2);
1414        assert_eq!(
1415            entries[0].message,
1416            "Event Socket Command from ::1:42864: api sofia jsonstatus"
1417        );
1418        assert_eq!(
1419            entries[1].message,
1420            "Event Socket Command from ::1:42898: api fsctl pause_check"
1421        );
1422        assert_eq!(stream.stats().lines_split, 1);
1423        assert_accounting(&stream);
1424    }
1425
1426    #[test]
1427    fn timestamp_collision_splits_three_entries() {
1428        let ts3 = "2025-01-15 10:30:47.345678";
1429        let line = format!(
1430            "{TS1} 95.00% [INFO] mod.c:1 first{TS2} 96.00% [INFO] mod.c:1 second{ts3} 97.00% [INFO] mod.c:1 third"
1431        );
1432        let mut stream = LogStream::new(std::iter::once(line));
1433        let entries: Vec<_> = stream.by_ref().collect();
1434        assert_eq!(entries.len(), 3);
1435        assert_eq!(entries[0].message, "first");
1436        assert_eq!(entries[1].message, "second");
1437        assert_eq!(entries[2].message, "third");
1438        assert_eq!(stream.stats().lines_split, 2);
1439        assert_accounting(&stream);
1440    }
1441
1442    #[test]
1443    fn timestamp_collision_with_uuid_prefix() {
1444        // System line collides with Full line (UUID + timestamp)
1445        let line = format!(
1446            "{TS1} 95.00% [INFO] mod.c:1 first{UUID1} {TS2} 96.00% [DEBUG] sofia.c:100 second"
1447        );
1448        let mut stream = LogStream::new(std::iter::once(line));
1449        let entries: Vec<_> = stream.by_ref().collect();
1450        assert_eq!(entries.len(), 2);
1451        assert_eq!(entries[0].message, "first");
1452        assert_eq!(entries[1].uuid, UUID1);
1453        assert_eq!(entries[1].message, "second");
1454        assert_eq!(stream.stats().lines_split, 1);
1455        assert_accounting(&stream);
1456    }
1457
1458    #[test]
1459    fn channel_data_multiline_variable_spans_many_lines() {
1460        let lines = vec![
1461            full_line(UUID1, TS1, "CHANNEL_DATA:"),
1462            format!("{UUID1} Channel-Name: [sofia/internal/+15550001234@192.0.2.1]"),
1463            "variable_switch_r_sdp: [v=0".to_string(),
1464            "o=- 1234 5678 IN IP4 192.0.2.1".to_string(),
1465            "s=-".to_string(),
1466            "c=IN IP4 192.0.2.1".to_string(),
1467            "t=0 0".to_string(),
1468            "m=audio 47758 RTP/AVP 0 8 101".to_string(),
1469            "a=rtpmap:0 PCMU/8000".to_string(),
1470            "a=rtpmap:8 PCMA/8000".to_string(),
1471            "a=rtpmap:101 telephone-event/8000".to_string(),
1472            "a=fmtp:101 0-16".to_string(),
1473            "]".to_string(),
1474            "variable_direction: [inbound]".to_string(),
1475        ];
1476        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1477        assert_eq!(entries.len(), 1);
1478        let block = entries[0].block.as_ref().expect("should have block");
1479        match block {
1480            Block::ChannelData { fields, variables } => {
1481                assert_eq!(fields.len(), 1);
1482                assert_eq!(variables.len(), 2);
1483                assert_eq!(variables[0].0, "variable_switch_r_sdp");
1484                let sdp = &variables[0].1;
1485                assert!(sdp.starts_with("v=0\n"));
1486                assert!(sdp.contains("a=fmtp:101 0-16"));
1487                assert!(!sdp.ends_with(']'));
1488                assert_eq!(variables[1].0, "variable_direction");
1489            }
1490            other => panic!("expected ChannelData block, got {other:?}"),
1491        }
1492    }
1493
1494    #[test]
1495    fn sdp_from_verto_update_media() {
1496        let lines = vec![
1497            full_line(UUID1, TS1, "updateMedia: Local SDP"),
1498            "v=0".to_string(),
1499            "o=- 1234 5678 IN IP4 192.0.2.1".to_string(),
1500            "m=audio 10000 RTP/AVP 0".to_string(),
1501        ];
1502        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1503        assert_eq!(entries.len(), 1);
1504        match &entries[0].message_kind {
1505            MessageKind::SdpMarker { direction } => assert_eq!(*direction, SdpDirection::Local),
1506            other => panic!("expected SdpMarker, got {other:?}"),
1507        }
1508        let block = entries[0].block.as_ref().expect("should have block");
1509        match block {
1510            Block::Sdp { direction, body } => {
1511                assert_eq!(*direction, SdpDirection::Local);
1512                assert_eq!(body.len(), 3);
1513            }
1514            other => panic!("expected Sdp block, got {other:?}"),
1515        }
1516    }
1517
1518    #[test]
1519    fn duplicate_sdp_marker() {
1520        let lines = vec![
1521            full_line(UUID1, TS1, "Duplicate SDP"),
1522            "v=0".to_string(),
1523            "m=audio 10000 RTP/AVP 0".to_string(),
1524        ];
1525        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1526        assert_eq!(entries.len(), 1);
1527        match &entries[0].message_kind {
1528            MessageKind::SdpMarker { direction } => assert_eq!(*direction, SdpDirection::Unknown),
1529            other => panic!("expected SdpMarker, got {other:?}"),
1530        }
1531        assert!(entries[0].block.is_some());
1532    }
1533
1534    #[test]
1535    fn warning_on_unclosed_multiline_variable() {
1536        let lines = vec![
1537            full_line(UUID1, TS1, "CHANNEL_DATA:"),
1538            "variable_switch_r_sdp: [v=0".to_string(),
1539            "o=- 1234 5678 IN IP4 192.0.2.1".to_string(),
1540            full_line(UUID2, TS2, "Next entry"),
1541        ];
1542        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1543        assert_eq!(entries.len(), 2);
1544        assert!(
1545            entries[0]
1546                .warnings
1547                .iter()
1548                .any(|w| w.contains("unclosed multi-line variable")),
1549            "expected unclosed variable warning, got: {:?}",
1550            entries[0].warnings
1551        );
1552    }
1553
1554    #[test]
1555    fn warning_on_unparseable_channel_data_line() {
1556        let lines = vec![
1557            full_line(UUID1, TS1, "CHANNEL_DATA:"),
1558            format!("{UUID1} Channel-Name: [sofia/internal/+15550001234@192.0.2.1]"),
1559            format!("{UUID1} this is not a valid field line"),
1560        ];
1561        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1562        assert_eq!(entries.len(), 1);
1563        assert!(
1564            entries[0]
1565                .warnings
1566                .iter()
1567                .any(|w| w.contains("unparseable CHANNEL_DATA")),
1568            "expected unparseable warning, got: {:?}",
1569            entries[0].warnings
1570        );
1571    }
1572
1573    #[test]
1574    fn warning_on_unexpected_codec_continuation() {
1575        let lines = vec![
1576            full_line(
1577                UUID1,
1578                TS1,
1579                "Audio Codec Compare [PCMU:0:8000:20:64000:1]/[PCMU:0:8000:20:64000:1]",
1580            ),
1581            format!("{UUID1} some unexpected continuation line"),
1582        ];
1583        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1584        assert_eq!(entries.len(), 1);
1585        assert!(
1586            entries[0]
1587                .warnings
1588                .iter()
1589                .any(|w| w.contains("unexpected codec negotiation")),
1590            "expected codec warning, got: {:?}",
1591            entries[0].warnings
1592        );
1593    }
1594
1595    #[test]
1596    fn system_line_uuid_continuation_not_absorbed() {
1597        // After the bug fix, a UUID continuation should NOT be absorbed
1598        // by a pending system line (empty UUID).
1599        let lines = vec![
1600            format!("{TS1} 95.97% [INFO] mod_event_socket.c:1772 Event Socket command"),
1601            format!("{UUID1} Channel-State: [CS_EXECUTE]"),
1602        ];
1603        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1604        assert_eq!(
1605            entries.len(),
1606            2,
1607            "UUID continuation should not be absorbed by system entry"
1608        );
1609        assert_eq!(entries[0].uuid, "");
1610        assert_eq!(entries[1].uuid, UUID1);
1611    }
1612
1613    #[test]
1614    fn truncated_collision_in_channel_data_variable() {
1615        // A CHANNEL_DATA block where a variable value exceeds the 2048-byte
1616        // mod_logfile buffer, causing a truncated collision (Format E).
1617        // The variable_long_xml value opens with [ but the buffer truncation
1618        // causes a UUID+EXECUTE to collide on the same physical line before
1619        // the closing ].
1620        let padding = "x".repeat(2000);
1621        let collision_line = format!(
1622            "{UUID1} variable_long_xml: [{padding}{UUID1} EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 export(foo=bar)"
1623        );
1624        assert!(
1625            collision_line.len() > super::MAX_LINE_PAYLOAD,
1626            "test line must exceed buffer limit, got {}",
1627            collision_line.len()
1628        );
1629
1630        let lines = vec![
1631            full_line(UUID1, TS1, "CHANNEL_DATA:"),
1632            format!("{UUID1} Channel-Name: [sofia/internal/+15550001234@192.0.2.1]"),
1633            format!("{UUID1} variable_direction: [inbound]"),
1634            collision_line,
1635            full_line(UUID1, TS2, "Next log entry"),
1636        ];
1637
1638        let entries: Vec<_> = LogStream::new(lines.into_iter()).collect();
1639
1640        // Entry 0: CHANNEL_DATA with the variables
1641        assert_eq!(entries[0].message, "CHANNEL_DATA:");
1642        let block = entries[0].block.as_ref().expect("should have block");
1643        match block {
1644            Block::ChannelData { fields, variables } => {
1645                assert_eq!(fields.len(), 1, "should have Channel-Name field");
1646                assert_eq!(fields[0].0, "Channel-Name");
1647                assert_eq!(
1648                    variables.len(),
1649                    2,
1650                    "should have direction + unclosed long_xml"
1651                );
1652                assert_eq!(variables[0].0, "variable_direction");
1653                assert_eq!(variables[0].1, "inbound");
1654                assert_eq!(variables[1].0, "variable_long_xml");
1655            }
1656            other => panic!("expected ChannelData block, got {other:?}"),
1657        }
1658        assert!(
1659            entries[0]
1660                .warnings
1661                .iter()
1662                .any(|w| w.contains("line exceeds mod_logfile 2048-byte buffer")),
1663            "expected buffer overflow warning, got: {:?}",
1664            entries[0].warnings
1665        );
1666        assert!(
1667            entries[0]
1668                .warnings
1669                .iter()
1670                .any(|w| w.contains("unclosed multi-line variable")),
1671            "expected unclosed variable warning, got: {:?}",
1672            entries[0].warnings
1673        );
1674
1675        // Entry 1: the split EXECUTE line
1676        assert_eq!(entries[1].uuid, UUID1);
1677        assert!(
1678            entries[1].message.starts_with("EXECUTE "),
1679            "split entry should be EXECUTE, got: {}",
1680            entries[1].message
1681        );
1682
1683        // Entry 2: the next full log line
1684        assert_eq!(entries.len(), 3);
1685        assert_eq!(entries[2].message, "Next log entry");
1686    }
1687
1688    #[test]
1689    fn channel_data_uuid_drops_mid_block() {
1690        // Production scenario: mod_logfile stops prepending the UUID mid-way
1691        // through a CHANNEL_DATA dump. The first few variable lines carry the
1692        // UUID prefix (UuidContinuation), then the remaining lines arrive as
1693        // bare continuations. All should be accumulated into the same block.
1694        let lines = vec![
1695            full_line(UUID1, TS1, "CHANNEL_DATA:"),
1696            format!("{UUID1} variable_max_forwards: [69]"),
1697            format!("{UUID1} variable_presence_id: [1251@[2001:db8::10]]"),
1698            format!("{UUID1} variable_sip_h_X-Custom-ID: [c4da84eb-88a7-40b2-b90d-e5bc2a0f634e]"),
1699            // UUID drops — bare continuations for the rest
1700            "variable_sip_h_X-Call-Info: [<urn:test:callid:20260316>;purpose=emergency-CallId]"
1701                .to_string(),
1702            "variable_ep_codec_string: [mod_opus.opus@48000h@20i@2c]".to_string(),
1703            "variable_remote_media_ip: [2001:db8::10]".to_string(),
1704            "variable_remote_media_port: [9952]".to_string(),
1705            "variable_rtp_use_codec_name: [opus]".to_string(),
1706            full_line(UUID1, TS2, "Next entry"),
1707        ];
1708
1709        let mut stream = LogStream::new(lines.into_iter());
1710        let entries: Vec<_> = stream.by_ref().collect();
1711
1712        assert_eq!(entries.len(), 2);
1713        assert_eq!(entries[0].message, "CHANNEL_DATA:");
1714        let block = entries[0].block.as_ref().expect("should have block");
1715        match block {
1716            Block::ChannelData { fields, variables } => {
1717                assert_eq!(fields.len(), 0);
1718                assert_eq!(variables.len(), 8);
1719                // UUID-prefixed variables
1720                assert_eq!(variables[0].0, "variable_max_forwards");
1721                assert_eq!(variables[0].1, "69");
1722                assert_eq!(variables[1].0, "variable_presence_id");
1723                assert_eq!(variables[1].1, "1251@[2001:db8::10]");
1724                assert_eq!(variables[2].0, "variable_sip_h_X-Custom-ID");
1725                // Bare variables (UUID dropped)
1726                assert_eq!(variables[3].0, "variable_sip_h_X-Call-Info");
1727                assert!(variables[3].1.contains("emergency-CallId"));
1728                assert_eq!(variables[4].0, "variable_ep_codec_string");
1729                assert_eq!(variables[7].0, "variable_rtp_use_codec_name");
1730                assert_eq!(variables[7].1, "opus");
1731            }
1732            other => panic!("expected ChannelData block, got {other:?}"),
1733        }
1734        assert_eq!(entries[0].attached.len(), 8);
1735        assert_eq!(entries[1].message, "Next entry");
1736        assert_accounting(&stream);
1737    }
1738
1739    #[test]
1740    fn channel_data_uuid_drops_with_multiline_variable() {
1741        // UUID drops mid-block AND a multi-line variable (SDP body embedded
1742        // in variable_switch_r_sdp) spans many bare continuation lines.
1743        // The \r characters are real — SDP uses \r\n per RFC 4566, and
1744        // mod_logfile splits on \n leaving \r in the content.
1745        let lines = vec![
1746            full_line(UUID1, TS1, "CHANNEL_DATA:"),
1747            format!("{UUID1} variable_max_forwards: [69]"),
1748            format!("{UUID1} variable_sip_h_X-Custom-ID: [c4da84eb-88a7-40b2-b90d-e5bc2a0f634e]"),
1749            // UUID drops
1750            "variable_switch_r_sdp: [v=0\r".to_string(),
1751            "o=FreeSWITCH 1773663549 1773663550 IN IP6 2001:db8::10\r".to_string(),
1752            "s=FreeSWITCH\r".to_string(),
1753            "c=IN IP6 2001:db8::10\r".to_string(),
1754            "t=0 0\r".to_string(),
1755            "m=audio 9952 RTP/AVP 102 101 13\r".to_string(),
1756            "a=rtpmap:102 opus/48000/2\r".to_string(),
1757            "a=ptime:20\r".to_string(),
1758            "]".to_string(),
1759            "variable_ep_codec_string: [mod_opus.opus@48000h@20i@2c]".to_string(),
1760            "variable_direction: [inbound]".to_string(),
1761            full_line(UUID1, TS2, "Next entry"),
1762        ];
1763
1764        let mut stream = LogStream::new(lines.into_iter());
1765        let entries: Vec<_> = stream.by_ref().collect();
1766
1767        assert_eq!(entries.len(), 2);
1768        let block = entries[0].block.as_ref().expect("should have block");
1769        match block {
1770            Block::ChannelData { fields, variables } => {
1771                assert_eq!(fields.len(), 0);
1772                assert_eq!(variables.len(), 5);
1773                assert_eq!(variables[0].0, "variable_max_forwards");
1774                assert_eq!(variables[1].0, "variable_sip_h_X-Custom-ID");
1775                // Multi-line SDP variable reassembled from bare continuations
1776                assert_eq!(variables[2].0, "variable_switch_r_sdp");
1777                let sdp = &variables[2].1;
1778                assert!(
1779                    sdp.starts_with("v=0\r\n"),
1780                    "SDP should start with v=0\\r\\n, got: {sdp:?}"
1781                );
1782                assert!(sdp.contains("m=audio 9952 RTP/AVP 102 101 13\r"));
1783                assert!(sdp.contains("a=ptime:20\r"));
1784                assert!(!sdp.ends_with(']'), "closing bracket should be stripped");
1785                // Post-SDP bare variables
1786                assert_eq!(variables[3].0, "variable_ep_codec_string");
1787                assert_eq!(variables[4].0, "variable_direction");
1788                assert_eq!(variables[4].1, "inbound");
1789            }
1790            other => panic!("expected ChannelData block, got {other:?}"),
1791        }
1792        // 2 UUID continuations + 9 SDP lines (open + 7 content + close) + 2 bare = 13
1793        assert_eq!(entries[0].attached.len(), 13);
1794        assert_accounting(&stream);
1795    }
1796
1797    #[test]
1798    fn channel_data_bare_variable_collision_with_execute() {
1799        // Production collision: bare variable_call_uuid line on same physical
1800        // line as a UUID EXECUTE. The UUID appears at byte 20 ("variable_call_uuid: "
1801        // is 20 chars), within find_uuid_in's 50-byte scan window, so Layer 1
1802        // classifies it as Truncated — extracting the UUID and EXECUTE message.
1803        // The CHANNEL_DATA block loses variable_call_uuid (eaten as truncation
1804        // prefix) but correctly recovers the EXECUTE as a separate entry.
1805        let collision = format!(
1806            "variable_call_uuid: {UUID1} EXECUTE [depth=0] \
1807             sofia/internal-v6/1251@[2001:db8::10] export(nolocal:test_var=value)"
1808        );
1809
1810        let lines = vec![
1811            full_line(UUID1, TS1, "CHANNEL_DATA:"),
1812            format!("{UUID1} variable_max_forwards: [69]"),
1813            // UUID drops — bare continuations
1814            "variable_DP_MATCH: [ARRAY::create_conference|:create_conference]".to_string(),
1815            collision,
1816            // Full line resumes normal logging
1817            full_line(
1818                UUID1,
1819                TS2,
1820                "EXPORT (export_vars) (REMOTE ONLY) [test_var]=[value]",
1821            ),
1822        ];
1823
1824        let mut stream = LogStream::new(lines.into_iter());
1825        let entries: Vec<_> = stream.by_ref().collect();
1826
1827        // Entry 0: CHANNEL_DATA — variable_call_uuid lost to truncation prefix
1828        assert_eq!(entries.len(), 3);
1829        let block = entries[0].block.as_ref().expect("should have block");
1830        match block {
1831            Block::ChannelData { fields, variables } => {
1832                assert_eq!(fields.len(), 0);
1833                assert_eq!(variables.len(), 2);
1834                assert_eq!(variables[0].0, "variable_max_forwards");
1835                assert_eq!(variables[1].0, "variable_DP_MATCH");
1836            }
1837            other => panic!("expected ChannelData block, got {other:?}"),
1838        }
1839
1840        // Entry 1: EXECUTE recovered from the Truncated classification
1841        assert_eq!(entries[1].uuid, UUID1);
1842        assert_eq!(entries[1].kind, LineKind::Truncated);
1843        assert!(
1844            entries[1].message.starts_with("EXECUTE "),
1845            "truncated line should yield EXECUTE, got: {}",
1846            entries[1].message
1847        );
1848
1849        // Entry 2: normal EXPORT line
1850        assert_eq!(entries[2].message_kind.label(), "variable");
1851        assert_accounting(&stream);
1852    }
1853
1854    #[test]
1855    fn system_line_with_embedded_uuid_gets_entry_uuid() {
1856        // System lines (Format B) where switch_cpp.cpp logs the UUID at the
1857        // start of the message body should produce entries with the correct UUID.
1858        let lines = vec![
1859            format!(
1860                "{TS1} 95.97% [DEBUG] switch_cpp.cpp:1466 {UUID1} DAA-LOG WaveManager originate"
1861            ),
1862            format!(
1863                "{TS1} 95.97% [WARNING] switch_cpp.cpp:1466 {UUID1} DAA-LOG Failed to create session"
1864            ),
1865            full_line(UUID1, TS2, "State Change CS_EXECUTE -> CS_HIBERNATE"),
1866        ];
1867
1868        let mut stream = LogStream::new(lines.into_iter());
1869        let entries: Vec<_> = stream.by_ref().collect();
1870
1871        assert_eq!(entries.len(), 3);
1872        // Both System lines should have the UUID extracted from the message
1873        assert_eq!(entries[0].uuid, UUID1);
1874        assert_eq!(entries[0].kind, LineKind::System);
1875        assert_eq!(entries[0].message, "DAA-LOG WaveManager originate");
1876
1877        assert_eq!(entries[1].uuid, UUID1);
1878        assert_eq!(entries[1].kind, LineKind::System);
1879        assert_eq!(entries[1].message, "DAA-LOG Failed to create session");
1880
1881        // Full line still works normally
1882        assert_eq!(entries[2].uuid, UUID1);
1883        assert_eq!(entries[2].kind, LineKind::Full);
1884        assert_accounting(&stream);
1885    }
1886}