Skip to main content

asterisk_rs_ami/
tracker.rs

1//! call correlation engine — tracks AMI events by UniqueID into call lifecycle objects.
2
3use std::collections::HashMap;
4use std::time::{Duration, Instant};
5
6use tokio::sync::{mpsc, watch};
7
8use crate::event::AmiEvent;
9use asterisk_rs_core::event::EventSubscription;
10
11/// a fully resolved call with all collected events
12#[derive(Debug, Clone)]
13pub struct CompletedCall {
14    /// channel name at creation
15    pub channel: String,
16    /// per-channel unique identifier
17    pub unique_id: String,
18    /// links bridged channels together
19    pub linked_id: String,
20    /// when the channel was created
21    pub start_time: Instant,
22    /// when the channel hung up
23    pub end_time: Instant,
24    /// total call duration
25    pub duration: Duration,
26    /// hangup cause code
27    pub cause: u32,
28    /// hangup cause description
29    pub cause_txt: String,
30    /// all events collected during this call's lifetime
31    pub events: Vec<AmiEvent>,
32}
33
34/// tracks an in-progress call
35struct ActiveCall {
36    channel: String,
37    unique_id: String,
38    linked_id: String,
39    start_time: Instant,
40    events: Vec<AmiEvent>,
41}
42
43/// correlates AMI events by UniqueID into complete call records
44///
45/// spawns a background task that consumes events from an EventSubscription,
46/// tracks active calls, and emits CompletedCall records when channels hang up.
47pub struct CallTracker {
48    shutdown_tx: watch::Sender<bool>,
49    task_handle: tokio::task::JoinHandle<()>,
50}
51
52impl std::fmt::Debug for CallTracker {
53    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54        f.debug_struct("CallTracker").finish_non_exhaustive()
55    }
56}
57
58impl CallTracker {
59    /// create a tracker that consumes events and produces completed call records
60    pub fn new(subscription: EventSubscription<AmiEvent>) -> (Self, mpsc::Receiver<CompletedCall>) {
61        let (completed_tx, completed_rx) = mpsc::channel(256);
62        let (shutdown_tx, shutdown_rx) = watch::channel(false);
63
64        let task_handle = tokio::spawn(track_loop(
65            subscription,
66            completed_tx,
67            shutdown_rx,
68            DEFAULT_CALL_TTL,
69        ));
70
71        let tracker = Self {
72            shutdown_tx,
73            task_handle,
74        };
75
76        (tracker, completed_rx)
77    }
78
79    /// stop the background tracking task
80    pub fn shutdown(&self) {
81        let _ = self.shutdown_tx.send(true);
82        self.task_handle.abort();
83    }
84}
85
86impl Drop for CallTracker {
87    fn drop(&mut self) {
88        self.shutdown();
89    }
90}
91
92const DEFAULT_CALL_TTL: Duration = Duration::from_secs(3600);
93
94async fn track_loop(
95    mut subscription: EventSubscription<AmiEvent>,
96    completed_tx: mpsc::Sender<CompletedCall>,
97    mut shutdown_rx: watch::Receiver<bool>,
98    ttl: Duration,
99) {
100    let mut active: HashMap<String, ActiveCall> = HashMap::new();
101
102    loop {
103        tokio::select! {
104            event = subscription.recv() => {
105                let Some(event) = event else { break };
106                evict_stale(&mut active, &completed_tx, ttl);
107                handle_event(&mut active, &completed_tx, event);
108            }
109            _ = shutdown_rx.changed() => {
110                break;
111            }
112        }
113    }
114}
115
116fn handle_event(
117    active: &mut HashMap<String, ActiveCall>,
118    completed_tx: &mpsc::Sender<CompletedCall>,
119    event: AmiEvent,
120) {
121    // handle new channel creation
122    if let AmiEvent::NewChannel {
123        ref channel,
124        ref unique_id,
125        ref linked_id,
126        ..
127    } = event
128    {
129        let call = ActiveCall {
130            channel: channel.clone(),
131            unique_id: unique_id.clone(),
132            linked_id: linked_id.clone(),
133            start_time: Instant::now(),
134            events: vec![event],
135        };
136        active.insert(call.unique_id.clone(), call);
137        return;
138    }
139
140    // rename — update the tracked channel name before appending
141    if let AmiEvent::Rename {
142        ref unique_id,
143        ref new_name,
144        ..
145    } = event
146    {
147        if let Some(call) = active.get_mut(unique_id.as_str()) {
148            call.channel = new_name.clone();
149            call.events.push(event);
150        }
151        return;
152    }
153
154    // handle hangup — finalize the call
155    if let AmiEvent::Hangup {
156        ref unique_id,
157        cause,
158        ref cause_txt,
159        ..
160    } = event
161    {
162        if let Some(mut call) = active.remove(unique_id.as_str()) {
163            let end_time = Instant::now();
164            let cause_txt = cause_txt.clone();
165            call.events.push(event);
166            let completed = CompletedCall {
167                channel: call.channel,
168                unique_id: call.unique_id,
169                linked_id: call.linked_id,
170                start_time: call.start_time,
171                end_time,
172                duration: end_time.duration_since(call.start_time),
173                cause,
174                cause_txt,
175                events: call.events,
176            };
177            // receiver may have been dropped or channel full — drop rather than block the tracker
178            if completed_tx.try_send(completed).is_err() {
179                tracing::warn!("completed_tx full or closed, dropping completed call");
180            }
181        }
182        return;
183    }
184
185    // for all other events, append to the matching active call if tracked
186    if let Some(uid) = extract_unique_id(&event) {
187        if let Some(call) = active.get_mut(uid) {
188            call.events.push(event);
189        }
190    }
191}
192
193/// extract the unique_id field from an event, if present
194fn extract_unique_id(event: &AmiEvent) -> Option<&str> {
195    match event {
196        // variants with a unique_id: String field
197        AmiEvent::NewChannel { unique_id, .. }
198        | AmiEvent::Hangup { unique_id, .. }
199        | AmiEvent::Newstate { unique_id, .. }
200        | AmiEvent::DialBegin { unique_id, .. }
201        | AmiEvent::DialEnd { unique_id, .. }
202        | AmiEvent::DtmfBegin { unique_id, .. }
203        | AmiEvent::DtmfEnd { unique_id, .. }
204        | AmiEvent::BridgeEnter { unique_id, .. }
205        | AmiEvent::BridgeLeave { unique_id, .. }
206        | AmiEvent::VarSet { unique_id, .. }
207        | AmiEvent::Hold { unique_id, .. }
208        | AmiEvent::Unhold { unique_id, .. }
209        | AmiEvent::HangupRequest { unique_id, .. }
210        | AmiEvent::SoftHangupRequest { unique_id, .. }
211        | AmiEvent::NewExten { unique_id, .. }
212        | AmiEvent::NewCallerid { unique_id, .. }
213        | AmiEvent::NewConnectedLine { unique_id, .. }
214        | AmiEvent::NewAccountCode { unique_id, .. }
215        | AmiEvent::Rename { unique_id, .. }
216        | AmiEvent::OriginateResponse { unique_id, .. }
217        | AmiEvent::DialState { unique_id, .. }
218        | AmiEvent::Flash { unique_id, .. }
219        | AmiEvent::Wink { unique_id, .. }
220        | AmiEvent::BridgeInfoChannel { unique_id, .. }
221        | AmiEvent::LocalBridge { unique_id, .. }
222        | AmiEvent::LocalOptimizationBegin { unique_id, .. }
223        | AmiEvent::LocalOptimizationEnd { unique_id, .. }
224        | AmiEvent::Cdr { unique_id, .. }
225        | AmiEvent::Cel { unique_id, .. }
226        | AmiEvent::QueueCallerAbandon { unique_id, .. }
227        | AmiEvent::QueueCallerJoin { unique_id, .. }
228        | AmiEvent::QueueCallerLeave { unique_id, .. }
229        | AmiEvent::QueueEntry { unique_id, .. }
230        | AmiEvent::AgentCalled { unique_id, .. }
231        | AmiEvent::AgentConnect { unique_id, .. }
232        | AmiEvent::AgentComplete { unique_id, .. }
233        | AmiEvent::AgentDump { unique_id, .. }
234        | AmiEvent::AgentLogin { unique_id, .. }
235        | AmiEvent::AgentRingNoAnswer { unique_id, .. }
236        | AmiEvent::ConfbridgeJoin { unique_id, .. }
237        | AmiEvent::ConfbridgeLeave { unique_id, .. }
238        | AmiEvent::ConfbridgeList { unique_id, .. }
239        | AmiEvent::ConfbridgeMute { unique_id, .. }
240        | AmiEvent::ConfbridgeUnmute { unique_id, .. }
241        | AmiEvent::ConfbridgeTalking { unique_id, .. }
242        | AmiEvent::MixMonitorStart { unique_id, .. }
243        | AmiEvent::MixMonitorStop { unique_id, .. }
244        | AmiEvent::MixMonitorMute { unique_id, .. }
245        | AmiEvent::MusicOnHoldStart { unique_id, .. }
246        | AmiEvent::MusicOnHoldStop { unique_id, .. }
247        | AmiEvent::ParkedCall { unique_id, .. }
248        | AmiEvent::ParkedCallGiveUp { unique_id, .. }
249        | AmiEvent::ParkedCallTimeOut { unique_id, .. }
250        | AmiEvent::ParkedCallSwap { unique_id, .. }
251        | AmiEvent::UnParkedCall { unique_id, .. }
252        | AmiEvent::Pickup { unique_id, .. }
253        | AmiEvent::ChanSpyStart { unique_id, .. }
254        | AmiEvent::ChanSpyStop { unique_id, .. }
255        | AmiEvent::ChannelTalkingStart { unique_id, .. }
256        | AmiEvent::ChannelTalkingStop { unique_id, .. }
257        | AmiEvent::RTCPReceived { unique_id, .. }
258        | AmiEvent::RTCPSent { unique_id, .. }
259        | AmiEvent::AsyncAGIStart { unique_id, .. }
260        | AmiEvent::AsyncAGIExec { unique_id, .. }
261        | AmiEvent::AsyncAGIEnd { unique_id, .. }
262        | AmiEvent::AGIExecStart { unique_id, .. }
263        | AmiEvent::AGIExecEnd { unique_id, .. }
264        | AmiEvent::HangupHandlerPush { unique_id, .. }
265        | AmiEvent::HangupHandlerPop { unique_id, .. }
266        | AmiEvent::HangupHandlerRun { unique_id, .. }
267        | AmiEvent::Status { unique_id, .. }
268        | AmiEvent::CoreShowChannel { unique_id, .. }
269        | AmiEvent::AocD { unique_id, .. }
270        | AmiEvent::AocE { unique_id, .. }
271        | AmiEvent::AocS { unique_id, .. }
272        | AmiEvent::FAXStatus { unique_id, .. }
273        | AmiEvent::ReceiveFAX { unique_id, .. }
274        | AmiEvent::SendFAX { unique_id, .. }
275        | AmiEvent::MeetmeJoin { unique_id, .. }
276        | AmiEvent::MeetmeLeave { unique_id, .. }
277        | AmiEvent::MeetmeMute { unique_id, .. }
278        | AmiEvent::MeetmeTalking { unique_id, .. }
279        | AmiEvent::MeetmeTalkRequest { unique_id, .. }
280        | AmiEvent::MeetmeList { unique_id, .. }
281        | AmiEvent::MiniVoiceMail { unique_id, .. }
282        | AmiEvent::FAXSession { unique_id, .. }
283        | AmiEvent::MCID { unique_id, .. } => Some(unique_id.as_str()),
284
285        // transferer_unique_id — not named unique_id but still useful
286        AmiEvent::AttendedTransfer {
287            transferer_unique_id,
288            ..
289        } => Some(transferer_unique_id.as_str()),
290        AmiEvent::BlindTransfer {
291            transferer_unique_id,
292            ..
293        } => Some(transferer_unique_id.as_str()),
294
295        // unique_id is Option<String>
296        AmiEvent::UserEvent { unique_id, .. } => unique_id.as_deref(),
297        AmiEvent::DAHDIChannel { unique_id, .. } => unique_id.as_deref(),
298
299        // variants without unique_id
300        AmiEvent::FullyBooted { .. }
301        | AmiEvent::PeerStatus { .. }
302        | AmiEvent::BridgeCreate { .. }
303        | AmiEvent::BridgeDestroy { .. }
304        | AmiEvent::BridgeMerge { .. }
305        | AmiEvent::BridgeInfoComplete { .. }
306        | AmiEvent::BridgeVideoSourceUpdate { .. }
307        | AmiEvent::QueueMemberAdded { .. }
308        | AmiEvent::QueueMemberRemoved { .. }
309        | AmiEvent::QueueMemberPause { .. }
310        | AmiEvent::QueueMemberStatus { .. }
311        | AmiEvent::QueueMemberPenalty { .. }
312        | AmiEvent::QueueMemberRinginuse { .. }
313        | AmiEvent::QueueParams { .. }
314        | AmiEvent::AgentLogoff { .. }
315        | AmiEvent::Agents { .. }
316        | AmiEvent::AgentsComplete
317        | AmiEvent::ConfbridgeStart { .. }
318        | AmiEvent::ConfbridgeEnd { .. }
319        | AmiEvent::ConfbridgeRecord { .. }
320        | AmiEvent::ConfbridgeStopRecord { .. }
321        | AmiEvent::ConfbridgeListRooms { .. }
322        | AmiEvent::DeviceStateChange { .. }
323        | AmiEvent::ExtensionStatus { .. }
324        | AmiEvent::PresenceStateChange { .. }
325        | AmiEvent::PresenceStatus { .. }
326        | AmiEvent::ContactStatus { .. }
327        | AmiEvent::Registry { .. }
328        | AmiEvent::MessageWaiting { .. }
329        | AmiEvent::VoicemailPasswordChange { .. }
330        | AmiEvent::FailedACL { .. }
331        | AmiEvent::InvalidAccountID { .. }
332        | AmiEvent::InvalidPassword { .. }
333        | AmiEvent::ChallengeResponseFailed { .. }
334        | AmiEvent::ChallengeSent { .. }
335        | AmiEvent::SuccessfulAuth { .. }
336        | AmiEvent::SessionLimit { .. }
337        | AmiEvent::UnexpectedAddress { .. }
338        | AmiEvent::RequestBadFormat { .. }
339        | AmiEvent::RequestNotAllowed { .. }
340        | AmiEvent::RequestNotSupported { .. }
341        | AmiEvent::InvalidTransport { .. }
342        | AmiEvent::AuthMethodNotAllowed { .. }
343        | AmiEvent::Shutdown { .. }
344        | AmiEvent::Reload { .. }
345        | AmiEvent::Load { .. }
346        | AmiEvent::Unload { .. }
347        | AmiEvent::LogChannel { .. }
348        | AmiEvent::LoadAverageLimit
349        | AmiEvent::MemoryLimit
350        | AmiEvent::StatusComplete { .. }
351        | AmiEvent::CoreShowChannelsComplete { .. }
352        | AmiEvent::CoreShowChannelMapComplete
353        | AmiEvent::Alarm { .. }
354        | AmiEvent::AlarmClear { .. }
355        | AmiEvent::SpanAlarm { .. }
356        | AmiEvent::SpanAlarmClear { .. }
357        | AmiEvent::MeetmeEnd { .. }
358        | AmiEvent::MeetmeListRooms { .. }
359        | AmiEvent::DeviceStateListComplete { .. }
360        | AmiEvent::ExtensionStateListComplete { .. }
361        | AmiEvent::PresenceStateListComplete { .. }
362        | AmiEvent::AorDetail { .. }
363        | AmiEvent::AorList { .. }
364        | AmiEvent::AorListComplete { .. }
365        | AmiEvent::AuthDetail { .. }
366        | AmiEvent::AuthList { .. }
367        | AmiEvent::AuthListComplete { .. }
368        | AmiEvent::ContactList { .. }
369        | AmiEvent::ContactListComplete { .. }
370        | AmiEvent::ContactStatusDetail { .. }
371        | AmiEvent::EndpointDetail { .. }
372        | AmiEvent::EndpointDetailComplete { .. }
373        | AmiEvent::EndpointList { .. }
374        | AmiEvent::EndpointListComplete { .. }
375        | AmiEvent::IdentifyDetail { .. }
376        | AmiEvent::TransportDetail { .. }
377        | AmiEvent::ResourceListDetail { .. }
378        | AmiEvent::InboundRegistrationDetail { .. }
379        | AmiEvent::OutboundRegistrationDetail { .. }
380        | AmiEvent::InboundSubscriptionDetail { .. }
381        | AmiEvent::OutboundSubscriptionDetail { .. }
382        | AmiEvent::MWIGet { .. }
383        | AmiEvent::MWIGetComplete { .. }
384        | AmiEvent::FAXSessionsEntry { .. }
385        | AmiEvent::FAXSessionsComplete { .. }
386        | AmiEvent::FAXStats { .. }
387        | AmiEvent::DNDState { .. }
388        | AmiEvent::DeadlockStart
389        | AmiEvent::Unknown { .. } => None,
390    }
391}
392
393/// emit completed records for calls that have exceeded the maximum tracked age
394///
395/// called on every incoming event so the sweep cost is proportional to churn, not wall-clock time.
396/// stale calls are emitted with cause 0 and a synthetic cause_txt so callers can distinguish them
397/// from normal hangups.
398fn evict_stale(
399    active: &mut HashMap<String, ActiveCall>,
400    completed_tx: &mpsc::Sender<CompletedCall>,
401    ttl: Duration,
402) {
403    let now = Instant::now();
404    active.retain(|_, call| {
405        if now.duration_since(call.start_time) <= ttl {
406            return true;
407        }
408        let completed = CompletedCall {
409            channel: call.channel.clone(),
410            unique_id: call.unique_id.clone(),
411            linked_id: call.linked_id.clone(),
412            start_time: call.start_time,
413            end_time: now,
414            duration: now.duration_since(call.start_time),
415            cause: 0,
416            cause_txt: "ttl eviction: no hangup received".to_string(),
417            events: std::mem::take(&mut call.events),
418        };
419        if completed_tx.try_send(completed).is_err() {
420            tracing::warn!(unique_id = %call.unique_id, "completed_tx full, dropping stale evicted call");
421        }
422        false
423    });
424}