1use std::collections::HashMap;
2use std::str::FromStr;
3
4use freeswitch_types::{BridgeDialString, CallDirection, DialString};
5
6use crate::line::parse_line;
7use crate::message::{classify_message, MessageKind};
8use crate::stream::{Block, LogEntry, LogStream, ParseStats, UnclassifiedLine};
9
10#[derive(Debug, Clone, Default)]
16pub struct SessionState {
17 pub channel_name: Option<String>,
19 pub channel_state: Option<String>,
21 pub initial_context: Option<String>,
23 pub dialplan_context: Option<String>,
25 pub dialplan_from: Option<String>,
27 pub dialplan_to: Option<String>,
29 pub call_direction: Option<CallDirection>,
31 pub caller_id_number: Option<String>,
33 pub destination_number: Option<String>,
35 pub other_leg_uuid: Option<String>,
38 pub(crate) pending_bridge_target: Option<String>,
40 pub variables: HashMap<String, String>,
42}
43
44#[derive(Debug, Clone)]
49pub struct SessionSnapshot {
50 pub channel_name: Option<String>,
51 pub channel_state: Option<String>,
52 pub initial_context: Option<String>,
53 pub dialplan_context: Option<String>,
54 pub dialplan_from: Option<String>,
55 pub dialplan_to: Option<String>,
56 pub call_direction: Option<CallDirection>,
57 pub caller_id_number: Option<String>,
58 pub destination_number: Option<String>,
59 pub other_leg_uuid: Option<String>,
60}
61
62impl SessionState {
63 fn snapshot(&self) -> SessionSnapshot {
64 SessionSnapshot {
65 channel_name: self.channel_name.clone(),
66 channel_state: self.channel_state.clone(),
67 initial_context: self.initial_context.clone(),
68 dialplan_context: self.dialplan_context.clone(),
69 dialplan_from: self.dialplan_from.clone(),
70 dialplan_to: self.dialplan_to.clone(),
71 call_direction: self.call_direction,
72 caller_id_number: self.caller_id_number.clone(),
73 destination_number: self.destination_number.clone(),
74 other_leg_uuid: self.other_leg_uuid.clone(),
75 }
76 }
77
78 fn update_from_entry(&mut self, entry: &LogEntry) {
79 if let Some(Block::ChannelData { fields, variables }) = &entry.block {
80 for (name, value) in fields {
81 match name.as_str() {
82 "Channel-Name" => self.channel_name = Some(value.clone()),
83 "Channel-State" => self.channel_state = Some(value.clone()),
84 "Call-Direction" => {
85 self.call_direction = CallDirection::from_str(value).ok();
86 }
87 "Caller-Caller-ID-Number" => {
88 self.caller_id_number = Some(value.clone());
89 }
90 "Caller-Destination-Number" => {
91 self.destination_number = Some(value.clone());
92 }
93 "Other-Leg-Unique-ID" => {
94 self.other_leg_uuid = Some(value.clone());
95 }
96 _ => {}
97 }
98 }
99 for (name, value) in variables {
100 let var_name = name.strip_prefix("variable_").unwrap_or(name);
101 self.variables.insert(var_name.to_string(), value.clone());
102 }
103 }
104
105 match &entry.message_kind {
106 MessageKind::Dialplan { detail, .. } => {
107 if let Some(dp) = parse_dialplan_context(detail) {
108 self.initial_context.get_or_insert(dp.context.clone());
109 self.dialplan_context = Some(dp.context);
110 self.dialplan_from = Some(dp.from);
111 self.dialplan_to = Some(dp.to);
112 }
113 }
114 MessageKind::Execute {
115 application,
116 arguments,
117 ..
118 } => match application.as_str() {
119 "set" | "export" => {
120 if let Some((name, value)) = arguments.split_once('=') {
121 self.variables.insert(name.to_string(), value.to_string());
122 }
123 }
124 "bridge" => {
125 if let Some(info) = parse_bridge_args(arguments) {
126 if let Some(uuid) = &info.origination_uuid {
127 self.other_leg_uuid = Some(uuid.clone());
128 }
129 self.pending_bridge_target = Some(info.target_channel);
130 }
131 }
132 _ => {}
133 },
134 MessageKind::Variable { name, value } => {
135 let var_name = name.strip_prefix("variable_").unwrap_or(name);
136 self.variables.insert(var_name.to_string(), value.clone());
137 }
138 MessageKind::ChannelField { name, value } => match name.as_str() {
139 "Channel-Name" => self.channel_name = Some(value.clone()),
140 "Channel-State" => self.channel_state = Some(value.clone()),
141 _ => {}
142 },
143 MessageKind::StateChange { detail } => {
144 if let Some(new_state) = parse_state_change(detail) {
145 self.channel_state = Some(new_state);
146 }
147 }
148 MessageKind::ChannelLifecycle { detail } => {
149 if let Some(name) = parse_new_channel(detail) {
150 if self.channel_name.is_none() {
151 self.channel_name = Some(name);
152 }
153 }
154 }
155 _ => {}
156 }
157
158 if entry.message.contains("Processing ") && entry.message.contains(" in context ") {
159 if let Some(dp) = parse_processing_line(&entry.message) {
160 self.initial_context.get_or_insert(dp.context.clone());
161 self.dialplan_context = Some(dp.context);
162 self.dialplan_from = Some(dp.from);
163 self.dialplan_to = Some(dp.to);
164 }
165 }
166
167 for attached in &entry.attached {
168 let parsed = parse_line(attached);
169 self.update_from_message(parsed.message);
170 }
171 }
172
173 fn update_from_message(&mut self, msg: &str) {
174 let kind = classify_message(msg);
175 match &kind {
176 MessageKind::Dialplan { detail, .. } => {
177 if let Some(dp) = parse_dialplan_context(detail) {
178 self.initial_context.get_or_insert(dp.context.clone());
179 self.dialplan_context = Some(dp.context);
180 self.dialplan_from = Some(dp.from);
181 self.dialplan_to = Some(dp.to);
182 }
183 }
184 MessageKind::Variable { name, value } => {
185 let var_name = name.strip_prefix("variable_").unwrap_or(name);
186 self.variables.insert(var_name.to_string(), value.clone());
187 }
188 MessageKind::ChannelField { name, value } => match name.as_str() {
189 "Channel-Name" => self.channel_name = Some(value.clone()),
190 "Channel-State" => self.channel_state = Some(value.clone()),
191 _ => {}
192 },
193 MessageKind::StateChange { detail } => {
194 if let Some(new_state) = parse_state_change(detail) {
195 self.channel_state = Some(new_state);
196 }
197 }
198 _ => {}
199 }
200 }
201}
202
203struct DialplanContext {
204 from: String,
205 to: String,
206 context: String,
207}
208
209fn parse_dialplan_context(detail: &str) -> Option<DialplanContext> {
210 if !detail.starts_with("parsing [") {
211 return None;
212 }
213 let rest = &detail["parsing [".len()..];
214 let bracket_end = rest.find(']')?;
215 let inner = &rest[..bracket_end];
216
217 let arrow = inner.find("->")?;
218 let from_part = &inner[..arrow];
219 let to_part = &inner[arrow + 2..];
220
221 let context = if rest.len() > bracket_end + 1 {
222 let after = rest[bracket_end + 1..].trim();
223 if let Some(stripped) = after.strip_prefix("continue=") {
224 let _ = stripped;
225 }
226 from_part.to_string()
227 } else {
228 from_part.to_string()
229 };
230
231 Some(DialplanContext {
232 from: from_part.to_string(),
233 to: to_part.to_string(),
234 context,
235 })
236}
237
238fn parse_processing_line(msg: &str) -> Option<DialplanContext> {
239 let proc_idx = msg.find("Processing ")?;
240 let rest = &msg[proc_idx + "Processing ".len()..];
241
242 let arrow = rest.find("->")?;
243 let from = &rest[..arrow];
244
245 let after_arrow = &rest[arrow + 2..];
246 let space = after_arrow.find(' ')?;
247 let to = &after_arrow[..space];
248
249 let ctx_idx = after_arrow.find("in context ")?;
250 let ctx_rest = &after_arrow[ctx_idx + "in context ".len()..];
251 let context = ctx_rest.split_whitespace().next()?;
252
253 Some(DialplanContext {
254 from: from.to_string(),
255 to: to.to_string(),
256 context: context.to_string(),
257 })
258}
259
260fn parse_new_channel(detail: &str) -> Option<String> {
261 let rest = detail.strip_prefix("New Channel ")?;
262 let bracket = rest.rfind(" [")?;
263 Some(rest[..bracket].to_string())
264}
265
266fn parse_state_change(detail: &str) -> Option<String> {
267 let arrow = detail.find(" -> ")?;
268 Some(detail[arrow + 4..].trim().to_string())
269}
270
271fn parse_bridge_args(arguments: &str) -> Option<BridgeInfo> {
275 let dial = BridgeDialString::from_str(arguments).ok()?;
276 let first_ep = dial.groups.first()?.first()?;
277 let origination_uuid = first_ep
278 .variables()
279 .and_then(|v| v.get("origination_uuid"))
280 .map(|s| s.to_string());
281 let mut bare = first_ep.clone();
282 bare.set_variables(None);
283 let target_channel = bare.to_string();
284 Some(BridgeInfo {
285 origination_uuid,
286 target_channel,
287 })
288}
289
290struct BridgeInfo {
291 origination_uuid: Option<String>,
292 target_channel: String,
293}
294
295fn parse_originate_success(msg: &str) -> Option<String> {
297 let marker = "Peer UUID: ";
298 let idx = msg.find(marker)?;
299 let uuid = msg[idx + marker.len()..].trim();
300 if uuid.is_empty() {
301 None
302 } else {
303 Some(uuid.to_string())
304 }
305}
306
307#[derive(Debug)]
309pub struct EnrichedEntry {
310 pub entry: LogEntry,
311 pub session: Option<SessionSnapshot>,
313}
314
315pub struct SessionTracker<I> {
322 inner: LogStream<I>,
323 sessions: HashMap<String, SessionState>,
324}
325
326impl<I: Iterator<Item = String>> SessionTracker<I> {
327 pub fn new(inner: LogStream<I>) -> Self {
329 SessionTracker {
330 inner,
331 sessions: HashMap::new(),
332 }
333 }
334
335 pub fn sessions(&self) -> &HashMap<String, SessionState> {
337 &self.sessions
338 }
339
340 pub fn remove_session(&mut self, uuid: &str) -> Option<SessionState> {
343 self.sessions.remove(uuid)
344 }
345
346 pub fn stats(&self) -> &ParseStats {
348 self.inner.stats()
349 }
350
351 pub fn drain_unclassified(&mut self) -> Vec<UnclassifiedLine> {
353 self.inner.drain_unclassified()
354 }
355
356 fn link_legs(&mut self, uuid: &str, entry: &LogEntry) {
359 if entry.message.contains("Originate Resulted in Success") {
361 if let Some(peer_uuid) = parse_originate_success(&entry.message) {
362 let a_uuid = uuid.to_string();
363 if let Some(a_state) = self.sessions.get_mut(&a_uuid) {
364 a_state.other_leg_uuid = Some(peer_uuid.clone());
365 a_state.pending_bridge_target = None;
366 }
367 let b_state = self.sessions.entry(peer_uuid).or_default();
368 b_state.other_leg_uuid = Some(a_uuid);
369 }
370 return;
371 }
372
373 if let MessageKind::ChannelLifecycle { detail } = &entry.message_kind {
376 if let Some(channel_name) = parse_new_channel(detail) {
377 let b_uuid = uuid.to_string();
378 let mut a_uuid_found = None;
379
380 for (a_uuid, a_state) in &self.sessions {
381 if *a_uuid == b_uuid {
382 continue;
383 }
384 if a_state.other_leg_uuid.as_deref() == Some(&b_uuid) {
386 a_uuid_found = Some(a_uuid.clone());
387 break;
388 }
389 if a_state.pending_bridge_target.as_deref() == Some(channel_name.as_str()) {
391 a_uuid_found = Some(a_uuid.clone());
392 break;
393 }
394 }
395
396 if let Some(a_uuid) = a_uuid_found {
397 if let Some(a_state) = self.sessions.get_mut(&a_uuid) {
398 a_state.other_leg_uuid = Some(b_uuid.clone());
399 a_state.pending_bridge_target = None;
400 }
401 if let Some(b_state) = self.sessions.get_mut(&b_uuid) {
402 b_state.other_leg_uuid = Some(a_uuid);
403 }
404 }
405 }
406 }
407 }
408}
409
410impl<I: Iterator<Item = String>> Iterator for SessionTracker<I> {
411 type Item = EnrichedEntry;
412
413 fn next(&mut self) -> Option<EnrichedEntry> {
414 let entry = self.inner.next()?;
415
416 if entry.uuid.is_empty() {
417 return Some(EnrichedEntry {
418 entry,
419 session: None,
420 });
421 }
422
423 let uuid = entry.uuid.clone();
424 let state = self.sessions.entry(uuid.clone()).or_default();
425 state.update_from_entry(&entry);
426
427 self.link_legs(&uuid, &entry);
428
429 let snapshot = self.sessions.get(&uuid).unwrap().snapshot();
430
431 Some(EnrichedEntry {
432 entry,
433 session: Some(snapshot),
434 })
435 }
436}
437
438#[cfg(test)]
439mod tests {
440 use super::*;
441
442 const UUID1: &str = "a1b2c3d4-e5f6-7890-abcd-ef1234567890";
443 const UUID2: &str = "b2c3d4e5-f6a7-8901-bcde-f12345678901";
444 const UUID3: &str = "c3d4e5f6-a7b8-9012-cdef-234567890123";
445 const TS1: &str = "2025-01-15 10:30:45.123456";
446 const TS2: &str = "2025-01-15 10:30:46.234567";
447
448 fn full_line(uuid: &str, ts: &str, msg: &str) -> String {
449 format!("{uuid} {ts} 95.97% [DEBUG] sofia.c:100 {msg}")
450 }
451
452 fn collect_enriched(lines: Vec<String>) -> Vec<EnrichedEntry> {
453 let stream = LogStream::new(lines.into_iter());
454 SessionTracker::new(stream).collect()
455 }
456
457 #[test]
458 fn system_line_no_session() {
459 let lines = vec![format!(
460 "{TS1} 95.97% [INFO] mod_event_socket.c:1772 Event Socket command"
461 )];
462 let entries = collect_enriched(lines);
463 assert_eq!(entries.len(), 1);
464 assert!(entries[0].session.is_none());
465 }
466
467 #[test]
468 fn dialplan_context_propagation() {
469 let lines = vec![
470 full_line(UUID1, TS1, "CHANNEL_DATA:"),
471 format!("{UUID1} Channel-Name: [sofia/internal/+15550001234@192.0.2.1]"),
472 format!("{UUID1} EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 answer"),
473 format!("{UUID1} Dialplan: sofia/internal/+15550001234@192.0.2.1 parsing [public->global] continue=true"),
474 full_line(UUID1, TS2, "Some later event"),
475 ];
476 let entries = collect_enriched(lines);
477 let last = entries.last().unwrap();
478 let session = last.session.as_ref().unwrap();
479 assert_eq!(session.dialplan_context.as_deref(), Some("public"));
480 assert_eq!(session.dialplan_from.as_deref(), Some("public"));
481 assert_eq!(session.dialplan_to.as_deref(), Some("global"));
482 }
483
484 #[test]
485 fn processing_line_extracts_context() {
486 let lines = vec![full_line(
487 UUID1,
488 TS1,
489 "Processing 5551234567->5559876543 in context public",
490 )];
491 let entries = collect_enriched(lines);
492 let session = entries[0].session.as_ref().unwrap();
493 assert_eq!(session.dialplan_context.as_deref(), Some("public"));
494 assert_eq!(session.dialplan_from.as_deref(), Some("5551234567"));
495 assert_eq!(session.dialplan_to.as_deref(), Some("5559876543"));
496 }
497
498 #[test]
499 fn initial_context_preserved_across_transfers() {
500 let lines = vec![
501 full_line(
502 UUID1,
503 TS1,
504 "Processing 5551234567->5559876543 in context public",
505 ),
506 full_line(
507 UUID1,
508 TS2,
509 "Processing 5551234567->start_recording in context recordings",
510 ),
511 ];
512 let stream = LogStream::new(lines.into_iter());
513 let mut tracker = SessionTracker::new(stream);
514 let entries: Vec<_> = tracker.by_ref().collect();
515
516 let first = entries[0].session.as_ref().unwrap();
517 assert_eq!(
518 first.initial_context.as_deref(),
519 Some("public"),
520 "initial_context set on first Processing line"
521 );
522 assert_eq!(first.dialplan_context.as_deref(), Some("public"));
523
524 let state = tracker.sessions().get(UUID1).unwrap();
525 assert_eq!(
526 state.initial_context.as_deref(),
527 Some("public"),
528 "initial_context keeps the first context seen"
529 );
530 assert_eq!(
531 state.dialplan_context.as_deref(),
532 Some("recordings"),
533 "dialplan_context tracks the current context"
534 );
535 assert_eq!(state.dialplan_to.as_deref(), Some("start_recording"));
536 }
537
538 #[test]
539 fn new_channel_sets_channel_name() {
540 let lines = vec![full_line(
541 UUID1,
542 TS1,
543 "New Channel sofia/internal-v4/sos [a1b2c3d4-e5f6-7890-abcd-ef1234567890]",
544 )];
545 let entries = collect_enriched(lines);
546 let session = entries[0].session.as_ref().unwrap();
547 assert_eq!(
548 session.channel_name.as_deref(),
549 Some("sofia/internal-v4/sos")
550 );
551 }
552
553 #[test]
554 fn originate_success_links_both_legs() {
555 let lines = vec![
558 full_line(UUID2, TS1, "New Channel sofia/esinet1-v6-tcp/sip:target.example.com [b2c3d4e5-f6a7-8901-bcde-f12345678901]"),
559 full_line(UUID1, TS2, "Originate Resulted in Success: [sofia/esinet1-v6-tcp/sip:target.example.com] Peer UUID: b2c3d4e5-f6a7-8901-bcde-f12345678901"),
560 ];
561 let stream = LogStream::new(lines.into_iter());
562 let mut tracker = SessionTracker::new(stream);
563 let _: Vec<_> = tracker.by_ref().collect();
564
565 let a_leg = tracker.sessions().get(UUID1).unwrap();
566 assert_eq!(
567 a_leg.other_leg_uuid.as_deref(),
568 Some(UUID2),
569 "A-leg other_leg_uuid set from Originate Resulted in Success"
570 );
571
572 let b_leg = tracker.sessions().get(UUID2).unwrap();
573 assert_eq!(
574 b_leg.other_leg_uuid.as_deref(),
575 Some(UUID1),
576 "B-leg other_leg_uuid points back to A-leg"
577 );
578 }
579
580 #[test]
581 fn bridge_origination_uuid_links_a_leg_immediately() {
582 let lines = vec![
585 full_line(UUID1, TS1, "EXECUTE [depth=0] sofia/internal-v6/1232@[2001:db8::10] bridge([origination_uuid=b2c3d4e5-f6a7-8901-bcde-f12345678901,leg_timeout=2]sofia/esinet1-v6-tcp/sip:target.example.com)"),
586 full_line(UUID2, TS1, "New Channel sofia/esinet1-v6-tcp/sip:target.example.com [b2c3d4e5-f6a7-8901-bcde-f12345678901]"),
587 ];
588 let stream = LogStream::new(lines.into_iter());
589 let mut tracker = SessionTracker::new(stream);
590 let _: Vec<_> = tracker.by_ref().collect();
591
592 let a_leg = tracker.sessions().get(UUID1).unwrap();
593 assert_eq!(
594 a_leg.other_leg_uuid.as_deref(),
595 Some(UUID2),
596 "A-leg knows B-leg UUID from origination_uuid in bridge args"
597 );
598
599 let b_leg = tracker.sessions().get(UUID2).unwrap();
600 assert_eq!(
601 b_leg.other_leg_uuid.as_deref(),
602 Some(UUID1),
603 "B-leg knows A-leg once New Channel correlates"
604 );
605 }
606
607 #[test]
608 fn bridge_target_matches_new_channel() {
609 let lines = vec![
612 full_line(UUID1, TS1, "EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 bridge(sofia/gateway/carrier/+15559876543)"),
613 full_line(UUID1, TS1, "Parsing session specific variables"),
614 full_line(UUID2, TS1, "New Channel sofia/gateway/carrier/+15559876543 [b2c3d4e5-f6a7-8901-bcde-f12345678901]"),
615 ];
616 let stream = LogStream::new(lines.into_iter());
617 let mut tracker = SessionTracker::new(stream);
618 let _: Vec<_> = tracker.by_ref().collect();
619
620 let a_leg = tracker.sessions().get(UUID1).unwrap();
621 assert_eq!(
622 a_leg.other_leg_uuid.as_deref(),
623 Some(UUID2),
624 "A-leg linked to B-leg via bridge target matching New Channel"
625 );
626
627 let b_leg = tracker.sessions().get(UUID2).unwrap();
628 assert_eq!(
629 b_leg.other_leg_uuid.as_deref(),
630 Some(UUID1),
631 "B-leg linked back to A-leg"
632 );
633 }
634
635 #[test]
636 fn originate_success_corrects_wrong_target_match() {
637 let lines = vec![
640 full_line(UUID1, TS1, "EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 bridge(sofia/gateway/carrier/+15559876543)"),
641 full_line(UUID2, TS1, "New Channel sofia/gateway/carrier/+15559876543 [b2c3d4e5-f6a7-8901-bcde-f12345678901]"),
642 full_line(UUID1, TS2, "Originate Resulted in Success: [sofia/gateway/carrier/+15559876543] Peer UUID: c3d4e5f6-a7b8-9012-cdef-234567890123"),
643 ];
644 let stream = LogStream::new(lines.into_iter());
645 let mut tracker = SessionTracker::new(stream);
646 let _: Vec<_> = tracker.by_ref().collect();
647
648 let a_leg = tracker.sessions().get(UUID1).unwrap();
649 assert_eq!(
650 a_leg.other_leg_uuid.as_deref(),
651 Some(UUID3),
652 "Originate success overrides earlier target-match guess"
653 );
654
655 let real_b_leg = tracker.sessions().get(UUID3).unwrap();
656 assert_eq!(
657 real_b_leg.other_leg_uuid.as_deref(),
658 Some(UUID1),
659 "Real B-leg points back to A-leg"
660 );
661 }
662
663 #[test]
664 fn channel_data_other_leg_uuid() {
665 let lines = vec![
667 full_line(UUID1, TS1, "CHANNEL_DATA:"),
668 format!("{UUID1} Other-Leg-Unique-ID: [{UUID2}]"),
669 ];
670 let stream = LogStream::new(lines.into_iter());
671 let mut tracker = SessionTracker::new(stream);
672 let _: Vec<_> = tracker.by_ref().collect();
673
674 let state = tracker.sessions().get(UUID1).unwrap();
675 assert_eq!(
676 state.other_leg_uuid.as_deref(),
677 Some(UUID2),
678 "other_leg_uuid set from Other-Leg-Unique-ID CHANNEL_DATA field"
679 );
680 }
681
682 #[test]
683 fn channel_data_populates_session() {
684 let lines = vec![
685 full_line(UUID1, TS1, "CHANNEL_DATA:"),
686 format!("{UUID1} Channel-Name: [sofia/internal/+15550001234@192.0.2.1]"),
687 format!("{UUID1} Channel-State: [CS_EXECUTE]"),
688 "variable_sip_call_id: [test123@192.0.2.1]".to_string(),
689 "variable_direction: [inbound]".to_string(),
690 ];
691 let entries = collect_enriched(lines);
692 assert_eq!(entries.len(), 1);
693 let session = entries[0].session.as_ref().unwrap();
694 assert_eq!(
695 session.channel_name.as_deref(),
696 Some("sofia/internal/+15550001234@192.0.2.1")
697 );
698 assert_eq!(session.channel_state.as_deref(), Some("CS_EXECUTE"));
699 }
700
701 #[test]
702 fn variables_learned_from_channel_data() {
703 let lines = vec![
704 full_line(UUID1, TS1, "CHANNEL_DATA:"),
705 "variable_sip_call_id: [test123@192.0.2.1]".to_string(),
706 "variable_direction: [inbound]".to_string(),
707 ];
708 let stream = LogStream::new(lines.into_iter());
709 let mut tracker = SessionTracker::new(stream);
710 let _: Vec<_> = tracker.by_ref().collect();
711 let state = tracker.sessions().get(UUID1).unwrap();
712 assert_eq!(
713 state.variables.get("sip_call_id").map(|s| s.as_str()),
714 Some("test123@192.0.2.1")
715 );
716 assert_eq!(
717 state.variables.get("direction").map(|s| s.as_str()),
718 Some("inbound")
719 );
720 }
721
722 #[test]
723 fn variables_learned_from_set_execute() {
724 let lines = vec![
725 full_line(UUID1, TS1, "First"),
726 format!("{UUID1} EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 set(call_direction=inbound)"),
727 full_line(UUID1, TS2, "After set"),
728 ];
729 let stream = LogStream::new(lines.into_iter());
730 let mut tracker = SessionTracker::new(stream);
731 let entries: Vec<_> = tracker.by_ref().collect();
732 assert_eq!(entries.len(), 3);
733 let state = tracker.sessions().get(UUID1).unwrap();
734 assert_eq!(
735 state.variables.get("call_direction").map(|s| s.as_str()),
736 Some("inbound")
737 );
738 }
739
740 #[test]
741 fn variables_learned_from_export_execute() {
742 let lines = vec![
743 full_line(UUID1, TS1, "First"),
744 format!("{UUID1} EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 export(originate_timeout=3600)"),
745 ];
746 let stream = LogStream::new(lines.into_iter());
747 let mut tracker = SessionTracker::new(stream);
748 let _: Vec<_> = tracker.by_ref().collect();
749 let state = tracker.sessions().get(UUID1).unwrap();
750 assert_eq!(
751 state.variables.get("originate_timeout").map(|s| s.as_str()),
752 Some("3600")
753 );
754 }
755
756 #[test]
757 fn session_isolation_between_uuids() {
758 let lines = vec![
759 full_line(
760 UUID1,
761 TS1,
762 "Processing 5551111111->5552222222 in context public",
763 ),
764 full_line(
765 UUID2,
766 TS2,
767 "Processing 5553333333->5554444444 in context private",
768 ),
769 ];
770 let stream = LogStream::new(lines.into_iter());
771 let mut tracker = SessionTracker::new(stream);
772 let _: Vec<_> = tracker.by_ref().collect();
773 let s1 = tracker.sessions().get(UUID1).unwrap();
774 let s2 = tracker.sessions().get(UUID2).unwrap();
775 assert_eq!(s1.dialplan_context.as_deref(), Some("public"));
776 assert_eq!(s2.dialplan_context.as_deref(), Some("private"));
777 assert_eq!(s1.dialplan_from.as_deref(), Some("5551111111"));
778 assert_eq!(s2.dialplan_from.as_deref(), Some("5553333333"));
779 }
780
781 #[test]
782 fn processing_line_with_regex_type_and_angle_bracket_caller() {
783 let lines = vec![full_line(
784 UUID1,
785 TS1,
786 "Processing Emergency S R <5550001234>->start_recording in context recordings",
787 )];
788 let entries = collect_enriched(lines);
789 let session = entries[0].session.as_ref().unwrap();
790 assert_eq!(session.initial_context.as_deref(), Some("recordings"));
791 assert_eq!(session.dialplan_context.as_deref(), Some("recordings"));
792 assert_eq!(
793 session.dialplan_from.as_deref(),
794 Some("Emergency S R <5550001234>")
795 );
796 assert_eq!(session.dialplan_to.as_deref(), Some("start_recording"));
797 }
798
799 #[test]
800 fn processing_line_extension_format() {
801 let lines = vec![full_line(
802 UUID1,
803 TS1,
804 "Processing Extension 1263 <1263>->start_recording in context recordings",
805 )];
806 let entries = collect_enriched(lines);
807 let session = entries[0].session.as_ref().unwrap();
808 assert_eq!(session.initial_context.as_deref(), Some("recordings"));
809 assert_eq!(
810 session.dialplan_from.as_deref(),
811 Some("Extension 1263 <1263>")
812 );
813 assert_eq!(session.dialplan_to.as_deref(), Some("start_recording"));
814 }
815
816 #[test]
817 fn state_change_updates_channel_state() {
818 let lines = vec![full_line(UUID1, TS1, "State Change CS_INIT -> CS_ROUTING")];
819 let entries = collect_enriched(lines);
820 let session = entries[0].session.as_ref().unwrap();
821 assert_eq!(session.channel_state.as_deref(), Some("CS_ROUTING"));
822 }
823
824 #[test]
825 fn callstate_change_updates_channel_state() {
826 let lines = vec![full_line(
827 UUID1,
828 TS1,
829 "(sofia/internal-v4/sos) Callstate Change DOWN -> RINGING",
830 )];
831 let entries = collect_enriched(lines);
832 let session = entries[0].session.as_ref().unwrap();
833 assert_eq!(session.channel_state.as_deref(), Some("RINGING"));
834 }
835
836 #[test]
837 fn state_change_overrides_callstate() {
838 let lines = vec![
839 full_line(
840 UUID1,
841 TS1,
842 "(sofia/internal-v4/sos) Callstate Change DOWN -> RINGING",
843 ),
844 full_line(
845 UUID1,
846 TS2,
847 "(sofia/internal-v4/sos) State Change CS_CONSUME_MEDIA -> CS_EXCHANGE_MEDIA",
848 ),
849 ];
850 let entries = collect_enriched(lines);
851 assert_eq!(
852 entries[0]
853 .session
854 .as_ref()
855 .unwrap()
856 .channel_state
857 .as_deref(),
858 Some("RINGING")
859 );
860 assert_eq!(
861 entries[1]
862 .session
863 .as_ref()
864 .unwrap()
865 .channel_state
866 .as_deref(),
867 Some("CS_EXCHANGE_MEDIA")
868 );
869 }
870
871 #[test]
872 fn bleg_lifecycle_extracts_data_from_processing() {
873 let lines = vec![
874 full_line(
875 UUID1,
876 TS1,
877 "New Channel sofia/internal-v4/sos [a1b2c3d4-e5f6-7890-abcd-ef1234567890]",
878 ),
879 full_line(
880 UUID1,
881 TS1,
882 "(sofia/internal-v4/sos) State Change CS_NEW -> CS_INIT",
883 ),
884 full_line(
885 UUID1,
886 TS1,
887 "(sofia/internal-v4/sos) State Change CS_INIT -> CS_ROUTING",
888 ),
889 full_line(
890 UUID1,
891 TS1,
892 "(sofia/internal-v4/sos) State Change CS_ROUTING -> CS_CONSUME_MEDIA",
893 ),
894 full_line(
895 UUID1,
896 TS1,
897 "(sofia/internal-v4/sos) Callstate Change DOWN -> RINGING",
898 ),
899 full_line(
900 UUID1,
901 TS2,
902 "(sofia/internal-v4/sos) State Change CS_CONSUME_MEDIA -> CS_EXCHANGE_MEDIA",
903 ),
904 full_line(
905 UUID1,
906 TS2,
907 "Processing Emergency S R <5550001234>->start_recording in context recordings",
908 ),
909 full_line(
910 UUID1,
911 TS2,
912 "(sofia/internal-v4/sos) State Change CS_EXCHANGE_MEDIA -> CS_HANGUP",
913 ),
914 ];
915 let entries = collect_enriched(lines);
916
917 let after_ringing = entries[4].session.as_ref().unwrap();
918 assert_eq!(after_ringing.channel_state.as_deref(), Some("RINGING"));
919 assert!(after_ringing.initial_context.is_none());
920
921 let after_processing = entries[6].session.as_ref().unwrap();
922 assert_eq!(
923 after_processing.channel_state.as_deref(),
924 Some("CS_EXCHANGE_MEDIA")
925 );
926 assert_eq!(
927 after_processing.initial_context.as_deref(),
928 Some("recordings")
929 );
930 assert_eq!(
931 after_processing.dialplan_from.as_deref(),
932 Some("Emergency S R <5550001234>")
933 );
934 assert_eq!(
935 after_processing.dialplan_to.as_deref(),
936 Some("start_recording")
937 );
938
939 let after_hangup = entries[7].session.as_ref().unwrap();
940 assert_eq!(after_hangup.channel_state.as_deref(), Some("CS_HANGUP"));
941 assert_eq!(after_hangup.initial_context.as_deref(), Some("recordings"));
942 }
943
944 #[test]
945 fn channel_name_from_new_channel() {
946 let lines = vec![full_line(
947 UUID1,
948 TS1,
949 "New Channel sofia/internal-v4/sos [a1b2c3d4-e5f6-7890-abcd-ef1234567890]",
950 )];
951 let entries = collect_enriched(lines);
952 let session = entries[0].session.as_ref().unwrap();
953 assert_eq!(
954 session.channel_name.as_deref(),
955 Some("sofia/internal-v4/sos")
956 );
957 }
958
959 #[test]
960 fn remove_session() {
961 let lines = vec![full_line(
962 UUID1,
963 TS1,
964 "Processing 5551111111->5552222222 in context public",
965 )];
966 let stream = LogStream::new(lines.into_iter());
967 let mut tracker = SessionTracker::new(stream);
968 let _: Vec<_> = tracker.by_ref().collect();
969 assert!(tracker.sessions().contains_key(UUID1));
970 let removed = tracker.remove_session(UUID1).unwrap();
971 assert_eq!(removed.dialplan_context.as_deref(), Some("public"));
972 assert!(!tracker.sessions().contains_key(UUID1));
973 }
974
975 #[test]
976 fn stats_delegation() {
977 let lines = vec![
978 full_line(UUID1, TS1, "First"),
979 full_line(UUID1, TS2, "Second"),
980 ];
981 let stream = LogStream::new(lines.into_iter());
982 let mut tracker = SessionTracker::new(stream);
983 let _: Vec<_> = tracker.by_ref().collect();
984 assert_eq!(tracker.stats().lines_processed, 2);
985 }
986
987 #[test]
988 fn snapshot_reflects_cumulative_state() {
989 let lines = vec![
990 full_line(UUID1, TS1, "CHANNEL_DATA:"),
991 format!("{UUID1} Channel-Name: [sofia/internal/+15550001234@192.0.2.1]"),
992 format!("{UUID1} EXECUTE [depth=0] sofia/internal/+15550001234@192.0.2.1 set(foo=bar)"),
993 full_line(
994 UUID1,
995 TS2,
996 "Processing 5551111111->5552222222 in context public",
997 ),
998 ];
999 let entries = collect_enriched(lines);
1000 assert_eq!(entries.len(), 3);
1001 let first = entries[0].session.as_ref().unwrap();
1002 assert_eq!(
1003 first.channel_name.as_deref(),
1004 Some("sofia/internal/+15550001234@192.0.2.1"),
1005 );
1006 assert!(first.dialplan_context.is_none());
1007
1008 let last = entries[2].session.as_ref().unwrap();
1009 assert_eq!(
1010 last.channel_name.as_deref(),
1011 Some("sofia/internal/+15550001234@192.0.2.1"),
1012 );
1013 assert_eq!(last.dialplan_context.as_deref(), Some("public"));
1014 }
1015}