Skip to main content

asterisk_rs/
pbx.rs

1//! high-level PBX abstraction for call management over AMI
2//!
3//! wraps [`AmiClient`] with call lifecycle
4//! tracking and convenience methods for common telephony operations
5
6use std::sync::Arc;
7use std::time::Duration;
8
9use asterisk_rs_ami::action::{HangupAction, OriginateAction};
10use asterisk_rs_ami::event::AmiEvent;
11use asterisk_rs_ami::tracker::{CallTracker, CompletedCall};
12use asterisk_rs_ami::AmiClient;
13use asterisk_rs_core::event::EventSubscription;
14use tokio::sync::{mpsc, Mutex};
15
16/// a live call being tracked by the PBX
17///
18/// wraps a channel name and unique_id with the AMI client for
19/// issuing commands and tracking events
20#[derive(Debug, Clone)]
21pub struct Call {
22    /// channel name (e.g. "PJSIP/100-00000001")
23    pub channel: String,
24    /// per-channel unique identifier
25    pub unique_id: String,
26    client: AmiClient,
27    // pre-started subscription created before originate so that any Newstate/Hangup
28    // events arriving before wait_for_answer is called are buffered and not lost;
29    // Arc+Mutex keeps Call Clone and lets wait_for_answer take &self
30    answer_sub: Arc<Mutex<EventSubscription<AmiEvent>>>,
31}
32
33impl Call {
34    /// hang up this call
35    pub async fn hangup(
36        &self,
37    ) -> asterisk_rs_ami::error::Result<asterisk_rs_ami::response::AmiResponse> {
38        self.client.hangup(HangupAction::new(&self.channel)).await
39    }
40
41    /// wait for this channel to reach "Up" state (answered)
42    ///
43    /// listens for Newstate events with channel_state_desc "Up".
44    /// returns Err if the channel hangs up before answering
45    pub async fn wait_for_answer(&self, timeout: Duration) -> Result<(), PbxError> {
46        let uid = self.unique_id.clone();
47
48        let result = tokio::time::timeout(timeout, async {
49            let mut sub = self.answer_sub.lock().await;
50            loop {
51                let Some(event) = sub.recv().await else {
52                    return Err(PbxError::Disconnected);
53                };
54                match event {
55                    AmiEvent::Newstate {
56                        unique_id,
57                        channel_state_desc,
58                        ..
59                    } if unique_id == uid => {
60                        if channel_state_desc == "Up" {
61                            return Ok(());
62                        }
63                    }
64                    AmiEvent::Hangup {
65                        unique_id,
66                        cause,
67                        cause_txt,
68                        ..
69                    } if unique_id == uid => {
70                        return Err(PbxError::CallFailed { cause, cause_txt });
71                    }
72                    _ => {}
73                }
74            }
75        })
76        .await;
77
78        match result {
79            Ok(inner) => inner,
80            Err(_) => Err(PbxError::Timeout),
81        }
82    }
83}
84
85/// options for originating a call
86#[derive(Debug, Clone, Default)]
87#[must_use]
88pub struct DialOptions {
89    /// caller ID to present
90    pub caller_id: Option<String>,
91    /// maximum time to wait for answer in milliseconds
92    pub timeout_ms: Option<u64>,
93    /// channel variables to set
94    pub variables: Option<std::collections::HashMap<String, String>>,
95}
96
97impl DialOptions {
98    /// create default dial options
99    pub fn new() -> Self {
100        Self::default()
101    }
102
103    /// set the caller id to present
104    pub fn caller_id(mut self, cid: impl Into<String>) -> Self {
105        self.caller_id = Some(cid.into());
106        self
107    }
108
109    /// set max wait time in milliseconds (matches Asterisk Originate timeout)
110    pub fn timeout_ms(mut self, ms: u64) -> Self {
111        self.timeout_ms = Some(ms);
112        self
113    }
114}
115
116/// errors from PBX operations
117#[derive(Debug, thiserror::Error)]
118#[non_exhaustive]
119pub enum PbxError {
120    #[error("AMI error: {0}")]
121    Ami(#[from] asterisk_rs_ami::AmiError),
122
123    #[error("call failed: {cause} ({cause_txt})")]
124    CallFailed { cause: u32, cause_txt: String },
125
126    #[error("operation timed out")]
127    Timeout,
128
129    #[error("client disconnected")]
130    Disconnected,
131}
132
133/// high-level PBX abstraction wrapping an AMI client
134///
135/// provides convenient methods for common telephony operations
136/// with built-in call tracking via [`CallTracker`]
137#[derive(Debug)]
138pub struct Pbx {
139    client: AmiClient,
140    tracker: CallTracker,
141    completed_rx: mpsc::Receiver<CompletedCall>,
142}
143
144impl Pbx {
145    /// create a new PBX abstraction wrapping an AMI client
146    pub fn new(client: AmiClient) -> Self {
147        let (tracker, completed_rx) = client.call_tracker();
148        Self {
149            client,
150            tracker,
151            completed_rx,
152        }
153    }
154
155    /// originate a call from one endpoint to another
156    ///
157    /// uses async originate so the call is queued immediately.
158    /// waits for the OriginateResponse event to get the actual
159    /// channel name and unique_id.
160    pub async fn dial(
161        &self,
162        from: impl Into<String>,
163        to: impl Into<String>,
164        options: Option<DialOptions>,
165    ) -> Result<Call, PbxError> {
166        let from = from.into();
167        let to = to.into();
168        let opts = options.unwrap_or_default();
169
170        let mut action = OriginateAction::new(&from)
171            .extension(&to)
172            .context("default")
173            .priority(1)
174            .async_originate(true);
175
176        if let Some(ref cid) = opts.caller_id {
177            action = action.caller_id(cid);
178        }
179        if let Some(ms) = opts.timeout_ms {
180            action = action.timeout_ms(ms);
181        }
182        if let Some(ref vars) = opts.variables {
183            for (k, v) in vars {
184                action = action.variable(k, v);
185            }
186        }
187
188        // subscribe to answer-state events BEFORE sending the originate action;
189        // events arriving between originate and wait_for_answer are buffered
190        // in the broadcast channel and will not be missed
191        let answer_sub = Arc::new(Mutex::new(self.client.subscribe()));
192
193        // subscribe to OriginateResponse before sending so we don't miss a fast
194        // response; we don't know action_id yet, so filter by type here and match
195        // by action_id in the loop below
196        let mut orig_sub = self
197            .client
198            .subscribe_filtered(move |e| matches!(e, AmiEvent::OriginateResponse { .. }));
199
200        let orig_response = self.client.originate(action).await?;
201        let expected_action_id = orig_response.action_id;
202
203        // wait for the OriginateResponse event with a timeout
204        let originate_timeout =
205            Duration::from_secs(opts.timeout_ms.map(|ms| ms / 1000 + 5).unwrap_or(35));
206
207        let event = tokio::time::timeout(originate_timeout, async {
208            loop {
209                let Some(event) = orig_sub.recv().await else {
210                    return Err(PbxError::Disconnected);
211                };
212                if let AmiEvent::OriginateResponse {
213                    action_id,
214                    channel,
215                    unique_id,
216                    response,
217                    ..
218                } = event
219                {
220                    if action_id == expected_action_id {
221                        return Ok((channel, unique_id, response));
222                    }
223                }
224            }
225        })
226        .await
227        .map_err(|_| PbxError::Timeout)??;
228
229        let (channel, unique_id, response) = event;
230
231        if response.eq_ignore_ascii_case("failure") {
232            return Err(PbxError::CallFailed {
233                cause: 0,
234                cause_txt: "originate failed".to_owned(),
235            });
236        }
237
238        Ok(Call {
239            channel,
240            unique_id,
241            client: self.client.clone(),
242            answer_sub,
243        })
244    }
245
246    /// receive the next completed call record
247    pub async fn next_completed_call(&mut self) -> Option<CompletedCall> {
248        self.completed_rx.recv().await
249    }
250
251    /// access the underlying AMI client
252    pub fn client(&self) -> &AmiClient {
253        &self.client
254    }
255
256    /// shut down the call tracker
257    pub fn shutdown(self) {
258        self.tracker.shutdown();
259    }
260}