1use crate::error::{Result, VoxRtcError};
2use crate::session::VoxRtcControlSession;
3use crate::socket::RawSocketClient;
4use crate::types::{ConnectionState, EventData, SessionBootstrap};
5use serde_json::Value;
6use std::env;
7use std::time::Duration;
8
9#[derive(Debug, Clone)]
10pub struct VoxRtcServerClientOptions {
11 pub http_base: String,
12 pub api_key: Option<String>,
13 pub socket_base: Option<String>,
14 pub socket_params: EventData,
15 pub connection_timeout: Duration,
16 pub max_reconnect_delay: Duration,
17 pub request_timeout: Duration,
18 pub join_timeout: Duration,
19}
20
21impl VoxRtcServerClientOptions {
22 pub fn new(http_base: impl Into<String>) -> Self {
23 Self {
24 http_base: http_base.into(),
25 api_key: None,
26 socket_base: None,
27 socket_params: EventData::new(),
28 connection_timeout: Duration::from_secs(10),
29 max_reconnect_delay: Duration::from_secs(30),
30 request_timeout: Duration::from_secs(15),
31 join_timeout: Duration::from_secs(10),
32 }
33 }
34}
35
36#[derive(Clone)]
37pub struct VoxRtcServerClient {
38 http_base: String,
39 api_key: Option<String>,
40 socket_base: String,
41 socket_params: EventData,
42 http: reqwest::Client,
43 socket: RawSocketClient,
44 connection_timeout: Duration,
45 join_timeout: Duration,
46}
47
48#[derive(Clone)]
49pub struct ControlledSession {
50 pub bootstrap: SessionBootstrap,
51 pub session: VoxRtcControlSession,
52}
53
54impl VoxRtcServerClient {
55 pub fn new(http_base: impl Into<String>) -> Result<Self> {
56 Self::with_options(VoxRtcServerClientOptions::new(http_base))
57 }
58
59 pub fn with_options(mut options: VoxRtcServerClientOptions) -> Result<Self> {
60 let http_base = normalize_base(&options.http_base);
61 let api_key = options
62 .api_key
63 .take()
64 .or_else(|| env::var("VOX_API_KEY").ok())
65 .map(|value| value.trim().to_owned())
66 .filter(|value| !value.is_empty());
67 let socket_base = options
68 .socket_base
69 .take()
70 .map(|base| normalize_base(&base))
71 .unwrap_or_else(|| default_socket_base(&http_base));
72 if let Some(api_key) = &api_key {
73 options
74 .socket_params
75 .insert("api_key".to_owned(), Value::String(api_key.clone()));
76 }
77 let http = reqwest::Client::builder()
78 .timeout(options.request_timeout)
79 .build()?;
80 let socket = RawSocketClient::new(
81 &socket_base,
82 options.socket_params.clone(),
83 options.connection_timeout,
84 options.max_reconnect_delay,
85 )?;
86 Ok(Self {
87 http_base,
88 api_key,
89 socket_base,
90 socket_params: options.socket_params,
91 http,
92 socket,
93 connection_timeout: options.connection_timeout,
94 join_timeout: options.join_timeout,
95 })
96 }
97
98 pub fn http_base(&self) -> &str {
99 &self.http_base
100 }
101
102 pub fn socket_base(&self) -> &str {
103 &self.socket_base
104 }
105
106 pub fn connection_state(&self) -> ConnectionState {
107 self.socket.state()
108 }
109
110 pub async fn connect(&self) -> Result<()> {
111 if self.socket.state() == ConnectionState::Connected {
112 return Ok(());
113 }
114 self.socket.connect().await?;
115 let mut states = self.socket.subscribe_state();
116 tokio::time::timeout(self.connection_timeout, async move {
117 loop {
118 if *states.borrow_and_update() == ConnectionState::Connected {
119 return Ok(());
120 }
121 if states.changed().await.is_err() {
122 return Err(VoxRtcError::Disconnected);
123 }
124 }
125 })
126 .await
127 .map_err(|_| VoxRtcError::Timeout("PondSocket connection"))?
128 }
129
130 pub async fn disconnect(&self) {
131 self.socket.disconnect().await;
132 }
133
134 pub async fn create_session(&self) -> Result<SessionBootstrap> {
135 let mut request = self
136 .http
137 .post(format!("{}/v1/rtc/sessions", self.http_base))
138 .json(&serde_json::json!({}));
139 if let Some(api_key) = &self.api_key {
140 request = request.bearer_auth(api_key);
141 }
142 let response = request.send().await?;
143 let status = response.status();
144 let body = response.text().await?;
145 if !status.is_success() {
146 return Err(VoxRtcError::CreateSessionFailed { status, body });
147 }
148 Ok(serde_json::from_str(&body)?)
149 }
150
151 pub async fn attach_session(
152 &self,
153 session_id: impl Into<String>,
154 ) -> Result<VoxRtcControlSession> {
155 let session_id = session_id.into();
156 self.connect().await?;
157 let channel = self
158 .socket
159 .create_channel(format!("/rtc/{session_id}"), EventData::new())
160 .await;
161 let session = VoxRtcControlSession::new(channel, session_id, self.join_timeout);
162 session.join().await?;
163 Ok(session)
164 }
165
166 pub async fn create_controlled_session(&self) -> Result<ControlledSession> {
167 let bootstrap = self.create_session().await?;
168 let session = self.attach_session(bootstrap.session_id.clone()).await?;
169 Ok(ControlledSession { bootstrap, session })
170 }
171
172 #[allow(dead_code)]
173 pub fn socket_params(&self) -> &EventData {
174 &self.socket_params
175 }
176}
177
178fn normalize_base(base: &str) -> String {
179 base.trim_end_matches('/').to_owned()
180}
181
182fn default_socket_base(http_base: &str) -> String {
183 format!("{}/v1/socket", normalize_base(http_base))
184}
185
186#[cfg(test)]
187mod tests {
188 use super::*;
189
190 #[test]
191 fn defaults_socket_base_from_http_base() {
192 let client = VoxRtcServerClient::new("https://vox.example.com/").unwrap();
193 assert_eq!(client.http_base(), "https://vox.example.com");
194 assert_eq!(client.socket_base(), "https://vox.example.com/v1/socket");
195 }
196
197 #[test]
198 fn new_returns_error_on_bad_url_instead_of_panicking() {
199 match VoxRtcServerClient::new("not a url") {
200 Err(VoxRtcError::InvalidUrl(_)) => {}
201 Err(other) => panic!("expected InvalidUrl, got {other:?}"),
202 Ok(_) => panic!("expected an error for a malformed URL"),
203 }
204 }
205
206 #[test]
207 fn forwards_connection_and_reconnect_timeouts() {
208 let mut options = VoxRtcServerClientOptions::new("https://vox.example.com");
209 options.connection_timeout = Duration::from_secs(3);
210 options.max_reconnect_delay = Duration::from_secs(45);
211 let client = VoxRtcServerClient::with_options(options).unwrap();
212 assert_eq!(client.connection_timeout, Duration::from_secs(3));
213 }
214
215 #[test]
216 fn injects_api_key_into_socket_params() {
217 let mut options = VoxRtcServerClientOptions::new("https://vox.example.com");
218 options.api_key = Some("secret".to_owned());
219 let client = VoxRtcServerClient::with_options(options).unwrap();
220 assert_eq!(
221 client
222 .socket_params()
223 .get("api_key")
224 .and_then(Value::as_str),
225 Some("secret")
226 );
227 }
228}