1use std::collections::HashMap;
2use std::str::FromStr;
3
4use freeswitch_types::{BridgeDialString, CallDirection, DialString};
5
6use crate::line::parse_line;
7use crate::message::{classify_message, MessageKind};
8use crate::stream::{Block, LogEntry, LogStream, ParseStats, UnclassifiedLine};
9
10#[derive(Debug, Clone, Default)]
16pub struct SessionState {
17 pub channel_name: Option<String>,
19 pub channel_state: Option<String>,
21 pub initial_context: Option<String>,
23 pub dialplan_context: Option<String>,
25 pub dialplan_from: Option<String>,
27 pub dialplan_to: Option<String>,
29 pub call_direction: Option<CallDirection>,
31 pub caller_id_number: Option<String>,
33 pub destination_number: Option<String>,
35 pub other_leg_uuid: Option<String>,
38 pub(crate) pending_bridge_target: Option<String>,
40 pub variables: HashMap<String, String>,
42}
43
44#[derive(Debug, Clone)]
49pub struct SessionSnapshot {
50 pub channel_name: Option<String>,
51 pub channel_state: Option<String>,
52 pub initial_context: Option<String>,
53 pub dialplan_context: Option<String>,
54 pub dialplan_from: Option<String>,
55 pub dialplan_to: Option<String>,
56 pub call_direction: Option<CallDirection>,
57 pub caller_id_number: Option<String>,
58 pub destination_number: Option<String>,
59 pub other_leg_uuid: Option<String>,
60}
61
62impl SessionState {
63 fn snapshot(&self) -> SessionSnapshot {
64 SessionSnapshot {
65 channel_name: self.channel_name.clone(),
66 channel_state: self.channel_state.clone(),
67 initial_context: self.initial_context.clone(),
68 dialplan_context: self.dialplan_context.clone(),
69 dialplan_from: self.dialplan_from.clone(),
70 dialplan_to: self.dialplan_to.clone(),
71 call_direction: self.call_direction,
72 caller_id_number: self.caller_id_number.clone(),
73 destination_number: self.destination_number.clone(),
74 other_leg_uuid: self.other_leg_uuid.clone(),
75 }
76 }
77
78 fn update_from_entry(&mut self, entry: &LogEntry) {
79 if let Some(Block::ChannelData { fields, variables }) = &entry.block {
80 for (name, value) in fields {
81 match name.as_str() {
82 "Channel-Name" => self.channel_name = Some(value.clone()),
83 "Channel-State" => self.channel_state = Some(value.clone()),
84 "Call-Direction" => {
85 self.call_direction = CallDirection::from_str(value).ok();
86 }
87 "Caller-Caller-ID-Number" => {
88 self.caller_id_number = Some(value.clone());
89 }
90 "Caller-Destination-Number" => {
91 self.destination_number = Some(value.clone());
92 }
93 "Other-Leg-Unique-ID" => {
94 self.other_leg_uuid = Some(value.clone());
95 }
96 _ => {}
97 }
98 }
99 for (name, value) in variables {
100 let var_name = name.strip_prefix("variable_").unwrap_or(name);
101 self.variables.insert(var_name.to_string(), value.clone());
102 }
103 }
104
105 match &entry.message_kind {
106 MessageKind::Dialplan { detail, .. } => {
107 if let Some(dp) = parse_dialplan_context(detail) {
108 self.initial_context.get_or_insert(dp.context.clone());
109 self.dialplan_context = Some(dp.context);
110 self.dialplan_from = Some(dp.from);
111 self.dialplan_to = Some(dp.to);
112 }
113 }
114 MessageKind::Execute {
115 application,
116 arguments,
117 ..
118 } => match application.as_str() {
119 "set" | "export" => {
120 if let Some((name, value)) = arguments.split_once('=') {
121 self.variables.insert(name.to_string(), value.to_string());
122 }
123 }
124 "bridge" => {
125 if let Some(info) = parse_bridge_args(arguments) {
126 if let Some(uuid) = &info.origination_uuid {
127 self.other_leg_uuid = Some(uuid.clone());
128 }
129 self.pending_bridge_target = Some(info.target_channel);
130 }
131 }
132 _ => {}
133 },
134 MessageKind::Variable { name, value } => {
135 let var_name = name.strip_prefix("variable_").unwrap_or(name);
136 self.variables.insert(var_name.to_string(), value.clone());
137 }
138 MessageKind::ChannelField { name, value } => match name.as_str() {
139 "Channel-Name" => self.channel_name = Some(value.clone()),
140 "Channel-State" => self.channel_state = Some(value.clone()),
141 _ => {}
142 },
143 MessageKind::StateChange { detail } => {
144 if let Some(new_state) = parse_state_change(detail) {
145 self.channel_state = Some(new_state);
146 }
147 }
148 MessageKind::ChannelLifecycle { detail } => {
149 if let Some(name) = parse_new_channel(detail) {
150 if self.channel_name.is_none() {
151 self.channel_name = Some(name);
152 }
153 }
154 }
155 _ => {}
156 }
157
158 if entry.message.contains("Processing ") && entry.message.contains(" in context ") {
159 if let Some(dp) = parse_processing_line(&entry.message) {
160 self.initial_context.get_or_insert(dp.context.clone());
161 self.dialplan_context = Some(dp.context);
162 self.dialplan_from = Some(dp.from);
163 self.dialplan_to = Some(dp.to);
164 }
165 }
166
167 for attached in &entry.attached {
168 let parsed = parse_line(attached);
169 self.update_from_message(parsed.message);
170 }
171 }
172
173 fn update_from_message(&mut self, msg: &str) {
174 let kind = classify_message(msg);
175 match &kind {
176 MessageKind::Dialplan { detail, .. } => {
177 if let Some(dp) = parse_dialplan_context(detail) {
178 self.initial_context.get_or_insert(dp.context.clone());
179 self.dialplan_context = Some(dp.context);
180 self.dialplan_from = Some(dp.from);
181 self.dialplan_to = Some(dp.to);
182 }
183 }
184 MessageKind::Variable { name, value } => {
185 let var_name = name.strip_prefix("variable_").unwrap_or(name);
186 self.variables.insert(var_name.to_string(), value.clone());
187 }
188 MessageKind::ChannelField { name, value } => match name.as_str() {
189 "Channel-Name" => self.channel_name = Some(value.clone()),
190 "Channel-State" => self.channel_state = Some(value.clone()),
191 _ => {}
192 },
193 MessageKind::StateChange { detail } => {
194 if let Some(new_state) = parse_state_change(detail) {
195 self.channel_state = Some(new_state);
196 }
197 }
198 _ => {}
199 }
200 }
201}
202
203struct DialplanContext {
204 from: String,
205 to: String,
206 context: String,
207}
208
209fn parse_dialplan_context(detail: &str) -> Option<DialplanContext> {
210 if !detail.starts_with("parsing [") {
211 return None;
212 }
213 let rest = &detail["parsing [".len()..];
214 let bracket_end = rest.find(']')?;
215 let inner = &rest[..bracket_end];
216
217 let arrow = inner.find("->")?;
218 let from_part = &inner[..arrow];
219 let to_part = &inner[arrow + 2..];
220
221 let context = if rest.len() > bracket_end + 1 {
222 let after = rest[bracket_end + 1..].trim();
223 if let Some(stripped) = after.strip_prefix("continue=") {
224 let _ = stripped;
225 }
226 from_part.to_string()
227 } else {
228 from_part.to_string()
229 };
230
231 Some(DialplanContext {
232 from: from_part.to_string(),
233 to: to_part.to_string(),
234 context,
235 })
236}
237
238fn parse_processing_line(msg: &str) -> Option<DialplanContext> {
239 let proc_idx = msg.find("Processing ")?;
240 let rest = &msg[proc_idx + "Processing ".len()..];
241
242 let arrow = rest.find("->")?;
243 let from = &rest[..arrow];
244
245 let after_arrow = &rest[arrow + 2..];
246 let space = after_arrow.find(' ')?;
247 let to = &after_arrow[..space];
248
249 let ctx_idx = after_arrow.find("in context ")?;
250 let ctx_rest = &after_arrow[ctx_idx + "in context ".len()..];
251 let context = ctx_rest.split_whitespace().next()?;
252
253 Some(DialplanContext {
254 from: from.to_string(),
255 to: to.to_string(),
256 context: context.to_string(),
257 })
258}
259
260fn parse_new_channel(detail: &str) -> Option<String> {
261 let rest = detail.strip_prefix("New Channel ")?;
262 let bracket = rest.rfind(" [")?;
263 Some(rest[..bracket].to_string())
264}
265
266fn parse_state_change(detail: &str) -> Option<String> {
267 let arrow = detail.find(" -> ")?;
268 Some(detail[arrow + 4..].trim().to_string())
269}
270
271fn parse_bridge_args(arguments: &str) -> Option<BridgeInfo> {
275 let dial = BridgeDialString::from_str(arguments).ok()?;
276 let first_ep = dial.groups.first()?.first()?;
277 let origination_uuid = first_ep
278 .variables()
279 .and_then(|v| v.get("origination_uuid"))
280 .map(|s| s.to_string());
281 let mut bare = first_ep.clone();
282 bare.set_variables(None);
283 let target_channel = bare.to_string();
284 Some(BridgeInfo {
285 origination_uuid,
286 target_channel,
287 })
288}
289
290struct BridgeInfo {
291 origination_uuid: Option<String>,
292 target_channel: String,
293}
294
295fn parse_originate_success(msg: &str) -> Option<String> {
297 let marker = "Peer UUID: ";
298 let idx = msg.find(marker)?;
299 let uuid = msg[idx + marker.len()..].trim();
300 if uuid.is_empty() {
301 None
302 } else {
303 Some(uuid.to_string())
304 }
305}
306
307fn parse_originate_channel(msg: &str) -> Option<&str> {
311 let start = msg.find(" [")? + 2;
312 let end = msg[start..].find(']')?;
313 let chan = &msg[start..start + end];
314 if chan.is_empty() {
315 None
316 } else {
317 Some(chan)
318 }
319}
320
321fn is_terminal_channel_state(state: Option<&str>) -> bool {
329 matches!(
330 state,
331 Some("CS_HANGUP" | "CS_REPORTING" | "CS_DESTROY" | "CS_NONE" | "HANGUP")
332 )
333}
334
335#[derive(Debug)]
337pub struct EnrichedEntry {
338 pub entry: LogEntry,
339 pub session: Option<SessionSnapshot>,
341}
342
343pub struct SessionTracker<I> {
350 inner: LogStream<I>,
351 sessions: HashMap<String, SessionState>,
352}
353
354impl<I: Iterator<Item = String>> SessionTracker<I> {
355 pub fn new(inner: LogStream<I>) -> Self {
357 SessionTracker {
358 inner,
359 sessions: HashMap::new(),
360 }
361 }
362
363 pub fn sessions(&self) -> &HashMap<String, SessionState> {
365 &self.sessions
366 }
367
368 pub fn remove_session(&mut self, uuid: &str) -> Option<SessionState> {
371 self.sessions.remove(uuid)
372 }
373
374 pub fn stats(&self) -> &ParseStats {
376 self.inner.stats()
377 }
378
379 pub fn drain_unclassified(&mut self) -> Vec<UnclassifiedLine> {
381 self.inner.drain_unclassified()
382 }
383
384 fn link_legs(&mut self, uuid: &str, entry: &LogEntry) {
387 if entry.message.contains("Originate Resulted in Success") {
389 let a_uuid = uuid.to_string();
390 if let Some(peer_uuid) = parse_originate_success(&entry.message) {
391 if let Some(a_state) = self.sessions.get_mut(&a_uuid) {
392 a_state.other_leg_uuid = Some(peer_uuid.clone());
393 a_state.pending_bridge_target = None;
394 }
395 let b_state = self.sessions.entry(peer_uuid).or_default();
396 b_state.other_leg_uuid = Some(a_uuid);
397 } else if let Some(chan) = parse_originate_channel(&entry.message) {
398 let mut found: Option<String> = None;
407 let mut ambiguous = false;
408 for (u, s) in &self.sessions {
409 if u == &a_uuid {
410 continue;
411 }
412 if s.channel_name.as_deref() == Some(chan)
413 && !is_terminal_channel_state(s.channel_state.as_deref())
414 {
415 if found.is_some() {
416 ambiguous = true;
417 break;
418 }
419 found = Some(u.clone());
420 }
421 }
422 if !ambiguous {
423 if let Some(b_uuid) = found {
424 if let Some(a_state) = self.sessions.get_mut(&a_uuid) {
425 a_state.other_leg_uuid = Some(b_uuid.clone());
426 a_state.pending_bridge_target = None;
427 }
428 if let Some(b_state) = self.sessions.get_mut(&b_uuid) {
429 b_state.other_leg_uuid = Some(a_uuid);
430 }
431 }
432 }
433 }
434 return;
435 }
436
437 if let MessageKind::ChannelLifecycle { detail } = &entry.message_kind {
440 if let Some(channel_name) = parse_new_channel(detail) {
441 let b_uuid = uuid.to_string();
442 let mut a_uuid_found = None;
443
444 for (a_uuid, a_state) in &self.sessions {
445 if *a_uuid == b_uuid {
446 continue;
447 }
448 if a_state.other_leg_uuid.as_deref() == Some(&b_uuid) {
450 a_uuid_found = Some(a_uuid.clone());
451 break;
452 }
453 if a_state.pending_bridge_target.as_deref() == Some(channel_name.as_str()) {
455 a_uuid_found = Some(a_uuid.clone());
456 break;
457 }
458 }
459
460 if let Some(a_uuid) = a_uuid_found {
461 if let Some(a_state) = self.sessions.get_mut(&a_uuid) {
462 a_state.other_leg_uuid = Some(b_uuid.clone());
463 a_state.pending_bridge_target = None;
464 }
465 if let Some(b_state) = self.sessions.get_mut(&b_uuid) {
466 b_state.other_leg_uuid = Some(a_uuid);
467 }
468 }
469 }
470 }
471 }
472}
473
474impl<I: Iterator<Item = String>> Iterator for SessionTracker<I> {
475 type Item = EnrichedEntry;
476
477 fn next(&mut self) -> Option<EnrichedEntry> {
478 let entry = self.inner.next()?;
479
480 if entry.uuid.is_empty() {
481 return Some(EnrichedEntry {
482 entry,
483 session: None,
484 });
485 }
486
487 let uuid = entry.uuid.clone();
488 let state = self.sessions.entry(uuid.clone()).or_default();
489 state.update_from_entry(&entry);
490
491 self.link_legs(&uuid, &entry);
492
493 let snapshot = self.sessions.get(&uuid).unwrap().snapshot();
494
495 Some(EnrichedEntry {
496 entry,
497 session: Some(snapshot),
498 })
499 }
500}
501
502#[cfg(test)]
503mod tests {
504 use super::*;
505
506 const UUID1: &str = "a1b2c3d4-e5f6-7890-abcd-ef1234567890";
507 const UUID2: &str = "b2c3d4e5-f6a7-8901-bcde-f12345678901";
508 const UUID3: &str = "c3d4e5f6-a7b8-9012-cdef-234567890123";
509 const TS1: &str = "2025-01-15 10:30:45.123456";
510 const TS2: &str = "2025-01-15 10:30:46.234567";
511
512 fn full_line(uuid: &str, ts: &str, msg: &str) -> String {
513 format!("{uuid} {ts} 95.97% [DEBUG] sofia.c:100 {msg}")
514 }
515
516 fn collect_enriched(lines: Vec<String>) -> Vec<EnrichedEntry> {
517 let stream = LogStream::new(lines.into_iter());
518 SessionTracker::new(stream).collect()
519 }
520
521 #[test]
522 fn system_line_no_session() {
523 let lines = vec![format!(
524 "{TS1} 95.97% [INFO] mod_event_socket.c:1772 Event Socket command"
525 )];
526 let entries = collect_enriched(lines);
527 assert_eq!(entries.len(), 1);
528 assert!(entries[0].session.is_none());
529 }
530
531 #[test]
532 fn dialplan_context_propagation() {
533 let lines = vec![
534 full_line(UUID1, TS1, "CHANNEL_DATA:"),
535 format!("{UUID1} Channel-Name: [sofia/internal/+15550001234@192.0.2.1]"),
536 format!("{UUID1} EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 answer"),
537 format!("{UUID1} Dialplan: sofia/internal/+15550001234@192.0.2.1 parsing [public->global] continue=true"),
538 full_line(UUID1, TS2, "Some later event"),
539 ];
540 let entries = collect_enriched(lines);
541 let last = entries.last().unwrap();
542 let session = last.session.as_ref().unwrap();
543 assert_eq!(session.dialplan_context.as_deref(), Some("public"));
544 assert_eq!(session.dialplan_from.as_deref(), Some("public"));
545 assert_eq!(session.dialplan_to.as_deref(), Some("global"));
546 }
547
548 #[test]
549 fn processing_line_extracts_context() {
550 let lines = vec![full_line(
551 UUID1,
552 TS1,
553 "Processing 5551234567->5559876543 in context public",
554 )];
555 let entries = collect_enriched(lines);
556 let session = entries[0].session.as_ref().unwrap();
557 assert_eq!(session.dialplan_context.as_deref(), Some("public"));
558 assert_eq!(session.dialplan_from.as_deref(), Some("5551234567"));
559 assert_eq!(session.dialplan_to.as_deref(), Some("5559876543"));
560 }
561
562 #[test]
563 fn initial_context_preserved_across_transfers() {
564 let lines = vec![
565 full_line(
566 UUID1,
567 TS1,
568 "Processing 5551234567->5559876543 in context public",
569 ),
570 full_line(
571 UUID1,
572 TS2,
573 "Processing 5551234567->start_recording in context recordings",
574 ),
575 ];
576 let stream = LogStream::new(lines.into_iter());
577 let mut tracker = SessionTracker::new(stream);
578 let entries: Vec<_> = tracker.by_ref().collect();
579
580 let first = entries[0].session.as_ref().unwrap();
581 assert_eq!(
582 first.initial_context.as_deref(),
583 Some("public"),
584 "initial_context set on first Processing line"
585 );
586 assert_eq!(first.dialplan_context.as_deref(), Some("public"));
587
588 let state = tracker.sessions().get(UUID1).unwrap();
589 assert_eq!(
590 state.initial_context.as_deref(),
591 Some("public"),
592 "initial_context keeps the first context seen"
593 );
594 assert_eq!(
595 state.dialplan_context.as_deref(),
596 Some("recordings"),
597 "dialplan_context tracks the current context"
598 );
599 assert_eq!(state.dialplan_to.as_deref(), Some("start_recording"));
600 }
601
602 #[test]
603 fn new_channel_sets_channel_name() {
604 let lines = vec![full_line(
605 UUID1,
606 TS1,
607 "New Channel sofia/internal-v4/sos [a1b2c3d4-e5f6-7890-abcd-ef1234567890]",
608 )];
609 let entries = collect_enriched(lines);
610 let session = entries[0].session.as_ref().unwrap();
611 assert_eq!(
612 session.channel_name.as_deref(),
613 Some("sofia/internal-v4/sos")
614 );
615 }
616
617 #[test]
618 fn originate_success_links_both_legs() {
619 let lines = vec![
622 full_line(UUID2, TS1, "New Channel sofia/esinet1-v6-tcp/sip:target.example.com [b2c3d4e5-f6a7-8901-bcde-f12345678901]"),
623 full_line(UUID1, TS2, "Originate Resulted in Success: [sofia/esinet1-v6-tcp/sip:target.example.com] Peer UUID: b2c3d4e5-f6a7-8901-bcde-f12345678901"),
624 ];
625 let stream = LogStream::new(lines.into_iter());
626 let mut tracker = SessionTracker::new(stream);
627 let _: Vec<_> = tracker.by_ref().collect();
628
629 let a_leg = tracker.sessions().get(UUID1).unwrap();
630 assert_eq!(
631 a_leg.other_leg_uuid.as_deref(),
632 Some(UUID2),
633 "A-leg other_leg_uuid set from Originate Resulted in Success"
634 );
635
636 let b_leg = tracker.sessions().get(UUID2).unwrap();
637 assert_eq!(
638 b_leg.other_leg_uuid.as_deref(),
639 Some(UUID1),
640 "B-leg other_leg_uuid points back to A-leg"
641 );
642 }
643
644 #[test]
645 fn originate_success_channel_fallback_links_legs() {
646 let lines = vec![
650 full_line(
651 UUID2,
652 TS1,
653 "New Channel sofia/internal/6244@192.0.2.72:50744 [b2c3d4e5-f6a7-8901-bcde-f12345678901]",
654 ),
655 full_line(
656 UUID1,
657 TS2,
658 "Originate Resulted in Success: [sofia/internal/6244@192.0.2.72:50744]",
659 ),
660 ];
661 let stream = LogStream::new(lines.into_iter());
662 let mut tracker = SessionTracker::new(stream);
663 let _: Vec<_> = tracker.by_ref().collect();
664
665 let a_leg = tracker.sessions().get(UUID1).unwrap();
666 assert_eq!(
667 a_leg.other_leg_uuid.as_deref(),
668 Some(UUID2),
669 "A-leg linked to B-leg via channel-name fallback when Peer UUID absent"
670 );
671
672 let b_leg = tracker.sessions().get(UUID2).unwrap();
673 assert_eq!(
674 b_leg.other_leg_uuid.as_deref(),
675 Some(UUID1),
676 "B-leg linked back to A-leg"
677 );
678 }
679
680 #[test]
681 fn originate_success_peer_uuid_wins_over_channel_fallback() {
682 let lines = vec![
685 full_line(
686 UUID2,
687 TS1,
688 "New Channel sofia/internal/6244@192.0.2.72:50744 [b2c3d4e5-f6a7-8901-bcde-f12345678901]",
689 ),
690 full_line(
691 UUID3,
692 TS1,
693 "New Channel sofia/internal/6244@192.0.2.72:50744 [c3d4e5f6-a7b8-9012-cdef-234567890123]",
694 ),
695 full_line(
696 UUID1,
697 TS2,
698 "Originate Resulted in Success: [sofia/internal/6244@192.0.2.72:50744] Peer UUID: b2c3d4e5-f6a7-8901-bcde-f12345678901",
699 ),
700 ];
701 let stream = LogStream::new(lines.into_iter());
702 let mut tracker = SessionTracker::new(stream);
703 let _: Vec<_> = tracker.by_ref().collect();
704
705 let a_leg = tracker.sessions().get(UUID1).unwrap();
706 assert_eq!(
707 a_leg.other_leg_uuid.as_deref(),
708 Some(UUID2),
709 "Peer UUID wins over channel-name match"
710 );
711
712 let decoy = tracker.sessions().get(UUID3).unwrap();
713 assert_eq!(
714 decoy.other_leg_uuid, None,
715 "Decoy session sharing channel name is not touched"
716 );
717 }
718
719 #[test]
720 fn originate_success_channel_fallback_skips_when_ambiguous() {
721 let lines = vec![
724 full_line(
725 UUID2,
726 TS1,
727 "New Channel sofia/internal/6244@192.0.2.72:50744 [b2c3d4e5-f6a7-8901-bcde-f12345678901]",
728 ),
729 full_line(
730 UUID3,
731 TS1,
732 "New Channel sofia/internal/6244@192.0.2.72:50744 [c3d4e5f6-a7b8-9012-cdef-234567890123]",
733 ),
734 full_line(
735 UUID1,
736 TS2,
737 "Originate Resulted in Success: [sofia/internal/6244@192.0.2.72:50744]",
738 ),
739 ];
740 let stream = LogStream::new(lines.into_iter());
741 let mut tracker = SessionTracker::new(stream);
742 let _: Vec<_> = tracker.by_ref().collect();
743
744 let a_leg = tracker.sessions().get(UUID1).unwrap();
745 assert_eq!(
746 a_leg.other_leg_uuid, None,
747 "Ambiguous channel name yields no link"
748 );
749 assert_eq!(tracker.sessions().get(UUID2).unwrap().other_leg_uuid, None);
750 assert_eq!(tracker.sessions().get(UUID3).unwrap().other_leg_uuid, None);
751 }
752
753 #[test]
754 fn originate_success_channel_fallback_skips_terminated_candidates() {
755 let lines = vec![
760 full_line(
761 UUID2,
762 TS1,
763 "New Channel sofia/internal/6244@192.0.2.72:50744 [b2c3d4e5-f6a7-8901-bcde-f12345678901]",
764 ),
765 full_line(
766 UUID2,
767 TS1,
768 "(sofia/internal/6244@192.0.2.72:50744) State Change CS_EXECUTE -> CS_DESTROY",
769 ),
770 full_line(
771 UUID3,
772 TS1,
773 "New Channel sofia/internal/6244@192.0.2.72:50744 [c3d4e5f6-a7b8-9012-cdef-234567890123]",
774 ),
775 full_line(
776 UUID1,
777 TS2,
778 "Originate Resulted in Success: [sofia/internal/6244@192.0.2.72:50744]",
779 ),
780 ];
781 let stream = LogStream::new(lines.into_iter());
782 let mut tracker = SessionTracker::new(stream);
783 let _: Vec<_> = tracker.by_ref().collect();
784
785 let a_leg = tracker.sessions().get(UUID1).unwrap();
786 assert_eq!(
787 a_leg.other_leg_uuid.as_deref(),
788 Some(UUID3),
789 "Live b-leg wins over CS_DESTROY straggler"
790 );
791
792 let live_b = tracker.sessions().get(UUID3).unwrap();
793 assert_eq!(
794 live_b.other_leg_uuid.as_deref(),
795 Some(UUID1),
796 "Live b-leg points back to a-leg"
797 );
798
799 let stale_b = tracker.sessions().get(UUID2).unwrap();
800 assert_eq!(
801 stale_b.other_leg_uuid, None,
802 "Terminated b-leg is not touched"
803 );
804 }
805
806 #[test]
807 fn originate_success_channel_fallback_skips_when_no_match() {
808 let lines = vec![full_line(
811 UUID1,
812 TS2,
813 "Originate Resulted in Success: [sofia/internal/6244@192.0.2.72:50744]",
814 )];
815 let stream = LogStream::new(lines.into_iter());
816 let mut tracker = SessionTracker::new(stream);
817 let _: Vec<_> = tracker.by_ref().collect();
818
819 let a_leg = tracker.sessions().get(UUID1).unwrap();
820 assert_eq!(a_leg.other_leg_uuid, None);
821 assert_eq!(a_leg.pending_bridge_target, None);
822 }
823
824 #[test]
825 fn bridge_origination_uuid_links_a_leg_immediately() {
826 let lines = vec![
829 full_line(UUID1, TS1, "EXECUTE [depth=0] sofia/internal-v6/1232@[2001:db8::10] bridge([origination_uuid=b2c3d4e5-f6a7-8901-bcde-f12345678901,leg_timeout=2]sofia/esinet1-v6-tcp/sip:target.example.com)"),
830 full_line(UUID2, TS1, "New Channel sofia/esinet1-v6-tcp/sip:target.example.com [b2c3d4e5-f6a7-8901-bcde-f12345678901]"),
831 ];
832 let stream = LogStream::new(lines.into_iter());
833 let mut tracker = SessionTracker::new(stream);
834 let _: Vec<_> = tracker.by_ref().collect();
835
836 let a_leg = tracker.sessions().get(UUID1).unwrap();
837 assert_eq!(
838 a_leg.other_leg_uuid.as_deref(),
839 Some(UUID2),
840 "A-leg knows B-leg UUID from origination_uuid in bridge args"
841 );
842
843 let b_leg = tracker.sessions().get(UUID2).unwrap();
844 assert_eq!(
845 b_leg.other_leg_uuid.as_deref(),
846 Some(UUID1),
847 "B-leg knows A-leg once New Channel correlates"
848 );
849 }
850
851 #[test]
852 fn bridge_target_matches_new_channel() {
853 let lines = vec![
856 full_line(UUID1, TS1, "EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 bridge(sofia/gateway/carrier/+15559876543)"),
857 full_line(UUID1, TS1, "Parsing session specific variables"),
858 full_line(UUID2, TS1, "New Channel sofia/gateway/carrier/+15559876543 [b2c3d4e5-f6a7-8901-bcde-f12345678901]"),
859 ];
860 let stream = LogStream::new(lines.into_iter());
861 let mut tracker = SessionTracker::new(stream);
862 let _: Vec<_> = tracker.by_ref().collect();
863
864 let a_leg = tracker.sessions().get(UUID1).unwrap();
865 assert_eq!(
866 a_leg.other_leg_uuid.as_deref(),
867 Some(UUID2),
868 "A-leg linked to B-leg via bridge target matching New Channel"
869 );
870
871 let b_leg = tracker.sessions().get(UUID2).unwrap();
872 assert_eq!(
873 b_leg.other_leg_uuid.as_deref(),
874 Some(UUID1),
875 "B-leg linked back to A-leg"
876 );
877 }
878
879 #[test]
880 fn originate_success_corrects_wrong_target_match() {
881 let lines = vec![
884 full_line(UUID1, TS1, "EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 bridge(sofia/gateway/carrier/+15559876543)"),
885 full_line(UUID2, TS1, "New Channel sofia/gateway/carrier/+15559876543 [b2c3d4e5-f6a7-8901-bcde-f12345678901]"),
886 full_line(UUID1, TS2, "Originate Resulted in Success: [sofia/gateway/carrier/+15559876543] Peer UUID: c3d4e5f6-a7b8-9012-cdef-234567890123"),
887 ];
888 let stream = LogStream::new(lines.into_iter());
889 let mut tracker = SessionTracker::new(stream);
890 let _: Vec<_> = tracker.by_ref().collect();
891
892 let a_leg = tracker.sessions().get(UUID1).unwrap();
893 assert_eq!(
894 a_leg.other_leg_uuid.as_deref(),
895 Some(UUID3),
896 "Originate success overrides earlier target-match guess"
897 );
898
899 let real_b_leg = tracker.sessions().get(UUID3).unwrap();
900 assert_eq!(
901 real_b_leg.other_leg_uuid.as_deref(),
902 Some(UUID1),
903 "Real B-leg points back to A-leg"
904 );
905 }
906
907 #[test]
908 fn channel_data_other_leg_uuid() {
909 let lines = vec![
911 full_line(UUID1, TS1, "CHANNEL_DATA:"),
912 format!("{UUID1} Other-Leg-Unique-ID: [{UUID2}]"),
913 ];
914 let stream = LogStream::new(lines.into_iter());
915 let mut tracker = SessionTracker::new(stream);
916 let _: Vec<_> = tracker.by_ref().collect();
917
918 let state = tracker.sessions().get(UUID1).unwrap();
919 assert_eq!(
920 state.other_leg_uuid.as_deref(),
921 Some(UUID2),
922 "other_leg_uuid set from Other-Leg-Unique-ID CHANNEL_DATA field"
923 );
924 }
925
926 #[test]
927 fn channel_data_populates_session() {
928 let lines = vec![
929 full_line(UUID1, TS1, "CHANNEL_DATA:"),
930 format!("{UUID1} Channel-Name: [sofia/internal/+15550001234@192.0.2.1]"),
931 format!("{UUID1} Channel-State: [CS_EXECUTE]"),
932 "variable_sip_call_id: [test123@192.0.2.1]".to_string(),
933 "variable_direction: [inbound]".to_string(),
934 ];
935 let entries = collect_enriched(lines);
936 assert_eq!(entries.len(), 1);
937 let session = entries[0].session.as_ref().unwrap();
938 assert_eq!(
939 session.channel_name.as_deref(),
940 Some("sofia/internal/+15550001234@192.0.2.1")
941 );
942 assert_eq!(session.channel_state.as_deref(), Some("CS_EXECUTE"));
943 }
944
945 #[test]
946 fn variables_learned_from_channel_data() {
947 let lines = vec![
948 full_line(UUID1, TS1, "CHANNEL_DATA:"),
949 "variable_sip_call_id: [test123@192.0.2.1]".to_string(),
950 "variable_direction: [inbound]".to_string(),
951 ];
952 let stream = LogStream::new(lines.into_iter());
953 let mut tracker = SessionTracker::new(stream);
954 let _: Vec<_> = tracker.by_ref().collect();
955 let state = tracker.sessions().get(UUID1).unwrap();
956 assert_eq!(
957 state.variables.get("sip_call_id").map(|s| s.as_str()),
958 Some("test123@192.0.2.1")
959 );
960 assert_eq!(
961 state.variables.get("direction").map(|s| s.as_str()),
962 Some("inbound")
963 );
964 }
965
966 #[test]
967 fn variables_learned_from_set_execute() {
968 let lines = vec![
969 full_line(UUID1, TS1, "First"),
970 format!("{UUID1} EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 set(call_direction=inbound)"),
971 full_line(UUID1, TS2, "After set"),
972 ];
973 let stream = LogStream::new(lines.into_iter());
974 let mut tracker = SessionTracker::new(stream);
975 let entries: Vec<_> = tracker.by_ref().collect();
976 assert_eq!(entries.len(), 3);
977 let state = tracker.sessions().get(UUID1).unwrap();
978 assert_eq!(
979 state.variables.get("call_direction").map(|s| s.as_str()),
980 Some("inbound")
981 );
982 }
983
984 #[test]
985 fn variables_learned_from_export_execute() {
986 let lines = vec![
987 full_line(UUID1, TS1, "First"),
988 format!("{UUID1} EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 export(originate_timeout=3600)"),
989 ];
990 let stream = LogStream::new(lines.into_iter());
991 let mut tracker = SessionTracker::new(stream);
992 let _: Vec<_> = tracker.by_ref().collect();
993 let state = tracker.sessions().get(UUID1).unwrap();
994 assert_eq!(
995 state.variables.get("originate_timeout").map(|s| s.as_str()),
996 Some("3600")
997 );
998 }
999
1000 #[test]
1001 fn session_isolation_between_uuids() {
1002 let lines = vec![
1003 full_line(
1004 UUID1,
1005 TS1,
1006 "Processing 5551111111->5552222222 in context public",
1007 ),
1008 full_line(
1009 UUID2,
1010 TS2,
1011 "Processing 5553333333->5554444444 in context private",
1012 ),
1013 ];
1014 let stream = LogStream::new(lines.into_iter());
1015 let mut tracker = SessionTracker::new(stream);
1016 let _: Vec<_> = tracker.by_ref().collect();
1017 let s1 = tracker.sessions().get(UUID1).unwrap();
1018 let s2 = tracker.sessions().get(UUID2).unwrap();
1019 assert_eq!(s1.dialplan_context.as_deref(), Some("public"));
1020 assert_eq!(s2.dialplan_context.as_deref(), Some("private"));
1021 assert_eq!(s1.dialplan_from.as_deref(), Some("5551111111"));
1022 assert_eq!(s2.dialplan_from.as_deref(), Some("5553333333"));
1023 }
1024
1025 #[test]
1026 fn processing_line_with_regex_type_and_angle_bracket_caller() {
1027 let lines = vec![full_line(
1028 UUID1,
1029 TS1,
1030 "Processing Emergency S R <5550001234>->start_recording in context recordings",
1031 )];
1032 let entries = collect_enriched(lines);
1033 let session = entries[0].session.as_ref().unwrap();
1034 assert_eq!(session.initial_context.as_deref(), Some("recordings"));
1035 assert_eq!(session.dialplan_context.as_deref(), Some("recordings"));
1036 assert_eq!(
1037 session.dialplan_from.as_deref(),
1038 Some("Emergency S R <5550001234>")
1039 );
1040 assert_eq!(session.dialplan_to.as_deref(), Some("start_recording"));
1041 }
1042
1043 #[test]
1044 fn processing_line_extension_format() {
1045 let lines = vec![full_line(
1046 UUID1,
1047 TS1,
1048 "Processing Extension 1263 <1263>->start_recording in context recordings",
1049 )];
1050 let entries = collect_enriched(lines);
1051 let session = entries[0].session.as_ref().unwrap();
1052 assert_eq!(session.initial_context.as_deref(), Some("recordings"));
1053 assert_eq!(
1054 session.dialplan_from.as_deref(),
1055 Some("Extension 1263 <1263>")
1056 );
1057 assert_eq!(session.dialplan_to.as_deref(), Some("start_recording"));
1058 }
1059
1060 #[test]
1061 fn state_change_updates_channel_state() {
1062 let lines = vec![full_line(UUID1, TS1, "State Change CS_INIT -> CS_ROUTING")];
1063 let entries = collect_enriched(lines);
1064 let session = entries[0].session.as_ref().unwrap();
1065 assert_eq!(session.channel_state.as_deref(), Some("CS_ROUTING"));
1066 }
1067
1068 #[test]
1069 fn callstate_change_updates_channel_state() {
1070 let lines = vec![full_line(
1071 UUID1,
1072 TS1,
1073 "(sofia/internal-v4/sos) Callstate Change DOWN -> RINGING",
1074 )];
1075 let entries = collect_enriched(lines);
1076 let session = entries[0].session.as_ref().unwrap();
1077 assert_eq!(session.channel_state.as_deref(), Some("RINGING"));
1078 }
1079
1080 #[test]
1081 fn state_change_overrides_callstate() {
1082 let lines = vec![
1083 full_line(
1084 UUID1,
1085 TS1,
1086 "(sofia/internal-v4/sos) Callstate Change DOWN -> RINGING",
1087 ),
1088 full_line(
1089 UUID1,
1090 TS2,
1091 "(sofia/internal-v4/sos) State Change CS_CONSUME_MEDIA -> CS_EXCHANGE_MEDIA",
1092 ),
1093 ];
1094 let entries = collect_enriched(lines);
1095 assert_eq!(
1096 entries[0]
1097 .session
1098 .as_ref()
1099 .unwrap()
1100 .channel_state
1101 .as_deref(),
1102 Some("RINGING")
1103 );
1104 assert_eq!(
1105 entries[1]
1106 .session
1107 .as_ref()
1108 .unwrap()
1109 .channel_state
1110 .as_deref(),
1111 Some("CS_EXCHANGE_MEDIA")
1112 );
1113 }
1114
1115 #[test]
1116 fn bleg_lifecycle_extracts_data_from_processing() {
1117 let lines = vec![
1118 full_line(
1119 UUID1,
1120 TS1,
1121 "New Channel sofia/internal-v4/sos [a1b2c3d4-e5f6-7890-abcd-ef1234567890]",
1122 ),
1123 full_line(
1124 UUID1,
1125 TS1,
1126 "(sofia/internal-v4/sos) State Change CS_NEW -> CS_INIT",
1127 ),
1128 full_line(
1129 UUID1,
1130 TS1,
1131 "(sofia/internal-v4/sos) State Change CS_INIT -> CS_ROUTING",
1132 ),
1133 full_line(
1134 UUID1,
1135 TS1,
1136 "(sofia/internal-v4/sos) State Change CS_ROUTING -> CS_CONSUME_MEDIA",
1137 ),
1138 full_line(
1139 UUID1,
1140 TS1,
1141 "(sofia/internal-v4/sos) Callstate Change DOWN -> RINGING",
1142 ),
1143 full_line(
1144 UUID1,
1145 TS2,
1146 "(sofia/internal-v4/sos) State Change CS_CONSUME_MEDIA -> CS_EXCHANGE_MEDIA",
1147 ),
1148 full_line(
1149 UUID1,
1150 TS2,
1151 "Processing Emergency S R <5550001234>->start_recording in context recordings",
1152 ),
1153 full_line(
1154 UUID1,
1155 TS2,
1156 "(sofia/internal-v4/sos) State Change CS_EXCHANGE_MEDIA -> CS_HANGUP",
1157 ),
1158 ];
1159 let entries = collect_enriched(lines);
1160
1161 let after_ringing = entries[4].session.as_ref().unwrap();
1162 assert_eq!(after_ringing.channel_state.as_deref(), Some("RINGING"));
1163 assert!(after_ringing.initial_context.is_none());
1164
1165 let after_processing = entries[6].session.as_ref().unwrap();
1166 assert_eq!(
1167 after_processing.channel_state.as_deref(),
1168 Some("CS_EXCHANGE_MEDIA")
1169 );
1170 assert_eq!(
1171 after_processing.initial_context.as_deref(),
1172 Some("recordings")
1173 );
1174 assert_eq!(
1175 after_processing.dialplan_from.as_deref(),
1176 Some("Emergency S R <5550001234>")
1177 );
1178 assert_eq!(
1179 after_processing.dialplan_to.as_deref(),
1180 Some("start_recording")
1181 );
1182
1183 let after_hangup = entries[7].session.as_ref().unwrap();
1184 assert_eq!(after_hangup.channel_state.as_deref(), Some("CS_HANGUP"));
1185 assert_eq!(after_hangup.initial_context.as_deref(), Some("recordings"));
1186 }
1187
1188 #[test]
1189 fn channel_name_from_new_channel() {
1190 let lines = vec![full_line(
1191 UUID1,
1192 TS1,
1193 "New Channel sofia/internal-v4/sos [a1b2c3d4-e5f6-7890-abcd-ef1234567890]",
1194 )];
1195 let entries = collect_enriched(lines);
1196 let session = entries[0].session.as_ref().unwrap();
1197 assert_eq!(
1198 session.channel_name.as_deref(),
1199 Some("sofia/internal-v4/sos")
1200 );
1201 }
1202
1203 #[test]
1204 fn remove_session() {
1205 let lines = vec![full_line(
1206 UUID1,
1207 TS1,
1208 "Processing 5551111111->5552222222 in context public",
1209 )];
1210 let stream = LogStream::new(lines.into_iter());
1211 let mut tracker = SessionTracker::new(stream);
1212 let _: Vec<_> = tracker.by_ref().collect();
1213 assert!(tracker.sessions().contains_key(UUID1));
1214 let removed = tracker.remove_session(UUID1).unwrap();
1215 assert_eq!(removed.dialplan_context.as_deref(), Some("public"));
1216 assert!(!tracker.sessions().contains_key(UUID1));
1217 }
1218
1219 #[test]
1220 fn stats_delegation() {
1221 let lines = vec![
1222 full_line(UUID1, TS1, "First"),
1223 full_line(UUID1, TS2, "Second"),
1224 ];
1225 let stream = LogStream::new(lines.into_iter());
1226 let mut tracker = SessionTracker::new(stream);
1227 let _: Vec<_> = tracker.by_ref().collect();
1228 assert_eq!(tracker.stats().lines_processed, 2);
1229 }
1230
1231 #[test]
1232 fn snapshot_reflects_cumulative_state() {
1233 let lines = vec![
1234 full_line(UUID1, TS1, "CHANNEL_DATA:"),
1235 format!("{UUID1} Channel-Name: [sofia/internal/+15550001234@192.0.2.1]"),
1236 format!("{UUID1} EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 set(foo=bar)"),
1237 full_line(
1238 UUID1,
1239 TS2,
1240 "Processing 5551111111->5552222222 in context public",
1241 ),
1242 ];
1243 let entries = collect_enriched(lines);
1244 assert_eq!(entries.len(), 3);
1245 let first = entries[0].session.as_ref().unwrap();
1246 assert_eq!(
1247 first.channel_name.as_deref(),
1248 Some("sofia/internal/+15550001234@192.0.2.1"),
1249 );
1250 assert!(first.dialplan_context.is_none());
1251
1252 let last = entries[2].session.as_ref().unwrap();
1253 assert_eq!(
1254 last.channel_name.as_deref(),
1255 Some("sofia/internal/+15550001234@192.0.2.1"),
1256 );
1257 assert_eq!(last.dialplan_context.as_deref(), Some("public"));
1258 }
1259}