Skip to main content

freeswitch_log_parser/
session.rs

1use std::collections::{HashMap, HashSet};
2use std::str::FromStr;
3
4use freeswitch_types::{BridgeDialString, CallDirection, DialString};
5
6use crate::line::parse_line;
7use crate::message::{classify_message, MessageKind};
8use crate::stream::{Block, LogEntry, LogStream, ParseStats, UnclassifiedLine};
9
10/// Mutable per-UUID state accumulator, updated as entries are processed.
11///
12/// Fields are `None` until the corresponding data is first seen in the stream.
13/// Variables accumulate from CHANNEL_DATA dumps, `set()`/`export()` executions,
14/// `SET`/`EXPORT` log lines, and inline `variable_*` lines.
15#[derive(Debug, Clone, Default)]
16pub struct SessionState {
17    /// `None` until a `Channel-Name` field is encountered.
18    pub channel_name: Option<String>,
19    /// `None` until a state change or `Channel-State` field is encountered.
20    pub channel_state: Option<String>,
21    /// First dialplan context seen; set once and never overwritten.
22    pub initial_context: Option<String>,
23    /// Current dialplan context; updated on each transfer/continue.
24    pub dialplan_context: Option<String>,
25    /// Source extension in the dialplan routing; `None` until a dialplan line is processed.
26    pub dialplan_from: Option<String>,
27    /// Target extension in the dialplan routing; `None` until a dialplan line is processed.
28    pub dialplan_to: Option<String>,
29    /// Call direction from `Call-Direction` CHANNEL_DATA field; `None` until seen.
30    pub call_direction: Option<CallDirection>,
31    /// Caller ID number from `Caller-Caller-ID-Number` CHANNEL_DATA field; `None` until seen.
32    pub caller_id_number: Option<String>,
33    /// Destination number from `Caller-Destination-Number` CHANNEL_DATA field; `None` until seen.
34    pub destination_number: Option<String>,
35    /// Other leg's UUID; `None` until bridged. Set from `Originate Resulted in Success` on A-leg,
36    /// and from `New Channel` on B-leg (back-pointing to A-leg via originate context).
37    pub other_leg_uuid: Option<String>,
38    /// Pending bridge target channel from `EXECUTE bridge()`, consumed when B-leg `New Channel` matches.
39    pub(crate) pending_bridge_target: Option<String>,
40    /// All variables learned so far, with the `variable_` prefix stripped from names.
41    pub variables: HashMap<String, String>,
42}
43
44/// Changes to indexed fields reported by `update_from_entry` for index maintenance.
45#[derive(Default)]
46struct IndexedFieldChanges {
47    channel_name: Option<(Option<String>, Option<String>)>,
48    pending_bridge_target: Option<(Option<String>, Option<String>)>,
49    other_leg_uuid: Option<(Option<String>, Option<String>)>,
50}
51
52/// Immutable point-in-time copy of a session's state, attached to each [`EnrichedEntry`].
53///
54/// Does not include `variables` to keep snapshots lightweight — access the full
55/// variable map via [`SessionTracker::sessions()`].
56#[derive(Debug, Clone)]
57pub struct SessionSnapshot {
58    pub channel_name: Option<String>,
59    pub channel_state: Option<String>,
60    pub initial_context: Option<String>,
61    pub dialplan_context: Option<String>,
62    pub dialplan_from: Option<String>,
63    pub dialplan_to: Option<String>,
64    pub call_direction: Option<CallDirection>,
65    pub caller_id_number: Option<String>,
66    pub destination_number: Option<String>,
67    pub other_leg_uuid: Option<String>,
68}
69
70impl SessionState {
71    fn snapshot(&self) -> SessionSnapshot {
72        SessionSnapshot {
73            channel_name: self.channel_name.clone(),
74            channel_state: self.channel_state.clone(),
75            initial_context: self.initial_context.clone(),
76            dialplan_context: self.dialplan_context.clone(),
77            dialplan_from: self.dialplan_from.clone(),
78            dialplan_to: self.dialplan_to.clone(),
79            call_direction: self.call_direction,
80            caller_id_number: self.caller_id_number.clone(),
81            destination_number: self.destination_number.clone(),
82            other_leg_uuid: self.other_leg_uuid.clone(),
83        }
84    }
85
86    fn update_from_entry(&mut self, entry: &LogEntry) -> IndexedFieldChanges {
87        let old_channel_name = self.channel_name.clone();
88        let old_pending_bridge_target = self.pending_bridge_target.clone();
89        let old_other_leg_uuid = self.other_leg_uuid.clone();
90
91        if let Some(Block::ChannelData { fields, variables }) = &entry.block {
92            for (name, value) in fields {
93                match name.as_str() {
94                    "Channel-Name" => self.channel_name = Some(value.clone()),
95                    "Channel-State" => self.channel_state = Some(value.clone()),
96                    "Call-Direction" => {
97                        self.call_direction = CallDirection::from_str(value).ok();
98                    }
99                    "Caller-Caller-ID-Number" => {
100                        self.caller_id_number = Some(value.clone());
101                    }
102                    "Caller-Destination-Number" => {
103                        self.destination_number = Some(value.clone());
104                    }
105                    "Other-Leg-Unique-ID" => {
106                        self.other_leg_uuid = Some(value.clone());
107                    }
108                    _ => {}
109                }
110            }
111            for (name, value) in variables {
112                let var_name = name.strip_prefix("variable_").unwrap_or(name);
113                self.variables.insert(var_name.to_string(), value.clone());
114            }
115        }
116
117        match &entry.message_kind {
118            MessageKind::Dialplan { detail, .. } => {
119                if let Some(dp) = parse_dialplan_context(detail) {
120                    self.initial_context.get_or_insert(dp.context.clone());
121                    self.dialplan_context = Some(dp.context);
122                    self.dialplan_from = Some(dp.from);
123                    self.dialplan_to = Some(dp.to);
124                }
125            }
126            MessageKind::Execute {
127                application,
128                arguments,
129                ..
130            } => match application.as_str() {
131                "set" | "export" => {
132                    if let Some((name, value)) = arguments.split_once('=') {
133                        self.variables.insert(name.to_string(), value.to_string());
134                    }
135                }
136                "bridge" => {
137                    if let Some(info) = parse_bridge_args(arguments) {
138                        if let Some(uuid) = &info.origination_uuid {
139                            self.other_leg_uuid = Some(uuid.clone());
140                        }
141                        self.pending_bridge_target = Some(info.target_channel);
142                    }
143                }
144                _ => {}
145            },
146            MessageKind::Variable { name, value } => {
147                let var_name = name.strip_prefix("variable_").unwrap_or(name);
148                self.variables.insert(var_name.to_string(), value.clone());
149            }
150            MessageKind::ChannelField { name, value } => match name.as_str() {
151                "Channel-Name" => self.channel_name = Some(value.clone()),
152                "Channel-State" => self.channel_state = Some(value.clone()),
153                _ => {}
154            },
155            MessageKind::StateChange { detail } => {
156                if let Some(new_state) = parse_state_change(detail) {
157                    self.channel_state = Some(new_state);
158                }
159            }
160            MessageKind::ChannelLifecycle { detail } => {
161                if let Some(name) = parse_new_channel(detail) {
162                    if self.channel_name.is_none() {
163                        self.channel_name = Some(name);
164                    }
165                }
166            }
167            _ => {}
168        }
169
170        if entry.message.contains("Processing ") && entry.message.contains(" in context ") {
171            if let Some(dp) = parse_processing_line(&entry.message) {
172                self.initial_context.get_or_insert(dp.context.clone());
173                self.dialplan_context = Some(dp.context);
174                self.dialplan_from = Some(dp.from);
175                self.dialplan_to = Some(dp.to);
176            }
177        }
178
179        for attached in &entry.attached {
180            let parsed = parse_line(attached);
181            self.update_from_message(parsed.message);
182        }
183
184        let mut changes = IndexedFieldChanges::default();
185        if self.channel_name != old_channel_name {
186            changes.channel_name = Some((old_channel_name, self.channel_name.clone()));
187        }
188        if self.pending_bridge_target != old_pending_bridge_target {
189            changes.pending_bridge_target =
190                Some((old_pending_bridge_target, self.pending_bridge_target.clone()));
191        }
192        if self.other_leg_uuid != old_other_leg_uuid {
193            changes.other_leg_uuid = Some((old_other_leg_uuid, self.other_leg_uuid.clone()));
194        }
195        changes
196    }
197
198    fn update_from_message(&mut self, msg: &str) {
199        let kind = classify_message(msg);
200        match &kind {
201            MessageKind::Dialplan { detail, .. } => {
202                if let Some(dp) = parse_dialplan_context(detail) {
203                    self.initial_context.get_or_insert(dp.context.clone());
204                    self.dialplan_context = Some(dp.context);
205                    self.dialplan_from = Some(dp.from);
206                    self.dialplan_to = Some(dp.to);
207                }
208            }
209            MessageKind::Variable { name, value } => {
210                let var_name = name.strip_prefix("variable_").unwrap_or(name);
211                self.variables.insert(var_name.to_string(), value.clone());
212            }
213            MessageKind::ChannelField { name, value } => match name.as_str() {
214                "Channel-Name" => self.channel_name = Some(value.clone()),
215                "Channel-State" => self.channel_state = Some(value.clone()),
216                _ => {}
217            },
218            MessageKind::StateChange { detail } => {
219                if let Some(new_state) = parse_state_change(detail) {
220                    self.channel_state = Some(new_state);
221                }
222            }
223            _ => {}
224        }
225    }
226}
227
228struct DialplanContext {
229    from: String,
230    to: String,
231    context: String,
232}
233
234fn parse_dialplan_context(detail: &str) -> Option<DialplanContext> {
235    if !detail.starts_with("parsing [") {
236        return None;
237    }
238    let rest = &detail["parsing [".len()..];
239    let bracket_end = rest.find(']')?;
240    let inner = &rest[..bracket_end];
241
242    let arrow = inner.find("->")?;
243    let from_part = &inner[..arrow];
244    let to_part = &inner[arrow + 2..];
245
246    let context = if rest.len() > bracket_end + 1 {
247        let after = rest[bracket_end + 1..].trim();
248        if let Some(stripped) = after.strip_prefix("continue=") {
249            let _ = stripped;
250        }
251        from_part.to_string()
252    } else {
253        from_part.to_string()
254    };
255
256    Some(DialplanContext {
257        from: from_part.to_string(),
258        to: to_part.to_string(),
259        context,
260    })
261}
262
263fn parse_processing_line(msg: &str) -> Option<DialplanContext> {
264    let proc_idx = msg.find("Processing ")?;
265    let rest = &msg[proc_idx + "Processing ".len()..];
266
267    let arrow = rest.find("->")?;
268    let from = &rest[..arrow];
269
270    let after_arrow = &rest[arrow + 2..];
271    let space = after_arrow.find(' ')?;
272    let to = &after_arrow[..space];
273
274    let ctx_idx = after_arrow.find("in context ")?;
275    let ctx_rest = &after_arrow[ctx_idx + "in context ".len()..];
276    let context = ctx_rest.split_whitespace().next()?;
277
278    Some(DialplanContext {
279        from: from.to_string(),
280        to: to.to_string(),
281        context: context.to_string(),
282    })
283}
284
285fn parse_new_channel(detail: &str) -> Option<String> {
286    let rest = detail.strip_prefix("New Channel ")?;
287    let bracket = rest.rfind(" [")?;
288    Some(rest[..bracket].to_string())
289}
290
291fn parse_state_change(detail: &str) -> Option<String> {
292    let arrow = detail.find(" -> ")?;
293    Some(detail[arrow + 4..].trim().to_string())
294}
295
296/// Extract `origination_uuid` and the bridge target channel from bridge() arguments.
297/// Uses `BridgeDialString` from freeswitch-types for correct parsing of `[]`, `{}`,
298/// `|` failover, and `,` simultaneous ring syntax.
299fn parse_bridge_args(arguments: &str) -> Option<BridgeInfo> {
300    let dial = BridgeDialString::from_str(arguments).ok()?;
301    let first_ep = dial.groups().first()?.first()?;
302    let origination_uuid = first_ep
303        .variables()
304        .and_then(|v| v.get("origination_uuid"))
305        .map(|s| s.to_string());
306    let mut bare = first_ep.clone();
307    bare.set_variables(None);
308    let target_channel = bare.to_string();
309    Some(BridgeInfo {
310        origination_uuid,
311        target_channel,
312    })
313}
314
315struct BridgeInfo {
316    origination_uuid: Option<String>,
317    target_channel: String,
318}
319
320/// Parse "Originate Resulted in Success: [channel] Peer UUID: uuid"
321fn parse_originate_success(msg: &str) -> Option<String> {
322    let marker = "Peer UUID: ";
323    let idx = msg.find(marker)?;
324    let uuid = msg[idx + marker.len()..].trim();
325    if uuid.is_empty() {
326        None
327    } else {
328        Some(uuid.to_string())
329    }
330}
331
332/// Parse the bracketed channel name from "Originate Resulted in Success: [<chan>] …".
333/// Used as a fallback when the `Peer UUID:` suffix is absent (FS 1.10.5-dev and
334/// similar builds). Returns the channel name borrowed from `msg`.
335fn parse_originate_channel(msg: &str) -> Option<&str> {
336    let start = msg.find(" [")? + 2;
337    let end = msg[start..].find(']')?;
338    let chan = &msg[start..start + end];
339    if chan.is_empty() {
340        None
341    } else {
342        Some(chan)
343    }
344}
345
346/// Terminal channel-/callstate values — sessions left in one of these are
347/// stragglers from prior calls and must not be considered candidates when
348/// disambiguating channel-name collisions in the originate-success fallback.
349///
350/// Covers both `Channel-State` (`CS_*`) and `Callstate` (`HANGUP`). `DOWN` is
351/// excluded because it doubles as the initial Callstate before any change is
352/// observed.
353fn is_terminal_channel_state(state: Option<&str>) -> bool {
354    matches!(
355        state,
356        Some("CS_HANGUP" | "CS_REPORTING" | "CS_DESTROY" | "CS_NONE" | "HANGUP")
357    )
358}
359
360/// A [`LogEntry`] paired with the session's state snapshot at that point in time.
361#[derive(Debug)]
362pub struct EnrichedEntry {
363    pub entry: LogEntry,
364    /// `None` for system lines (entries with an empty UUID).
365    pub session: Option<SessionSnapshot>,
366}
367
368/// Layer 3 per-session state machine — tracks per-UUID state (dialplan context,
369/// channel state, variables) across entries and yields [`EnrichedEntry`] values.
370///
371/// Wraps a [`LogStream`] and maintains a `HashMap<String, SessionState>` keyed by UUID.
372/// Sessions are never automatically cleaned up; call [`remove_session()`](SessionTracker::remove_session)
373/// when a call ends.
374pub struct SessionTracker<I> {
375    inner: LogStream<I>,
376    sessions: HashMap<String, SessionState>,
377    by_channel_name: HashMap<String, HashSet<String>>,
378    by_pending_target: HashMap<String, String>,
379    by_other_leg: HashMap<String, String>,
380}
381
382impl<I: Iterator<Item = String>> SessionTracker<I> {
383    /// Wrap a [`LogStream`] to add per-session state tracking.
384    pub fn new(inner: LogStream<I>) -> Self {
385        SessionTracker {
386            inner,
387            sessions: HashMap::new(),
388            by_channel_name: HashMap::new(),
389            by_pending_target: HashMap::new(),
390            by_other_leg: HashMap::new(),
391        }
392    }
393
394    /// All currently tracked sessions, keyed by UUID.
395    pub fn sessions(&self) -> &HashMap<String, SessionState> {
396        &self.sessions
397    }
398
399    /// Remove and return a session's accumulated state. Call this when a call ends
400    /// (e.g. `CS_DESTROY` or hangup) to free memory.
401    pub fn remove_session(&mut self, uuid: &str) -> Option<SessionState> {
402        let state = self.sessions.remove(uuid)?;
403        if let Some(chan) = &state.channel_name {
404            if let Some(set) = self.by_channel_name.get_mut(chan) {
405                set.remove(uuid);
406                if set.is_empty() {
407                    self.by_channel_name.remove(chan);
408                }
409            }
410        }
411        if let Some(target) = &state.pending_bridge_target {
412            self.by_pending_target.remove(target);
413        }
414        if let Some(other) = &state.other_leg_uuid {
415            self.by_other_leg.remove(other);
416        }
417        Some(state)
418    }
419
420    /// Delegates to [`LogStream::stats()`].
421    pub fn stats(&self) -> &ParseStats {
422        self.inner.stats()
423    }
424
425    /// Delegates to [`LogStream::drain_unclassified()`].
426    pub fn drain_unclassified(&mut self) -> Vec<UnclassifiedLine> {
427        self.inner.drain_unclassified()
428    }
429
430    fn apply_index_changes(&mut self, uuid: &str, changes: &IndexedFieldChanges) {
431        if let Some((old, new)) = &changes.channel_name {
432            if let Some(old_name) = old {
433                if let Some(set) = self.by_channel_name.get_mut(old_name) {
434                    set.remove(uuid);
435                    if set.is_empty() {
436                        self.by_channel_name.remove(old_name);
437                    }
438                }
439            }
440            if let Some(new_name) = new {
441                self.by_channel_name
442                    .entry(new_name.clone())
443                    .or_default()
444                    .insert(uuid.to_string());
445            }
446        }
447        if let Some((old, new)) = &changes.pending_bridge_target {
448            if let Some(old_target) = old {
449                self.by_pending_target.remove(old_target);
450            }
451            if let Some(new_target) = new {
452                self.by_pending_target
453                    .insert(new_target.clone(), uuid.to_string());
454            }
455        }
456        if let Some((old, new)) = &changes.other_leg_uuid {
457            if let Some(old_leg) = old {
458                self.by_other_leg.remove(old_leg);
459            }
460            if let Some(new_leg) = new {
461                self.by_other_leg.insert(new_leg.clone(), uuid.to_string());
462            }
463        }
464    }
465
466    /// Cross-session leg linking. Called after `update_from_entry` so per-session
467    /// state (bridge target, channel name) is already populated.
468    fn link_legs(&mut self, uuid: &str, entry: &LogEntry) {
469        // 1. "Originate Resulted in Success ... Peer UUID: BLEG" — authoritative
470        if entry.message.contains("Originate Resulted in Success") {
471            let a_uuid = uuid.to_string();
472            if let Some(peer_uuid) = parse_originate_success(&entry.message) {
473                let a_old_pending = self
474                    .sessions
475                    .get(&a_uuid)
476                    .and_then(|s| s.pending_bridge_target.clone());
477
478                if let Some(a_state) = self.sessions.get_mut(&a_uuid) {
479                    a_state.other_leg_uuid = Some(peer_uuid.clone());
480                    a_state.pending_bridge_target = None;
481                }
482                self.by_other_leg
483                    .insert(peer_uuid.clone(), a_uuid.clone());
484                if let Some(old_target) = a_old_pending {
485                    self.by_pending_target.remove(&old_target);
486                }
487
488                let b_state = self.sessions.entry(peer_uuid.clone()).or_default();
489                b_state.other_leg_uuid = Some(a_uuid.clone());
490                self.by_other_leg.insert(a_uuid, peer_uuid);
491            } else if let Some(chan) = parse_originate_channel(&entry.message) {
492                // Fallback for FS builds without `Peer UUID:` suffix (e.g. 1.10.5-dev):
493                // link via unique non-terminated b-leg session whose channel_name
494                // matches. Candidates in terminal states are stragglers; if zero or
495                // multiple live candidates remain, skip (correctness over coverage).
496                let candidates: Vec<String> = self
497                    .by_channel_name
498                    .get(chan)
499                    .map(|set| {
500                        set.iter()
501                            .filter(|u| *u != &a_uuid)
502                            .filter(|u| {
503                                self.sessions
504                                    .get(*u)
505                                    .map(|s| !is_terminal_channel_state(s.channel_state.as_deref()))
506                                    .unwrap_or(false)
507                            })
508                            .cloned()
509                            .collect()
510                    })
511                    .unwrap_or_default();
512
513                if candidates.len() == 1 {
514                    let b_uuid = candidates.into_iter().next().unwrap();
515                    let a_old_pending = self
516                        .sessions
517                        .get(&a_uuid)
518                        .and_then(|s| s.pending_bridge_target.clone());
519
520                    if let Some(a_state) = self.sessions.get_mut(&a_uuid) {
521                        a_state.other_leg_uuid = Some(b_uuid.clone());
522                        a_state.pending_bridge_target = None;
523                    }
524                    if let Some(b_state) = self.sessions.get_mut(&b_uuid) {
525                        b_state.other_leg_uuid = Some(a_uuid.clone());
526                    }
527
528                    self.by_other_leg.insert(b_uuid.clone(), a_uuid.clone());
529                    self.by_other_leg.insert(a_uuid, b_uuid);
530                    if let Some(old_target) = a_old_pending {
531                        self.by_pending_target.remove(&old_target);
532                    }
533                }
534            }
535            return;
536        }
537
538        // 2. New Channel on this UUID — check if any other session has a pending bridge
539        //    with origination_uuid matching this UUID, or target matching this channel name.
540        if let MessageKind::ChannelLifecycle { detail } = &entry.message_kind {
541            if let Some(channel_name) = parse_new_channel(detail) {
542                let b_uuid = uuid.to_string();
543
544                // O(1) index lookups instead of full scan
545                let a_uuid_found = self
546                    .by_other_leg
547                    .get(&b_uuid)
548                    .cloned()
549                    .or_else(|| self.by_pending_target.get(&channel_name).cloned())
550                    .filter(|a| a != &b_uuid);
551
552                if let Some(a_uuid) = a_uuid_found {
553                    let a_old_pending = self
554                        .sessions
555                        .get(&a_uuid)
556                        .and_then(|s| s.pending_bridge_target.clone());
557
558                    if let Some(a_state) = self.sessions.get_mut(&a_uuid) {
559                        a_state.other_leg_uuid = Some(b_uuid.clone());
560                        a_state.pending_bridge_target = None;
561                    }
562                    if let Some(b_state) = self.sessions.get_mut(&b_uuid) {
563                        b_state.other_leg_uuid = Some(a_uuid.clone());
564                    }
565
566                    self.by_other_leg.insert(b_uuid.clone(), a_uuid.clone());
567                    self.by_other_leg.insert(a_uuid, b_uuid);
568                    if let Some(old_target) = a_old_pending {
569                        self.by_pending_target.remove(&old_target);
570                    }
571                }
572            }
573        }
574    }
575}
576
577impl<I: Iterator<Item = String>> Iterator for SessionTracker<I> {
578    type Item = EnrichedEntry;
579
580    fn next(&mut self) -> Option<EnrichedEntry> {
581        let entry = self.inner.next()?;
582
583        if entry.uuid.is_empty() {
584            return Some(EnrichedEntry {
585                entry,
586                session: None,
587            });
588        }
589
590        let uuid = entry.uuid.clone();
591        let changes = self
592            .sessions
593            .entry(uuid.clone())
594            .or_default()
595            .update_from_entry(&entry);
596        self.apply_index_changes(&uuid, &changes);
597
598        self.link_legs(&uuid, &entry);
599
600        let snapshot = self.sessions.get(&uuid).unwrap().snapshot();
601
602        Some(EnrichedEntry {
603            entry,
604            session: Some(snapshot),
605        })
606    }
607}
608
609#[cfg(test)]
610mod tests {
611    use super::*;
612
613    const UUID1: &str = "a1b2c3d4-e5f6-7890-abcd-ef1234567890";
614    const UUID2: &str = "b2c3d4e5-f6a7-8901-bcde-f12345678901";
615    const UUID3: &str = "c3d4e5f6-a7b8-9012-cdef-234567890123";
616    const TS1: &str = "2025-01-15 10:30:45.123456";
617    const TS2: &str = "2025-01-15 10:30:46.234567";
618
619    fn full_line(uuid: &str, ts: &str, msg: &str) -> String {
620        format!("{uuid} {ts} 95.97% [DEBUG] sofia.c:100 {msg}")
621    }
622
623    fn collect_enriched(lines: Vec<String>) -> Vec<EnrichedEntry> {
624        let stream = LogStream::new(lines.into_iter());
625        SessionTracker::new(stream).collect()
626    }
627
628    #[test]
629    fn system_line_no_session() {
630        let lines = vec![format!(
631            "{TS1} 95.97% [INFO] mod_event_socket.c:1772 Event Socket command"
632        )];
633        let entries = collect_enriched(lines);
634        assert_eq!(entries.len(), 1);
635        assert!(entries[0].session.is_none());
636    }
637
638    #[test]
639    fn dialplan_context_propagation() {
640        let lines = vec![
641            full_line(UUID1, TS1, "CHANNEL_DATA:"),
642            format!("{UUID1} Channel-Name: [sofia/internal/+15550001234@192.0.2.1]"),
643            format!("{UUID1} EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 answer"),
644            format!("{UUID1} Dialplan: sofia/internal/+15550001234@192.0.2.1 parsing [public->global] continue=true"),
645            full_line(UUID1, TS2, "Some later event"),
646        ];
647        let entries = collect_enriched(lines);
648        let last = entries.last().unwrap();
649        let session = last.session.as_ref().unwrap();
650        assert_eq!(session.dialplan_context.as_deref(), Some("public"));
651        assert_eq!(session.dialplan_from.as_deref(), Some("public"));
652        assert_eq!(session.dialplan_to.as_deref(), Some("global"));
653    }
654
655    #[test]
656    fn processing_line_extracts_context() {
657        let lines = vec![full_line(
658            UUID1,
659            TS1,
660            "Processing 5551234567->5559876543 in context public",
661        )];
662        let entries = collect_enriched(lines);
663        let session = entries[0].session.as_ref().unwrap();
664        assert_eq!(session.dialplan_context.as_deref(), Some("public"));
665        assert_eq!(session.dialplan_from.as_deref(), Some("5551234567"));
666        assert_eq!(session.dialplan_to.as_deref(), Some("5559876543"));
667    }
668
669    #[test]
670    fn initial_context_preserved_across_transfers() {
671        let lines = vec![
672            full_line(
673                UUID1,
674                TS1,
675                "Processing 5551234567->5559876543 in context public",
676            ),
677            full_line(
678                UUID1,
679                TS2,
680                "Processing 5551234567->start_recording in context recordings",
681            ),
682        ];
683        let stream = LogStream::new(lines.into_iter());
684        let mut tracker = SessionTracker::new(stream);
685        let entries: Vec<_> = tracker.by_ref().collect();
686
687        let first = entries[0].session.as_ref().unwrap();
688        assert_eq!(
689            first.initial_context.as_deref(),
690            Some("public"),
691            "initial_context set on first Processing line"
692        );
693        assert_eq!(first.dialplan_context.as_deref(), Some("public"));
694
695        let state = tracker.sessions().get(UUID1).unwrap();
696        assert_eq!(
697            state.initial_context.as_deref(),
698            Some("public"),
699            "initial_context keeps the first context seen"
700        );
701        assert_eq!(
702            state.dialplan_context.as_deref(),
703            Some("recordings"),
704            "dialplan_context tracks the current context"
705        );
706        assert_eq!(state.dialplan_to.as_deref(), Some("start_recording"));
707    }
708
709    #[test]
710    fn new_channel_sets_channel_name() {
711        let lines = vec![full_line(
712            UUID1,
713            TS1,
714            "New Channel sofia/internal-v4/sos [a1b2c3d4-e5f6-7890-abcd-ef1234567890]",
715        )];
716        let entries = collect_enriched(lines);
717        let session = entries[0].session.as_ref().unwrap();
718        assert_eq!(
719            session.channel_name.as_deref(),
720            Some("sofia/internal-v4/sos")
721        );
722    }
723
724    #[test]
725    fn originate_success_links_both_legs() {
726        // "Originate Resulted in Success" contains both the A-leg UUID (line prefix)
727        // and B-leg UUID (Peer UUID field). Both legs should learn about each other.
728        let lines = vec![
729            full_line(UUID2, TS1, "New Channel sofia/esinet1-v6-tcp/sip:target.example.com [b2c3d4e5-f6a7-8901-bcde-f12345678901]"),
730            full_line(UUID1, TS2, "Originate Resulted in Success: [sofia/esinet1-v6-tcp/sip:target.example.com] Peer UUID: b2c3d4e5-f6a7-8901-bcde-f12345678901"),
731        ];
732        let stream = LogStream::new(lines.into_iter());
733        let mut tracker = SessionTracker::new(stream);
734        let _: Vec<_> = tracker.by_ref().collect();
735
736        let a_leg = tracker.sessions().get(UUID1).unwrap();
737        assert_eq!(
738            a_leg.other_leg_uuid.as_deref(),
739            Some(UUID2),
740            "A-leg other_leg_uuid set from Originate Resulted in Success"
741        );
742
743        let b_leg = tracker.sessions().get(UUID2).unwrap();
744        assert_eq!(
745            b_leg.other_leg_uuid.as_deref(),
746            Some(UUID1),
747            "B-leg other_leg_uuid points back to A-leg"
748        );
749    }
750
751    #[test]
752    fn originate_success_channel_fallback_links_legs() {
753        // FS 1.10.5-dev and similar omit `Peer UUID:` from "Originate Resulted in Success".
754        // The b-leg's New Channel populates channel_name 3.5 s before originate; the
755        // fallback path matches by channel name when the Peer UUID is absent.
756        let lines = vec![
757            full_line(
758                UUID2,
759                TS1,
760                "New Channel sofia/internal/6244@192.0.2.72:50744 [b2c3d4e5-f6a7-8901-bcde-f12345678901]",
761            ),
762            full_line(
763                UUID1,
764                TS2,
765                "Originate Resulted in Success: [sofia/internal/6244@192.0.2.72:50744]",
766            ),
767        ];
768        let stream = LogStream::new(lines.into_iter());
769        let mut tracker = SessionTracker::new(stream);
770        let _: Vec<_> = tracker.by_ref().collect();
771
772        let a_leg = tracker.sessions().get(UUID1).unwrap();
773        assert_eq!(
774            a_leg.other_leg_uuid.as_deref(),
775            Some(UUID2),
776            "A-leg linked to B-leg via channel-name fallback when Peer UUID absent"
777        );
778
779        let b_leg = tracker.sessions().get(UUID2).unwrap();
780        assert_eq!(
781            b_leg.other_leg_uuid.as_deref(),
782            Some(UUID1),
783            "B-leg linked back to A-leg"
784        );
785    }
786
787    #[test]
788    fn originate_success_peer_uuid_wins_over_channel_fallback() {
789        // When Peer UUID is present, channel-name fallback must not fire — even if
790        // another session shares the channel name. Peer UUID is authoritative.
791        let lines = vec![
792            full_line(
793                UUID2,
794                TS1,
795                "New Channel sofia/internal/6244@192.0.2.72:50744 [b2c3d4e5-f6a7-8901-bcde-f12345678901]",
796            ),
797            full_line(
798                UUID3,
799                TS1,
800                "New Channel sofia/internal/6244@192.0.2.72:50744 [c3d4e5f6-a7b8-9012-cdef-234567890123]",
801            ),
802            full_line(
803                UUID1,
804                TS2,
805                "Originate Resulted in Success: [sofia/internal/6244@192.0.2.72:50744] Peer UUID: b2c3d4e5-f6a7-8901-bcde-f12345678901",
806            ),
807        ];
808        let stream = LogStream::new(lines.into_iter());
809        let mut tracker = SessionTracker::new(stream);
810        let _: Vec<_> = tracker.by_ref().collect();
811
812        let a_leg = tracker.sessions().get(UUID1).unwrap();
813        assert_eq!(
814            a_leg.other_leg_uuid.as_deref(),
815            Some(UUID2),
816            "Peer UUID wins over channel-name match"
817        );
818
819        let decoy = tracker.sessions().get(UUID3).unwrap();
820        assert_eq!(
821            decoy.other_leg_uuid, None,
822            "Decoy session sharing channel name is not touched"
823        );
824    }
825
826    #[test]
827    fn originate_success_channel_fallback_skips_when_ambiguous() {
828        // Two b-leg candidates share the same channel name. The fallback must not
829        // guess — correctness over coverage.
830        let lines = vec![
831            full_line(
832                UUID2,
833                TS1,
834                "New Channel sofia/internal/6244@192.0.2.72:50744 [b2c3d4e5-f6a7-8901-bcde-f12345678901]",
835            ),
836            full_line(
837                UUID3,
838                TS1,
839                "New Channel sofia/internal/6244@192.0.2.72:50744 [c3d4e5f6-a7b8-9012-cdef-234567890123]",
840            ),
841            full_line(
842                UUID1,
843                TS2,
844                "Originate Resulted in Success: [sofia/internal/6244@192.0.2.72:50744]",
845            ),
846        ];
847        let stream = LogStream::new(lines.into_iter());
848        let mut tracker = SessionTracker::new(stream);
849        let _: Vec<_> = tracker.by_ref().collect();
850
851        let a_leg = tracker.sessions().get(UUID1).unwrap();
852        assert_eq!(
853            a_leg.other_leg_uuid, None,
854            "Ambiguous channel name yields no link"
855        );
856        assert_eq!(tracker.sessions().get(UUID2).unwrap().other_leg_uuid, None);
857        assert_eq!(tracker.sessions().get(UUID3).unwrap().other_leg_uuid, None);
858    }
859
860    #[test]
861    fn originate_success_channel_fallback_skips_terminated_candidates() {
862        // Two b-leg sessions share the same channel_name, but one is in
863        // CS_DESTROY (stale prior call on the same registered phone). The
864        // liveness filter must drop the terminated candidate so the live one
865        // becomes the unambiguous match.
866        let lines = vec![
867            full_line(
868                UUID2,
869                TS1,
870                "New Channel sofia/internal/6244@192.0.2.72:50744 [b2c3d4e5-f6a7-8901-bcde-f12345678901]",
871            ),
872            full_line(
873                UUID2,
874                TS1,
875                "(sofia/internal/6244@192.0.2.72:50744) State Change CS_EXECUTE -> CS_DESTROY",
876            ),
877            full_line(
878                UUID3,
879                TS1,
880                "New Channel sofia/internal/6244@192.0.2.72:50744 [c3d4e5f6-a7b8-9012-cdef-234567890123]",
881            ),
882            full_line(
883                UUID1,
884                TS2,
885                "Originate Resulted in Success: [sofia/internal/6244@192.0.2.72:50744]",
886            ),
887        ];
888        let stream = LogStream::new(lines.into_iter());
889        let mut tracker = SessionTracker::new(stream);
890        let _: Vec<_> = tracker.by_ref().collect();
891
892        let a_leg = tracker.sessions().get(UUID1).unwrap();
893        assert_eq!(
894            a_leg.other_leg_uuid.as_deref(),
895            Some(UUID3),
896            "Live b-leg wins over CS_DESTROY straggler"
897        );
898
899        let live_b = tracker.sessions().get(UUID3).unwrap();
900        assert_eq!(
901            live_b.other_leg_uuid.as_deref(),
902            Some(UUID1),
903            "Live b-leg points back to a-leg"
904        );
905
906        let stale_b = tracker.sessions().get(UUID2).unwrap();
907        assert_eq!(
908            stale_b.other_leg_uuid, None,
909            "Terminated b-leg is not touched"
910        );
911    }
912
913    #[test]
914    fn originate_success_channel_fallback_skips_when_no_match() {
915        // a-leg fires Originate with a bracketed channel name no session has.
916        // Must not panic, must not create a spurious link.
917        let lines = vec![full_line(
918            UUID1,
919            TS2,
920            "Originate Resulted in Success: [sofia/internal/6244@192.0.2.72:50744]",
921        )];
922        let stream = LogStream::new(lines.into_iter());
923        let mut tracker = SessionTracker::new(stream);
924        let _: Vec<_> = tracker.by_ref().collect();
925
926        let a_leg = tracker.sessions().get(UUID1).unwrap();
927        assert_eq!(a_leg.other_leg_uuid, None);
928        assert_eq!(a_leg.pending_bridge_target, None);
929    }
930
931    #[test]
932    fn bridge_origination_uuid_links_a_leg_immediately() {
933        // bridge([origination_uuid=BLEG_UUID,...]) guarantees B-leg UUID from execute args alone.
934        // A-leg knows B-leg immediately, B-leg learns A-leg when New Channel appears.
935        let lines = vec![
936            full_line(UUID1, TS1, "EXECUTE [depth=0] sofia/internal-v6/1232@[2001:db8::10] bridge([origination_uuid=b2c3d4e5-f6a7-8901-bcde-f12345678901,leg_timeout=2]sofia/esinet1-v6-tcp/sip:target.example.com)"),
937            full_line(UUID2, TS1, "New Channel sofia/esinet1-v6-tcp/sip:target.example.com [b2c3d4e5-f6a7-8901-bcde-f12345678901]"),
938        ];
939        let stream = LogStream::new(lines.into_iter());
940        let mut tracker = SessionTracker::new(stream);
941        let _: Vec<_> = tracker.by_ref().collect();
942
943        let a_leg = tracker.sessions().get(UUID1).unwrap();
944        assert_eq!(
945            a_leg.other_leg_uuid.as_deref(),
946            Some(UUID2),
947            "A-leg knows B-leg UUID from origination_uuid in bridge args"
948        );
949
950        let b_leg = tracker.sessions().get(UUID2).unwrap();
951        assert_eq!(
952            b_leg.other_leg_uuid.as_deref(),
953            Some(UUID1),
954            "B-leg knows A-leg once New Channel correlates"
955        );
956    }
957
958    #[test]
959    fn bridge_target_matches_new_channel() {
960        // bridge() without origination_uuid — B-leg UUID is auto-generated by FS.
961        // Match via bridge target channel matching next New Channel with same target.
962        let lines = vec![
963            full_line(UUID1, TS1, "EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 bridge(sofia/gateway/carrier/+15559876543)"),
964            full_line(UUID1, TS1, "Parsing session specific variables"),
965            full_line(UUID2, TS1, "New Channel sofia/gateway/carrier/+15559876543 [b2c3d4e5-f6a7-8901-bcde-f12345678901]"),
966        ];
967        let stream = LogStream::new(lines.into_iter());
968        let mut tracker = SessionTracker::new(stream);
969        let _: Vec<_> = tracker.by_ref().collect();
970
971        let a_leg = tracker.sessions().get(UUID1).unwrap();
972        assert_eq!(
973            a_leg.other_leg_uuid.as_deref(),
974            Some(UUID2),
975            "A-leg linked to B-leg via bridge target matching New Channel"
976        );
977
978        let b_leg = tracker.sessions().get(UUID2).unwrap();
979        assert_eq!(
980            b_leg.other_leg_uuid.as_deref(),
981            Some(UUID1),
982            "B-leg linked back to A-leg"
983        );
984    }
985
986    #[test]
987    fn originate_success_corrects_wrong_target_match() {
988        // Bridge target matching guessed UUID2 as B-leg, but originate success reveals
989        // the actual B-leg is UUID3. The authoritative success message must override.
990        let lines = vec![
991            full_line(UUID1, TS1, "EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 bridge(sofia/gateway/carrier/+15559876543)"),
992            full_line(UUID2, TS1, "New Channel sofia/gateway/carrier/+15559876543 [b2c3d4e5-f6a7-8901-bcde-f12345678901]"),
993            full_line(UUID1, TS2, "Originate Resulted in Success: [sofia/gateway/carrier/+15559876543] Peer UUID: c3d4e5f6-a7b8-9012-cdef-234567890123"),
994        ];
995        let stream = LogStream::new(lines.into_iter());
996        let mut tracker = SessionTracker::new(stream);
997        let _: Vec<_> = tracker.by_ref().collect();
998
999        let a_leg = tracker.sessions().get(UUID1).unwrap();
1000        assert_eq!(
1001            a_leg.other_leg_uuid.as_deref(),
1002            Some(UUID3),
1003            "Originate success overrides earlier target-match guess"
1004        );
1005
1006        let real_b_leg = tracker.sessions().get(UUID3).unwrap();
1007        assert_eq!(
1008            real_b_leg.other_leg_uuid.as_deref(),
1009            Some(UUID1),
1010            "Real B-leg points back to A-leg"
1011        );
1012    }
1013
1014    #[test]
1015    fn channel_data_other_leg_uuid() {
1016        // Other-Leg-Unique-ID in CHANNEL_DATA (post-bridge info dump) sets other_leg_uuid
1017        let lines = vec![
1018            full_line(UUID1, TS1, "CHANNEL_DATA:"),
1019            format!("{UUID1} Other-Leg-Unique-ID: [{UUID2}]"),
1020        ];
1021        let stream = LogStream::new(lines.into_iter());
1022        let mut tracker = SessionTracker::new(stream);
1023        let _: Vec<_> = tracker.by_ref().collect();
1024
1025        let state = tracker.sessions().get(UUID1).unwrap();
1026        assert_eq!(
1027            state.other_leg_uuid.as_deref(),
1028            Some(UUID2),
1029            "other_leg_uuid set from Other-Leg-Unique-ID CHANNEL_DATA field"
1030        );
1031    }
1032
1033    #[test]
1034    fn channel_data_populates_session() {
1035        let lines = vec![
1036            full_line(UUID1, TS1, "CHANNEL_DATA:"),
1037            format!("{UUID1} Channel-Name: [sofia/internal/+15550001234@192.0.2.1]"),
1038            format!("{UUID1} Channel-State: [CS_EXECUTE]"),
1039            "variable_sip_call_id: [test123@192.0.2.1]".to_string(),
1040            "variable_direction: [inbound]".to_string(),
1041        ];
1042        let entries = collect_enriched(lines);
1043        assert_eq!(entries.len(), 1);
1044        let session = entries[0].session.as_ref().unwrap();
1045        assert_eq!(
1046            session.channel_name.as_deref(),
1047            Some("sofia/internal/+15550001234@192.0.2.1")
1048        );
1049        assert_eq!(session.channel_state.as_deref(), Some("CS_EXECUTE"));
1050    }
1051
1052    #[test]
1053    fn variables_learned_from_channel_data() {
1054        let lines = vec![
1055            full_line(UUID1, TS1, "CHANNEL_DATA:"),
1056            "variable_sip_call_id: [test123@192.0.2.1]".to_string(),
1057            "variable_direction: [inbound]".to_string(),
1058        ];
1059        let stream = LogStream::new(lines.into_iter());
1060        let mut tracker = SessionTracker::new(stream);
1061        let _: Vec<_> = tracker.by_ref().collect();
1062        let state = tracker.sessions().get(UUID1).unwrap();
1063        assert_eq!(
1064            state.variables.get("sip_call_id").map(|s| s.as_str()),
1065            Some("test123@192.0.2.1")
1066        );
1067        assert_eq!(
1068            state.variables.get("direction").map(|s| s.as_str()),
1069            Some("inbound")
1070        );
1071    }
1072
1073    #[test]
1074    fn variables_learned_from_set_execute() {
1075        let lines = vec![
1076            full_line(UUID1, TS1, "First"),
1077            format!("{UUID1} EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 set(call_direction=inbound)"),
1078            full_line(UUID1, TS2, "After set"),
1079        ];
1080        let stream = LogStream::new(lines.into_iter());
1081        let mut tracker = SessionTracker::new(stream);
1082        let entries: Vec<_> = tracker.by_ref().collect();
1083        assert_eq!(entries.len(), 3);
1084        let state = tracker.sessions().get(UUID1).unwrap();
1085        assert_eq!(
1086            state.variables.get("call_direction").map(|s| s.as_str()),
1087            Some("inbound")
1088        );
1089    }
1090
1091    #[test]
1092    fn variables_learned_from_export_execute() {
1093        let lines = vec![
1094            full_line(UUID1, TS1, "First"),
1095            format!("{UUID1} EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 export(originate_timeout=3600)"),
1096        ];
1097        let stream = LogStream::new(lines.into_iter());
1098        let mut tracker = SessionTracker::new(stream);
1099        let _: Vec<_> = tracker.by_ref().collect();
1100        let state = tracker.sessions().get(UUID1).unwrap();
1101        assert_eq!(
1102            state.variables.get("originate_timeout").map(|s| s.as_str()),
1103            Some("3600")
1104        );
1105    }
1106
1107    #[test]
1108    fn session_isolation_between_uuids() {
1109        let lines = vec![
1110            full_line(
1111                UUID1,
1112                TS1,
1113                "Processing 5551111111->5552222222 in context public",
1114            ),
1115            full_line(
1116                UUID2,
1117                TS2,
1118                "Processing 5553333333->5554444444 in context private",
1119            ),
1120        ];
1121        let stream = LogStream::new(lines.into_iter());
1122        let mut tracker = SessionTracker::new(stream);
1123        let _: Vec<_> = tracker.by_ref().collect();
1124        let s1 = tracker.sessions().get(UUID1).unwrap();
1125        let s2 = tracker.sessions().get(UUID2).unwrap();
1126        assert_eq!(s1.dialplan_context.as_deref(), Some("public"));
1127        assert_eq!(s2.dialplan_context.as_deref(), Some("private"));
1128        assert_eq!(s1.dialplan_from.as_deref(), Some("5551111111"));
1129        assert_eq!(s2.dialplan_from.as_deref(), Some("5553333333"));
1130    }
1131
1132    #[test]
1133    fn processing_line_with_regex_type_and_angle_bracket_caller() {
1134        let lines = vec![full_line(
1135            UUID1,
1136            TS1,
1137            "Processing Emergency S R <5550001234>->start_recording in context recordings",
1138        )];
1139        let entries = collect_enriched(lines);
1140        let session = entries[0].session.as_ref().unwrap();
1141        assert_eq!(session.initial_context.as_deref(), Some("recordings"));
1142        assert_eq!(session.dialplan_context.as_deref(), Some("recordings"));
1143        assert_eq!(
1144            session.dialplan_from.as_deref(),
1145            Some("Emergency S R <5550001234>")
1146        );
1147        assert_eq!(session.dialplan_to.as_deref(), Some("start_recording"));
1148    }
1149
1150    #[test]
1151    fn processing_line_extension_format() {
1152        let lines = vec![full_line(
1153            UUID1,
1154            TS1,
1155            "Processing Extension 1263 <1263>->start_recording in context recordings",
1156        )];
1157        let entries = collect_enriched(lines);
1158        let session = entries[0].session.as_ref().unwrap();
1159        assert_eq!(session.initial_context.as_deref(), Some("recordings"));
1160        assert_eq!(
1161            session.dialplan_from.as_deref(),
1162            Some("Extension 1263 <1263>")
1163        );
1164        assert_eq!(session.dialplan_to.as_deref(), Some("start_recording"));
1165    }
1166
1167    #[test]
1168    fn state_change_updates_channel_state() {
1169        let lines = vec![full_line(UUID1, TS1, "State Change CS_INIT -> CS_ROUTING")];
1170        let entries = collect_enriched(lines);
1171        let session = entries[0].session.as_ref().unwrap();
1172        assert_eq!(session.channel_state.as_deref(), Some("CS_ROUTING"));
1173    }
1174
1175    #[test]
1176    fn callstate_change_updates_channel_state() {
1177        let lines = vec![full_line(
1178            UUID1,
1179            TS1,
1180            "(sofia/internal-v4/sos) Callstate Change DOWN -> RINGING",
1181        )];
1182        let entries = collect_enriched(lines);
1183        let session = entries[0].session.as_ref().unwrap();
1184        assert_eq!(session.channel_state.as_deref(), Some("RINGING"));
1185    }
1186
1187    #[test]
1188    fn state_change_overrides_callstate() {
1189        let lines = vec![
1190            full_line(
1191                UUID1,
1192                TS1,
1193                "(sofia/internal-v4/sos) Callstate Change DOWN -> RINGING",
1194            ),
1195            full_line(
1196                UUID1,
1197                TS2,
1198                "(sofia/internal-v4/sos) State Change CS_CONSUME_MEDIA -> CS_EXCHANGE_MEDIA",
1199            ),
1200        ];
1201        let entries = collect_enriched(lines);
1202        assert_eq!(
1203            entries[0]
1204                .session
1205                .as_ref()
1206                .unwrap()
1207                .channel_state
1208                .as_deref(),
1209            Some("RINGING")
1210        );
1211        assert_eq!(
1212            entries[1]
1213                .session
1214                .as_ref()
1215                .unwrap()
1216                .channel_state
1217                .as_deref(),
1218            Some("CS_EXCHANGE_MEDIA")
1219        );
1220    }
1221
1222    #[test]
1223    fn bleg_lifecycle_extracts_data_from_processing() {
1224        let lines = vec![
1225            full_line(
1226                UUID1,
1227                TS1,
1228                "New Channel sofia/internal-v4/sos [a1b2c3d4-e5f6-7890-abcd-ef1234567890]",
1229            ),
1230            full_line(
1231                UUID1,
1232                TS1,
1233                "(sofia/internal-v4/sos) State Change CS_NEW -> CS_INIT",
1234            ),
1235            full_line(
1236                UUID1,
1237                TS1,
1238                "(sofia/internal-v4/sos) State Change CS_INIT -> CS_ROUTING",
1239            ),
1240            full_line(
1241                UUID1,
1242                TS1,
1243                "(sofia/internal-v4/sos) State Change CS_ROUTING -> CS_CONSUME_MEDIA",
1244            ),
1245            full_line(
1246                UUID1,
1247                TS1,
1248                "(sofia/internal-v4/sos) Callstate Change DOWN -> RINGING",
1249            ),
1250            full_line(
1251                UUID1,
1252                TS2,
1253                "(sofia/internal-v4/sos) State Change CS_CONSUME_MEDIA -> CS_EXCHANGE_MEDIA",
1254            ),
1255            full_line(
1256                UUID1,
1257                TS2,
1258                "Processing Emergency S R <5550001234>->start_recording in context recordings",
1259            ),
1260            full_line(
1261                UUID1,
1262                TS2,
1263                "(sofia/internal-v4/sos) State Change CS_EXCHANGE_MEDIA -> CS_HANGUP",
1264            ),
1265        ];
1266        let entries = collect_enriched(lines);
1267
1268        let after_ringing = entries[4].session.as_ref().unwrap();
1269        assert_eq!(after_ringing.channel_state.as_deref(), Some("RINGING"));
1270        assert!(after_ringing.initial_context.is_none());
1271
1272        let after_processing = entries[6].session.as_ref().unwrap();
1273        assert_eq!(
1274            after_processing.channel_state.as_deref(),
1275            Some("CS_EXCHANGE_MEDIA")
1276        );
1277        assert_eq!(
1278            after_processing.initial_context.as_deref(),
1279            Some("recordings")
1280        );
1281        assert_eq!(
1282            after_processing.dialplan_from.as_deref(),
1283            Some("Emergency S R <5550001234>")
1284        );
1285        assert_eq!(
1286            after_processing.dialplan_to.as_deref(),
1287            Some("start_recording")
1288        );
1289
1290        let after_hangup = entries[7].session.as_ref().unwrap();
1291        assert_eq!(after_hangup.channel_state.as_deref(), Some("CS_HANGUP"));
1292        assert_eq!(after_hangup.initial_context.as_deref(), Some("recordings"));
1293    }
1294
1295    #[test]
1296    fn channel_name_from_new_channel() {
1297        let lines = vec![full_line(
1298            UUID1,
1299            TS1,
1300            "New Channel sofia/internal-v4/sos [a1b2c3d4-e5f6-7890-abcd-ef1234567890]",
1301        )];
1302        let entries = collect_enriched(lines);
1303        let session = entries[0].session.as_ref().unwrap();
1304        assert_eq!(
1305            session.channel_name.as_deref(),
1306            Some("sofia/internal-v4/sos")
1307        );
1308    }
1309
1310    #[test]
1311    fn remove_session() {
1312        let lines = vec![full_line(
1313            UUID1,
1314            TS1,
1315            "Processing 5551111111->5552222222 in context public",
1316        )];
1317        let stream = LogStream::new(lines.into_iter());
1318        let mut tracker = SessionTracker::new(stream);
1319        let _: Vec<_> = tracker.by_ref().collect();
1320        assert!(tracker.sessions().contains_key(UUID1));
1321        let removed = tracker.remove_session(UUID1).unwrap();
1322        assert_eq!(removed.dialplan_context.as_deref(), Some("public"));
1323        assert!(!tracker.sessions().contains_key(UUID1));
1324    }
1325
1326    #[test]
1327    fn stats_delegation() {
1328        let lines = vec![
1329            full_line(UUID1, TS1, "First"),
1330            full_line(UUID1, TS2, "Second"),
1331        ];
1332        let stream = LogStream::new(lines.into_iter());
1333        let mut tracker = SessionTracker::new(stream);
1334        let _: Vec<_> = tracker.by_ref().collect();
1335        assert_eq!(tracker.stats().lines_processed, 2);
1336    }
1337
1338    #[test]
1339    fn snapshot_reflects_cumulative_state() {
1340        let lines = vec![
1341            full_line(UUID1, TS1, "CHANNEL_DATA:"),
1342            format!("{UUID1} Channel-Name: [sofia/internal/+15550001234@192.0.2.1]"),
1343            format!("{UUID1} EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 set(foo=bar)"),
1344            full_line(
1345                UUID1,
1346                TS2,
1347                "Processing 5551111111->5552222222 in context public",
1348            ),
1349        ];
1350        let entries = collect_enriched(lines);
1351        assert_eq!(entries.len(), 3);
1352        let first = entries[0].session.as_ref().unwrap();
1353        assert_eq!(
1354            first.channel_name.as_deref(),
1355            Some("sofia/internal/+15550001234@192.0.2.1"),
1356        );
1357        assert!(first.dialplan_context.is_none());
1358
1359        let last = entries[2].session.as_ref().unwrap();
1360        assert_eq!(
1361            last.channel_name.as_deref(),
1362            Some("sofia/internal/+15550001234@192.0.2.1"),
1363        );
1364        assert_eq!(last.dialplan_context.as_deref(), Some("public"));
1365    }
1366}