1use crate::error::{Result, VoxRtcError};
2use crate::session::VoxRtcControlSession;
3use crate::socket::RawSocketClient;
4use crate::types::{ConnectionState, EventData, SessionBootstrap};
5use serde_json::Value;
6use std::env;
7use std::time::Duration;
8
9#[derive(Debug, Clone)]
10pub struct VoxRtcServerClientOptions {
11 pub http_base: String,
12 pub api_key: Option<String>,
13 pub socket_base: Option<String>,
14 pub socket_params: EventData,
15 pub connection_timeout: Duration,
16 pub max_reconnect_delay: Duration,
17 pub request_timeout: Duration,
18 pub join_timeout: Duration,
19}
20
21impl VoxRtcServerClientOptions {
22 pub fn new(http_base: impl Into<String>) -> Self {
23 Self {
24 http_base: http_base.into(),
25 api_key: None,
26 socket_base: None,
27 socket_params: EventData::new(),
28 connection_timeout: Duration::from_secs(10),
29 max_reconnect_delay: Duration::from_secs(30),
30 request_timeout: Duration::from_secs(15),
31 join_timeout: Duration::from_secs(10),
32 }
33 }
34}
35
36#[derive(Clone)]
37pub struct VoxRtcServerClient {
38 http_base: String,
39 api_key: Option<String>,
40 socket_base: String,
41 socket_params: EventData,
42 http: reqwest::Client,
43 socket: RawSocketClient,
44 connection_timeout: Duration,
45 join_timeout: Duration,
46}
47
48#[derive(Clone)]
49pub struct ControlledSession {
50 pub bootstrap: SessionBootstrap,
51 pub session: VoxRtcControlSession,
52}
53
54impl VoxRtcServerClient {
55 pub fn new(http_base: impl Into<String>) -> Self {
56 Self::with_options(VoxRtcServerClientOptions::new(http_base))
57 .expect("valid Vox RTC client options")
58 }
59
60 pub fn with_options(mut options: VoxRtcServerClientOptions) -> Result<Self> {
61 let http_base = normalize_base(&options.http_base);
62 let api_key = options
63 .api_key
64 .take()
65 .or_else(|| env::var("VOX_API_KEY").ok())
66 .map(|value| value.trim().to_owned())
67 .filter(|value| !value.is_empty());
68 let socket_base = options
69 .socket_base
70 .take()
71 .map(|base| normalize_base(&base))
72 .unwrap_or_else(|| default_socket_base(&http_base));
73 if let Some(api_key) = &api_key {
74 options
75 .socket_params
76 .insert("api_key".to_owned(), Value::String(api_key.clone()));
77 }
78 let http = reqwest::Client::builder()
79 .timeout(options.request_timeout)
80 .build()?;
81 let socket = RawSocketClient::new(&socket_base, options.socket_params.clone())?;
82 Ok(Self {
83 http_base,
84 api_key,
85 socket_base,
86 socket_params: options.socket_params,
87 http,
88 socket,
89 connection_timeout: options.connection_timeout,
90 join_timeout: options.join_timeout,
91 })
92 }
93
94 pub fn http_base(&self) -> &str {
95 &self.http_base
96 }
97
98 pub fn socket_base(&self) -> &str {
99 &self.socket_base
100 }
101
102 pub fn connection_state(&self) -> ConnectionState {
103 self.socket.state()
104 }
105
106 pub async fn connect(&self) -> Result<()> {
107 if self.socket.state() == ConnectionState::Connected {
108 return Ok(());
109 }
110 self.socket.connect().await?;
111 let mut states = self.socket.subscribe_state();
112 tokio::time::timeout(self.connection_timeout, async move {
113 loop {
114 if *states.borrow_and_update() == ConnectionState::Connected {
115 return Ok(());
116 }
117 if states.changed().await.is_err() {
118 return Err(VoxRtcError::Disconnected);
119 }
120 }
121 })
122 .await
123 .map_err(|_| VoxRtcError::Timeout("PondSocket connection"))?
124 }
125
126 pub async fn disconnect(&self) {
127 self.socket.disconnect().await;
128 }
129
130 pub async fn create_session(&self) -> Result<SessionBootstrap> {
131 let mut request = self
132 .http
133 .post(format!("{}/v1/rtc/sessions", self.http_base))
134 .json(&serde_json::json!({}));
135 if let Some(api_key) = &self.api_key {
136 request = request.bearer_auth(api_key);
137 }
138 let response = request.send().await?;
139 let status = response.status();
140 let body = response.text().await?;
141 if !status.is_success() {
142 return Err(VoxRtcError::CreateSessionFailed { status, body });
143 }
144 Ok(serde_json::from_str(&body)?)
145 }
146
147 pub async fn attach_session(
148 &self,
149 session_id: impl Into<String>,
150 ) -> Result<VoxRtcControlSession> {
151 let session_id = session_id.into();
152 self.connect().await?;
153 let channel = self
154 .socket
155 .create_channel(format!("/rtc/{session_id}"), EventData::new())
156 .await;
157 let session = VoxRtcControlSession::new(channel, session_id, self.join_timeout);
158 session.join().await?;
159 Ok(session)
160 }
161
162 pub async fn create_controlled_session(&self) -> Result<ControlledSession> {
163 let bootstrap = self.create_session().await?;
164 let session = self.attach_session(bootstrap.session_id.clone()).await?;
165 Ok(ControlledSession { bootstrap, session })
166 }
167
168 #[allow(dead_code)]
169 pub fn socket_params(&self) -> &EventData {
170 &self.socket_params
171 }
172}
173
174fn normalize_base(base: &str) -> String {
175 base.trim_end_matches('/').to_owned()
176}
177
178fn default_socket_base(http_base: &str) -> String {
179 format!("{}/v1/socket", normalize_base(http_base))
180}
181
182#[cfg(test)]
183mod tests {
184 use super::*;
185
186 #[test]
187 fn defaults_socket_base_from_http_base() {
188 let client = VoxRtcServerClient::new("https://vox.example.com/");
189 assert_eq!(client.http_base(), "https://vox.example.com");
190 assert_eq!(client.socket_base(), "https://vox.example.com/v1/socket");
191 }
192
193 #[test]
194 fn injects_api_key_into_socket_params() {
195 let mut options = VoxRtcServerClientOptions::new("https://vox.example.com");
196 options.api_key = Some("secret".to_owned());
197 let client = VoxRtcServerClient::with_options(options).unwrap();
198 assert_eq!(
199 client
200 .socket_params()
201 .get("api_key")
202 .and_then(Value::as_str),
203 Some("secret")
204 );
205 }
206}