Skip to main content

freeswitch_log_parser/
session.rs

1use std::collections::HashMap;
2use std::str::FromStr;
3
4use freeswitch_types::{BridgeDialString, CallDirection, DialString};
5
6use crate::line::parse_line;
7use crate::message::{classify_message, MessageKind};
8use crate::stream::{Block, LogEntry, LogStream, ParseStats, UnclassifiedLine};
9
10/// Mutable per-UUID state accumulator, updated as entries are processed.
11///
12/// Fields are `None` until the corresponding data is first seen in the stream.
13/// Variables accumulate from CHANNEL_DATA dumps, `set()`/`export()` executions,
14/// `SET`/`EXPORT` log lines, and inline `variable_*` lines.
15#[derive(Debug, Clone, Default)]
16pub struct SessionState {
17    /// `None` until a `Channel-Name` field is encountered.
18    pub channel_name: Option<String>,
19    /// `None` until a state change or `Channel-State` field is encountered.
20    pub channel_state: Option<String>,
21    /// First dialplan context seen; set once and never overwritten.
22    pub initial_context: Option<String>,
23    /// Current dialplan context; updated on each transfer/continue.
24    pub dialplan_context: Option<String>,
25    /// Source extension in the dialplan routing; `None` until a dialplan line is processed.
26    pub dialplan_from: Option<String>,
27    /// Target extension in the dialplan routing; `None` until a dialplan line is processed.
28    pub dialplan_to: Option<String>,
29    /// Call direction from `Call-Direction` CHANNEL_DATA field; `None` until seen.
30    pub call_direction: Option<CallDirection>,
31    /// Caller ID number from `Caller-Caller-ID-Number` CHANNEL_DATA field; `None` until seen.
32    pub caller_id_number: Option<String>,
33    /// Destination number from `Caller-Destination-Number` CHANNEL_DATA field; `None` until seen.
34    pub destination_number: Option<String>,
35    /// Other leg's UUID; `None` until bridged. Set from `Originate Resulted in Success` on A-leg,
36    /// and from `New Channel` on B-leg (back-pointing to A-leg via originate context).
37    pub other_leg_uuid: Option<String>,
38    /// Pending bridge target channel from `EXECUTE bridge()`, consumed when B-leg `New Channel` matches.
39    pub(crate) pending_bridge_target: Option<String>,
40    /// All variables learned so far, with the `variable_` prefix stripped from names.
41    pub variables: HashMap<String, String>,
42}
43
44/// Immutable point-in-time copy of a session's state, attached to each [`EnrichedEntry`].
45///
46/// Does not include `variables` to keep snapshots lightweight — access the full
47/// variable map via [`SessionTracker::sessions()`].
48#[derive(Debug, Clone)]
49pub struct SessionSnapshot {
50    pub channel_name: Option<String>,
51    pub channel_state: Option<String>,
52    pub initial_context: Option<String>,
53    pub dialplan_context: Option<String>,
54    pub dialplan_from: Option<String>,
55    pub dialplan_to: Option<String>,
56    pub call_direction: Option<CallDirection>,
57    pub caller_id_number: Option<String>,
58    pub destination_number: Option<String>,
59    pub other_leg_uuid: Option<String>,
60}
61
62impl SessionState {
63    fn snapshot(&self) -> SessionSnapshot {
64        SessionSnapshot {
65            channel_name: self.channel_name.clone(),
66            channel_state: self.channel_state.clone(),
67            initial_context: self.initial_context.clone(),
68            dialplan_context: self.dialplan_context.clone(),
69            dialplan_from: self.dialplan_from.clone(),
70            dialplan_to: self.dialplan_to.clone(),
71            call_direction: self.call_direction,
72            caller_id_number: self.caller_id_number.clone(),
73            destination_number: self.destination_number.clone(),
74            other_leg_uuid: self.other_leg_uuid.clone(),
75        }
76    }
77
78    fn update_from_entry(&mut self, entry: &LogEntry) {
79        if let Some(Block::ChannelData { fields, variables }) = &entry.block {
80            for (name, value) in fields {
81                match name.as_str() {
82                    "Channel-Name" => self.channel_name = Some(value.clone()),
83                    "Channel-State" => self.channel_state = Some(value.clone()),
84                    "Call-Direction" => {
85                        self.call_direction = CallDirection::from_str(value).ok();
86                    }
87                    "Caller-Caller-ID-Number" => {
88                        self.caller_id_number = Some(value.clone());
89                    }
90                    "Caller-Destination-Number" => {
91                        self.destination_number = Some(value.clone());
92                    }
93                    "Other-Leg-Unique-ID" => {
94                        self.other_leg_uuid = Some(value.clone());
95                    }
96                    _ => {}
97                }
98            }
99            for (name, value) in variables {
100                let var_name = name.strip_prefix("variable_").unwrap_or(name);
101                self.variables.insert(var_name.to_string(), value.clone());
102            }
103        }
104
105        match &entry.message_kind {
106            MessageKind::Dialplan { detail, .. } => {
107                if let Some(dp) = parse_dialplan_context(detail) {
108                    self.initial_context.get_or_insert(dp.context.clone());
109                    self.dialplan_context = Some(dp.context);
110                    self.dialplan_from = Some(dp.from);
111                    self.dialplan_to = Some(dp.to);
112                }
113            }
114            MessageKind::Execute {
115                application,
116                arguments,
117                ..
118            } => match application.as_str() {
119                "set" | "export" => {
120                    if let Some((name, value)) = arguments.split_once('=') {
121                        self.variables.insert(name.to_string(), value.to_string());
122                    }
123                }
124                "bridge" => {
125                    if let Some(info) = parse_bridge_args(arguments) {
126                        if let Some(uuid) = &info.origination_uuid {
127                            self.other_leg_uuid = Some(uuid.clone());
128                        }
129                        self.pending_bridge_target = Some(info.target_channel);
130                    }
131                }
132                _ => {}
133            },
134            MessageKind::Variable { name, value } => {
135                let var_name = name.strip_prefix("variable_").unwrap_or(name);
136                self.variables.insert(var_name.to_string(), value.clone());
137            }
138            MessageKind::ChannelField { name, value } => match name.as_str() {
139                "Channel-Name" => self.channel_name = Some(value.clone()),
140                "Channel-State" => self.channel_state = Some(value.clone()),
141                _ => {}
142            },
143            MessageKind::StateChange { detail } => {
144                if let Some(new_state) = parse_state_change(detail) {
145                    self.channel_state = Some(new_state);
146                }
147            }
148            MessageKind::ChannelLifecycle { detail } => {
149                if let Some(name) = parse_new_channel(detail) {
150                    if self.channel_name.is_none() {
151                        self.channel_name = Some(name);
152                    }
153                }
154            }
155            _ => {}
156        }
157
158        if entry.message.contains("Processing ") && entry.message.contains(" in context ") {
159            if let Some(dp) = parse_processing_line(&entry.message) {
160                self.initial_context.get_or_insert(dp.context.clone());
161                self.dialplan_context = Some(dp.context);
162                self.dialplan_from = Some(dp.from);
163                self.dialplan_to = Some(dp.to);
164            }
165        }
166
167        for attached in &entry.attached {
168            let parsed = parse_line(attached);
169            self.update_from_message(parsed.message);
170        }
171    }
172
173    fn update_from_message(&mut self, msg: &str) {
174        let kind = classify_message(msg);
175        match &kind {
176            MessageKind::Dialplan { detail, .. } => {
177                if let Some(dp) = parse_dialplan_context(detail) {
178                    self.initial_context.get_or_insert(dp.context.clone());
179                    self.dialplan_context = Some(dp.context);
180                    self.dialplan_from = Some(dp.from);
181                    self.dialplan_to = Some(dp.to);
182                }
183            }
184            MessageKind::Variable { name, value } => {
185                let var_name = name.strip_prefix("variable_").unwrap_or(name);
186                self.variables.insert(var_name.to_string(), value.clone());
187            }
188            MessageKind::ChannelField { name, value } => match name.as_str() {
189                "Channel-Name" => self.channel_name = Some(value.clone()),
190                "Channel-State" => self.channel_state = Some(value.clone()),
191                _ => {}
192            },
193            MessageKind::StateChange { detail } => {
194                if let Some(new_state) = parse_state_change(detail) {
195                    self.channel_state = Some(new_state);
196                }
197            }
198            _ => {}
199        }
200    }
201}
202
203struct DialplanContext {
204    from: String,
205    to: String,
206    context: String,
207}
208
209fn parse_dialplan_context(detail: &str) -> Option<DialplanContext> {
210    if !detail.starts_with("parsing [") {
211        return None;
212    }
213    let rest = &detail["parsing [".len()..];
214    let bracket_end = rest.find(']')?;
215    let inner = &rest[..bracket_end];
216
217    let arrow = inner.find("->")?;
218    let from_part = &inner[..arrow];
219    let to_part = &inner[arrow + 2..];
220
221    let context = if rest.len() > bracket_end + 1 {
222        let after = rest[bracket_end + 1..].trim();
223        if let Some(stripped) = after.strip_prefix("continue=") {
224            let _ = stripped;
225        }
226        from_part.to_string()
227    } else {
228        from_part.to_string()
229    };
230
231    Some(DialplanContext {
232        from: from_part.to_string(),
233        to: to_part.to_string(),
234        context,
235    })
236}
237
238fn parse_processing_line(msg: &str) -> Option<DialplanContext> {
239    let proc_idx = msg.find("Processing ")?;
240    let rest = &msg[proc_idx + "Processing ".len()..];
241
242    let arrow = rest.find("->")?;
243    let from = &rest[..arrow];
244
245    let after_arrow = &rest[arrow + 2..];
246    let space = after_arrow.find(' ')?;
247    let to = &after_arrow[..space];
248
249    let ctx_idx = after_arrow.find("in context ")?;
250    let ctx_rest = &after_arrow[ctx_idx + "in context ".len()..];
251    let context = ctx_rest.split_whitespace().next()?;
252
253    Some(DialplanContext {
254        from: from.to_string(),
255        to: to.to_string(),
256        context: context.to_string(),
257    })
258}
259
260fn parse_new_channel(detail: &str) -> Option<String> {
261    let rest = detail.strip_prefix("New Channel ")?;
262    let bracket = rest.rfind(" [")?;
263    Some(rest[..bracket].to_string())
264}
265
266fn parse_state_change(detail: &str) -> Option<String> {
267    let arrow = detail.find(" -> ")?;
268    Some(detail[arrow + 4..].trim().to_string())
269}
270
271/// Extract `origination_uuid` and the bridge target channel from bridge() arguments.
272/// Uses `BridgeDialString` from freeswitch-types for correct parsing of `[]`, `{}`,
273/// `|` failover, and `,` simultaneous ring syntax.
274fn parse_bridge_args(arguments: &str) -> Option<BridgeInfo> {
275    let dial = BridgeDialString::from_str(arguments).ok()?;
276    let first_ep = dial.groups.first()?.first()?;
277    let origination_uuid = first_ep
278        .variables()
279        .and_then(|v| v.get("origination_uuid"))
280        .map(|s| s.to_string());
281    let mut bare = first_ep.clone();
282    bare.set_variables(None);
283    let target_channel = bare.to_string();
284    Some(BridgeInfo {
285        origination_uuid,
286        target_channel,
287    })
288}
289
290struct BridgeInfo {
291    origination_uuid: Option<String>,
292    target_channel: String,
293}
294
295/// Parse "Originate Resulted in Success: [channel] Peer UUID: uuid"
296fn parse_originate_success(msg: &str) -> Option<String> {
297    let marker = "Peer UUID: ";
298    let idx = msg.find(marker)?;
299    let uuid = msg[idx + marker.len()..].trim();
300    if uuid.is_empty() {
301        None
302    } else {
303        Some(uuid.to_string())
304    }
305}
306
307/// A [`LogEntry`] paired with the session's state snapshot at that point in time.
308#[derive(Debug)]
309pub struct EnrichedEntry {
310    pub entry: LogEntry,
311    /// `None` for system lines (entries with an empty UUID).
312    pub session: Option<SessionSnapshot>,
313}
314
315/// Layer 3 per-session state machine — tracks per-UUID state (dialplan context,
316/// channel state, variables) across entries and yields [`EnrichedEntry`] values.
317///
318/// Wraps a [`LogStream`] and maintains a `HashMap<String, SessionState>` keyed by UUID.
319/// Sessions are never automatically cleaned up; call [`remove_session()`](SessionTracker::remove_session)
320/// when a call ends.
321pub struct SessionTracker<I> {
322    inner: LogStream<I>,
323    sessions: HashMap<String, SessionState>,
324}
325
326impl<I: Iterator<Item = String>> SessionTracker<I> {
327    /// Wrap a [`LogStream`] to add per-session state tracking.
328    pub fn new(inner: LogStream<I>) -> Self {
329        SessionTracker {
330            inner,
331            sessions: HashMap::new(),
332        }
333    }
334
335    /// All currently tracked sessions, keyed by UUID.
336    pub fn sessions(&self) -> &HashMap<String, SessionState> {
337        &self.sessions
338    }
339
340    /// Remove and return a session's accumulated state. Call this when a call ends
341    /// (e.g. `CS_DESTROY` or hangup) to free memory.
342    pub fn remove_session(&mut self, uuid: &str) -> Option<SessionState> {
343        self.sessions.remove(uuid)
344    }
345
346    /// Delegates to [`LogStream::stats()`].
347    pub fn stats(&self) -> &ParseStats {
348        self.inner.stats()
349    }
350
351    /// Delegates to [`LogStream::drain_unclassified()`].
352    pub fn drain_unclassified(&mut self) -> Vec<UnclassifiedLine> {
353        self.inner.drain_unclassified()
354    }
355
356    /// Cross-session leg linking. Called after `update_from_entry` so per-session
357    /// state (bridge target, channel name) is already populated.
358    fn link_legs(&mut self, uuid: &str, entry: &LogEntry) {
359        // 1. "Originate Resulted in Success ... Peer UUID: BLEG" — authoritative
360        if entry.message.contains("Originate Resulted in Success") {
361            if let Some(peer_uuid) = parse_originate_success(&entry.message) {
362                let a_uuid = uuid.to_string();
363                if let Some(a_state) = self.sessions.get_mut(&a_uuid) {
364                    a_state.other_leg_uuid = Some(peer_uuid.clone());
365                    a_state.pending_bridge_target = None;
366                }
367                let b_state = self.sessions.entry(peer_uuid).or_default();
368                b_state.other_leg_uuid = Some(a_uuid);
369            }
370            return;
371        }
372
373        // 2. New Channel on this UUID — check if any other session has a pending bridge
374        //    with origination_uuid matching this UUID, or target matching this channel name.
375        if let MessageKind::ChannelLifecycle { detail } = &entry.message_kind {
376            if let Some(channel_name) = parse_new_channel(detail) {
377                let b_uuid = uuid.to_string();
378                let mut a_uuid_found = None;
379
380                for (a_uuid, a_state) in &self.sessions {
381                    if *a_uuid == b_uuid {
382                        continue;
383                    }
384                    // origination_uuid match: A-leg already set other_leg_uuid during bridge parse
385                    if a_state.other_leg_uuid.as_deref() == Some(&b_uuid) {
386                        a_uuid_found = Some(a_uuid.clone());
387                        break;
388                    }
389                    // Target channel match
390                    if a_state.pending_bridge_target.as_deref() == Some(channel_name.as_str()) {
391                        a_uuid_found = Some(a_uuid.clone());
392                        break;
393                    }
394                }
395
396                if let Some(a_uuid) = a_uuid_found {
397                    if let Some(a_state) = self.sessions.get_mut(&a_uuid) {
398                        a_state.other_leg_uuid = Some(b_uuid.clone());
399                        a_state.pending_bridge_target = None;
400                    }
401                    if let Some(b_state) = self.sessions.get_mut(&b_uuid) {
402                        b_state.other_leg_uuid = Some(a_uuid);
403                    }
404                }
405            }
406        }
407    }
408}
409
410impl<I: Iterator<Item = String>> Iterator for SessionTracker<I> {
411    type Item = EnrichedEntry;
412
413    fn next(&mut self) -> Option<EnrichedEntry> {
414        let entry = self.inner.next()?;
415
416        if entry.uuid.is_empty() {
417            return Some(EnrichedEntry {
418                entry,
419                session: None,
420            });
421        }
422
423        let uuid = entry.uuid.clone();
424        let state = self.sessions.entry(uuid.clone()).or_default();
425        state.update_from_entry(&entry);
426
427        self.link_legs(&uuid, &entry);
428
429        let snapshot = self.sessions.get(&uuid).unwrap().snapshot();
430
431        Some(EnrichedEntry {
432            entry,
433            session: Some(snapshot),
434        })
435    }
436}
437
438#[cfg(test)]
439mod tests {
440    use super::*;
441
442    const UUID1: &str = "a1b2c3d4-e5f6-7890-abcd-ef1234567890";
443    const UUID2: &str = "b2c3d4e5-f6a7-8901-bcde-f12345678901";
444    const UUID3: &str = "c3d4e5f6-a7b8-9012-cdef-234567890123";
445    const TS1: &str = "2025-01-15 10:30:45.123456";
446    const TS2: &str = "2025-01-15 10:30:46.234567";
447
448    fn full_line(uuid: &str, ts: &str, msg: &str) -> String {
449        format!("{uuid} {ts} 95.97% [DEBUG] sofia.c:100 {msg}")
450    }
451
452    fn collect_enriched(lines: Vec<String>) -> Vec<EnrichedEntry> {
453        let stream = LogStream::new(lines.into_iter());
454        SessionTracker::new(stream).collect()
455    }
456
457    #[test]
458    fn system_line_no_session() {
459        let lines = vec![format!(
460            "{TS1} 95.97% [INFO] mod_event_socket.c:1772 Event Socket command"
461        )];
462        let entries = collect_enriched(lines);
463        assert_eq!(entries.len(), 1);
464        assert!(entries[0].session.is_none());
465    }
466
467    #[test]
468    fn dialplan_context_propagation() {
469        let lines = vec![
470            full_line(UUID1, TS1, "CHANNEL_DATA:"),
471            format!("{UUID1} Channel-Name: [sofia/internal/+15550001234@192.0.2.1]"),
472            format!("{UUID1} EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 answer"),
473            format!("{UUID1} Dialplan: sofia/internal/+15550001234@192.0.2.1 parsing [public->global] continue=true"),
474            full_line(UUID1, TS2, "Some later event"),
475        ];
476        let entries = collect_enriched(lines);
477        let last = entries.last().unwrap();
478        let session = last.session.as_ref().unwrap();
479        assert_eq!(session.dialplan_context.as_deref(), Some("public"));
480        assert_eq!(session.dialplan_from.as_deref(), Some("public"));
481        assert_eq!(session.dialplan_to.as_deref(), Some("global"));
482    }
483
484    #[test]
485    fn processing_line_extracts_context() {
486        let lines = vec![full_line(
487            UUID1,
488            TS1,
489            "Processing 5551234567->5559876543 in context public",
490        )];
491        let entries = collect_enriched(lines);
492        let session = entries[0].session.as_ref().unwrap();
493        assert_eq!(session.dialplan_context.as_deref(), Some("public"));
494        assert_eq!(session.dialplan_from.as_deref(), Some("5551234567"));
495        assert_eq!(session.dialplan_to.as_deref(), Some("5559876543"));
496    }
497
498    #[test]
499    fn initial_context_preserved_across_transfers() {
500        let lines = vec![
501            full_line(
502                UUID1,
503                TS1,
504                "Processing 5551234567->5559876543 in context public",
505            ),
506            full_line(
507                UUID1,
508                TS2,
509                "Processing 5551234567->start_recording in context recordings",
510            ),
511        ];
512        let stream = LogStream::new(lines.into_iter());
513        let mut tracker = SessionTracker::new(stream);
514        let entries: Vec<_> = tracker.by_ref().collect();
515
516        let first = entries[0].session.as_ref().unwrap();
517        assert_eq!(
518            first.initial_context.as_deref(),
519            Some("public"),
520            "initial_context set on first Processing line"
521        );
522        assert_eq!(first.dialplan_context.as_deref(), Some("public"));
523
524        let state = tracker.sessions().get(UUID1).unwrap();
525        assert_eq!(
526            state.initial_context.as_deref(),
527            Some("public"),
528            "initial_context keeps the first context seen"
529        );
530        assert_eq!(
531            state.dialplan_context.as_deref(),
532            Some("recordings"),
533            "dialplan_context tracks the current context"
534        );
535        assert_eq!(state.dialplan_to.as_deref(), Some("start_recording"));
536    }
537
538    #[test]
539    fn new_channel_sets_channel_name() {
540        let lines = vec![full_line(
541            UUID1,
542            TS1,
543            "New Channel sofia/internal-v4/sos [a1b2c3d4-e5f6-7890-abcd-ef1234567890]",
544        )];
545        let entries = collect_enriched(lines);
546        let session = entries[0].session.as_ref().unwrap();
547        assert_eq!(
548            session.channel_name.as_deref(),
549            Some("sofia/internal-v4/sos")
550        );
551    }
552
553    #[test]
554    fn originate_success_links_both_legs() {
555        // "Originate Resulted in Success" contains both the A-leg UUID (line prefix)
556        // and B-leg UUID (Peer UUID field). Both legs should learn about each other.
557        let lines = vec![
558            full_line(UUID2, TS1, "New Channel sofia/esinet1-v6-tcp/sip:target.example.com [b2c3d4e5-f6a7-8901-bcde-f12345678901]"),
559            full_line(UUID1, TS2, "Originate Resulted in Success: [sofia/esinet1-v6-tcp/sip:target.example.com] Peer UUID: b2c3d4e5-f6a7-8901-bcde-f12345678901"),
560        ];
561        let stream = LogStream::new(lines.into_iter());
562        let mut tracker = SessionTracker::new(stream);
563        let _: Vec<_> = tracker.by_ref().collect();
564
565        let a_leg = tracker.sessions().get(UUID1).unwrap();
566        assert_eq!(
567            a_leg.other_leg_uuid.as_deref(),
568            Some(UUID2),
569            "A-leg other_leg_uuid set from Originate Resulted in Success"
570        );
571
572        let b_leg = tracker.sessions().get(UUID2).unwrap();
573        assert_eq!(
574            b_leg.other_leg_uuid.as_deref(),
575            Some(UUID1),
576            "B-leg other_leg_uuid points back to A-leg"
577        );
578    }
579
580    #[test]
581    fn bridge_origination_uuid_links_a_leg_immediately() {
582        // bridge([origination_uuid=BLEG_UUID,...]) guarantees B-leg UUID from execute args alone.
583        // A-leg knows B-leg immediately, B-leg learns A-leg when New Channel appears.
584        let lines = vec![
585            full_line(UUID1, TS1, "EXECUTE [depth=0] sofia/internal-v6/1232@[2001:db8::10] bridge([origination_uuid=b2c3d4e5-f6a7-8901-bcde-f12345678901,leg_timeout=2]sofia/esinet1-v6-tcp/sip:target.example.com)"),
586            full_line(UUID2, TS1, "New Channel sofia/esinet1-v6-tcp/sip:target.example.com [b2c3d4e5-f6a7-8901-bcde-f12345678901]"),
587        ];
588        let stream = LogStream::new(lines.into_iter());
589        let mut tracker = SessionTracker::new(stream);
590        let _: Vec<_> = tracker.by_ref().collect();
591
592        let a_leg = tracker.sessions().get(UUID1).unwrap();
593        assert_eq!(
594            a_leg.other_leg_uuid.as_deref(),
595            Some(UUID2),
596            "A-leg knows B-leg UUID from origination_uuid in bridge args"
597        );
598
599        let b_leg = tracker.sessions().get(UUID2).unwrap();
600        assert_eq!(
601            b_leg.other_leg_uuid.as_deref(),
602            Some(UUID1),
603            "B-leg knows A-leg once New Channel correlates"
604        );
605    }
606
607    #[test]
608    fn bridge_target_matches_new_channel() {
609        // bridge() without origination_uuid — B-leg UUID is auto-generated by FS.
610        // Match via bridge target channel matching next New Channel with same target.
611        let lines = vec![
612            full_line(UUID1, TS1, "EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 bridge(sofia/gateway/carrier/+15559876543)"),
613            full_line(UUID1, TS1, "Parsing session specific variables"),
614            full_line(UUID2, TS1, "New Channel sofia/gateway/carrier/+15559876543 [b2c3d4e5-f6a7-8901-bcde-f12345678901]"),
615        ];
616        let stream = LogStream::new(lines.into_iter());
617        let mut tracker = SessionTracker::new(stream);
618        let _: Vec<_> = tracker.by_ref().collect();
619
620        let a_leg = tracker.sessions().get(UUID1).unwrap();
621        assert_eq!(
622            a_leg.other_leg_uuid.as_deref(),
623            Some(UUID2),
624            "A-leg linked to B-leg via bridge target matching New Channel"
625        );
626
627        let b_leg = tracker.sessions().get(UUID2).unwrap();
628        assert_eq!(
629            b_leg.other_leg_uuid.as_deref(),
630            Some(UUID1),
631            "B-leg linked back to A-leg"
632        );
633    }
634
635    #[test]
636    fn originate_success_corrects_wrong_target_match() {
637        // Bridge target matching guessed UUID2 as B-leg, but originate success reveals
638        // the actual B-leg is UUID3. The authoritative success message must override.
639        let lines = vec![
640            full_line(UUID1, TS1, "EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 bridge(sofia/gateway/carrier/+15559876543)"),
641            full_line(UUID2, TS1, "New Channel sofia/gateway/carrier/+15559876543 [b2c3d4e5-f6a7-8901-bcde-f12345678901]"),
642            full_line(UUID1, TS2, "Originate Resulted in Success: [sofia/gateway/carrier/+15559876543] Peer UUID: c3d4e5f6-a7b8-9012-cdef-234567890123"),
643        ];
644        let stream = LogStream::new(lines.into_iter());
645        let mut tracker = SessionTracker::new(stream);
646        let _: Vec<_> = tracker.by_ref().collect();
647
648        let a_leg = tracker.sessions().get(UUID1).unwrap();
649        assert_eq!(
650            a_leg.other_leg_uuid.as_deref(),
651            Some(UUID3),
652            "Originate success overrides earlier target-match guess"
653        );
654
655        let real_b_leg = tracker.sessions().get(UUID3).unwrap();
656        assert_eq!(
657            real_b_leg.other_leg_uuid.as_deref(),
658            Some(UUID1),
659            "Real B-leg points back to A-leg"
660        );
661    }
662
663    #[test]
664    fn channel_data_other_leg_uuid() {
665        // Other-Leg-Unique-ID in CHANNEL_DATA (post-bridge info dump) sets other_leg_uuid
666        let lines = vec![
667            full_line(UUID1, TS1, "CHANNEL_DATA:"),
668            format!("{UUID1} Other-Leg-Unique-ID: [{UUID2}]"),
669        ];
670        let stream = LogStream::new(lines.into_iter());
671        let mut tracker = SessionTracker::new(stream);
672        let _: Vec<_> = tracker.by_ref().collect();
673
674        let state = tracker.sessions().get(UUID1).unwrap();
675        assert_eq!(
676            state.other_leg_uuid.as_deref(),
677            Some(UUID2),
678            "other_leg_uuid set from Other-Leg-Unique-ID CHANNEL_DATA field"
679        );
680    }
681
682    #[test]
683    fn channel_data_populates_session() {
684        let lines = vec![
685            full_line(UUID1, TS1, "CHANNEL_DATA:"),
686            format!("{UUID1} Channel-Name: [sofia/internal/+15550001234@192.0.2.1]"),
687            format!("{UUID1} Channel-State: [CS_EXECUTE]"),
688            "variable_sip_call_id: [test123@192.0.2.1]".to_string(),
689            "variable_direction: [inbound]".to_string(),
690        ];
691        let entries = collect_enriched(lines);
692        assert_eq!(entries.len(), 1);
693        let session = entries[0].session.as_ref().unwrap();
694        assert_eq!(
695            session.channel_name.as_deref(),
696            Some("sofia/internal/+15550001234@192.0.2.1")
697        );
698        assert_eq!(session.channel_state.as_deref(), Some("CS_EXECUTE"));
699    }
700
701    #[test]
702    fn variables_learned_from_channel_data() {
703        let lines = vec![
704            full_line(UUID1, TS1, "CHANNEL_DATA:"),
705            "variable_sip_call_id: [test123@192.0.2.1]".to_string(),
706            "variable_direction: [inbound]".to_string(),
707        ];
708        let stream = LogStream::new(lines.into_iter());
709        let mut tracker = SessionTracker::new(stream);
710        let _: Vec<_> = tracker.by_ref().collect();
711        let state = tracker.sessions().get(UUID1).unwrap();
712        assert_eq!(
713            state.variables.get("sip_call_id").map(|s| s.as_str()),
714            Some("test123@192.0.2.1")
715        );
716        assert_eq!(
717            state.variables.get("direction").map(|s| s.as_str()),
718            Some("inbound")
719        );
720    }
721
722    #[test]
723    fn variables_learned_from_set_execute() {
724        let lines = vec![
725            full_line(UUID1, TS1, "First"),
726            format!("{UUID1} EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 set(call_direction=inbound)"),
727            full_line(UUID1, TS2, "After set"),
728        ];
729        let stream = LogStream::new(lines.into_iter());
730        let mut tracker = SessionTracker::new(stream);
731        let entries: Vec<_> = tracker.by_ref().collect();
732        assert_eq!(entries.len(), 3);
733        let state = tracker.sessions().get(UUID1).unwrap();
734        assert_eq!(
735            state.variables.get("call_direction").map(|s| s.as_str()),
736            Some("inbound")
737        );
738    }
739
740    #[test]
741    fn variables_learned_from_export_execute() {
742        let lines = vec![
743            full_line(UUID1, TS1, "First"),
744            format!("{UUID1} EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 export(originate_timeout=3600)"),
745        ];
746        let stream = LogStream::new(lines.into_iter());
747        let mut tracker = SessionTracker::new(stream);
748        let _: Vec<_> = tracker.by_ref().collect();
749        let state = tracker.sessions().get(UUID1).unwrap();
750        assert_eq!(
751            state.variables.get("originate_timeout").map(|s| s.as_str()),
752            Some("3600")
753        );
754    }
755
756    #[test]
757    fn session_isolation_between_uuids() {
758        let lines = vec![
759            full_line(
760                UUID1,
761                TS1,
762                "Processing 5551111111->5552222222 in context public",
763            ),
764            full_line(
765                UUID2,
766                TS2,
767                "Processing 5553333333->5554444444 in context private",
768            ),
769        ];
770        let stream = LogStream::new(lines.into_iter());
771        let mut tracker = SessionTracker::new(stream);
772        let _: Vec<_> = tracker.by_ref().collect();
773        let s1 = tracker.sessions().get(UUID1).unwrap();
774        let s2 = tracker.sessions().get(UUID2).unwrap();
775        assert_eq!(s1.dialplan_context.as_deref(), Some("public"));
776        assert_eq!(s2.dialplan_context.as_deref(), Some("private"));
777        assert_eq!(s1.dialplan_from.as_deref(), Some("5551111111"));
778        assert_eq!(s2.dialplan_from.as_deref(), Some("5553333333"));
779    }
780
781    #[test]
782    fn processing_line_with_regex_type_and_angle_bracket_caller() {
783        let lines = vec![full_line(
784            UUID1,
785            TS1,
786            "Processing Emergency S R <5550001234>->start_recording in context recordings",
787        )];
788        let entries = collect_enriched(lines);
789        let session = entries[0].session.as_ref().unwrap();
790        assert_eq!(session.initial_context.as_deref(), Some("recordings"));
791        assert_eq!(session.dialplan_context.as_deref(), Some("recordings"));
792        assert_eq!(
793            session.dialplan_from.as_deref(),
794            Some("Emergency S R <5550001234>")
795        );
796        assert_eq!(session.dialplan_to.as_deref(), Some("start_recording"));
797    }
798
799    #[test]
800    fn processing_line_extension_format() {
801        let lines = vec![full_line(
802            UUID1,
803            TS1,
804            "Processing Extension 1263 <1263>->start_recording in context recordings",
805        )];
806        let entries = collect_enriched(lines);
807        let session = entries[0].session.as_ref().unwrap();
808        assert_eq!(session.initial_context.as_deref(), Some("recordings"));
809        assert_eq!(
810            session.dialplan_from.as_deref(),
811            Some("Extension 1263 <1263>")
812        );
813        assert_eq!(session.dialplan_to.as_deref(), Some("start_recording"));
814    }
815
816    #[test]
817    fn state_change_updates_channel_state() {
818        let lines = vec![full_line(UUID1, TS1, "State Change CS_INIT -> CS_ROUTING")];
819        let entries = collect_enriched(lines);
820        let session = entries[0].session.as_ref().unwrap();
821        assert_eq!(session.channel_state.as_deref(), Some("CS_ROUTING"));
822    }
823
824    #[test]
825    fn callstate_change_updates_channel_state() {
826        let lines = vec![full_line(
827            UUID1,
828            TS1,
829            "(sofia/internal-v4/sos) Callstate Change DOWN -> RINGING",
830        )];
831        let entries = collect_enriched(lines);
832        let session = entries[0].session.as_ref().unwrap();
833        assert_eq!(session.channel_state.as_deref(), Some("RINGING"));
834    }
835
836    #[test]
837    fn state_change_overrides_callstate() {
838        let lines = vec![
839            full_line(
840                UUID1,
841                TS1,
842                "(sofia/internal-v4/sos) Callstate Change DOWN -> RINGING",
843            ),
844            full_line(
845                UUID1,
846                TS2,
847                "(sofia/internal-v4/sos) State Change CS_CONSUME_MEDIA -> CS_EXCHANGE_MEDIA",
848            ),
849        ];
850        let entries = collect_enriched(lines);
851        assert_eq!(
852            entries[0]
853                .session
854                .as_ref()
855                .unwrap()
856                .channel_state
857                .as_deref(),
858            Some("RINGING")
859        );
860        assert_eq!(
861            entries[1]
862                .session
863                .as_ref()
864                .unwrap()
865                .channel_state
866                .as_deref(),
867            Some("CS_EXCHANGE_MEDIA")
868        );
869    }
870
871    #[test]
872    fn bleg_lifecycle_extracts_data_from_processing() {
873        let lines = vec![
874            full_line(
875                UUID1,
876                TS1,
877                "New Channel sofia/internal-v4/sos [a1b2c3d4-e5f6-7890-abcd-ef1234567890]",
878            ),
879            full_line(
880                UUID1,
881                TS1,
882                "(sofia/internal-v4/sos) State Change CS_NEW -> CS_INIT",
883            ),
884            full_line(
885                UUID1,
886                TS1,
887                "(sofia/internal-v4/sos) State Change CS_INIT -> CS_ROUTING",
888            ),
889            full_line(
890                UUID1,
891                TS1,
892                "(sofia/internal-v4/sos) State Change CS_ROUTING -> CS_CONSUME_MEDIA",
893            ),
894            full_line(
895                UUID1,
896                TS1,
897                "(sofia/internal-v4/sos) Callstate Change DOWN -> RINGING",
898            ),
899            full_line(
900                UUID1,
901                TS2,
902                "(sofia/internal-v4/sos) State Change CS_CONSUME_MEDIA -> CS_EXCHANGE_MEDIA",
903            ),
904            full_line(
905                UUID1,
906                TS2,
907                "Processing Emergency S R <5550001234>->start_recording in context recordings",
908            ),
909            full_line(
910                UUID1,
911                TS2,
912                "(sofia/internal-v4/sos) State Change CS_EXCHANGE_MEDIA -> CS_HANGUP",
913            ),
914        ];
915        let entries = collect_enriched(lines);
916
917        let after_ringing = entries[4].session.as_ref().unwrap();
918        assert_eq!(after_ringing.channel_state.as_deref(), Some("RINGING"));
919        assert!(after_ringing.initial_context.is_none());
920
921        let after_processing = entries[6].session.as_ref().unwrap();
922        assert_eq!(
923            after_processing.channel_state.as_deref(),
924            Some("CS_EXCHANGE_MEDIA")
925        );
926        assert_eq!(
927            after_processing.initial_context.as_deref(),
928            Some("recordings")
929        );
930        assert_eq!(
931            after_processing.dialplan_from.as_deref(),
932            Some("Emergency S R <5550001234>")
933        );
934        assert_eq!(
935            after_processing.dialplan_to.as_deref(),
936            Some("start_recording")
937        );
938
939        let after_hangup = entries[7].session.as_ref().unwrap();
940        assert_eq!(after_hangup.channel_state.as_deref(), Some("CS_HANGUP"));
941        assert_eq!(after_hangup.initial_context.as_deref(), Some("recordings"));
942    }
943
944    #[test]
945    fn channel_name_from_new_channel() {
946        let lines = vec![full_line(
947            UUID1,
948            TS1,
949            "New Channel sofia/internal-v4/sos [a1b2c3d4-e5f6-7890-abcd-ef1234567890]",
950        )];
951        let entries = collect_enriched(lines);
952        let session = entries[0].session.as_ref().unwrap();
953        assert_eq!(
954            session.channel_name.as_deref(),
955            Some("sofia/internal-v4/sos")
956        );
957    }
958
959    #[test]
960    fn remove_session() {
961        let lines = vec![full_line(
962            UUID1,
963            TS1,
964            "Processing 5551111111->5552222222 in context public",
965        )];
966        let stream = LogStream::new(lines.into_iter());
967        let mut tracker = SessionTracker::new(stream);
968        let _: Vec<_> = tracker.by_ref().collect();
969        assert!(tracker.sessions().contains_key(UUID1));
970        let removed = tracker.remove_session(UUID1).unwrap();
971        assert_eq!(removed.dialplan_context.as_deref(), Some("public"));
972        assert!(!tracker.sessions().contains_key(UUID1));
973    }
974
975    #[test]
976    fn stats_delegation() {
977        let lines = vec![
978            full_line(UUID1, TS1, "First"),
979            full_line(UUID1, TS2, "Second"),
980        ];
981        let stream = LogStream::new(lines.into_iter());
982        let mut tracker = SessionTracker::new(stream);
983        let _: Vec<_> = tracker.by_ref().collect();
984        assert_eq!(tracker.stats().lines_processed, 2);
985    }
986
987    #[test]
988    fn snapshot_reflects_cumulative_state() {
989        let lines = vec![
990            full_line(UUID1, TS1, "CHANNEL_DATA:"),
991            format!("{UUID1} Channel-Name: [sofia/internal/+15550001234@192.0.2.1]"),
992            format!("{UUID1} EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 set(foo=bar)"),
993            full_line(
994                UUID1,
995                TS2,
996                "Processing 5551111111->5552222222 in context public",
997            ),
998        ];
999        let entries = collect_enriched(lines);
1000        assert_eq!(entries.len(), 3);
1001        let first = entries[0].session.as_ref().unwrap();
1002        assert_eq!(
1003            first.channel_name.as_deref(),
1004            Some("sofia/internal/+15550001234@192.0.2.1"),
1005        );
1006        assert!(first.dialplan_context.is_none());
1007
1008        let last = entries[2].session.as_ref().unwrap();
1009        assert_eq!(
1010            last.channel_name.as_deref(),
1011            Some("sofia/internal/+15550001234@192.0.2.1"),
1012        );
1013        assert_eq!(last.dialplan_context.as_deref(), Some("public"));
1014    }
1015}