asterisk_ari/ws/client.rs
1use crate::config::Config;
2use crate::errors::AriError;
3use crate::ws::{models, params};
4use futures_util::{SinkExt, StreamExt as _};
5use rand::random;
6use std::time::Duration;
7use tokio::time::interval;
8use tokio_stream::wrappers::ReceiverStream;
9use tokio_stream::Stream;
10use tokio_tungstenite::connect_async;
11use tokio_tungstenite::tungstenite::Message;
12use tokio_util::sync::CancellationToken;
13use tracing::{debug, trace, warn};
14use url::Url;
15
16/// WebSocket client for ARI.
17///
18/// This struct manages the WebSocket connection to the ARI server.
19#[derive(Clone, Debug)]
20pub struct Client {
21 config: Config,
22 stop_signal: CancellationToken,
23}
24
25impl Drop for Client {
26 /// Cancels the stop signal when the client is dropped.
27 fn drop(&mut self) {
28 self.stop_signal.cancel();
29 }
30}
31
32impl Client {
33 /// Creates a new `Client` with the given configuration.
34 ///
35 /// # Arguments
36 ///
37 /// * `config` - The configuration for the ARI client.
38 ///
39 /// # Returns
40 ///
41 /// A new instance of `Client`.
42 pub fn with_config(config: Config) -> Self {
43 Self {
44 config,
45 stop_signal: CancellationToken::new(),
46 }
47 }
48
49 /// Disconnects the WebSocket client.
50 pub fn disconnect(&self) {
51 self.stop_signal.cancel()
52 }
53
54 /// Connects to the ARI WebSocket and starts listening for events.
55 ///
56 /// # Arguments
57 ///
58 /// * `request` - The parameters for the listen request.
59 ///
60 /// # Returns
61 ///
62 /// A `Result` containing a stream of ARI events or an `AriError`.
63 pub async fn connect(
64 &self,
65 request: params::ListenRequest,
66 ) -> Result<impl Stream<Item = models::Event>, AriError> {
67 let mut url = Url::parse(self.config.api_base.clone().as_str())?;
68
69 url.set_scheme(if url.scheme().starts_with("https://") {
70 "wss"
71 } else {
72 "ws"
73 })
74 .unwrap();
75
76 url.set_path("/ari/events");
77
78 url.query_pairs_mut()
79 .append_pair(
80 "api_key",
81 &format!("{}:{}", self.config.username, self.config.password),
82 )
83 .append_pair("app", request.app.as_str())
84 .append_pair(
85 "subscribeAll",
86 request.subscribe_all.unwrap_or(true).to_string().as_str(),
87 );
88
89 debug!("connecting to ws_url: {}", url);
90
91 // if not connect, retry!
92 let ws_stream = match connect_async(url.to_string()).await {
93 Ok((ws_stream, _)) => ws_stream,
94 Err(e) => {
95 warn!("error when connecting to the websocket: {:#?}", e);
96 return Err(AriError::from(e));
97 }
98 };
99 debug!("websocket connected");
100
101 let (mut ws_sender, mut ws_receiver) = ws_stream.split();
102
103 let mut interval = interval(Duration::from_millis(5000));
104 let cancel_token = self.stop_signal.child_token();
105 let (tx, rx) = tokio::sync::mpsc::channel(100);
106
107 let mut closed = false;
108 tokio::spawn(async move {
109 loop {
110 tokio::select! {
111 _ = cancel_token.cancelled() => if !closed {
112 //debug!("Stop signal received, leaving the loop!");
113 match ws_sender.close().await{
114 Ok(_) => {
115 debug!("WS connection closed");
116 closed = true;
117 },
118 Err(e) => warn!("error when closing ws connection: {:#?}", e),
119 }
120 },
121 msg = ws_receiver.next() => {
122 match msg {
123 Some(msg) => {
124 match msg {
125 Ok(Message::Close(close_frame)) => {
126 debug!(
127 "Close message received, leaving the loop! {:#?}",
128 close_frame
129 );
130 break;
131 }
132 Ok(Message::Pong(_)) => {}
133 Ok(Message::Ping(data)) => {
134 let _ = ws_sender.send(Message::Pong(data)).await;
135 }
136 Ok(Message::Text(string_msg)) => {
137
138 trace!("WS Ari Event: {:#?}", string_msg);
139 match serde_json::from_str::<models::Event>(&string_msg){
140 Ok(event ) => {
141 if tx.send(event).await.is_err() {
142 warn!("error when sending ARI event to the channel");
143 break;
144 }
145 }
146 Err(e) => warn!(
147 "error when deserializing ARI event: {:#?}. Event: {:#?}",
148 e, string_msg
149 ),
150 }
151
152 }
153 Err(e) => {
154 warn!("Error when receiving websocket message: {:#?}", e);
155 break;
156 }
157 _ => {
158 warn!(
159 "Unknown websocket message received: {:#?}",
160 msg
161 );
162 }
163 }
164 }
165 None => break,
166 }
167 }
168 _ = interval.tick() => {
169 // every 5 seconds we are sending ping to keep connection alive
170 // https://rust-lang-nursery.github.io/rust-cookbook/algorithms/randomness.html
171 let _ = ws_sender.send(Message::Ping(random::<[u8; 32]>().to_vec().into())).await;
172 debug!("ari connection ping sent");
173 }
174 }
175 }
176 });
177
178 Ok(ReceiverStream::new(rx))
179 }
180}