Skip to main content

vox_rtc_server/
client.rs

1use crate::error::{Result, VoxRtcError};
2use crate::session::VoxRtcControlSession;
3use crate::socket::RawSocketClient;
4use crate::types::{ConnectionState, EventData, SessionBootstrap};
5use serde_json::Value;
6use std::env;
7use std::time::Duration;
8
9#[derive(Debug, Clone)]
10pub struct VoxRtcServerClientOptions {
11    pub http_base: String,
12    pub api_key: Option<String>,
13    pub socket_base: Option<String>,
14    pub socket_params: EventData,
15    pub connection_timeout: Duration,
16    pub max_reconnect_delay: Duration,
17    pub request_timeout: Duration,
18    pub join_timeout: Duration,
19}
20
21impl VoxRtcServerClientOptions {
22    pub fn new(http_base: impl Into<String>) -> Self {
23        Self {
24            http_base: http_base.into(),
25            api_key: None,
26            socket_base: None,
27            socket_params: EventData::new(),
28            connection_timeout: Duration::from_secs(10),
29            max_reconnect_delay: Duration::from_secs(30),
30            request_timeout: Duration::from_secs(15),
31            join_timeout: Duration::from_secs(10),
32        }
33    }
34}
35
36#[derive(Clone)]
37pub struct VoxRtcServerClient {
38    http_base: String,
39    api_key: Option<String>,
40    socket_base: String,
41    socket_params: EventData,
42    http: reqwest::Client,
43    socket: RawSocketClient,
44    connection_timeout: Duration,
45    join_timeout: Duration,
46}
47
48#[derive(Clone)]
49pub struct ControlledSession {
50    pub bootstrap: SessionBootstrap,
51    pub session: VoxRtcControlSession,
52}
53
54impl VoxRtcServerClient {
55    pub fn new(http_base: impl Into<String>) -> Self {
56        Self::with_options(VoxRtcServerClientOptions::new(http_base))
57            .expect("valid Vox RTC client options")
58    }
59
60    pub fn with_options(mut options: VoxRtcServerClientOptions) -> Result<Self> {
61        let http_base = normalize_base(&options.http_base);
62        let api_key = options
63            .api_key
64            .take()
65            .or_else(|| env::var("VOX_API_KEY").ok())
66            .map(|value| value.trim().to_owned())
67            .filter(|value| !value.is_empty());
68        let socket_base = options
69            .socket_base
70            .take()
71            .map(|base| normalize_base(&base))
72            .unwrap_or_else(|| default_socket_base(&http_base));
73        if let Some(api_key) = &api_key {
74            options
75                .socket_params
76                .insert("api_key".to_owned(), Value::String(api_key.clone()));
77        }
78        let http = reqwest::Client::builder()
79            .timeout(options.request_timeout)
80            .build()?;
81        let socket = RawSocketClient::new(&socket_base, options.socket_params.clone())?;
82        Ok(Self {
83            http_base,
84            api_key,
85            socket_base,
86            socket_params: options.socket_params,
87            http,
88            socket,
89            connection_timeout: options.connection_timeout,
90            join_timeout: options.join_timeout,
91        })
92    }
93
94    pub fn http_base(&self) -> &str {
95        &self.http_base
96    }
97
98    pub fn socket_base(&self) -> &str {
99        &self.socket_base
100    }
101
102    pub fn connection_state(&self) -> ConnectionState {
103        self.socket.state()
104    }
105
106    pub async fn connect(&self) -> Result<()> {
107        if self.socket.state() == ConnectionState::Connected {
108            return Ok(());
109        }
110        self.socket.connect().await?;
111        let mut states = self.socket.subscribe_state();
112        tokio::time::timeout(self.connection_timeout, async move {
113            loop {
114                if *states.borrow_and_update() == ConnectionState::Connected {
115                    return Ok(());
116                }
117                if states.changed().await.is_err() {
118                    return Err(VoxRtcError::Disconnected);
119                }
120            }
121        })
122        .await
123        .map_err(|_| VoxRtcError::Timeout("PondSocket connection"))?
124    }
125
126    pub async fn disconnect(&self) {
127        self.socket.disconnect().await;
128    }
129
130    pub async fn create_session(&self) -> Result<SessionBootstrap> {
131        let mut request = self
132            .http
133            .post(format!("{}/v1/rtc/sessions", self.http_base))
134            .json(&serde_json::json!({}));
135        if let Some(api_key) = &self.api_key {
136            request = request.bearer_auth(api_key);
137        }
138        let response = request.send().await?;
139        let status = response.status();
140        let body = response.text().await?;
141        if !status.is_success() {
142            return Err(VoxRtcError::CreateSessionFailed { status, body });
143        }
144        Ok(serde_json::from_str(&body)?)
145    }
146
147    pub async fn attach_session(
148        &self,
149        session_id: impl Into<String>,
150    ) -> Result<VoxRtcControlSession> {
151        let session_id = session_id.into();
152        self.connect().await?;
153        let channel = self
154            .socket
155            .create_channel(format!("/rtc/{session_id}"), EventData::new())
156            .await;
157        let session = VoxRtcControlSession::new(channel, session_id, self.join_timeout);
158        session.join().await?;
159        Ok(session)
160    }
161
162    pub async fn create_controlled_session(&self) -> Result<ControlledSession> {
163        let bootstrap = self.create_session().await?;
164        let session = self.attach_session(bootstrap.session_id.clone()).await?;
165        Ok(ControlledSession { bootstrap, session })
166    }
167
168    #[allow(dead_code)]
169    pub fn socket_params(&self) -> &EventData {
170        &self.socket_params
171    }
172}
173
174fn normalize_base(base: &str) -> String {
175    base.trim_end_matches('/').to_owned()
176}
177
178fn default_socket_base(http_base: &str) -> String {
179    format!("{}/v1/socket", normalize_base(http_base))
180}
181
182#[cfg(test)]
183mod tests {
184    use super::*;
185
186    #[test]
187    fn defaults_socket_base_from_http_base() {
188        let client = VoxRtcServerClient::new("https://vox.example.com/");
189        assert_eq!(client.http_base(), "https://vox.example.com");
190        assert_eq!(client.socket_base(), "https://vox.example.com/v1/socket");
191    }
192
193    #[test]
194    fn injects_api_key_into_socket_params() {
195        let mut options = VoxRtcServerClientOptions::new("https://vox.example.com");
196        options.api_key = Some("secret".to_owned());
197        let client = VoxRtcServerClient::with_options(options).unwrap();
198        assert_eq!(
199            client
200                .socket_params()
201                .get("api_key")
202                .and_then(Value::as_str),
203            Some("secret")
204        );
205    }
206}