1use std::sync::Arc;
7use std::time::Duration;
8
9use asterisk_rs_ami::action::{HangupAction, OriginateAction};
10use asterisk_rs_ami::event::AmiEvent;
11use asterisk_rs_ami::tracker::{CallTracker, CompletedCall};
12use asterisk_rs_ami::AmiClient;
13use asterisk_rs_core::event::EventSubscription;
14use tokio::sync::{mpsc, Mutex};
15
16#[derive(Debug, Clone)]
21pub struct Call {
22 pub channel: String,
24 pub unique_id: String,
26 client: AmiClient,
27 answer_sub: Arc<Mutex<EventSubscription<AmiEvent>>>,
31}
32
33impl Call {
34 pub async fn hangup(
36 &self,
37 ) -> asterisk_rs_ami::error::Result<asterisk_rs_ami::response::AmiResponse> {
38 self.client.hangup(HangupAction::new(&self.channel)).await
39 }
40
41 pub async fn wait_for_answer(&self, timeout: Duration) -> Result<(), PbxError> {
46 let uid = self.unique_id.clone();
47
48 let result = tokio::time::timeout(timeout, async {
49 let mut sub = self.answer_sub.lock().await;
50 loop {
51 let Some(event) = sub.recv().await else {
52 return Err(PbxError::Disconnected);
53 };
54 match event {
55 AmiEvent::Newstate {
56 unique_id,
57 channel_state_desc,
58 ..
59 } if unique_id == uid => {
60 if channel_state_desc == "Up" {
61 return Ok(());
62 }
63 }
64 AmiEvent::Hangup {
65 unique_id,
66 cause,
67 cause_txt,
68 ..
69 } if unique_id == uid => {
70 return Err(PbxError::CallFailed { cause, cause_txt });
71 }
72 _ => {}
73 }
74 }
75 })
76 .await;
77
78 match result {
79 Ok(inner) => inner,
80 Err(_) => Err(PbxError::Timeout),
81 }
82 }
83}
84
85#[derive(Debug, Clone, Default)]
87#[must_use]
88pub struct DialOptions {
89 pub caller_id: Option<String>,
91 pub timeout_ms: Option<u64>,
93 pub variables: Option<std::collections::HashMap<String, String>>,
95}
96
97impl DialOptions {
98 pub fn new() -> Self {
100 Self::default()
101 }
102
103 pub fn caller_id(mut self, cid: impl Into<String>) -> Self {
105 self.caller_id = Some(cid.into());
106 self
107 }
108
109 pub fn timeout_ms(mut self, ms: u64) -> Self {
111 self.timeout_ms = Some(ms);
112 self
113 }
114}
115
116#[derive(Debug, thiserror::Error)]
118#[non_exhaustive]
119pub enum PbxError {
120 #[error("AMI error: {0}")]
121 Ami(#[from] asterisk_rs_ami::AmiError),
122
123 #[error("call failed: {cause} ({cause_txt})")]
124 CallFailed { cause: u32, cause_txt: String },
125
126 #[error("operation timed out")]
127 Timeout,
128
129 #[error("client disconnected")]
130 Disconnected,
131}
132
133#[derive(Debug)]
138pub struct Pbx {
139 client: AmiClient,
140 tracker: CallTracker,
141 completed_rx: mpsc::Receiver<CompletedCall>,
142}
143
144impl Pbx {
145 pub fn new(client: AmiClient) -> Self {
147 let (tracker, completed_rx) = client.call_tracker();
148 Self {
149 client,
150 tracker,
151 completed_rx,
152 }
153 }
154
155 pub async fn dial(
161 &self,
162 from: impl Into<String>,
163 to: impl Into<String>,
164 options: Option<DialOptions>,
165 ) -> Result<Call, PbxError> {
166 let from = from.into();
167 let to = to.into();
168 let opts = options.unwrap_or_default();
169
170 let mut action = OriginateAction::new(&from)
171 .extension(&to)
172 .context("default")
173 .priority(1)
174 .async_originate(true);
175
176 if let Some(ref cid) = opts.caller_id {
177 action = action.caller_id(cid);
178 }
179 if let Some(ms) = opts.timeout_ms {
180 action = action.timeout_ms(ms);
181 }
182 if let Some(ref vars) = opts.variables {
183 for (k, v) in vars {
184 action = action.variable(k, v);
185 }
186 }
187
188 let answer_sub = Arc::new(Mutex::new(self.client.subscribe()));
192
193 let mut orig_sub = self
197 .client
198 .subscribe_filtered(move |e| matches!(e, AmiEvent::OriginateResponse { .. }));
199
200 let orig_response = self.client.originate(action).await?;
201 let expected_action_id = orig_response.action_id;
202
203 let originate_timeout =
205 Duration::from_secs(opts.timeout_ms.map(|ms| ms / 1000 + 5).unwrap_or(35));
206
207 let event = tokio::time::timeout(originate_timeout, async {
208 loop {
209 let Some(event) = orig_sub.recv().await else {
210 return Err(PbxError::Disconnected);
211 };
212 if let AmiEvent::OriginateResponse {
213 action_id,
214 channel,
215 unique_id,
216 response,
217 ..
218 } = event
219 {
220 if action_id == expected_action_id {
221 return Ok((channel, unique_id, response));
222 }
223 }
224 }
225 })
226 .await
227 .map_err(|_| PbxError::Timeout)??;
228
229 let (channel, unique_id, response) = event;
230
231 if response.eq_ignore_ascii_case("failure") {
232 return Err(PbxError::CallFailed {
233 cause: 0,
234 cause_txt: "originate failed".to_owned(),
235 });
236 }
237
238 Ok(Call {
239 channel,
240 unique_id,
241 client: self.client.clone(),
242 answer_sub,
243 })
244 }
245
246 pub async fn next_completed_call(&mut self) -> Option<CompletedCall> {
248 self.completed_rx.recv().await
249 }
250
251 pub fn client(&self) -> &AmiClient {
253 &self.client
254 }
255
256 pub fn shutdown(self) {
258 self.tracker.shutdown();
259 }
260}