Skip to main content

asterisk_rs_ari/
client.rs

1//! Main ARI client combining REST API and WebSocket events.
2
3use std::sync::Arc;
4
5use asterisk_rs_core::event::{EventBus, EventSubscription};
6use serde::de::DeserializeOwned;
7use serde::Serialize;
8
9use crate::config::{AriConfig, TransportMode};
10use crate::error::{AriError, Result};
11use crate::event::AriMessage;
12use crate::transport::{HttpTransport, TransportInner};
13use crate::ws_transport::WsTransport;
14
15/// async client for the Asterisk REST Interface
16///
17/// combines REST operations with a background websocket listener for
18/// receiving Stasis events. supports both HTTP and unified WebSocket
19/// transport modes.
20#[derive(Clone)]
21pub struct AriClient {
22    transport: Arc<TransportInner>,
23    config: Arc<AriConfig>,
24    event_bus: EventBus<AriMessage>,
25}
26
27impl std::fmt::Debug for AriClient {
28    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
29        f.debug_struct("AriClient")
30            .field("base_url", &self.config.base_url)
31            .field("transport_mode", &self.config.transport_mode)
32            .finish_non_exhaustive()
33    }
34}
35
36impl AriClient {
37    /// connect to an ARI server
38    ///
39    /// builds the transport layer and spawns the websocket event listener.
40    /// the transport mode is determined by [`AriConfig::transport_mode`].
41    pub async fn connect(config: AriConfig) -> Result<Self> {
42        let event_bus = EventBus::new(256);
43
44        let transport = match config.transport_mode {
45            TransportMode::Http => {
46                let http = HttpTransport::new(
47                    config.base_url.as_str(),
48                    config.username.clone(),
49                    config.password.clone(),
50                    config.ws_url.to_string(),
51                    event_bus.clone(),
52                    config.reconnect_policy.clone(),
53                )?;
54                TransportInner::Http(http)
55            }
56            TransportMode::WebSocket => {
57                let ws = WsTransport::spawn(
58                    config.ws_url.to_string(),
59                    event_bus.clone(),
60                    config.reconnect_policy.clone(),
61                );
62                TransportInner::WebSocket(ws)
63            }
64        };
65
66        Ok(Self {
67            transport: Arc::new(transport),
68            config: Arc::new(config),
69            event_bus,
70        })
71    }
72
73    /// send a GET request to the given ARI path
74    pub async fn get<T: DeserializeOwned>(&self, path: &str) -> Result<T> {
75        let resp = self.transport.request("GET", path, None).await?;
76        let body = resp.body.ok_or_else(|| AriError::Api {
77            status: resp.status,
78            message: "expected response body".into(),
79        })?;
80        serde_json::from_str(&body).map_err(AriError::Json)
81    }
82
83    /// send a POST request with a JSON body to the given ARI path
84    pub async fn post<T: DeserializeOwned>(&self, path: &str, body: &impl Serialize) -> Result<T> {
85        let json = serde_json::to_string(body).map_err(AriError::Json)?;
86        let resp = self.transport.request("POST", path, Some(json)).await?;
87        let body = resp.body.ok_or_else(|| AriError::Api {
88            status: resp.status,
89            message: "expected response body".into(),
90        })?;
91        serde_json::from_str(&body).map_err(AriError::Json)
92    }
93
94    /// send a POST request with no body to the given ARI path
95    pub async fn post_empty(&self, path: &str) -> Result<()> {
96        self.transport.request("POST", path, None).await?;
97        Ok(())
98    }
99
100    /// send a PUT request with a JSON body to the given ARI path
101    pub async fn put<T: DeserializeOwned>(&self, path: &str, body: &impl Serialize) -> Result<T> {
102        let json = serde_json::to_string(body).map_err(AriError::Json)?;
103        let resp = self.transport.request("PUT", path, Some(json)).await?;
104        let body = resp.body.ok_or_else(|| AriError::Api {
105            status: resp.status,
106            message: "expected response body".into(),
107        })?;
108        serde_json::from_str(&body).map_err(AriError::Json)
109    }
110
111    /// send a PUT request with no body to the given ARI path
112    pub async fn put_empty(&self, path: &str) -> Result<()> {
113        self.transport.request("PUT", path, None).await?;
114        Ok(())
115    }
116
117    /// send a DELETE request to the given ARI path
118    pub async fn delete(&self, path: &str) -> Result<()> {
119        self.transport.request("DELETE", path, None).await?;
120        Ok(())
121    }
122
123    /// send a DELETE request and deserialize the response body
124    pub async fn delete_with_response<T: DeserializeOwned>(&self, path: &str) -> Result<T> {
125        let resp = self.transport.request("DELETE", path, None).await?;
126        let body = resp.body.ok_or_else(|| AriError::Api {
127            status: resp.status,
128            message: "expected response body".into(),
129        })?;
130        serde_json::from_str(&body).map_err(AriError::Json)
131    }
132
133    /// subscribe to ARI events from the websocket stream
134    pub fn subscribe(&self) -> EventSubscription<AriMessage> {
135        self.event_bus.subscribe()
136    }
137
138    /// subscribe to events matching a filter predicate
139    pub fn subscribe_filtered(
140        &self,
141        predicate: impl Fn(&AriMessage) -> bool + Send + 'static,
142    ) -> asterisk_rs_core::event::FilteredSubscription<AriMessage> {
143        self.event_bus.subscribe_filtered(predicate)
144    }
145
146    /// access the underlying event bus
147    pub fn events(&self) -> &EventBus<AriMessage> {
148        &self.event_bus
149    }
150
151    /// access the underlying config
152    pub fn config(&self) -> &AriConfig {
153        &self.config
154    }
155
156    /// create a pending channel with a pre-generated ID for race-free origination
157    ///
158    /// the returned PendingChannel subscribes to events for its ID immediately,
159    /// so no StasisStart events are missed between originate and subscribe.
160    pub fn channel(&self) -> crate::pending::PendingChannel {
161        crate::pending::PendingChannel::new(self.clone())
162    }
163
164    /// create a pending bridge with a pre-generated ID
165    pub fn bridge(&self) -> crate::pending::PendingBridge {
166        crate::pending::PendingBridge::new(self.clone())
167    }
168
169    /// create a pending playback with a pre-generated ID
170    pub fn playback(&self) -> crate::pending::PendingPlayback {
171        crate::pending::PendingPlayback::new(self)
172    }
173
174    /// shut down the websocket listener and transport
175    pub fn disconnect(&self) {
176        self.transport.shutdown();
177    }
178}
179
180/// percent-encode a string for use in URL path segments or query values
181pub fn url_encode(input: &str) -> String {
182    let mut encoded = String::with_capacity(input.len());
183    for byte in input.bytes() {
184        match byte {
185            b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' => {
186                encoded.push(byte as char);
187            }
188            _ => {
189                encoded.push_str(&format!("%{byte:02X}"));
190            }
191        }
192    }
193    encoded
194}