Skip to main content

vox_rtc_server/
client.rs

1use crate::error::{Result, VoxRtcError};
2use crate::session::VoxRtcControlSession;
3use crate::socket::RawSocketClient;
4use crate::types::{ConnectionState, EventData, SessionBootstrap};
5use serde_json::Value;
6use std::env;
7use std::time::Duration;
8
9#[derive(Debug, Clone)]
10pub struct VoxRtcServerClientOptions {
11    pub http_base: String,
12    pub api_key: Option<String>,
13    pub socket_base: Option<String>,
14    pub socket_params: EventData,
15    pub connection_timeout: Duration,
16    pub max_reconnect_delay: Duration,
17    pub request_timeout: Duration,
18    pub join_timeout: Duration,
19}
20
21impl VoxRtcServerClientOptions {
22    pub fn new(http_base: impl Into<String>) -> Self {
23        Self {
24            http_base: http_base.into(),
25            api_key: None,
26            socket_base: None,
27            socket_params: EventData::new(),
28            connection_timeout: Duration::from_secs(10),
29            max_reconnect_delay: Duration::from_secs(30),
30            request_timeout: Duration::from_secs(15),
31            join_timeout: Duration::from_secs(10),
32        }
33    }
34}
35
36#[derive(Clone)]
37pub struct VoxRtcServerClient {
38    http_base: String,
39    api_key: Option<String>,
40    socket_base: String,
41    socket_params: EventData,
42    http: reqwest::Client,
43    socket: RawSocketClient,
44    connection_timeout: Duration,
45    join_timeout: Duration,
46}
47
48#[derive(Clone)]
49pub struct ControlledSession {
50    pub bootstrap: SessionBootstrap,
51    pub session: VoxRtcControlSession,
52}
53
54impl VoxRtcServerClient {
55    pub fn new(http_base: impl Into<String>) -> Result<Self> {
56        Self::with_options(VoxRtcServerClientOptions::new(http_base))
57    }
58
59    pub fn with_options(mut options: VoxRtcServerClientOptions) -> Result<Self> {
60        let http_base = normalize_base(&options.http_base);
61        let api_key = options
62            .api_key
63            .take()
64            .or_else(|| env::var("VOX_API_KEY").ok())
65            .map(|value| value.trim().to_owned())
66            .filter(|value| !value.is_empty());
67        let socket_base = options
68            .socket_base
69            .take()
70            .map(|base| normalize_base(&base))
71            .unwrap_or_else(|| default_socket_base(&http_base));
72        if let Some(api_key) = &api_key {
73            options
74                .socket_params
75                .insert("api_key".to_owned(), Value::String(api_key.clone()));
76        }
77        let http = reqwest::Client::builder()
78            .timeout(options.request_timeout)
79            .build()?;
80        let socket = RawSocketClient::new(
81            &socket_base,
82            options.socket_params.clone(),
83            options.connection_timeout,
84            options.max_reconnect_delay,
85        )?;
86        Ok(Self {
87            http_base,
88            api_key,
89            socket_base,
90            socket_params: options.socket_params,
91            http,
92            socket,
93            connection_timeout: options.connection_timeout,
94            join_timeout: options.join_timeout,
95        })
96    }
97
98    pub fn http_base(&self) -> &str {
99        &self.http_base
100    }
101
102    pub fn socket_base(&self) -> &str {
103        &self.socket_base
104    }
105
106    pub fn connection_state(&self) -> ConnectionState {
107        self.socket.state()
108    }
109
110    pub async fn connect(&self) -> Result<()> {
111        if self.socket.state() == ConnectionState::Connected {
112            return Ok(());
113        }
114        self.socket.connect().await?;
115        let mut states = self.socket.subscribe_state();
116        tokio::time::timeout(self.connection_timeout, async move {
117            loop {
118                if *states.borrow_and_update() == ConnectionState::Connected {
119                    return Ok(());
120                }
121                if states.changed().await.is_err() {
122                    return Err(VoxRtcError::Disconnected);
123                }
124            }
125        })
126        .await
127        .map_err(|_| VoxRtcError::Timeout("PondSocket connection"))?
128    }
129
130    pub async fn disconnect(&self) {
131        self.socket.disconnect().await;
132    }
133
134    pub async fn create_session(&self) -> Result<SessionBootstrap> {
135        let mut request = self
136            .http
137            .post(format!("{}/v1/rtc/sessions", self.http_base))
138            .json(&serde_json::json!({}));
139        if let Some(api_key) = &self.api_key {
140            request = request.bearer_auth(api_key);
141        }
142        let response = request.send().await?;
143        let status = response.status();
144        let body = response.text().await?;
145        if !status.is_success() {
146            return Err(VoxRtcError::CreateSessionFailed { status, body });
147        }
148        Ok(serde_json::from_str(&body)?)
149    }
150
151    pub async fn attach_session(
152        &self,
153        session_id: impl Into<String>,
154    ) -> Result<VoxRtcControlSession> {
155        let session_id = session_id.into();
156        self.connect().await?;
157        let channel = self
158            .socket
159            .create_channel(format!("/rtc/{session_id}"), EventData::new())
160            .await;
161        let session = VoxRtcControlSession::new(channel, session_id, self.join_timeout);
162        session.join().await?;
163        Ok(session)
164    }
165
166    pub async fn create_controlled_session(&self) -> Result<ControlledSession> {
167        let bootstrap = self.create_session().await?;
168        let session = self.attach_session(bootstrap.session_id.clone()).await?;
169        Ok(ControlledSession { bootstrap, session })
170    }
171
172    #[allow(dead_code)]
173    pub fn socket_params(&self) -> &EventData {
174        &self.socket_params
175    }
176}
177
178fn normalize_base(base: &str) -> String {
179    base.trim_end_matches('/').to_owned()
180}
181
182fn default_socket_base(http_base: &str) -> String {
183    format!("{}/v1/socket", normalize_base(http_base))
184}
185
186#[cfg(test)]
187mod tests {
188    use super::*;
189
190    #[test]
191    fn defaults_socket_base_from_http_base() {
192        let client = VoxRtcServerClient::new("https://vox.example.com/").unwrap();
193        assert_eq!(client.http_base(), "https://vox.example.com");
194        assert_eq!(client.socket_base(), "https://vox.example.com/v1/socket");
195    }
196
197    #[test]
198    fn new_returns_error_on_bad_url_instead_of_panicking() {
199        match VoxRtcServerClient::new("not a url") {
200            Err(VoxRtcError::InvalidUrl(_)) => {}
201            Err(other) => panic!("expected InvalidUrl, got {other:?}"),
202            Ok(_) => panic!("expected an error for a malformed URL"),
203        }
204    }
205
206    #[test]
207    fn forwards_connection_and_reconnect_timeouts() {
208        let mut options = VoxRtcServerClientOptions::new("https://vox.example.com");
209        options.connection_timeout = Duration::from_secs(3);
210        options.max_reconnect_delay = Duration::from_secs(45);
211        let client = VoxRtcServerClient::with_options(options).unwrap();
212        assert_eq!(client.connection_timeout, Duration::from_secs(3));
213    }
214
215    #[test]
216    fn injects_api_key_into_socket_params() {
217        let mut options = VoxRtcServerClientOptions::new("https://vox.example.com");
218        options.api_key = Some("secret".to_owned());
219        let client = VoxRtcServerClient::with_options(options).unwrap();
220        assert_eq!(
221            client
222                .socket_params()
223                .get("api_key")
224                .and_then(Value::as_str),
225            Some("secret")
226        );
227    }
228}