asterisk_rs_ari/
client.rs1use std::sync::Arc;
4
5use asterisk_rs_core::event::{EventBus, EventSubscription};
6use serde::de::DeserializeOwned;
7use serde::Serialize;
8
9use crate::config::{AriConfig, TransportMode};
10use crate::error::{AriError, Result};
11use crate::event::AriMessage;
12use crate::transport::{HttpTransport, TransportInner};
13use crate::ws_transport::WsTransport;
14
15#[derive(Clone)]
21pub struct AriClient {
22 transport: Arc<TransportInner>,
23 config: Arc<AriConfig>,
24 event_bus: EventBus<AriMessage>,
25}
26
27impl std::fmt::Debug for AriClient {
28 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
29 f.debug_struct("AriClient")
30 .field("base_url", &self.config.base_url)
31 .field("transport_mode", &self.config.transport_mode)
32 .finish_non_exhaustive()
33 }
34}
35
36impl AriClient {
37 pub async fn connect(config: AriConfig) -> Result<Self> {
42 let event_bus = EventBus::new(256);
43
44 let transport = match config.transport_mode {
45 TransportMode::Http => {
46 let http = HttpTransport::new(
47 config.base_url.as_str(),
48 config.username.clone(),
49 config.password.clone(),
50 config.ws_url.to_string(),
51 event_bus.clone(),
52 config.reconnect_policy.clone(),
53 )?;
54 TransportInner::Http(http)
55 }
56 TransportMode::WebSocket => {
57 let ws = WsTransport::spawn(
58 config.ws_url.to_string(),
59 event_bus.clone(),
60 config.reconnect_policy.clone(),
61 );
62 TransportInner::WebSocket(ws)
63 }
64 };
65
66 Ok(Self {
67 transport: Arc::new(transport),
68 config: Arc::new(config),
69 event_bus,
70 })
71 }
72
73 pub async fn get<T: DeserializeOwned>(&self, path: &str) -> Result<T> {
75 let resp = self.transport.request("GET", path, None).await?;
76 let body = resp.body.ok_or_else(|| AriError::Api {
77 status: resp.status,
78 message: "expected response body".into(),
79 })?;
80 serde_json::from_str(&body).map_err(AriError::Json)
81 }
82
83 pub async fn post<T: DeserializeOwned>(&self, path: &str, body: &impl Serialize) -> Result<T> {
85 let json = serde_json::to_string(body).map_err(AriError::Json)?;
86 let resp = self.transport.request("POST", path, Some(json)).await?;
87 let body = resp.body.ok_or_else(|| AriError::Api {
88 status: resp.status,
89 message: "expected response body".into(),
90 })?;
91 serde_json::from_str(&body).map_err(AriError::Json)
92 }
93
94 pub async fn post_empty(&self, path: &str) -> Result<()> {
96 self.transport.request("POST", path, None).await?;
97 Ok(())
98 }
99
100 pub async fn put<T: DeserializeOwned>(&self, path: &str, body: &impl Serialize) -> Result<T> {
102 let json = serde_json::to_string(body).map_err(AriError::Json)?;
103 let resp = self.transport.request("PUT", path, Some(json)).await?;
104 let body = resp.body.ok_or_else(|| AriError::Api {
105 status: resp.status,
106 message: "expected response body".into(),
107 })?;
108 serde_json::from_str(&body).map_err(AriError::Json)
109 }
110
111 pub async fn put_empty(&self, path: &str) -> Result<()> {
113 self.transport.request("PUT", path, None).await?;
114 Ok(())
115 }
116
117 pub async fn delete(&self, path: &str) -> Result<()> {
119 self.transport.request("DELETE", path, None).await?;
120 Ok(())
121 }
122
123 pub async fn delete_with_response<T: DeserializeOwned>(&self, path: &str) -> Result<T> {
125 let resp = self.transport.request("DELETE", path, None).await?;
126 let body = resp.body.ok_or_else(|| AriError::Api {
127 status: resp.status,
128 message: "expected response body".into(),
129 })?;
130 serde_json::from_str(&body).map_err(AriError::Json)
131 }
132
133 pub fn subscribe(&self) -> EventSubscription<AriMessage> {
135 self.event_bus.subscribe()
136 }
137
138 pub fn subscribe_filtered(
140 &self,
141 predicate: impl Fn(&AriMessage) -> bool + Send + 'static,
142 ) -> asterisk_rs_core::event::FilteredSubscription<AriMessage> {
143 self.event_bus.subscribe_filtered(predicate)
144 }
145
146 pub fn events(&self) -> &EventBus<AriMessage> {
148 &self.event_bus
149 }
150
151 pub fn config(&self) -> &AriConfig {
153 &self.config
154 }
155
156 pub fn channel(&self) -> crate::pending::PendingChannel {
161 crate::pending::PendingChannel::new(self.clone())
162 }
163
164 pub fn bridge(&self) -> crate::pending::PendingBridge {
166 crate::pending::PendingBridge::new(self.clone())
167 }
168
169 pub fn playback(&self) -> crate::pending::PendingPlayback {
171 crate::pending::PendingPlayback::new(self)
172 }
173
174 pub fn disconnect(&self) {
176 self.transport.shutdown();
177 }
178}
179
180pub fn url_encode(input: &str) -> String {
182 let mut encoded = String::with_capacity(input.len());
183 for byte in input.bytes() {
184 match byte {
185 b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' => {
186 encoded.push(byte as char);
187 }
188 _ => {
189 encoded.push_str(&format!("%{byte:02X}"));
190 }
191 }
192 }
193 encoded
194}