1uniffi::include_scaffolding!("videocall");
21
22use bytes::Bytes;
23use log::{debug, error, info, LevelFilter};
24use rustls::{ClientConfig, RootCertStore};
25use rustls_native_certs::load_native_certs;
26use std::collections::VecDeque;
27use std::sync::Arc;
28use std::sync::Mutex;
29use thiserror::Error;
30use tokio::runtime::Runtime;
31use tokio::task;
32use url::Url;
33use web_transport_quinn::ClientBuilder;
34use web_transport_quinn::Session;
35
36pub fn hello_world() -> String {
38 "Hello from Rust!".to_string()
39}
40
41pub fn get_version() -> String {
43 env!("CARGO_PKG_VERSION").to_string()
44}
45
46#[derive(Error, Debug)]
47pub enum WebTransportError {
48 #[error("Connection error: {0}")]
49 ConnectionError(String),
50 #[error("TLS error: {0}")]
51 TlsError(String),
52 #[error("Stream error")]
53 StreamError,
54 #[error("Invalid URL: {0}")]
55 InvalidUrl(String),
56 #[error("Runtime error: {0}")]
57 RuntimeError(String),
58 #[error("Certificate error: {0}")]
59 CertificateError(String),
60 #[error("Failed to create client: {0}")]
61 ClientError(String),
62 #[error("Queue error: {0}")]
63 QueueError(String),
64}
65
66pub struct DatagramQueue {
67 queue: Arc<Mutex<VecDeque<Vec<u8>>>>,
68}
69
70impl Default for DatagramQueue {
71 fn default() -> Self {
72 Self::new()
73 }
74}
75
76impl DatagramQueue {
77 pub fn new() -> Self {
78 Self {
79 queue: Arc::new(Mutex::new(VecDeque::new())),
80 }
81 }
82
83 pub fn add_datagram(&self, data: Vec<u8>) -> Result<(), WebTransportError> {
84 let mut queue = self
85 .queue
86 .lock()
87 .map_err(|e| WebTransportError::QueueError(format!("Failed to acquire lock: {e}")))?;
88
89 queue.push_back(data);
90 info!("Added datagram to queue, queue size: {}", queue.len());
91 Ok(())
92 }
93
94 pub fn receive_datagram(&self) -> Result<Vec<u8>, WebTransportError> {
95 let mut queue = self
96 .queue
97 .lock()
98 .map_err(|e| WebTransportError::QueueError(format!("Failed to acquire lock: {e}")))?;
99
100 match queue.pop_front() {
101 Some(data) => {
102 info!("Received datagram from queue, remaining: {}", queue.len());
103 Ok(data)
104 }
105 None => Err(WebTransportError::QueueError(
106 "No datagrams available".to_string(),
107 )),
108 }
109 }
110
111 pub fn has_datagrams(&self) -> Result<bool, WebTransportError> {
112 let queue = self
113 .queue
114 .lock()
115 .map_err(|e| WebTransportError::QueueError(format!("Failed to acquire lock: {e}")))?;
116
117 Ok(!queue.is_empty())
118 }
119}
120
121pub struct WebTransportClient {
122 runtime: Arc<Runtime>,
123 session: Arc<Mutex<Option<Session>>>,
124 datagram_listener: Arc<Mutex<Option<task::JoinHandle<()>>>>,
125}
126
127impl Default for WebTransportClient {
128 fn default() -> Self {
129 Self::new()
130 }
131}
132
133impl WebTransportClient {
134 pub fn new() -> Self {
135 let _ = env_logger::Builder::new()
137 .filter_level(LevelFilter::Debug)
138 .try_init();
139
140 if let Err(e) = rustls::crypto::ring::default_provider().install_default() {
141 error!("Failed to install default provider: {e:?}");
142 }
143 let runtime = tokio::runtime::Builder::new_multi_thread()
145 .enable_all()
146 .build()
147 .expect("Failed to create Tokio runtime");
148
149 Self {
150 runtime: Arc::new(runtime),
151 session: Arc::new(Mutex::new(None)),
152 datagram_listener: Arc::new(Mutex::new(None)),
153 }
154 }
155
156 pub fn connect(&self, url: String) -> Result<(), WebTransportError> {
157 info!("Connecting to WebTransport server at {url}");
158
159 let url = Url::parse(&url)
161 .map_err(|e| WebTransportError::InvalidUrl(format!("Invalid URL: {e}")))?;
162
163 let session_mutex = Arc::clone(&self.session);
165
166 self.runtime.block_on(async move {
168 let mut root_store = RootCertStore::empty();
170 let cert_count = match load_native_certs() {
171 Ok(certs) => {
172 let count = certs.len();
173 for cert in certs {
174 root_store.add(cert).map_err(|e| {
175 WebTransportError::CertificateError(format!(
176 "Failed to add certificate: {e}"
177 ))
178 })?;
179 }
180 count
181 }
182 Err(e) => {
183 error!("Failed to load native certificates: {e}");
184 return Err(WebTransportError::CertificateError(format!(
185 "Failed to load native certificates: {e}"
186 )));
187 }
188 };
189 info!("Loaded {cert_count} native certificates");
190
191 let _client_config = ClientConfig::builder()
193 .with_root_certificates(root_store)
194 .with_no_client_auth();
195
196 let client = unsafe {
198 ClientBuilder::new()
199 .with_no_certificate_verification()
200 .map_err(|e| {
201 WebTransportError::TlsError(format!("Failed to create client: {e}"))
202 })?
203 };
204
205 let session = client.connect(&url).await.map_err(|e| {
207 WebTransportError::ConnectionError(format!("Failed to connect: {e}"))
208 })?;
209
210 let mut session_guard = session_mutex.lock().map_err(|e| {
212 WebTransportError::RuntimeError(format!("Failed to acquire lock: {e}"))
213 })?;
214 *session_guard = Some(session);
215
216 info!("Connected to WebTransport server");
217 Ok(())
218 })
219 }
220
221 pub fn send_datagram(&self, data: Vec<u8>) -> Result<(), WebTransportError> {
222 info!("Sending datagram of size {} bytes", data.len());
223
224 let session_mutex = Arc::clone(&self.session);
226
227 self.runtime.block_on(async move {
228 let session_guard = session_mutex.lock().map_err(|e| {
229 WebTransportError::RuntimeError(format!("Failed to acquire lock: {e}"))
230 })?;
231
232 let session = session_guard
233 .as_ref()
234 .ok_or_else(|| WebTransportError::ConnectionError("Not connected".to_string()))?;
235
236 session
238 .send_datagram(Bytes::from(data))
239 .map_err(|_| WebTransportError::StreamError)?;
240
241 info!("Datagram sent successfully");
242 Ok(())
243 })
244 }
245
246 pub fn subscribe_to_datagrams(
247 &self,
248 queue: Arc<DatagramQueue>,
249 ) -> Result<(), WebTransportError> {
250 info!("Subscribing to inbound datagrams");
251
252 let session_mutex = Arc::clone(&self.session);
254 let datagram_listener_mutex = Arc::clone(&self.datagram_listener);
255
256 self.stop_datagram_listener()?;
258
259 let handle = self.runtime.spawn(async move {
261 let session = {
263 let session_guard = match session_mutex.lock() {
264 Ok(guard) => guard,
265 Err(e) => {
266 error!("Failed to acquire lock: {e}");
267 return;
268 }
269 };
270
271 match session_guard.as_ref() {
272 Some(session) => session.clone(),
273 None => {
274 error!("Not connected");
275 return;
276 }
277 }
278 }; info!("Starting to listen for datagrams");
281
282 loop {
283 match session.read_datagram().await {
284 Ok(datagram) => {
285 let data = datagram.to_vec();
286 info!("Received datagram of size {} bytes", data.len());
287 debug!("Datagram content: {data:?}");
288
289 if let Err(e) = queue.add_datagram(data) {
291 error!("Failed to add datagram to queue: {e}");
292 }
293 }
294 Err(e) => {
295 error!("Error receiving datagram: {e}");
296 break;
297 }
298 }
299 }
300 });
301
302 let mut listener_guard = datagram_listener_mutex
304 .lock()
305 .map_err(|e| WebTransportError::RuntimeError(format!("Failed to acquire lock: {e}")))?;
306 *listener_guard = Some(handle);
307
308 info!("Successfully subscribed to inbound datagrams");
309 Ok(())
310 }
311
312 pub fn stop_datagram_listener(&self) -> Result<(), WebTransportError> {
313 let mut listener_guard = self
314 .datagram_listener
315 .lock()
316 .map_err(|e| WebTransportError::RuntimeError(format!("Failed to acquire lock: {e}")))?;
317
318 if let Some(handle) = listener_guard.take() {
319 handle.abort();
320 info!("Stopped datagram listener");
321 }
322
323 Ok(())
324 }
325}
326
327impl Drop for WebTransportClient {
328 fn drop(&mut self) {
329 info!("Shutting down WebTransportClient");
330 if let Err(e) = self.stop_datagram_listener() {
332 error!("Failed to stop datagram listener: {e}");
333 }
334 }
336}