1use crate::apis::{
2 applications::ApplicationsAPI, channels::ChannelsAPI, recordings::RecordingsAPI,
3};
4use crate::errors::{Error, Result};
5use crate::models::applications::Application;
6use crate::models::channels::{Channel, Direction, Variable};
7use crate::models::events::*;
8use crate::models::playbacks::Playback;
9use async_trait::async_trait;
10use futures_util::SinkExt;
11use lazy_static::lazy_static;
12use log::*;
13use percent_encoding::{utf8_percent_encode, NON_ALPHANUMERIC};
14use rand::Rng;
15use reqwest::StatusCode;
16use reqwest::{
17 self,
18 header::{HeaderMap, HeaderValue},
19};
20use tokio::sync::mpsc::Sender;
21use tokio::time::{interval, Duration};
22use tokio_tungstenite::{connect_async, tungstenite::Message as WSMessage};
23use url::Url;
24use urlencoding::encode;
25
26lazy_static! {
27 pub static ref HTTP_CLIENT: reqwest::Client = reqwest::Client::new();
28}
29
30use futures_util::StreamExt; #[derive(Clone)]
35pub struct AriClient {
36 pub url: String,
37 pub user: String,
38 pub password: String,
39 stasis_start_sender: Option<Sender<StasisStart>>,
40 channel_dtmf_received_sender: Option<Sender<ChannelDtmfReceived>>,
41 channel_hangup_request_sender: Option<Sender<ChannelHangupRequest>>,
42 stasis_end_sender: Option<Sender<StasisEnd>>,
43 channel_talking_finished_sender: Option<Sender<ChannelTalkingFinished>>,
44 channel_talking_started_sender: Option<Sender<ChannelTalkingStarted>>,
45 channel_destroyed_sender: Option<Sender<ChannelDestroyed>>,
46 playback_started_sender: Option<Sender<PlaybackStarted>>,
47 playback_finished_sender: Option<Sender<PlaybackFinished>>,
48 channel_state_change_sender: Option<Sender<ChannelStateChange>>,
49 channel_var_set_sender: Option<Sender<ChannelVarset>>,
50 recording_started_sender: Option<Sender<RecordingStarted>>,
51 recording_finished_sender: Option<Sender<RecordingFinished>>,
52}
53
54impl AriClient {
55 pub fn new(url: String, user: String, password: String) -> Self {
56 AriClient {
57 url,
58 user,
59 password,
60 stasis_start_sender: None,
61 channel_dtmf_received_sender: None,
62 channel_hangup_request_sender: None,
63 stasis_end_sender: None,
64 channel_talking_finished_sender: None,
65 channel_talking_started_sender: None,
66 channel_destroyed_sender: None,
67 playback_started_sender: None,
68 playback_finished_sender: None,
69 channel_state_change_sender: None,
70 channel_var_set_sender: None,
71 recording_started_sender: None,
72 recording_finished_sender: None,
73 }
74 }
75
76 pub fn set_stasis_start_sender(&mut self, sender: Option<Sender<StasisStart>>) {
77 self.stasis_start_sender = sender;
78 }
79
80 pub fn set_channel_dtmf_received_sender(
81 &mut self,
82 sender: Option<Sender<ChannelDtmfReceived>>,
83 ) {
84 self.channel_dtmf_received_sender = sender;
85 }
86
87 pub fn set_channel_hangup_request_sender(
88 &mut self,
89 sender: Option<Sender<ChannelHangupRequest>>,
90 ) {
91 self.channel_hangup_request_sender = sender;
92 }
93
94 pub fn set_stasis_end_sender(&mut self, sender: Option<Sender<StasisEnd>>) {
95 self.stasis_end_sender = sender;
96 }
97
98 pub fn set_channel_talking_finished_sender(
99 &mut self,
100 sender: Option<Sender<ChannelTalkingFinished>>,
101 ) {
102 self.channel_talking_finished_sender = sender;
103 }
104
105 pub fn set_channel_talking_started_sender(
106 &mut self,
107 sender: Option<Sender<ChannelTalkingStarted>>,
108 ) {
109 self.channel_talking_started_sender = sender;
110 }
111
112 pub fn set_channel_destroyed_sender(&mut self, sender: Option<Sender<ChannelDestroyed>>) {
113 self.channel_destroyed_sender = sender;
114 }
115
116 pub fn set_playback_started_sender(&mut self, sender: Option<Sender<PlaybackStarted>>) {
117 self.playback_started_sender = sender;
118 }
119
120 pub fn set_playback_finished_sender(&mut self, sender: Option<Sender<PlaybackFinished>>) {
121 self.playback_finished_sender = sender;
122 }
123
124 pub fn set_channel_state_change_sender(&mut self, sender: Option<Sender<ChannelStateChange>>) {
125 self.channel_state_change_sender = sender;
126 }
127
128 pub fn set_channel_var_set_sender(&mut self, sender: Option<Sender<ChannelVarset>>) {
129 self.channel_var_set_sender = sender;
130 }
131
132 pub fn set_recording_started_sender(&mut self, sender: Option<Sender<RecordingStarted>>) {
133 self.recording_started_sender = sender;
134 }
135
136 pub fn set_recording_finished_sender(&mut self, sender: Option<Sender<RecordingFinished>>) {
137 self.recording_finished_sender = sender;
138 }
139
140 pub async fn ari_processing_loop(&self, asterisk_apps: Vec<String>) -> Result<()> {
142 let ws_protocol = if self.url.starts_with("https://") {
143 "wss"
144 } else {
145 "ws"
146 };
147
148 let url = Url::parse(&self.url)?;
149 let hostname;
150 if let Some(host) = url.host_str() {
151 hostname = host;
152 } else {
153 return Err(Error::new(
154 StatusCode::BAD_REQUEST,
155 Some("unable to parse hostname".into()),
156 ));
157 }
158
159 let portno;
160 if let Some(por) = url.port() {
161 portno = por;
162 } else {
163 return Err(Error::new(
164 StatusCode::BAD_REQUEST,
165 Some("unable to parse port".into()),
166 ));
167 }
168
169 let app_str = asterisk_apps.join(",");
170
171 let ws_url_str = format!(
174 "{}://{}:{}/ari/events?app={}&api_key={}:{}",
175 ws_protocol,
176 hostname,
177 portno,
178 app_str,
179 encode(&self.user),
180 encode(&self.password)
181 );
182
183 let ws_url = Url::parse(&ws_url_str)?;
184
185 debug!("connecting to ws_url: {}", ws_url);
186 let (ws_stream, _) = connect_async(ws_url).await?;
187
188 let (mut ws_sender, mut ws_receiver) = ws_stream.split();
189
190 debug!("websocket connected");
191
192 let mut interval = interval(Duration::from_millis(5000));
193
194 loop {
195 tokio::select! {
196 msg = ws_receiver.next() => {
197 match msg {
198 Some(msg) => {
199 let msg = msg?;
200 match msg {
201 WSMessage::Close(close_frame) => {
202 info!(
203 "close message received, leaving the loop! {:#?}",
204 close_frame
205 );
206 break;
207 }
208 WSMessage::Pong(_) => {}
209 WSMessage::Ping(data) => {
210 let _ = ws_sender.send(WSMessage::Pong(data)).await;
211 }
212 WSMessage::Text(string_msg) => {
213 debug!(
214 "asterisk signal event received: {:#?}",
215 string_msg
216 );
217 let ari_event =
218 serde_json::from_str::<AriEvent>(&string_msg);
219 if let Err(deser_err) = ari_event {
220 warn!(
221 "error when deserializing ARI event: {:#?}. Event: {:#?}",
222 deser_err, string_msg
223 );
224 } else {
225 let ari_event = ari_event.unwrap();
226 trace!("ari_event: {:#?}", ari_event);
227 match ari_event {
228 AriEvent::StasisStart(event) => {
229 if let Some(sender) = &self.stasis_start_sender {
230 if let Err(send_err) = sender.send(event.clone()).await {
231 error!("ari_processing_loop StasisStart sending error {:?}: ", send_err);
232 }
233 }
234 }
235 AriEvent::ChannelDtmfReceived(event) => {
236 if let Some(sender) = &self.channel_dtmf_received_sender {
237 if let Err(send_err) = sender.send(event.clone()).await {
238 error!("ari_processing_loop ChannelDtmfReceived sending error {:?}: ", send_err);
239 }
240 }
241 }
242 AriEvent::ChannelHangupRequest(event) => {
243 if let Some(sender) = &self.channel_hangup_request_sender {
244 if let Err(send_err) = sender.send(event.clone()).await {
245 error!("ari_processing_loop ChannelHangupRequest sending error {:?}: ", send_err);
246 }
247 }
248 }
249 AriEvent::StasisEnd(event) => {
250 if let Some(sender) = &self.stasis_end_sender {
251 if let Err(send_err) = sender.send(event.clone()).await {
252 error!("ari_processing_loop StasisEnd sending error {:?}: ", send_err);
253 }
254 }
255 }
256 AriEvent::ChannelTalkingFinished(event) => {
257 if let Some(sender) = &self.channel_talking_finished_sender {
258 if let Err(send_err) = sender.send(event.clone()).await {
259 error!("ari_processing_loop ChannelTalkingFinished sending error {:?}: ", send_err);
260 }
261 }
262 }
263 AriEvent::ChannelTalkingStarted(event) => {
264 if let Some(sender) = &self.channel_talking_started_sender {
265 if let Err(send_err) = sender.send(event.clone()).await {
266 error!("ari_processing_loop ChannelTalkingStarted sending error {:?}: ", send_err);
267 }
268 }
269 }
270 AriEvent::ChannelDestroyed(event) => {
271 if let Some(sender) = &self.channel_destroyed_sender {
272 if let Err(send_err) = sender.send(event.clone()).await {
273 error!("ari_processing_loop ChannelDestroyed sending error {:?}: ", send_err);
274 }
275 }
276 }
277 AriEvent::PlaybackStarted(event) => {
278 if let Some(sender) = &self.playback_started_sender {
279 if let Err(send_err) = sender.send(event.clone()).await {
280 error!("ari_processing_loop PlaybackStarted sending error {:?}: ", send_err);
281 }
282 }
283 }
284 AriEvent::PlaybackFinished(event) => {
285 if let Some(sender) = &self.playback_finished_sender {
286 if let Err(send_err) = sender.send(event.clone()).await {
287 error!("ari_processing_loop PlaybackFinished sending error {:?}: ", send_err);
288 }
289 }
290 }
291 AriEvent::ChannelStateChange(event) => {
292 if let Some(sender) = &self.channel_state_change_sender {
293 if let Err(send_err) = sender.send(event.clone()).await {
294 error!("ari_processing_loop ChannelStateChange sending error {:?}: ", send_err);
295 }
296 }
297 }
298 AriEvent::ChannelVarset(event) => {
299 if let Some(sender) = &self.channel_var_set_sender {
300 if let Err(send_err) = sender.send(event.clone()).await {
301 error!("ari_processing_loop ChannelVarset sending error {:?}: ", send_err);
302 }
303 }
304 }
305 AriEvent::RecordingStarted(event) => {
306 if let Some(sender) = &self.recording_started_sender {
307 if let Err(send_err) = sender.send(event.clone()).await {
308 error!("ari_processing_loop RecordingStarted sending error {:?}: ", send_err);
309 }
310 }
311 }
312 AriEvent::RecordingFinished(event) => {
313 if let Some(sender) = &self.recording_finished_sender {
314 if let Err(send_err) = sender.send(event.clone()).await {
315 error!("ari_processing_loop RecordingFinished sending error {:?}: ", send_err);
316 }
317 }
318 }
319 }
320 }
321 }
322 _ => {
323 warn!(
324 "unknown websocket message received: {:#?}",
325 msg
326 );
327 }
328 }
329 }
330 None => break,
331 }
332 }
333 _ = interval.tick() => {
334 let random_bytes = rand::thread_rng().gen::<[u8; 32]>().to_vec();
337 let _ = ws_sender.send(WSMessage::Ping(random_bytes)).await;
338 debug!("ari connection ping sent");
339 }
340 }
341 }
342
343 Ok(())
344 }
345
346 #[allow(deprecated)]
347 fn get_auth_header(&self) -> String {
348 format!(
349 "Basic {}",
350 base64::encode(format!("{}:{}", self.user, self.password))
351 )
352 }
353
354 fn get_common_headers(&self) -> Result<HeaderMap> {
355 let mut headers = HeaderMap::new();
356
357 headers.insert("Content-Type", HeaderValue::from_str("application/json")?);
358 headers.insert(
359 "Authorization",
360 HeaderValue::from_str(&self.get_auth_header())?,
361 );
362
363 Ok(headers)
364 }
365}
366
367macro_rules! eval_status_code {
368 ($status_real:ident, $status_expected:expr, $body_str:expr) => {
369 if $status_real != $status_expected {
370 return if let Some(some_body) = $body_str {
371 Err(Error::new($status_real, Some(some_body)))
372 } else {
373 Err(Error::new($status_real, None))
374 };
375 }
376 };
377}
378
379#[async_trait]
380impl ApplicationsAPI for AriClient {
381 #[allow(unused_variables)]
383 async fn filter(
384 &self,
385 application_name: &str,
386 filter: Option<serde_json::Value>,
387 ) -> Result<String> {
388 Err(Error::new(StatusCode::NOT_IMPLEMENTED, None))
389 }
390
391 async fn get(&self, application_name: &str) -> Result<Application> {
393 let resp = HTTP_CLIENT
394 .get(format!("{}/applications/{}", self.url, application_name))
395 .headers(self.get_common_headers()?)
396 .send()
397 .await?;
398
399 let status = resp.status();
400 let body_str = resp.text().await?;
401 eval_status_code!(status, StatusCode::OK, Some(body_str));
402 Ok(serde_json::from_str(&body_str)?)
403 }
404
405 async fn list(&self) -> Result<Vec<Application>> {
407 let resp = HTTP_CLIENT
408 .get(format!("{}/applications", self.url))
409 .headers(self.get_common_headers()?)
410 .send()
411 .await?;
412
413 let status = resp.status();
414 let body_str = resp.text().await?;
415 eval_status_code!(status, StatusCode::OK, Some(body_str));
416 Ok(serde_json::from_str(&body_str)?)
417 }
418
419 #[allow(unused_variables)]
421 async fn subscribe(&self, application_name: &str, event_source: Vec<String>) -> Result<String> {
422 Err(Error::new(StatusCode::NOT_IMPLEMENTED, None))
423 }
424
425 #[allow(unused_variables)]
427 async fn unsubscribe(
428 &self,
429 application_name: &str,
430 event_source: Vec<String>,
431 ) -> Result<String> {
432 Err(Error::new(StatusCode::NOT_IMPLEMENTED, None))
433 }
434}
435
436#[async_trait]
437impl ChannelsAPI for AriClient {
438 async fn answer(&self, channel_id: &str) -> Result<()> {
439 let resp = HTTP_CLIENT
440 .post(format!("{}/channels/{}/answer", self.url, channel_id))
441 .headers(self.get_common_headers()?)
442 .send()
443 .await?;
444
445 let status = resp.status();
446 eval_status_code!(status, StatusCode::NO_CONTENT, None);
447 Ok(())
448 }
449
450 async fn play(
451 &self,
452 channel_id: &str,
453 media: &str,
454 _playback_id: Option<String>,
455 _lang: Option<String>,
456 _offsetms: Option<usize>,
457 _skipms: Option<usize>,
458 ) -> Result<Playback> {
459 let req_body = format!(
461 r#"
462 {{
463 "channelId": "{_channel_id_}",
464 "media": "{_media_}"
465 }}
466 "#,
467 _channel_id_ = channel_id,
468 _media_ = media,
469 );
470
471 let resp = HTTP_CLIENT
472 .post(format!("{}/channels/{}/play", self.url, channel_id))
473 .headers(self.get_common_headers()?)
474 .body(req_body)
475 .send()
476 .await?;
477
478 let status = resp.status();
479 let body_str = resp.text().await?;
480 eval_status_code!(status, StatusCode::CREATED, Some(body_str));
481 Ok(serde_json::from_str(&body_str)?)
482 }
483
484 async fn stop_play(&self, playback_id: &str) -> Result<()> {
485 let resp = HTTP_CLIENT
486 .delete(format!("{}/playbacks/{}", self.url, playback_id))
487 .headers(self.get_common_headers()?)
488 .send()
489 .await?;
490
491 let status = resp.status();
492 eval_status_code!(status, StatusCode::NO_CONTENT, None);
493 Ok(())
494 }
495
496 async fn get_variable(&self, channel_id: &str, var_name: &str) -> Result<String> {
497 let resp = HTTP_CLIENT
498 .get(format!(
499 "{}/channels/{}/variable?variable={}",
500 self.url, channel_id, var_name
501 ))
502 .headers(self.get_common_headers()?)
503 .send()
504 .await?;
505
506 let status = resp.status();
507 let body_str = resp.text().await?;
508 eval_status_code!(status, StatusCode::OK, Some(body_str));
509
510 let variable = serde_json::from_str::<Variable>(&body_str)?;
511 Ok(variable.value)
512 }
513
514 async fn set_variable(&self, channel_id: &str, var_name: &str, var_value: &str) -> Result<()> {
515 let resp = HTTP_CLIENT
516 .post(format!(
517 "{}/channels/{}/variable?variable={}&value={}",
518 self.url, channel_id, var_name, var_value
519 ))
520 .headers(self.get_common_headers()?)
521 .send()
522 .await?;
523
524 let status = resp.status();
525 let body_str = resp.text().await?;
526
527 eval_status_code!(status, StatusCode::NO_CONTENT, Some(body_str));
528 Ok(())
529 }
530
531 async fn hangup(&self, channel_id: &str) -> Result<()> {
532 let resp = HTTP_CLIENT
533 .delete(format!("{}/channels/{}", self.url, channel_id))
534 .headers(self.get_common_headers()?)
535 .send()
536 .await?;
537
538 let status = resp.status();
539 let body_str = resp.text().await?;
540 eval_status_code!(status, StatusCode::NO_CONTENT, Some(body_str));
541 Ok(())
542 }
543
544 async fn continue_in_dialplan(&self, channel_id: &str) -> Result<()> {
545 let resp = HTTP_CLIENT
546 .post(format!("{}/channels/{}/continue", self.url, channel_id))
547 .headers(self.get_common_headers()?)
548 .send()
549 .await?;
550
551 let status = resp.status();
552 let body_str = resp.text().await?;
553 eval_status_code!(status, StatusCode::NO_CONTENT, Some(body_str));
554 Ok(())
555 }
556
557 async fn snoop(
558 &self,
559 channel_id: &str,
560 app: &str,
561 spy: Option<Direction>,
562 whisper: Option<Direction>,
563 ) -> Result<Channel> {
564 let req_body = format!(
565 r#"
566 {{
567 "app": "{_app_name_}",
568 "spy": "{_spy_}",
569 "whisper": "{_whisper_}"
570 }}
571 "#,
572 _app_name_ = app,
573 _spy_ = spy.unwrap_or_default(),
574 _whisper_ = whisper.unwrap_or_default()
575 );
576
577 let req = HTTP_CLIENT
578 .post(format!("{}/channels/{}/snoop", self.url, channel_id))
579 .headers(self.get_common_headers()?)
580 .body(req_body.clone());
581 trace!("req: {req:#?}");
582 trace!("req body: {}", req_body);
583 trace!("url: {:#?}", self.url);
584
585 let resp = req.send().await?;
586
587 trace!("response: {:#?}", resp);
588
589 let status = resp.status();
590 trace!("status: {:#?}", status);
591
592 let body_str = resp.text().await?;
593 trace!("text: {:#?}", body_str);
594
595 eval_status_code!(status, StatusCode::OK, Some(body_str));
596
597 let res_chan = serde_json::from_str(&body_str)?;
598 Ok(res_chan)
599 }
600
601 async fn record(
602 &self,
603 channel_id: &str,
604 filepath: Option<&str>,
605 audio_format: Option<&str>,
606 terminate_on: Option<&str>,
607 max_duration: Option<usize>,
608 max_silence: Option<usize>,
609 if_exists: Option<&str>,
610 beep: Option<bool>,
611 ) -> Result<()> {
612 let req_body = format!(
613 r#"
614 {{
615 "name": "{_filepath_}",
616 "format": "{_audio_format_}",
617 "terminateOn": "{_terminate_on_}",
618 "maxDuration": {_max_duration_},
619 "maxSilence": {_max_silence_},
620 "ifExists": "{_if_exists_}",
621 "beep": {_beep_}
622 }}
623 "#,
624 _filepath_ = filepath.unwrap_or(channel_id),
625 _audio_format_ = audio_format.unwrap_or("wav"),
626 _terminate_on_ = terminate_on.unwrap_or("none"),
627 _max_duration_ = max_duration.unwrap_or(0),
628 _max_silence_ = max_silence.unwrap_or(0),
629 _if_exists_ = if_exists.unwrap_or("fail"),
630 _beep_ = beep.unwrap_or(false),
631 );
632 let resp = HTTP_CLIENT
633 .post(format!("{}/channels/{}/record", self.url, channel_id))
634 .headers(self.get_common_headers()?)
635 .body(req_body)
636 .send()
637 .await?;
638
639 let status = resp.status();
640 let body_str = resp.text().await?;
641 eval_status_code!(status, StatusCode::CREATED, Some(body_str));
642 Ok(())
643 }
644}
645
646#[async_trait]
647impl RecordingsAPI for AriClient {
648 async fn get_recording(&self, recording_name: &str) -> Result<Vec<u8>> {
649 let recording_name = utf8_percent_encode(recording_name, NON_ALPHANUMERIC);
650 let resp = HTTP_CLIENT
651 .get(format!(
652 "{}/recordings/stored/{}/file",
653 self.url, recording_name
654 ))
655 .headers(self.get_common_headers()?)
656 .send()
657 .await?;
658 let status = resp.status();
659 let body_bytes = resp.bytes().await?;
660
661 eval_status_code!(status, StatusCode::OK, Some(format!("{body_bytes:#?}")));
662 Ok(body_bytes.to_vec())
663 }
664 async fn stop_recording(&self, recording_name: &str) -> Result<()> {
665 let resp = HTTP_CLIENT
666 .post(format!(
667 "{}/recordings/live/{}/stop",
668 self.url, recording_name
669 ))
670 .headers(self.get_common_headers()?)
671 .send()
672 .await?;
673
674 let status = resp.status();
675 let body_str = resp.text().await?;
676 eval_status_code!(status, StatusCode::NO_CONTENT, Some(body_str));
677 Ok(())
678 }
679
680 async fn pause_recording(&self, recording_name: &str) -> Result<()> {
681 let resp = HTTP_CLIENT
682 .post(format!(
683 "{}/recordings/live/{}/pause",
684 self.url, recording_name
685 ))
686 .headers(self.get_common_headers()?)
687 .send()
688 .await?;
689
690 let status = resp.status();
691 eval_status_code!(status, StatusCode::NO_CONTENT, None);
692 Ok(())
693 }
694
695 async fn unpause_recording(&self, recording_name: &str) -> Result<()> {
696 let resp = HTTP_CLIENT
697 .delete(format!(
698 "{}/recordings/live/{}/pause",
699 self.url, recording_name
700 ))
701 .headers(self.get_common_headers()?)
702 .send()
703 .await?;
704
705 let status = resp.status();
706 eval_status_code!(status, StatusCode::NO_CONTENT, None);
707 Ok(())
708 }
709
710 async fn mute_recording(&self, recording_name: &str) -> Result<()> {
711 let resp = HTTP_CLIENT
712 .post(format!(
713 "{}/recordings/live/{}/mute",
714 self.url, recording_name
715 ))
716 .headers(self.get_common_headers()?)
717 .send()
718 .await?;
719
720 let status = resp.status();
721 eval_status_code!(status, StatusCode::NO_CONTENT, None);
722 Ok(())
723 }
724
725 async fn unmute_recording(&self, recording_name: &str) -> Result<()> {
726 let resp = HTTP_CLIENT
727 .delete(format!(
728 "{}/recordings/live/{}/mute",
729 self.url, recording_name
730 ))
731 .headers(self.get_common_headers()?)
732 .send()
733 .await?;
734
735 let status = resp.status();
736 eval_status_code!(status, StatusCode::NO_CONTENT, None);
737 Ok(())
738 }
739
740 async fn delete_recording(&self, recording_name: &str) -> Result<()> {
741 let resp = HTTP_CLIENT
742 .delete(format!("{}/recordings/live/{}", self.url, recording_name))
743 .headers(self.get_common_headers()?)
744 .send()
745 .await?;
746
747 let status = resp.status();
748 eval_status_code!(status, StatusCode::NO_CONTENT, None);
749 Ok(())
750 }
751}