asterisk_ari/ws/
client.rs

1use crate::config::Config;
2use crate::errors::AriError;
3use crate::ws::{models, params};
4use futures_util::{SinkExt, StreamExt as _};
5use rand::random;
6use std::time::Duration;
7use tokio::time::interval;
8use tokio_stream::wrappers::ReceiverStream;
9use tokio_stream::Stream;
10use tokio_tungstenite::connect_async;
11use tokio_tungstenite::tungstenite::Message;
12use tokio_util::sync::CancellationToken;
13use tracing::{debug, trace, warn};
14use url::Url;
15
16/// WebSocket client for ARI.
17///
18/// This struct manages the WebSocket connection to the ARI server.
19#[derive(Clone, Debug)]
20pub struct Client {
21    config: Config,
22    stop_signal: CancellationToken,
23}
24
25impl Drop for Client {
26    /// Cancels the stop signal when the client is dropped.
27    fn drop(&mut self) {
28        self.stop_signal.cancel();
29    }
30}
31
32impl Client {
33    /// Creates a new `Client` with the given configuration.
34    ///
35    /// # Arguments
36    ///
37    /// * `config` - The configuration for the ARI client.
38    ///
39    /// # Returns
40    ///
41    /// A new instance of `Client`.
42    pub fn with_config(config: Config) -> Self {
43        Self {
44            config,
45            stop_signal: CancellationToken::new(),
46        }
47    }
48
49    /// Disconnects the WebSocket client.
50    pub fn disconnect(&self) {
51        self.stop_signal.cancel()
52    }
53
54    /// Connects to the ARI WebSocket and starts listening for events.
55    ///
56    /// # Arguments
57    ///
58    /// * `request` - The parameters for the listen request.
59    ///
60    /// # Returns
61    ///
62    /// A `Result` containing a stream of ARI events or an `AriError`.
63    pub async fn connect(
64        &self,
65        request: params::ListenRequest,
66    ) -> Result<impl Stream<Item = models::Event>, AriError> {
67        let mut url = Url::parse(self.config.api_base.clone().as_str())?;
68
69        url.set_scheme(if url.scheme().starts_with("https://") {
70            "wss"
71        } else {
72            "ws"
73        })
74        .unwrap();
75
76        url.set_path("/ari/events");
77
78        url.query_pairs_mut()
79            .append_pair(
80                "api_key",
81                &format!("{}:{}", self.config.username, self.config.password),
82            )
83            .append_pair("app", request.app.as_str())
84            .append_pair(
85                "subscribeAll",
86                request.subscribe_all.unwrap_or(true).to_string().as_str(),
87            );
88
89        debug!("connecting to ws_url: {}", url);
90
91        // if not connect, retry!
92        let ws_stream = match connect_async(url.to_string()).await {
93            Ok((ws_stream, _)) => ws_stream,
94            Err(e) => {
95                warn!("error when connecting to the websocket: {:#?}", e);
96                return Err(AriError::from(e));
97            }
98        };
99        debug!("websocket connected");
100
101        let (mut ws_sender, mut ws_receiver) = ws_stream.split();
102
103        let mut interval = interval(Duration::from_millis(5000));
104        let cancel_token = self.stop_signal.child_token();
105        let (tx, rx) = tokio::sync::mpsc::channel(100);
106
107        let mut closed = false;
108        tokio::spawn(async move {
109            loop {
110                tokio::select! {
111                    _ = cancel_token.cancelled()  => if !closed {
112                        //debug!("Stop signal received, leaving the loop!");
113                        match ws_sender.close().await{
114                            Ok(_) => {
115                                debug!("WS connection closed");
116                                closed = true;
117                            },
118                            Err(e) => warn!("error when closing ws connection: {:#?}", e),
119                        }
120                    },
121                    msg = ws_receiver.next() => {
122                        match msg {
123                            Some(msg) => {
124                                match msg {
125                                        Ok(Message::Close(close_frame)) => {
126                                            debug!(
127                                                "Close message received, leaving the loop! {:#?}",
128                                                close_frame
129                                            );
130                                            break;
131                                        }
132                                        Ok(Message::Pong(_)) => {}
133                                        Ok(Message::Ping(data)) => {
134                                            let _ = ws_sender.send(Message::Pong(data)).await;
135                                        }
136                                        Ok(Message::Text(string_msg)) => {
137
138                                            trace!("WS Ari Event: {:#?}", string_msg);
139                                            match serde_json::from_str::<models::Event>(&string_msg){
140                                                Ok(event ) => {
141                                                    if tx.send(event).await.is_err() {
142                                                        warn!("error when sending ARI event to the channel");
143                                                        break;
144                                                    }
145                                                }
146                                                Err(e) => warn!(
147                                                        "error when deserializing ARI event: {:#?}. Event: {:#?}",
148                                                        e, string_msg
149                                                    ),
150                                            }
151
152                                        }
153                                        Err(e) => {
154                                            warn!("Error when receiving websocket message: {:#?}", e);
155                                            break;
156                                        }
157                                        _ => {
158                                            warn!(
159                                                "Unknown websocket message received: {:#?}",
160                                                msg
161                                            );
162                                        }
163                                    }
164                            }
165                            None => break,
166                        }
167                    }
168                    _ = interval.tick() => {
169                        // every 5 seconds we are sending ping to keep connection alive
170                        // https://rust-lang-nursery.github.io/rust-cookbook/algorithms/randomness.html
171                        let _ = ws_sender.send(Message::Ping(random::<[u8; 32]>().to_vec().into())).await;
172                        debug!("ari connection ping sent");
173                    }
174                }
175            }
176        });
177
178        Ok(ReceiverStream::new(rx))
179    }
180}