Skip to main content

videocall_cli/consumers/
webtransport.rs

1/*
2 * Copyright 2025 Security Union LLC
3 *
4 * Licensed under either of
5 *
6 * * Apache License, Version 2.0
7 *   (http://www.apache.org/licenses/LICENSE-2.0)
8 * * MIT license
9 *   (http://opensource.org/licenses/MIT)
10 *
11 * at your option.
12 *
13 * Unless you explicitly state otherwise, any contribution intentionally
14 * submitted for inclusion in the work by you, as defined in the Apache-2.0
15 * license, shall be dual licensed as above, without any additional terms or
16 * conditions.
17 */
18
19use anyhow::Error;
20use protobuf::Message;
21use std::time::{SystemTime, UNIX_EPOCH};
22use tokio::{
23    sync::mpsc::{self, Sender},
24    time::{self, Duration},
25};
26use tracing::info;
27use videocall_types::protos::{
28    connection_packet::ConnectionPacket,
29    media_packet::{media_packet::MediaType, HeartbeatMetadata, MediaPacket},
30    packet_wrapper::{packet_wrapper::PacketType, PacketWrapper},
31};
32
33use crate::cli_args::Stream;
34
35use super::camera_synk::CameraSynk;
36
37pub struct WebTransportClient {
38    options: Stream,
39    sender: Option<Sender<Vec<u8>>>,
40}
41
42impl WebTransportClient {
43    pub fn new(options: Stream) -> Self {
44        Self {
45            options,
46            sender: None,
47        }
48    }
49
50    async fn send_connection_packet(&self) -> anyhow::Result<()> {
51        let connection_packet = ConnectionPacket {
52            meeting_id: self.options.meeting_id.clone(),
53            ..Default::default()
54        };
55        let packet = PacketWrapper {
56            packet_type: PacketType::CONNECTION.into(),
57            email: self.options.user_id.clone(),
58            data: connection_packet.write_to_bytes()?,
59            ..Default::default()
60        };
61        self.queue_message(packet.write_to_bytes()?).await?;
62        Ok(())
63    }
64
65    pub async fn send(session: &web_transport_quinn::Session, data: Vec<u8>) -> anyhow::Result<()> {
66        let mut stream = session.open_uni().await?;
67        stream.write_all(&data).await?;
68        stream.finish()?;
69        Ok(())
70    }
71
72    async fn queue_message(&self, message: Vec<u8>) -> anyhow::Result<()> {
73        if let Some(sender) = &self.sender {
74            sender
75                .send(message)
76                .await
77                .map_err(|_| Error::msg("Failed to send message to queue"))
78        } else {
79            Err(Error::msg("No sender available"))
80        }
81    }
82
83    async fn start_heartbeat(&self, session: web_transport_quinn::Session, options: &Stream) {
84        let interval = time::interval(Duration::from_secs(1));
85        let email = options.user_id.clone();
86        tokio::spawn(async move {
87            let mut interval = interval;
88            loop {
89                let now_ms = SystemTime::now()
90                    .duration_since(UNIX_EPOCH)
91                    .expect("Time went backwards")
92                    .as_millis(); // Get milliseconds since Unix epoch
93                interval.tick().await;
94                let actual_heartbeat = MediaPacket {
95                    media_type: MediaType::HEARTBEAT.into(),
96                    email: email.clone(),
97                    timestamp: now_ms as f64,
98                    heartbeat_metadata: Some(HeartbeatMetadata {
99                        video_enabled: true,
100                        ..Default::default()
101                    })
102                    .into(),
103                    ..Default::default()
104                };
105
106                let packet = PacketWrapper {
107                    email: email.clone(),
108                    packet_type: PacketType::MEDIA.into(),
109                    data: actual_heartbeat.write_to_bytes().unwrap(),
110                    ..Default::default()
111                };
112                let data = packet.write_to_bytes().unwrap();
113                if let Err(e) = Self::send(&session, data).await {
114                    tracing::error!("Failed to send heartbeat: {}", e);
115                }
116            }
117        });
118    }
119}
120
121async fn connect_to_server(options: &Stream) -> anyhow::Result<web_transport_quinn::Session> {
122    loop {
123        info!("Attempting to connect to {}", options.url);
124
125        // Construct WebTransport URL
126        let mut url = options.url.clone();
127        url.set_path(&format!(
128            "/lobby/{}/{}",
129            options.user_id, options.meeting_id
130        ));
131
132        // Create WebTransport client using 0.7.3 API (same pattern as bot)
133        let client = if options.insecure_skip_verify {
134            info!("WARNING: Skipping TLS certificate verification - connection is insecure!");
135            unsafe { web_transport_quinn::ClientBuilder::new().with_no_certificate_verification()? }
136        } else {
137            web_transport_quinn::ClientBuilder::new().with_system_roots()?
138        };
139
140        match client.connect(url).await {
141            Ok(session) => {
142                info!("WebTransport session established successfully");
143                return Ok(session);
144            }
145            Err(e) => {
146                tracing::error!(
147                    "WebTransport connection failed: {}. Retrying in 5 seconds...",
148                    e
149                );
150                time::sleep(Duration::from_secs(5)).await;
151            }
152        }
153    }
154}
155
156impl CameraSynk for WebTransportClient {
157    async fn connect(&mut self) -> anyhow::Result<()> {
158        let session = connect_to_server(&self.options).await?;
159        let (tx, mut rx) = mpsc::channel::<Vec<u8>>(100);
160        self.sender = Some(tx);
161
162        // Spawn a task to handle sending messages via the WebTransport session
163        let session_clone = session.clone();
164        tokio::spawn(async move {
165            while let Some(message) = rx.recv().await {
166                let session_clone_inner = session_clone.clone();
167                tokio::spawn(async move {
168                    if let Err(e) = WebTransportClient::send(&session_clone_inner, message).await {
169                        tracing::error!("Failed to send message: {}", e);
170                    }
171                });
172            }
173        });
174
175        // Spawn a separate task for heartbeat
176        self.start_heartbeat(session.clone(), &self.options).await;
177
178        self.send_connection_packet().await?;
179        Ok(())
180    }
181
182    async fn send_packet(&self, data: Vec<u8>) -> anyhow::Result<()> {
183        self.queue_message(data).await
184    }
185}