Skip to main content

videocall_uniffi/
lib.rs

1/*
2 * Copyright 2025 Security Union LLC
3 *
4 * Licensed under either of
5 *
6 * * Apache License, Version 2.0
7 *   (http://www.apache.org/licenses/LICENSE-2.0)
8 * * MIT license
9 *   (http://opensource.org/licenses/MIT)
10 *
11 * at your option.
12 *
13 * Unless you explicitly state otherwise, any contribution intentionally
14 * submitted for inclusion in the work by you, as defined in the Apache-2.0
15 * license, shall be dual licensed as above, without any additional terms or
16 * conditions.
17 */
18
19// Include the generated bindings
20uniffi::include_scaffolding!("videocall");
21
22use bytes::Bytes;
23use log::{debug, error, info, LevelFilter};
24use rustls::{ClientConfig, RootCertStore};
25use rustls_native_certs::load_native_certs;
26use std::collections::VecDeque;
27use std::sync::Arc;
28use std::sync::Mutex;
29use thiserror::Error;
30use tokio::runtime::Runtime;
31use tokio::task;
32use url::Url;
33use web_transport_quinn::ClientBuilder;
34use web_transport_quinn::Session;
35
36// A simple function that returns a greeting
37pub fn hello_world() -> String {
38    "Hello from Rust!".to_string()
39}
40
41// A function that returns the version of the library
42pub fn get_version() -> String {
43    env!("CARGO_PKG_VERSION").to_string()
44}
45
46#[derive(Error, Debug)]
47pub enum WebTransportError {
48    #[error("Connection error: {0}")]
49    ConnectionError(String),
50    #[error("TLS error: {0}")]
51    TlsError(String),
52    #[error("Stream error")]
53    StreamError,
54    #[error("Invalid URL: {0}")]
55    InvalidUrl(String),
56    #[error("Runtime error: {0}")]
57    RuntimeError(String),
58    #[error("Certificate error: {0}")]
59    CertificateError(String),
60    #[error("Failed to create client: {0}")]
61    ClientError(String),
62    #[error("Queue error: {0}")]
63    QueueError(String),
64}
65
66pub struct DatagramQueue {
67    queue: Arc<Mutex<VecDeque<Vec<u8>>>>,
68}
69
70impl Default for DatagramQueue {
71    fn default() -> Self {
72        Self::new()
73    }
74}
75
76impl DatagramQueue {
77    pub fn new() -> Self {
78        Self {
79            queue: Arc::new(Mutex::new(VecDeque::new())),
80        }
81    }
82
83    pub fn add_datagram(&self, data: Vec<u8>) -> Result<(), WebTransportError> {
84        let mut queue = self
85            .queue
86            .lock()
87            .map_err(|e| WebTransportError::QueueError(format!("Failed to acquire lock: {e}")))?;
88
89        queue.push_back(data);
90        info!("Added datagram to queue, queue size: {}", queue.len());
91        Ok(())
92    }
93
94    pub fn receive_datagram(&self) -> Result<Vec<u8>, WebTransportError> {
95        let mut queue = self
96            .queue
97            .lock()
98            .map_err(|e| WebTransportError::QueueError(format!("Failed to acquire lock: {e}")))?;
99
100        match queue.pop_front() {
101            Some(data) => {
102                info!("Received datagram from queue, remaining: {}", queue.len());
103                Ok(data)
104            }
105            None => Err(WebTransportError::QueueError(
106                "No datagrams available".to_string(),
107            )),
108        }
109    }
110
111    pub fn has_datagrams(&self) -> Result<bool, WebTransportError> {
112        let queue = self
113            .queue
114            .lock()
115            .map_err(|e| WebTransportError::QueueError(format!("Failed to acquire lock: {e}")))?;
116
117        Ok(!queue.is_empty())
118    }
119}
120
121pub struct WebTransportClient {
122    runtime: Arc<Runtime>,
123    session: Arc<Mutex<Option<Session>>>,
124    datagram_listener: Arc<Mutex<Option<task::JoinHandle<()>>>>,
125}
126
127impl Default for WebTransportClient {
128    fn default() -> Self {
129        Self::new()
130    }
131}
132
133impl WebTransportClient {
134    pub fn new() -> Self {
135        // Initialize logger with debug level
136        let _ = env_logger::Builder::new()
137            .filter_level(LevelFilter::Debug)
138            .try_init();
139
140        if let Err(e) = rustls::crypto::ring::default_provider().install_default() {
141            error!("Failed to install default provider: {e:?}");
142        }
143        // Create a multi-threaded Tokio runtime with all features enabled
144        let runtime = tokio::runtime::Builder::new_multi_thread()
145            .enable_all()
146            .build()
147            .expect("Failed to create Tokio runtime");
148
149        Self {
150            runtime: Arc::new(runtime),
151            session: Arc::new(Mutex::new(None)),
152            datagram_listener: Arc::new(Mutex::new(None)),
153        }
154    }
155
156    pub fn connect(&self, url: String) -> Result<(), WebTransportError> {
157        info!("Connecting to WebTransport server at {url}");
158
159        // Parse the URL
160        let url = Url::parse(&url)
161            .map_err(|e| WebTransportError::InvalidUrl(format!("Invalid URL: {e}")))?;
162
163        // Clone Arc for move into async block
164        let session_mutex = Arc::clone(&self.session);
165
166        // Create a WebTransport session
167        self.runtime.block_on(async move {
168            // Load native certificates
169            let mut root_store = RootCertStore::empty();
170            let cert_count = match load_native_certs() {
171                Ok(certs) => {
172                    let count = certs.len();
173                    for cert in certs {
174                        root_store.add(cert).map_err(|e| {
175                            WebTransportError::CertificateError(format!(
176                                "Failed to add certificate: {e}"
177                            ))
178                        })?;
179                    }
180                    count
181                }
182                Err(e) => {
183                    error!("Failed to load native certificates: {e}");
184                    return Err(WebTransportError::CertificateError(format!(
185                        "Failed to load native certificates: {e}"
186                    )));
187                }
188            };
189            info!("Loaded {cert_count} native certificates");
190
191            // Create a rustls ClientConfig with the root store
192            let _client_config = ClientConfig::builder()
193                .with_root_certificates(root_store)
194                .with_no_client_auth();
195
196            // Create a WebTransport client with the custom rustls config for now disable certificate verification
197            let client = unsafe {
198                ClientBuilder::new()
199                    .with_no_certificate_verification()
200                    .map_err(|e| {
201                        WebTransportError::TlsError(format!("Failed to create client: {e}"))
202                    })?
203            };
204
205            // Connect to the server
206            let session = client.connect(&url).await.map_err(|e| {
207                WebTransportError::ConnectionError(format!("Failed to connect: {e}"))
208            })?;
209
210            // Store the session
211            let mut session_guard = session_mutex.lock().map_err(|e| {
212                WebTransportError::RuntimeError(format!("Failed to acquire lock: {e}"))
213            })?;
214            *session_guard = Some(session);
215
216            info!("Connected to WebTransport server");
217            Ok(())
218        })
219    }
220
221    pub fn send_datagram(&self, data: Vec<u8>) -> Result<(), WebTransportError> {
222        info!("Sending datagram of size {} bytes", data.len());
223
224        // Clone Arc for move into async block
225        let session_mutex = Arc::clone(&self.session);
226
227        self.runtime.block_on(async move {
228            let session_guard = session_mutex.lock().map_err(|e| {
229                WebTransportError::RuntimeError(format!("Failed to acquire lock: {e}"))
230            })?;
231
232            let session = session_guard
233                .as_ref()
234                .ok_or_else(|| WebTransportError::ConnectionError("Not connected".to_string()))?;
235
236            // Send the datagram
237            session
238                .send_datagram(Bytes::from(data))
239                .map_err(|_| WebTransportError::StreamError)?;
240
241            info!("Datagram sent successfully");
242            Ok(())
243        })
244    }
245
246    pub fn subscribe_to_datagrams(
247        &self,
248        queue: Arc<DatagramQueue>,
249    ) -> Result<(), WebTransportError> {
250        info!("Subscribing to inbound datagrams");
251
252        // Clone Arc for move into async block
253        let session_mutex = Arc::clone(&self.session);
254        let datagram_listener_mutex = Arc::clone(&self.datagram_listener);
255
256        // Stop any existing listener
257        self.stop_datagram_listener()?;
258
259        // Create a new listener
260        let handle = self.runtime.spawn(async move {
261            // Get a clone of the session outside the async block
262            let session = {
263                let session_guard = match session_mutex.lock() {
264                    Ok(guard) => guard,
265                    Err(e) => {
266                        error!("Failed to acquire lock: {e}");
267                        return;
268                    }
269                };
270
271                match session_guard.as_ref() {
272                    Some(session) => session.clone(),
273                    None => {
274                        error!("Not connected");
275                        return;
276                    }
277                }
278            }; // session_guard is dropped here
279
280            info!("Starting to listen for datagrams");
281
282            loop {
283                match session.read_datagram().await {
284                    Ok(datagram) => {
285                        let data = datagram.to_vec();
286                        info!("Received datagram of size {} bytes", data.len());
287                        debug!("Datagram content: {data:?}");
288
289                        // Add the datagram to the queue
290                        if let Err(e) = queue.add_datagram(data) {
291                            error!("Failed to add datagram to queue: {e}");
292                        }
293                    }
294                    Err(e) => {
295                        error!("Error receiving datagram: {e}");
296                        break;
297                    }
298                }
299            }
300        });
301
302        // Store the listener handle
303        let mut listener_guard = datagram_listener_mutex
304            .lock()
305            .map_err(|e| WebTransportError::RuntimeError(format!("Failed to acquire lock: {e}")))?;
306        *listener_guard = Some(handle);
307
308        info!("Successfully subscribed to inbound datagrams");
309        Ok(())
310    }
311
312    pub fn stop_datagram_listener(&self) -> Result<(), WebTransportError> {
313        let mut listener_guard = self
314            .datagram_listener
315            .lock()
316            .map_err(|e| WebTransportError::RuntimeError(format!("Failed to acquire lock: {e}")))?;
317
318        if let Some(handle) = listener_guard.take() {
319            handle.abort();
320            info!("Stopped datagram listener");
321        }
322
323        Ok(())
324    }
325}
326
327impl Drop for WebTransportClient {
328    fn drop(&mut self) {
329        info!("Shutting down WebTransportClient");
330        // Stop the datagram listener if it's running
331        if let Err(e) = self.stop_datagram_listener() {
332            error!("Failed to stop datagram listener: {e}");
333        }
334        // The runtime will be dropped automatically when the Arc's reference count reaches zero
335    }
336}