1use std::collections::HashMap;
4use std::time::{Duration, Instant};
5
6use tokio::sync::{mpsc, watch};
7
8use crate::event::AmiEvent;
9use asterisk_rs_core::event::EventSubscription;
10
11#[derive(Debug, Clone)]
13pub struct CompletedCall {
14 pub channel: String,
16 pub unique_id: String,
18 pub linked_id: String,
20 pub start_time: Instant,
22 pub end_time: Instant,
24 pub duration: Duration,
26 pub cause: u32,
28 pub cause_txt: String,
30 pub events: Vec<AmiEvent>,
32}
33
34struct ActiveCall {
36 channel: String,
37 unique_id: String,
38 linked_id: String,
39 start_time: Instant,
40 events: Vec<AmiEvent>,
41}
42
43pub struct CallTracker {
48 shutdown_tx: watch::Sender<bool>,
49 task_handle: tokio::task::JoinHandle<()>,
50}
51
52impl std::fmt::Debug for CallTracker {
53 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54 f.debug_struct("CallTracker").finish_non_exhaustive()
55 }
56}
57
58impl CallTracker {
59 pub fn new(subscription: EventSubscription<AmiEvent>) -> (Self, mpsc::Receiver<CompletedCall>) {
61 let (completed_tx, completed_rx) = mpsc::channel(256);
62 let (shutdown_tx, shutdown_rx) = watch::channel(false);
63
64 let task_handle = tokio::spawn(track_loop(
65 subscription,
66 completed_tx,
67 shutdown_rx,
68 DEFAULT_CALL_TTL,
69 ));
70
71 let tracker = Self {
72 shutdown_tx,
73 task_handle,
74 };
75
76 (tracker, completed_rx)
77 }
78
79 pub fn shutdown(&self) {
81 let _ = self.shutdown_tx.send(true);
82 self.task_handle.abort();
83 }
84}
85
86impl Drop for CallTracker {
87 fn drop(&mut self) {
88 self.shutdown();
89 }
90}
91
92const DEFAULT_CALL_TTL: Duration = Duration::from_secs(3600);
93
94async fn track_loop(
95 mut subscription: EventSubscription<AmiEvent>,
96 completed_tx: mpsc::Sender<CompletedCall>,
97 mut shutdown_rx: watch::Receiver<bool>,
98 ttl: Duration,
99) {
100 let mut active: HashMap<String, ActiveCall> = HashMap::new();
101
102 loop {
103 tokio::select! {
104 event = subscription.recv() => {
105 let Some(event) = event else { break };
106 evict_stale(&mut active, &completed_tx, ttl);
107 handle_event(&mut active, &completed_tx, event);
108 }
109 _ = shutdown_rx.changed() => {
110 break;
111 }
112 }
113 }
114}
115
116fn handle_event(
117 active: &mut HashMap<String, ActiveCall>,
118 completed_tx: &mpsc::Sender<CompletedCall>,
119 event: AmiEvent,
120) {
121 if let AmiEvent::NewChannel {
123 ref channel,
124 ref unique_id,
125 ref linked_id,
126 ..
127 } = event
128 {
129 let call = ActiveCall {
130 channel: channel.clone(),
131 unique_id: unique_id.clone(),
132 linked_id: linked_id.clone(),
133 start_time: Instant::now(),
134 events: vec![event],
135 };
136 active.insert(call.unique_id.clone(), call);
137 return;
138 }
139
140 if let AmiEvent::Rename {
142 ref unique_id,
143 ref new_name,
144 ..
145 } = event
146 {
147 if let Some(call) = active.get_mut(unique_id.as_str()) {
148 call.channel = new_name.clone();
149 call.events.push(event);
150 }
151 return;
152 }
153
154 if let AmiEvent::Hangup {
156 ref unique_id,
157 cause,
158 ref cause_txt,
159 ..
160 } = event
161 {
162 if let Some(mut call) = active.remove(unique_id.as_str()) {
163 let end_time = Instant::now();
164 let cause_txt = cause_txt.clone();
165 call.events.push(event);
166 let completed = CompletedCall {
167 channel: call.channel,
168 unique_id: call.unique_id,
169 linked_id: call.linked_id,
170 start_time: call.start_time,
171 end_time,
172 duration: end_time.duration_since(call.start_time),
173 cause,
174 cause_txt,
175 events: call.events,
176 };
177 if completed_tx.try_send(completed).is_err() {
179 tracing::warn!("completed_tx full or closed, dropping completed call");
180 }
181 }
182 return;
183 }
184
185 if let Some(uid) = extract_unique_id(&event) {
187 if let Some(call) = active.get_mut(uid) {
188 call.events.push(event);
189 }
190 }
191}
192
193fn extract_unique_id(event: &AmiEvent) -> Option<&str> {
195 match event {
196 AmiEvent::NewChannel { unique_id, .. }
198 | AmiEvent::Hangup { unique_id, .. }
199 | AmiEvent::Newstate { unique_id, .. }
200 | AmiEvent::DialBegin { unique_id, .. }
201 | AmiEvent::DialEnd { unique_id, .. }
202 | AmiEvent::DtmfBegin { unique_id, .. }
203 | AmiEvent::DtmfEnd { unique_id, .. }
204 | AmiEvent::BridgeEnter { unique_id, .. }
205 | AmiEvent::BridgeLeave { unique_id, .. }
206 | AmiEvent::VarSet { unique_id, .. }
207 | AmiEvent::Hold { unique_id, .. }
208 | AmiEvent::Unhold { unique_id, .. }
209 | AmiEvent::HangupRequest { unique_id, .. }
210 | AmiEvent::SoftHangupRequest { unique_id, .. }
211 | AmiEvent::NewExten { unique_id, .. }
212 | AmiEvent::NewCallerid { unique_id, .. }
213 | AmiEvent::NewConnectedLine { unique_id, .. }
214 | AmiEvent::NewAccountCode { unique_id, .. }
215 | AmiEvent::Rename { unique_id, .. }
216 | AmiEvent::OriginateResponse { unique_id, .. }
217 | AmiEvent::DialState { unique_id, .. }
218 | AmiEvent::Flash { unique_id, .. }
219 | AmiEvent::Wink { unique_id, .. }
220 | AmiEvent::BridgeInfoChannel { unique_id, .. }
221 | AmiEvent::LocalBridge { unique_id, .. }
222 | AmiEvent::LocalOptimizationBegin { unique_id, .. }
223 | AmiEvent::LocalOptimizationEnd { unique_id, .. }
224 | AmiEvent::Cdr { unique_id, .. }
225 | AmiEvent::Cel { unique_id, .. }
226 | AmiEvent::QueueCallerAbandon { unique_id, .. }
227 | AmiEvent::QueueCallerJoin { unique_id, .. }
228 | AmiEvent::QueueCallerLeave { unique_id, .. }
229 | AmiEvent::QueueEntry { unique_id, .. }
230 | AmiEvent::AgentCalled { unique_id, .. }
231 | AmiEvent::AgentConnect { unique_id, .. }
232 | AmiEvent::AgentComplete { unique_id, .. }
233 | AmiEvent::AgentDump { unique_id, .. }
234 | AmiEvent::AgentLogin { unique_id, .. }
235 | AmiEvent::AgentRingNoAnswer { unique_id, .. }
236 | AmiEvent::ConfbridgeJoin { unique_id, .. }
237 | AmiEvent::ConfbridgeLeave { unique_id, .. }
238 | AmiEvent::ConfbridgeList { unique_id, .. }
239 | AmiEvent::ConfbridgeMute { unique_id, .. }
240 | AmiEvent::ConfbridgeUnmute { unique_id, .. }
241 | AmiEvent::ConfbridgeTalking { unique_id, .. }
242 | AmiEvent::MixMonitorStart { unique_id, .. }
243 | AmiEvent::MixMonitorStop { unique_id, .. }
244 | AmiEvent::MixMonitorMute { unique_id, .. }
245 | AmiEvent::MusicOnHoldStart { unique_id, .. }
246 | AmiEvent::MusicOnHoldStop { unique_id, .. }
247 | AmiEvent::ParkedCall { unique_id, .. }
248 | AmiEvent::ParkedCallGiveUp { unique_id, .. }
249 | AmiEvent::ParkedCallTimeOut { unique_id, .. }
250 | AmiEvent::ParkedCallSwap { unique_id, .. }
251 | AmiEvent::UnParkedCall { unique_id, .. }
252 | AmiEvent::Pickup { unique_id, .. }
253 | AmiEvent::ChanSpyStart { unique_id, .. }
254 | AmiEvent::ChanSpyStop { unique_id, .. }
255 | AmiEvent::ChannelTalkingStart { unique_id, .. }
256 | AmiEvent::ChannelTalkingStop { unique_id, .. }
257 | AmiEvent::RTCPReceived { unique_id, .. }
258 | AmiEvent::RTCPSent { unique_id, .. }
259 | AmiEvent::AsyncAGIStart { unique_id, .. }
260 | AmiEvent::AsyncAGIExec { unique_id, .. }
261 | AmiEvent::AsyncAGIEnd { unique_id, .. }
262 | AmiEvent::AGIExecStart { unique_id, .. }
263 | AmiEvent::AGIExecEnd { unique_id, .. }
264 | AmiEvent::HangupHandlerPush { unique_id, .. }
265 | AmiEvent::HangupHandlerPop { unique_id, .. }
266 | AmiEvent::HangupHandlerRun { unique_id, .. }
267 | AmiEvent::Status { unique_id, .. }
268 | AmiEvent::CoreShowChannel { unique_id, .. }
269 | AmiEvent::AocD { unique_id, .. }
270 | AmiEvent::AocE { unique_id, .. }
271 | AmiEvent::AocS { unique_id, .. }
272 | AmiEvent::FAXStatus { unique_id, .. }
273 | AmiEvent::ReceiveFAX { unique_id, .. }
274 | AmiEvent::SendFAX { unique_id, .. }
275 | AmiEvent::MeetmeJoin { unique_id, .. }
276 | AmiEvent::MeetmeLeave { unique_id, .. }
277 | AmiEvent::MeetmeMute { unique_id, .. }
278 | AmiEvent::MeetmeTalking { unique_id, .. }
279 | AmiEvent::MeetmeTalkRequest { unique_id, .. }
280 | AmiEvent::MeetmeList { unique_id, .. }
281 | AmiEvent::MiniVoiceMail { unique_id, .. }
282 | AmiEvent::FAXSession { unique_id, .. }
283 | AmiEvent::MCID { unique_id, .. } => Some(unique_id.as_str()),
284
285 AmiEvent::AttendedTransfer {
287 transferer_unique_id,
288 ..
289 } => Some(transferer_unique_id.as_str()),
290 AmiEvent::BlindTransfer {
291 transferer_unique_id,
292 ..
293 } => Some(transferer_unique_id.as_str()),
294
295 AmiEvent::UserEvent { unique_id, .. } => unique_id.as_deref(),
297 AmiEvent::DAHDIChannel { unique_id, .. } => unique_id.as_deref(),
298
299 AmiEvent::FullyBooted { .. }
301 | AmiEvent::PeerStatus { .. }
302 | AmiEvent::BridgeCreate { .. }
303 | AmiEvent::BridgeDestroy { .. }
304 | AmiEvent::BridgeMerge { .. }
305 | AmiEvent::BridgeInfoComplete { .. }
306 | AmiEvent::BridgeVideoSourceUpdate { .. }
307 | AmiEvent::QueueMemberAdded { .. }
308 | AmiEvent::QueueMemberRemoved { .. }
309 | AmiEvent::QueueMemberPause { .. }
310 | AmiEvent::QueueMemberStatus { .. }
311 | AmiEvent::QueueMemberPenalty { .. }
312 | AmiEvent::QueueMemberRinginuse { .. }
313 | AmiEvent::QueueParams { .. }
314 | AmiEvent::AgentLogoff { .. }
315 | AmiEvent::Agents { .. }
316 | AmiEvent::AgentsComplete
317 | AmiEvent::ConfbridgeStart { .. }
318 | AmiEvent::ConfbridgeEnd { .. }
319 | AmiEvent::ConfbridgeRecord { .. }
320 | AmiEvent::ConfbridgeStopRecord { .. }
321 | AmiEvent::ConfbridgeListRooms { .. }
322 | AmiEvent::DeviceStateChange { .. }
323 | AmiEvent::ExtensionStatus { .. }
324 | AmiEvent::PresenceStateChange { .. }
325 | AmiEvent::PresenceStatus { .. }
326 | AmiEvent::ContactStatus { .. }
327 | AmiEvent::Registry { .. }
328 | AmiEvent::MessageWaiting { .. }
329 | AmiEvent::VoicemailPasswordChange { .. }
330 | AmiEvent::FailedACL { .. }
331 | AmiEvent::InvalidAccountID { .. }
332 | AmiEvent::InvalidPassword { .. }
333 | AmiEvent::ChallengeResponseFailed { .. }
334 | AmiEvent::ChallengeSent { .. }
335 | AmiEvent::SuccessfulAuth { .. }
336 | AmiEvent::SessionLimit { .. }
337 | AmiEvent::UnexpectedAddress { .. }
338 | AmiEvent::RequestBadFormat { .. }
339 | AmiEvent::RequestNotAllowed { .. }
340 | AmiEvent::RequestNotSupported { .. }
341 | AmiEvent::InvalidTransport { .. }
342 | AmiEvent::AuthMethodNotAllowed { .. }
343 | AmiEvent::Shutdown { .. }
344 | AmiEvent::Reload { .. }
345 | AmiEvent::Load { .. }
346 | AmiEvent::Unload { .. }
347 | AmiEvent::LogChannel { .. }
348 | AmiEvent::LoadAverageLimit
349 | AmiEvent::MemoryLimit
350 | AmiEvent::StatusComplete { .. }
351 | AmiEvent::CoreShowChannelsComplete { .. }
352 | AmiEvent::CoreShowChannelMapComplete
353 | AmiEvent::Alarm { .. }
354 | AmiEvent::AlarmClear { .. }
355 | AmiEvent::SpanAlarm { .. }
356 | AmiEvent::SpanAlarmClear { .. }
357 | AmiEvent::MeetmeEnd { .. }
358 | AmiEvent::MeetmeListRooms { .. }
359 | AmiEvent::DeviceStateListComplete { .. }
360 | AmiEvent::ExtensionStateListComplete { .. }
361 | AmiEvent::PresenceStateListComplete { .. }
362 | AmiEvent::AorDetail { .. }
363 | AmiEvent::AorList { .. }
364 | AmiEvent::AorListComplete { .. }
365 | AmiEvent::AuthDetail { .. }
366 | AmiEvent::AuthList { .. }
367 | AmiEvent::AuthListComplete { .. }
368 | AmiEvent::ContactList { .. }
369 | AmiEvent::ContactListComplete { .. }
370 | AmiEvent::ContactStatusDetail { .. }
371 | AmiEvent::EndpointDetail { .. }
372 | AmiEvent::EndpointDetailComplete { .. }
373 | AmiEvent::EndpointList { .. }
374 | AmiEvent::EndpointListComplete { .. }
375 | AmiEvent::IdentifyDetail { .. }
376 | AmiEvent::TransportDetail { .. }
377 | AmiEvent::ResourceListDetail { .. }
378 | AmiEvent::InboundRegistrationDetail { .. }
379 | AmiEvent::OutboundRegistrationDetail { .. }
380 | AmiEvent::InboundSubscriptionDetail { .. }
381 | AmiEvent::OutboundSubscriptionDetail { .. }
382 | AmiEvent::MWIGet { .. }
383 | AmiEvent::MWIGetComplete { .. }
384 | AmiEvent::FAXSessionsEntry { .. }
385 | AmiEvent::FAXSessionsComplete { .. }
386 | AmiEvent::FAXStats { .. }
387 | AmiEvent::DNDState { .. }
388 | AmiEvent::DeadlockStart
389 | AmiEvent::Unknown { .. } => None,
390 }
391}
392
393fn evict_stale(
399 active: &mut HashMap<String, ActiveCall>,
400 completed_tx: &mpsc::Sender<CompletedCall>,
401 ttl: Duration,
402) {
403 let now = Instant::now();
404 active.retain(|_, call| {
405 if now.duration_since(call.start_time) <= ttl {
406 return true;
407 }
408 let completed = CompletedCall {
409 channel: call.channel.clone(),
410 unique_id: call.unique_id.clone(),
411 linked_id: call.linked_id.clone(),
412 start_time: call.start_time,
413 end_time: now,
414 duration: now.duration_since(call.start_time),
415 cause: 0,
416 cause_txt: "ttl eviction: no hangup received".to_string(),
417 events: std::mem::take(&mut call.events),
418 };
419 if completed_tx.try_send(completed).is_err() {
420 tracing::warn!(unique_id = %call.unique_id, "completed_tx full, dropping stale evicted call");
421 }
422 false
423 });
424}