1use std::collections::{HashMap, HashSet};
2use std::str::FromStr;
3
4use freeswitch_types::{BridgeDialString, CallDirection, DialString};
5
6use crate::line::parse_line;
7use crate::message::{classify_message, MessageKind};
8use crate::stream::{Block, LogEntry, LogStream, ParseStats, UnclassifiedLine};
9
10#[derive(Debug, Clone, Default)]
16pub struct SessionState {
17 pub channel_name: Option<String>,
19 pub channel_state: Option<String>,
21 pub initial_context: Option<String>,
23 pub dialplan_context: Option<String>,
25 pub dialplan_from: Option<String>,
27 pub dialplan_to: Option<String>,
29 pub call_direction: Option<CallDirection>,
31 pub caller_id_number: Option<String>,
33 pub destination_number: Option<String>,
35 pub other_leg_uuid: Option<String>,
38 pub(crate) pending_bridge_target: Option<String>,
40 pub variables: HashMap<String, String>,
42}
43
44#[derive(Default)]
46struct IndexedFieldChanges {
47 channel_name: Option<(Option<String>, Option<String>)>,
48 pending_bridge_target: Option<(Option<String>, Option<String>)>,
49 other_leg_uuid: Option<(Option<String>, Option<String>)>,
50}
51
52#[derive(Debug, Clone)]
57pub struct SessionSnapshot {
58 pub channel_name: Option<String>,
59 pub channel_state: Option<String>,
60 pub initial_context: Option<String>,
61 pub dialplan_context: Option<String>,
62 pub dialplan_from: Option<String>,
63 pub dialplan_to: Option<String>,
64 pub call_direction: Option<CallDirection>,
65 pub caller_id_number: Option<String>,
66 pub destination_number: Option<String>,
67 pub other_leg_uuid: Option<String>,
68}
69
70impl SessionState {
71 fn snapshot(&self) -> SessionSnapshot {
72 SessionSnapshot {
73 channel_name: self.channel_name.clone(),
74 channel_state: self.channel_state.clone(),
75 initial_context: self.initial_context.clone(),
76 dialplan_context: self.dialplan_context.clone(),
77 dialplan_from: self.dialplan_from.clone(),
78 dialplan_to: self.dialplan_to.clone(),
79 call_direction: self.call_direction,
80 caller_id_number: self.caller_id_number.clone(),
81 destination_number: self.destination_number.clone(),
82 other_leg_uuid: self.other_leg_uuid.clone(),
83 }
84 }
85
86 fn update_from_entry(&mut self, entry: &LogEntry) -> IndexedFieldChanges {
87 let old_channel_name = self.channel_name.clone();
88 let old_pending_bridge_target = self.pending_bridge_target.clone();
89 let old_other_leg_uuid = self.other_leg_uuid.clone();
90
91 if let Some(Block::ChannelData { fields, variables }) = &entry.block {
92 for (name, value) in fields {
93 match name.as_str() {
94 "Channel-Name" => self.channel_name = Some(value.clone()),
95 "Channel-State" => self.channel_state = Some(value.clone()),
96 "Call-Direction" => {
97 self.call_direction = CallDirection::from_str(value).ok();
98 }
99 "Caller-Caller-ID-Number" => {
100 self.caller_id_number = Some(value.clone());
101 }
102 "Caller-Destination-Number" => {
103 self.destination_number = Some(value.clone());
104 }
105 "Other-Leg-Unique-ID" => {
106 self.other_leg_uuid = Some(value.clone());
107 }
108 _ => {}
109 }
110 }
111 for (name, value) in variables {
112 let var_name = name.strip_prefix("variable_").unwrap_or(name);
113 self.variables.insert(var_name.to_string(), value.clone());
114 }
115 }
116
117 match &entry.message_kind {
118 MessageKind::Dialplan { detail, .. } => {
119 if let Some(dp) = parse_dialplan_context(detail) {
120 self.initial_context.get_or_insert(dp.context.clone());
121 self.dialplan_context = Some(dp.context);
122 self.dialplan_from = Some(dp.from);
123 self.dialplan_to = Some(dp.to);
124 }
125 }
126 MessageKind::Execute {
127 application,
128 arguments,
129 ..
130 } => match application.as_str() {
131 "set" | "export" => {
132 if let Some((name, value)) = arguments.split_once('=') {
133 self.variables.insert(name.to_string(), value.to_string());
134 }
135 }
136 "bridge" => {
137 if let Some(info) = parse_bridge_args(arguments) {
138 if let Some(uuid) = &info.origination_uuid {
139 self.other_leg_uuid = Some(uuid.clone());
140 }
141 self.pending_bridge_target = Some(info.target_channel);
142 }
143 }
144 _ => {}
145 },
146 MessageKind::Variable { name, value } => {
147 let var_name = name.strip_prefix("variable_").unwrap_or(name);
148 self.variables.insert(var_name.to_string(), value.clone());
149 }
150 MessageKind::ChannelField { name, value } => match name.as_str() {
151 "Channel-Name" => self.channel_name = Some(value.clone()),
152 "Channel-State" => self.channel_state = Some(value.clone()),
153 _ => {}
154 },
155 MessageKind::StateChange { detail } => {
156 if let Some(new_state) = parse_state_change(detail) {
157 self.channel_state = Some(new_state);
158 }
159 }
160 MessageKind::ChannelLifecycle { detail } => {
161 if let Some(name) = parse_new_channel(detail) {
162 if self.channel_name.is_none() {
163 self.channel_name = Some(name);
164 }
165 }
166 }
167 _ => {}
168 }
169
170 if entry.message.contains("Processing ") && entry.message.contains(" in context ") {
171 if let Some(dp) = parse_processing_line(&entry.message) {
172 self.initial_context.get_or_insert(dp.context.clone());
173 self.dialplan_context = Some(dp.context);
174 self.dialplan_from = Some(dp.from);
175 self.dialplan_to = Some(dp.to);
176 }
177 }
178
179 for attached in &entry.attached {
180 let parsed = parse_line(attached);
181 self.update_from_message(parsed.message);
182 }
183
184 let mut changes = IndexedFieldChanges::default();
185 if self.channel_name != old_channel_name {
186 changes.channel_name = Some((old_channel_name, self.channel_name.clone()));
187 }
188 if self.pending_bridge_target != old_pending_bridge_target {
189 changes.pending_bridge_target =
190 Some((old_pending_bridge_target, self.pending_bridge_target.clone()));
191 }
192 if self.other_leg_uuid != old_other_leg_uuid {
193 changes.other_leg_uuid = Some((old_other_leg_uuid, self.other_leg_uuid.clone()));
194 }
195 changes
196 }
197
198 fn update_from_message(&mut self, msg: &str) {
199 let kind = classify_message(msg);
200 match &kind {
201 MessageKind::Dialplan { detail, .. } => {
202 if let Some(dp) = parse_dialplan_context(detail) {
203 self.initial_context.get_or_insert(dp.context.clone());
204 self.dialplan_context = Some(dp.context);
205 self.dialplan_from = Some(dp.from);
206 self.dialplan_to = Some(dp.to);
207 }
208 }
209 MessageKind::Variable { name, value } => {
210 let var_name = name.strip_prefix("variable_").unwrap_or(name);
211 self.variables.insert(var_name.to_string(), value.clone());
212 }
213 MessageKind::ChannelField { name, value } => match name.as_str() {
214 "Channel-Name" => self.channel_name = Some(value.clone()),
215 "Channel-State" => self.channel_state = Some(value.clone()),
216 _ => {}
217 },
218 MessageKind::StateChange { detail } => {
219 if let Some(new_state) = parse_state_change(detail) {
220 self.channel_state = Some(new_state);
221 }
222 }
223 _ => {}
224 }
225 }
226}
227
228struct DialplanContext {
229 from: String,
230 to: String,
231 context: String,
232}
233
234fn parse_dialplan_context(detail: &str) -> Option<DialplanContext> {
235 if !detail.starts_with("parsing [") {
236 return None;
237 }
238 let rest = &detail["parsing [".len()..];
239 let bracket_end = rest.find(']')?;
240 let inner = &rest[..bracket_end];
241
242 let arrow = inner.find("->")?;
243 let from_part = &inner[..arrow];
244 let to_part = &inner[arrow + 2..];
245
246 let context = if rest.len() > bracket_end + 1 {
247 let after = rest[bracket_end + 1..].trim();
248 if let Some(stripped) = after.strip_prefix("continue=") {
249 let _ = stripped;
250 }
251 from_part.to_string()
252 } else {
253 from_part.to_string()
254 };
255
256 Some(DialplanContext {
257 from: from_part.to_string(),
258 to: to_part.to_string(),
259 context,
260 })
261}
262
263fn parse_processing_line(msg: &str) -> Option<DialplanContext> {
264 let proc_idx = msg.find("Processing ")?;
265 let rest = &msg[proc_idx + "Processing ".len()..];
266
267 let arrow = rest.find("->")?;
268 let from = &rest[..arrow];
269
270 let after_arrow = &rest[arrow + 2..];
271 let space = after_arrow.find(' ')?;
272 let to = &after_arrow[..space];
273
274 let ctx_idx = after_arrow.find("in context ")?;
275 let ctx_rest = &after_arrow[ctx_idx + "in context ".len()..];
276 let context = ctx_rest.split_whitespace().next()?;
277
278 Some(DialplanContext {
279 from: from.to_string(),
280 to: to.to_string(),
281 context: context.to_string(),
282 })
283}
284
285fn parse_new_channel(detail: &str) -> Option<String> {
286 let rest = detail.strip_prefix("New Channel ")?;
287 let bracket = rest.rfind(" [")?;
288 Some(rest[..bracket].to_string())
289}
290
291fn parse_state_change(detail: &str) -> Option<String> {
292 let arrow = detail.find(" -> ")?;
293 Some(detail[arrow + 4..].trim().to_string())
294}
295
296fn parse_bridge_args(arguments: &str) -> Option<BridgeInfo> {
300 let dial = BridgeDialString::from_str(arguments).ok()?;
301 let first_ep = dial.groups().first()?.first()?;
302 let origination_uuid = first_ep
303 .variables()
304 .and_then(|v| v.get("origination_uuid"))
305 .map(|s| s.to_string());
306 let mut bare = first_ep.clone();
307 bare.set_variables(None);
308 let target_channel = bare.to_string();
309 Some(BridgeInfo {
310 origination_uuid,
311 target_channel,
312 })
313}
314
315struct BridgeInfo {
316 origination_uuid: Option<String>,
317 target_channel: String,
318}
319
320fn parse_originate_success(msg: &str) -> Option<String> {
322 let marker = "Peer UUID: ";
323 let idx = msg.find(marker)?;
324 let uuid = msg[idx + marker.len()..].trim();
325 if uuid.is_empty() {
326 None
327 } else {
328 Some(uuid.to_string())
329 }
330}
331
332fn parse_originate_channel(msg: &str) -> Option<&str> {
336 let start = msg.find(" [")? + 2;
337 let end = msg[start..].find(']')?;
338 let chan = &msg[start..start + end];
339 if chan.is_empty() {
340 None
341 } else {
342 Some(chan)
343 }
344}
345
346fn is_terminal_channel_state(state: Option<&str>) -> bool {
354 matches!(
355 state,
356 Some("CS_HANGUP" | "CS_REPORTING" | "CS_DESTROY" | "CS_NONE" | "HANGUP")
357 )
358}
359
360#[derive(Debug)]
362pub struct EnrichedEntry {
363 pub entry: LogEntry,
364 pub session: Option<SessionSnapshot>,
366}
367
368pub struct SessionTracker<I> {
375 inner: LogStream<I>,
376 sessions: HashMap<String, SessionState>,
377 by_channel_name: HashMap<String, HashSet<String>>,
378 by_pending_target: HashMap<String, String>,
379 by_other_leg: HashMap<String, String>,
380}
381
382impl<I: Iterator<Item = String>> SessionTracker<I> {
383 pub fn new(inner: LogStream<I>) -> Self {
385 SessionTracker {
386 inner,
387 sessions: HashMap::new(),
388 by_channel_name: HashMap::new(),
389 by_pending_target: HashMap::new(),
390 by_other_leg: HashMap::new(),
391 }
392 }
393
394 pub fn sessions(&self) -> &HashMap<String, SessionState> {
396 &self.sessions
397 }
398
399 pub fn remove_session(&mut self, uuid: &str) -> Option<SessionState> {
402 let state = self.sessions.remove(uuid)?;
403 if let Some(chan) = &state.channel_name {
404 if let Some(set) = self.by_channel_name.get_mut(chan) {
405 set.remove(uuid);
406 if set.is_empty() {
407 self.by_channel_name.remove(chan);
408 }
409 }
410 }
411 if let Some(target) = &state.pending_bridge_target {
412 self.by_pending_target.remove(target);
413 }
414 if let Some(other) = &state.other_leg_uuid {
415 self.by_other_leg.remove(other);
416 }
417 Some(state)
418 }
419
420 pub fn stats(&self) -> &ParseStats {
422 self.inner.stats()
423 }
424
425 pub fn drain_unclassified(&mut self) -> Vec<UnclassifiedLine> {
427 self.inner.drain_unclassified()
428 }
429
430 fn apply_index_changes(&mut self, uuid: &str, changes: &IndexedFieldChanges) {
431 if let Some((old, new)) = &changes.channel_name {
432 if let Some(old_name) = old {
433 if let Some(set) = self.by_channel_name.get_mut(old_name) {
434 set.remove(uuid);
435 if set.is_empty() {
436 self.by_channel_name.remove(old_name);
437 }
438 }
439 }
440 if let Some(new_name) = new {
441 self.by_channel_name
442 .entry(new_name.clone())
443 .or_default()
444 .insert(uuid.to_string());
445 }
446 }
447 if let Some((old, new)) = &changes.pending_bridge_target {
448 if let Some(old_target) = old {
449 self.by_pending_target.remove(old_target);
450 }
451 if let Some(new_target) = new {
452 self.by_pending_target
453 .insert(new_target.clone(), uuid.to_string());
454 }
455 }
456 if let Some((old, new)) = &changes.other_leg_uuid {
457 if let Some(old_leg) = old {
458 self.by_other_leg.remove(old_leg);
459 }
460 if let Some(new_leg) = new {
461 self.by_other_leg.insert(new_leg.clone(), uuid.to_string());
462 }
463 }
464 }
465
466 fn link_legs(&mut self, uuid: &str, entry: &LogEntry) {
469 if entry.message.contains("Originate Resulted in Success") {
471 let a_uuid = uuid.to_string();
472 if let Some(peer_uuid) = parse_originate_success(&entry.message) {
473 let a_old_pending = self
474 .sessions
475 .get(&a_uuid)
476 .and_then(|s| s.pending_bridge_target.clone());
477
478 if let Some(a_state) = self.sessions.get_mut(&a_uuid) {
479 a_state.other_leg_uuid = Some(peer_uuid.clone());
480 a_state.pending_bridge_target = None;
481 }
482 self.by_other_leg
483 .insert(peer_uuid.clone(), a_uuid.clone());
484 if let Some(old_target) = a_old_pending {
485 self.by_pending_target.remove(&old_target);
486 }
487
488 let b_state = self.sessions.entry(peer_uuid.clone()).or_default();
489 b_state.other_leg_uuid = Some(a_uuid.clone());
490 self.by_other_leg.insert(a_uuid, peer_uuid);
491 } else if let Some(chan) = parse_originate_channel(&entry.message) {
492 let candidates: Vec<String> = self
497 .by_channel_name
498 .get(chan)
499 .map(|set| {
500 set.iter()
501 .filter(|u| *u != &a_uuid)
502 .filter(|u| {
503 self.sessions
504 .get(*u)
505 .map(|s| !is_terminal_channel_state(s.channel_state.as_deref()))
506 .unwrap_or(false)
507 })
508 .cloned()
509 .collect()
510 })
511 .unwrap_or_default();
512
513 if candidates.len() == 1 {
514 let b_uuid = candidates.into_iter().next().unwrap();
515 let a_old_pending = self
516 .sessions
517 .get(&a_uuid)
518 .and_then(|s| s.pending_bridge_target.clone());
519
520 if let Some(a_state) = self.sessions.get_mut(&a_uuid) {
521 a_state.other_leg_uuid = Some(b_uuid.clone());
522 a_state.pending_bridge_target = None;
523 }
524 if let Some(b_state) = self.sessions.get_mut(&b_uuid) {
525 b_state.other_leg_uuid = Some(a_uuid.clone());
526 }
527
528 self.by_other_leg.insert(b_uuid.clone(), a_uuid.clone());
529 self.by_other_leg.insert(a_uuid, b_uuid);
530 if let Some(old_target) = a_old_pending {
531 self.by_pending_target.remove(&old_target);
532 }
533 }
534 }
535 return;
536 }
537
538 if let MessageKind::ChannelLifecycle { detail } = &entry.message_kind {
541 if let Some(channel_name) = parse_new_channel(detail) {
542 let b_uuid = uuid.to_string();
543
544 let a_uuid_found = self
546 .by_other_leg
547 .get(&b_uuid)
548 .cloned()
549 .or_else(|| self.by_pending_target.get(&channel_name).cloned())
550 .filter(|a| a != &b_uuid);
551
552 if let Some(a_uuid) = a_uuid_found {
553 let a_old_pending = self
554 .sessions
555 .get(&a_uuid)
556 .and_then(|s| s.pending_bridge_target.clone());
557
558 if let Some(a_state) = self.sessions.get_mut(&a_uuid) {
559 a_state.other_leg_uuid = Some(b_uuid.clone());
560 a_state.pending_bridge_target = None;
561 }
562 if let Some(b_state) = self.sessions.get_mut(&b_uuid) {
563 b_state.other_leg_uuid = Some(a_uuid.clone());
564 }
565
566 self.by_other_leg.insert(b_uuid.clone(), a_uuid.clone());
567 self.by_other_leg.insert(a_uuid, b_uuid);
568 if let Some(old_target) = a_old_pending {
569 self.by_pending_target.remove(&old_target);
570 }
571 }
572 }
573 }
574 }
575}
576
577impl<I: Iterator<Item = String>> Iterator for SessionTracker<I> {
578 type Item = EnrichedEntry;
579
580 fn next(&mut self) -> Option<EnrichedEntry> {
581 let entry = self.inner.next()?;
582
583 if entry.uuid.is_empty() {
584 return Some(EnrichedEntry {
585 entry,
586 session: None,
587 });
588 }
589
590 let uuid = entry.uuid.clone();
591 let changes = self
592 .sessions
593 .entry(uuid.clone())
594 .or_default()
595 .update_from_entry(&entry);
596 self.apply_index_changes(&uuid, &changes);
597
598 self.link_legs(&uuid, &entry);
599
600 let snapshot = self.sessions.get(&uuid).unwrap().snapshot();
601
602 Some(EnrichedEntry {
603 entry,
604 session: Some(snapshot),
605 })
606 }
607}
608
609#[cfg(test)]
610mod tests {
611 use super::*;
612
613 const UUID1: &str = "a1b2c3d4-e5f6-7890-abcd-ef1234567890";
614 const UUID2: &str = "b2c3d4e5-f6a7-8901-bcde-f12345678901";
615 const UUID3: &str = "c3d4e5f6-a7b8-9012-cdef-234567890123";
616 const TS1: &str = "2025-01-15 10:30:45.123456";
617 const TS2: &str = "2025-01-15 10:30:46.234567";
618
619 fn full_line(uuid: &str, ts: &str, msg: &str) -> String {
620 format!("{uuid} {ts} 95.97% [DEBUG] sofia.c:100 {msg}")
621 }
622
623 fn collect_enriched(lines: Vec<String>) -> Vec<EnrichedEntry> {
624 let stream = LogStream::new(lines.into_iter());
625 SessionTracker::new(stream).collect()
626 }
627
628 #[test]
629 fn system_line_no_session() {
630 let lines = vec![format!(
631 "{TS1} 95.97% [INFO] mod_event_socket.c:1772 Event Socket command"
632 )];
633 let entries = collect_enriched(lines);
634 assert_eq!(entries.len(), 1);
635 assert!(entries[0].session.is_none());
636 }
637
638 #[test]
639 fn dialplan_context_propagation() {
640 let lines = vec![
641 full_line(UUID1, TS1, "CHANNEL_DATA:"),
642 format!("{UUID1} Channel-Name: [sofia/internal/+15550001234@192.0.2.1]"),
643 format!("{UUID1} EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 answer"),
644 format!("{UUID1} Dialplan: sofia/internal/+15550001234@192.0.2.1 parsing [public->global] continue=true"),
645 full_line(UUID1, TS2, "Some later event"),
646 ];
647 let entries = collect_enriched(lines);
648 let last = entries.last().unwrap();
649 let session = last.session.as_ref().unwrap();
650 assert_eq!(session.dialplan_context.as_deref(), Some("public"));
651 assert_eq!(session.dialplan_from.as_deref(), Some("public"));
652 assert_eq!(session.dialplan_to.as_deref(), Some("global"));
653 }
654
655 #[test]
656 fn processing_line_extracts_context() {
657 let lines = vec![full_line(
658 UUID1,
659 TS1,
660 "Processing 5551234567->5559876543 in context public",
661 )];
662 let entries = collect_enriched(lines);
663 let session = entries[0].session.as_ref().unwrap();
664 assert_eq!(session.dialplan_context.as_deref(), Some("public"));
665 assert_eq!(session.dialplan_from.as_deref(), Some("5551234567"));
666 assert_eq!(session.dialplan_to.as_deref(), Some("5559876543"));
667 }
668
669 #[test]
670 fn initial_context_preserved_across_transfers() {
671 let lines = vec![
672 full_line(
673 UUID1,
674 TS1,
675 "Processing 5551234567->5559876543 in context public",
676 ),
677 full_line(
678 UUID1,
679 TS2,
680 "Processing 5551234567->start_recording in context recordings",
681 ),
682 ];
683 let stream = LogStream::new(lines.into_iter());
684 let mut tracker = SessionTracker::new(stream);
685 let entries: Vec<_> = tracker.by_ref().collect();
686
687 let first = entries[0].session.as_ref().unwrap();
688 assert_eq!(
689 first.initial_context.as_deref(),
690 Some("public"),
691 "initial_context set on first Processing line"
692 );
693 assert_eq!(first.dialplan_context.as_deref(), Some("public"));
694
695 let state = tracker.sessions().get(UUID1).unwrap();
696 assert_eq!(
697 state.initial_context.as_deref(),
698 Some("public"),
699 "initial_context keeps the first context seen"
700 );
701 assert_eq!(
702 state.dialplan_context.as_deref(),
703 Some("recordings"),
704 "dialplan_context tracks the current context"
705 );
706 assert_eq!(state.dialplan_to.as_deref(), Some("start_recording"));
707 }
708
709 #[test]
710 fn new_channel_sets_channel_name() {
711 let lines = vec![full_line(
712 UUID1,
713 TS1,
714 "New Channel sofia/internal-v4/sos [a1b2c3d4-e5f6-7890-abcd-ef1234567890]",
715 )];
716 let entries = collect_enriched(lines);
717 let session = entries[0].session.as_ref().unwrap();
718 assert_eq!(
719 session.channel_name.as_deref(),
720 Some("sofia/internal-v4/sos")
721 );
722 }
723
724 #[test]
725 fn originate_success_links_both_legs() {
726 let lines = vec![
729 full_line(UUID2, TS1, "New Channel sofia/esinet1-v6-tcp/sip:target.example.com [b2c3d4e5-f6a7-8901-bcde-f12345678901]"),
730 full_line(UUID1, TS2, "Originate Resulted in Success: [sofia/esinet1-v6-tcp/sip:target.example.com] Peer UUID: b2c3d4e5-f6a7-8901-bcde-f12345678901"),
731 ];
732 let stream = LogStream::new(lines.into_iter());
733 let mut tracker = SessionTracker::new(stream);
734 let _: Vec<_> = tracker.by_ref().collect();
735
736 let a_leg = tracker.sessions().get(UUID1).unwrap();
737 assert_eq!(
738 a_leg.other_leg_uuid.as_deref(),
739 Some(UUID2),
740 "A-leg other_leg_uuid set from Originate Resulted in Success"
741 );
742
743 let b_leg = tracker.sessions().get(UUID2).unwrap();
744 assert_eq!(
745 b_leg.other_leg_uuid.as_deref(),
746 Some(UUID1),
747 "B-leg other_leg_uuid points back to A-leg"
748 );
749 }
750
751 #[test]
752 fn originate_success_channel_fallback_links_legs() {
753 let lines = vec![
757 full_line(
758 UUID2,
759 TS1,
760 "New Channel sofia/internal/6244@192.0.2.72:50744 [b2c3d4e5-f6a7-8901-bcde-f12345678901]",
761 ),
762 full_line(
763 UUID1,
764 TS2,
765 "Originate Resulted in Success: [sofia/internal/6244@192.0.2.72:50744]",
766 ),
767 ];
768 let stream = LogStream::new(lines.into_iter());
769 let mut tracker = SessionTracker::new(stream);
770 let _: Vec<_> = tracker.by_ref().collect();
771
772 let a_leg = tracker.sessions().get(UUID1).unwrap();
773 assert_eq!(
774 a_leg.other_leg_uuid.as_deref(),
775 Some(UUID2),
776 "A-leg linked to B-leg via channel-name fallback when Peer UUID absent"
777 );
778
779 let b_leg = tracker.sessions().get(UUID2).unwrap();
780 assert_eq!(
781 b_leg.other_leg_uuid.as_deref(),
782 Some(UUID1),
783 "B-leg linked back to A-leg"
784 );
785 }
786
787 #[test]
788 fn originate_success_peer_uuid_wins_over_channel_fallback() {
789 let lines = vec![
792 full_line(
793 UUID2,
794 TS1,
795 "New Channel sofia/internal/6244@192.0.2.72:50744 [b2c3d4e5-f6a7-8901-bcde-f12345678901]",
796 ),
797 full_line(
798 UUID3,
799 TS1,
800 "New Channel sofia/internal/6244@192.0.2.72:50744 [c3d4e5f6-a7b8-9012-cdef-234567890123]",
801 ),
802 full_line(
803 UUID1,
804 TS2,
805 "Originate Resulted in Success: [sofia/internal/6244@192.0.2.72:50744] Peer UUID: b2c3d4e5-f6a7-8901-bcde-f12345678901",
806 ),
807 ];
808 let stream = LogStream::new(lines.into_iter());
809 let mut tracker = SessionTracker::new(stream);
810 let _: Vec<_> = tracker.by_ref().collect();
811
812 let a_leg = tracker.sessions().get(UUID1).unwrap();
813 assert_eq!(
814 a_leg.other_leg_uuid.as_deref(),
815 Some(UUID2),
816 "Peer UUID wins over channel-name match"
817 );
818
819 let decoy = tracker.sessions().get(UUID3).unwrap();
820 assert_eq!(
821 decoy.other_leg_uuid, None,
822 "Decoy session sharing channel name is not touched"
823 );
824 }
825
826 #[test]
827 fn originate_success_channel_fallback_skips_when_ambiguous() {
828 let lines = vec![
831 full_line(
832 UUID2,
833 TS1,
834 "New Channel sofia/internal/6244@192.0.2.72:50744 [b2c3d4e5-f6a7-8901-bcde-f12345678901]",
835 ),
836 full_line(
837 UUID3,
838 TS1,
839 "New Channel sofia/internal/6244@192.0.2.72:50744 [c3d4e5f6-a7b8-9012-cdef-234567890123]",
840 ),
841 full_line(
842 UUID1,
843 TS2,
844 "Originate Resulted in Success: [sofia/internal/6244@192.0.2.72:50744]",
845 ),
846 ];
847 let stream = LogStream::new(lines.into_iter());
848 let mut tracker = SessionTracker::new(stream);
849 let _: Vec<_> = tracker.by_ref().collect();
850
851 let a_leg = tracker.sessions().get(UUID1).unwrap();
852 assert_eq!(
853 a_leg.other_leg_uuid, None,
854 "Ambiguous channel name yields no link"
855 );
856 assert_eq!(tracker.sessions().get(UUID2).unwrap().other_leg_uuid, None);
857 assert_eq!(tracker.sessions().get(UUID3).unwrap().other_leg_uuid, None);
858 }
859
860 #[test]
861 fn originate_success_channel_fallback_skips_terminated_candidates() {
862 let lines = vec![
867 full_line(
868 UUID2,
869 TS1,
870 "New Channel sofia/internal/6244@192.0.2.72:50744 [b2c3d4e5-f6a7-8901-bcde-f12345678901]",
871 ),
872 full_line(
873 UUID2,
874 TS1,
875 "(sofia/internal/6244@192.0.2.72:50744) State Change CS_EXECUTE -> CS_DESTROY",
876 ),
877 full_line(
878 UUID3,
879 TS1,
880 "New Channel sofia/internal/6244@192.0.2.72:50744 [c3d4e5f6-a7b8-9012-cdef-234567890123]",
881 ),
882 full_line(
883 UUID1,
884 TS2,
885 "Originate Resulted in Success: [sofia/internal/6244@192.0.2.72:50744]",
886 ),
887 ];
888 let stream = LogStream::new(lines.into_iter());
889 let mut tracker = SessionTracker::new(stream);
890 let _: Vec<_> = tracker.by_ref().collect();
891
892 let a_leg = tracker.sessions().get(UUID1).unwrap();
893 assert_eq!(
894 a_leg.other_leg_uuid.as_deref(),
895 Some(UUID3),
896 "Live b-leg wins over CS_DESTROY straggler"
897 );
898
899 let live_b = tracker.sessions().get(UUID3).unwrap();
900 assert_eq!(
901 live_b.other_leg_uuid.as_deref(),
902 Some(UUID1),
903 "Live b-leg points back to a-leg"
904 );
905
906 let stale_b = tracker.sessions().get(UUID2).unwrap();
907 assert_eq!(
908 stale_b.other_leg_uuid, None,
909 "Terminated b-leg is not touched"
910 );
911 }
912
913 #[test]
914 fn originate_success_channel_fallback_skips_when_no_match() {
915 let lines = vec![full_line(
918 UUID1,
919 TS2,
920 "Originate Resulted in Success: [sofia/internal/6244@192.0.2.72:50744]",
921 )];
922 let stream = LogStream::new(lines.into_iter());
923 let mut tracker = SessionTracker::new(stream);
924 let _: Vec<_> = tracker.by_ref().collect();
925
926 let a_leg = tracker.sessions().get(UUID1).unwrap();
927 assert_eq!(a_leg.other_leg_uuid, None);
928 assert_eq!(a_leg.pending_bridge_target, None);
929 }
930
931 #[test]
932 fn bridge_origination_uuid_links_a_leg_immediately() {
933 let lines = vec![
936 full_line(UUID1, TS1, "EXECUTE [depth=0] sofia/internal-v6/1232@[2001:db8::10] bridge([origination_uuid=b2c3d4e5-f6a7-8901-bcde-f12345678901,leg_timeout=2]sofia/esinet1-v6-tcp/sip:target.example.com)"),
937 full_line(UUID2, TS1, "New Channel sofia/esinet1-v6-tcp/sip:target.example.com [b2c3d4e5-f6a7-8901-bcde-f12345678901]"),
938 ];
939 let stream = LogStream::new(lines.into_iter());
940 let mut tracker = SessionTracker::new(stream);
941 let _: Vec<_> = tracker.by_ref().collect();
942
943 let a_leg = tracker.sessions().get(UUID1).unwrap();
944 assert_eq!(
945 a_leg.other_leg_uuid.as_deref(),
946 Some(UUID2),
947 "A-leg knows B-leg UUID from origination_uuid in bridge args"
948 );
949
950 let b_leg = tracker.sessions().get(UUID2).unwrap();
951 assert_eq!(
952 b_leg.other_leg_uuid.as_deref(),
953 Some(UUID1),
954 "B-leg knows A-leg once New Channel correlates"
955 );
956 }
957
958 #[test]
959 fn bridge_target_matches_new_channel() {
960 let lines = vec![
963 full_line(UUID1, TS1, "EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 bridge(sofia/gateway/carrier/+15559876543)"),
964 full_line(UUID1, TS1, "Parsing session specific variables"),
965 full_line(UUID2, TS1, "New Channel sofia/gateway/carrier/+15559876543 [b2c3d4e5-f6a7-8901-bcde-f12345678901]"),
966 ];
967 let stream = LogStream::new(lines.into_iter());
968 let mut tracker = SessionTracker::new(stream);
969 let _: Vec<_> = tracker.by_ref().collect();
970
971 let a_leg = tracker.sessions().get(UUID1).unwrap();
972 assert_eq!(
973 a_leg.other_leg_uuid.as_deref(),
974 Some(UUID2),
975 "A-leg linked to B-leg via bridge target matching New Channel"
976 );
977
978 let b_leg = tracker.sessions().get(UUID2).unwrap();
979 assert_eq!(
980 b_leg.other_leg_uuid.as_deref(),
981 Some(UUID1),
982 "B-leg linked back to A-leg"
983 );
984 }
985
986 #[test]
987 fn originate_success_corrects_wrong_target_match() {
988 let lines = vec![
991 full_line(UUID1, TS1, "EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 bridge(sofia/gateway/carrier/+15559876543)"),
992 full_line(UUID2, TS1, "New Channel sofia/gateway/carrier/+15559876543 [b2c3d4e5-f6a7-8901-bcde-f12345678901]"),
993 full_line(UUID1, TS2, "Originate Resulted in Success: [sofia/gateway/carrier/+15559876543] Peer UUID: c3d4e5f6-a7b8-9012-cdef-234567890123"),
994 ];
995 let stream = LogStream::new(lines.into_iter());
996 let mut tracker = SessionTracker::new(stream);
997 let _: Vec<_> = tracker.by_ref().collect();
998
999 let a_leg = tracker.sessions().get(UUID1).unwrap();
1000 assert_eq!(
1001 a_leg.other_leg_uuid.as_deref(),
1002 Some(UUID3),
1003 "Originate success overrides earlier target-match guess"
1004 );
1005
1006 let real_b_leg = tracker.sessions().get(UUID3).unwrap();
1007 assert_eq!(
1008 real_b_leg.other_leg_uuid.as_deref(),
1009 Some(UUID1),
1010 "Real B-leg points back to A-leg"
1011 );
1012 }
1013
1014 #[test]
1015 fn channel_data_other_leg_uuid() {
1016 let lines = vec![
1018 full_line(UUID1, TS1, "CHANNEL_DATA:"),
1019 format!("{UUID1} Other-Leg-Unique-ID: [{UUID2}]"),
1020 ];
1021 let stream = LogStream::new(lines.into_iter());
1022 let mut tracker = SessionTracker::new(stream);
1023 let _: Vec<_> = tracker.by_ref().collect();
1024
1025 let state = tracker.sessions().get(UUID1).unwrap();
1026 assert_eq!(
1027 state.other_leg_uuid.as_deref(),
1028 Some(UUID2),
1029 "other_leg_uuid set from Other-Leg-Unique-ID CHANNEL_DATA field"
1030 );
1031 }
1032
1033 #[test]
1034 fn channel_data_populates_session() {
1035 let lines = vec![
1036 full_line(UUID1, TS1, "CHANNEL_DATA:"),
1037 format!("{UUID1} Channel-Name: [sofia/internal/+15550001234@192.0.2.1]"),
1038 format!("{UUID1} Channel-State: [CS_EXECUTE]"),
1039 "variable_sip_call_id: [test123@192.0.2.1]".to_string(),
1040 "variable_direction: [inbound]".to_string(),
1041 ];
1042 let entries = collect_enriched(lines);
1043 assert_eq!(entries.len(), 1);
1044 let session = entries[0].session.as_ref().unwrap();
1045 assert_eq!(
1046 session.channel_name.as_deref(),
1047 Some("sofia/internal/+15550001234@192.0.2.1")
1048 );
1049 assert_eq!(session.channel_state.as_deref(), Some("CS_EXECUTE"));
1050 }
1051
1052 #[test]
1053 fn variables_learned_from_channel_data() {
1054 let lines = vec![
1055 full_line(UUID1, TS1, "CHANNEL_DATA:"),
1056 "variable_sip_call_id: [test123@192.0.2.1]".to_string(),
1057 "variable_direction: [inbound]".to_string(),
1058 ];
1059 let stream = LogStream::new(lines.into_iter());
1060 let mut tracker = SessionTracker::new(stream);
1061 let _: Vec<_> = tracker.by_ref().collect();
1062 let state = tracker.sessions().get(UUID1).unwrap();
1063 assert_eq!(
1064 state.variables.get("sip_call_id").map(|s| s.as_str()),
1065 Some("test123@192.0.2.1")
1066 );
1067 assert_eq!(
1068 state.variables.get("direction").map(|s| s.as_str()),
1069 Some("inbound")
1070 );
1071 }
1072
1073 #[test]
1074 fn variables_learned_from_set_execute() {
1075 let lines = vec![
1076 full_line(UUID1, TS1, "First"),
1077 format!("{UUID1} EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 set(call_direction=inbound)"),
1078 full_line(UUID1, TS2, "After set"),
1079 ];
1080 let stream = LogStream::new(lines.into_iter());
1081 let mut tracker = SessionTracker::new(stream);
1082 let entries: Vec<_> = tracker.by_ref().collect();
1083 assert_eq!(entries.len(), 3);
1084 let state = tracker.sessions().get(UUID1).unwrap();
1085 assert_eq!(
1086 state.variables.get("call_direction").map(|s| s.as_str()),
1087 Some("inbound")
1088 );
1089 }
1090
1091 #[test]
1092 fn variables_learned_from_export_execute() {
1093 let lines = vec![
1094 full_line(UUID1, TS1, "First"),
1095 format!("{UUID1} EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 export(originate_timeout=3600)"),
1096 ];
1097 let stream = LogStream::new(lines.into_iter());
1098 let mut tracker = SessionTracker::new(stream);
1099 let _: Vec<_> = tracker.by_ref().collect();
1100 let state = tracker.sessions().get(UUID1).unwrap();
1101 assert_eq!(
1102 state.variables.get("originate_timeout").map(|s| s.as_str()),
1103 Some("3600")
1104 );
1105 }
1106
1107 #[test]
1108 fn session_isolation_between_uuids() {
1109 let lines = vec![
1110 full_line(
1111 UUID1,
1112 TS1,
1113 "Processing 5551111111->5552222222 in context public",
1114 ),
1115 full_line(
1116 UUID2,
1117 TS2,
1118 "Processing 5553333333->5554444444 in context private",
1119 ),
1120 ];
1121 let stream = LogStream::new(lines.into_iter());
1122 let mut tracker = SessionTracker::new(stream);
1123 let _: Vec<_> = tracker.by_ref().collect();
1124 let s1 = tracker.sessions().get(UUID1).unwrap();
1125 let s2 = tracker.sessions().get(UUID2).unwrap();
1126 assert_eq!(s1.dialplan_context.as_deref(), Some("public"));
1127 assert_eq!(s2.dialplan_context.as_deref(), Some("private"));
1128 assert_eq!(s1.dialplan_from.as_deref(), Some("5551111111"));
1129 assert_eq!(s2.dialplan_from.as_deref(), Some("5553333333"));
1130 }
1131
1132 #[test]
1133 fn processing_line_with_regex_type_and_angle_bracket_caller() {
1134 let lines = vec![full_line(
1135 UUID1,
1136 TS1,
1137 "Processing Emergency S R <5550001234>->start_recording in context recordings",
1138 )];
1139 let entries = collect_enriched(lines);
1140 let session = entries[0].session.as_ref().unwrap();
1141 assert_eq!(session.initial_context.as_deref(), Some("recordings"));
1142 assert_eq!(session.dialplan_context.as_deref(), Some("recordings"));
1143 assert_eq!(
1144 session.dialplan_from.as_deref(),
1145 Some("Emergency S R <5550001234>")
1146 );
1147 assert_eq!(session.dialplan_to.as_deref(), Some("start_recording"));
1148 }
1149
1150 #[test]
1151 fn processing_line_extension_format() {
1152 let lines = vec![full_line(
1153 UUID1,
1154 TS1,
1155 "Processing Extension 1263 <1263>->start_recording in context recordings",
1156 )];
1157 let entries = collect_enriched(lines);
1158 let session = entries[0].session.as_ref().unwrap();
1159 assert_eq!(session.initial_context.as_deref(), Some("recordings"));
1160 assert_eq!(
1161 session.dialplan_from.as_deref(),
1162 Some("Extension 1263 <1263>")
1163 );
1164 assert_eq!(session.dialplan_to.as_deref(), Some("start_recording"));
1165 }
1166
1167 #[test]
1168 fn state_change_updates_channel_state() {
1169 let lines = vec![full_line(UUID1, TS1, "State Change CS_INIT -> CS_ROUTING")];
1170 let entries = collect_enriched(lines);
1171 let session = entries[0].session.as_ref().unwrap();
1172 assert_eq!(session.channel_state.as_deref(), Some("CS_ROUTING"));
1173 }
1174
1175 #[test]
1176 fn callstate_change_updates_channel_state() {
1177 let lines = vec![full_line(
1178 UUID1,
1179 TS1,
1180 "(sofia/internal-v4/sos) Callstate Change DOWN -> RINGING",
1181 )];
1182 let entries = collect_enriched(lines);
1183 let session = entries[0].session.as_ref().unwrap();
1184 assert_eq!(session.channel_state.as_deref(), Some("RINGING"));
1185 }
1186
1187 #[test]
1188 fn state_change_overrides_callstate() {
1189 let lines = vec![
1190 full_line(
1191 UUID1,
1192 TS1,
1193 "(sofia/internal-v4/sos) Callstate Change DOWN -> RINGING",
1194 ),
1195 full_line(
1196 UUID1,
1197 TS2,
1198 "(sofia/internal-v4/sos) State Change CS_CONSUME_MEDIA -> CS_EXCHANGE_MEDIA",
1199 ),
1200 ];
1201 let entries = collect_enriched(lines);
1202 assert_eq!(
1203 entries[0]
1204 .session
1205 .as_ref()
1206 .unwrap()
1207 .channel_state
1208 .as_deref(),
1209 Some("RINGING")
1210 );
1211 assert_eq!(
1212 entries[1]
1213 .session
1214 .as_ref()
1215 .unwrap()
1216 .channel_state
1217 .as_deref(),
1218 Some("CS_EXCHANGE_MEDIA")
1219 );
1220 }
1221
1222 #[test]
1223 fn bleg_lifecycle_extracts_data_from_processing() {
1224 let lines = vec![
1225 full_line(
1226 UUID1,
1227 TS1,
1228 "New Channel sofia/internal-v4/sos [a1b2c3d4-e5f6-7890-abcd-ef1234567890]",
1229 ),
1230 full_line(
1231 UUID1,
1232 TS1,
1233 "(sofia/internal-v4/sos) State Change CS_NEW -> CS_INIT",
1234 ),
1235 full_line(
1236 UUID1,
1237 TS1,
1238 "(sofia/internal-v4/sos) State Change CS_INIT -> CS_ROUTING",
1239 ),
1240 full_line(
1241 UUID1,
1242 TS1,
1243 "(sofia/internal-v4/sos) State Change CS_ROUTING -> CS_CONSUME_MEDIA",
1244 ),
1245 full_line(
1246 UUID1,
1247 TS1,
1248 "(sofia/internal-v4/sos) Callstate Change DOWN -> RINGING",
1249 ),
1250 full_line(
1251 UUID1,
1252 TS2,
1253 "(sofia/internal-v4/sos) State Change CS_CONSUME_MEDIA -> CS_EXCHANGE_MEDIA",
1254 ),
1255 full_line(
1256 UUID1,
1257 TS2,
1258 "Processing Emergency S R <5550001234>->start_recording in context recordings",
1259 ),
1260 full_line(
1261 UUID1,
1262 TS2,
1263 "(sofia/internal-v4/sos) State Change CS_EXCHANGE_MEDIA -> CS_HANGUP",
1264 ),
1265 ];
1266 let entries = collect_enriched(lines);
1267
1268 let after_ringing = entries[4].session.as_ref().unwrap();
1269 assert_eq!(after_ringing.channel_state.as_deref(), Some("RINGING"));
1270 assert!(after_ringing.initial_context.is_none());
1271
1272 let after_processing = entries[6].session.as_ref().unwrap();
1273 assert_eq!(
1274 after_processing.channel_state.as_deref(),
1275 Some("CS_EXCHANGE_MEDIA")
1276 );
1277 assert_eq!(
1278 after_processing.initial_context.as_deref(),
1279 Some("recordings")
1280 );
1281 assert_eq!(
1282 after_processing.dialplan_from.as_deref(),
1283 Some("Emergency S R <5550001234>")
1284 );
1285 assert_eq!(
1286 after_processing.dialplan_to.as_deref(),
1287 Some("start_recording")
1288 );
1289
1290 let after_hangup = entries[7].session.as_ref().unwrap();
1291 assert_eq!(after_hangup.channel_state.as_deref(), Some("CS_HANGUP"));
1292 assert_eq!(after_hangup.initial_context.as_deref(), Some("recordings"));
1293 }
1294
1295 #[test]
1296 fn channel_name_from_new_channel() {
1297 let lines = vec![full_line(
1298 UUID1,
1299 TS1,
1300 "New Channel sofia/internal-v4/sos [a1b2c3d4-e5f6-7890-abcd-ef1234567890]",
1301 )];
1302 let entries = collect_enriched(lines);
1303 let session = entries[0].session.as_ref().unwrap();
1304 assert_eq!(
1305 session.channel_name.as_deref(),
1306 Some("sofia/internal-v4/sos")
1307 );
1308 }
1309
1310 #[test]
1311 fn remove_session() {
1312 let lines = vec![full_line(
1313 UUID1,
1314 TS1,
1315 "Processing 5551111111->5552222222 in context public",
1316 )];
1317 let stream = LogStream::new(lines.into_iter());
1318 let mut tracker = SessionTracker::new(stream);
1319 let _: Vec<_> = tracker.by_ref().collect();
1320 assert!(tracker.sessions().contains_key(UUID1));
1321 let removed = tracker.remove_session(UUID1).unwrap();
1322 assert_eq!(removed.dialplan_context.as_deref(), Some("public"));
1323 assert!(!tracker.sessions().contains_key(UUID1));
1324 }
1325
1326 #[test]
1327 fn stats_delegation() {
1328 let lines = vec![
1329 full_line(UUID1, TS1, "First"),
1330 full_line(UUID1, TS2, "Second"),
1331 ];
1332 let stream = LogStream::new(lines.into_iter());
1333 let mut tracker = SessionTracker::new(stream);
1334 let _: Vec<_> = tracker.by_ref().collect();
1335 assert_eq!(tracker.stats().lines_processed, 2);
1336 }
1337
1338 #[test]
1339 fn snapshot_reflects_cumulative_state() {
1340 let lines = vec![
1341 full_line(UUID1, TS1, "CHANNEL_DATA:"),
1342 format!("{UUID1} Channel-Name: [sofia/internal/+15550001234@192.0.2.1]"),
1343 format!("{UUID1} EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 set(foo=bar)"),
1344 full_line(
1345 UUID1,
1346 TS2,
1347 "Processing 5551111111->5552222222 in context public",
1348 ),
1349 ];
1350 let entries = collect_enriched(lines);
1351 assert_eq!(entries.len(), 3);
1352 let first = entries[0].session.as_ref().unwrap();
1353 assert_eq!(
1354 first.channel_name.as_deref(),
1355 Some("sofia/internal/+15550001234@192.0.2.1"),
1356 );
1357 assert!(first.dialplan_context.is_none());
1358
1359 let last = entries[2].session.as_ref().unwrap();
1360 assert_eq!(
1361 last.channel_name.as_deref(),
1362 Some("sofia/internal/+15550001234@192.0.2.1"),
1363 );
1364 assert_eq!(last.dialplan_context.as_deref(), Some("public"));
1365 }
1366}