videocall_cli/consumers/
quic.rs

1/*
2 * Copyright 2025 Security Union LLC
3 *
4 * Licensed under either of
5 *
6 * * Apache License, Version 2.0
7 *   (http://www.apache.org/licenses/LICENSE-2.0)
8 * * MIT license
9 *   (http://opensource.org/licenses/MIT)
10 *
11 * at your option.
12 *
13 * Unless you explicitly state otherwise, any contribution intentionally
14 * submitted for inclusion in the work by you, as defined in the Apache-2.0
15 * license, shall be dual licensed as above, without any additional terms or
16 * conditions.
17 */
18
19use std::sync::Arc;
20
21use anyhow::Error;
22use protobuf::Message;
23use quinn::Connection;
24use std::time::{SystemTime, UNIX_EPOCH};
25use tokio::{
26    sync::mpsc::{self, Sender},
27    time::{self, Duration},
28};
29use tracing::info;
30use videocall_types::protos::{
31    connection_packet::ConnectionPacket,
32    media_packet::{media_packet::MediaType, HeartbeatMetadata, MediaPacket},
33    packet_wrapper::{packet_wrapper::PacketType, PacketWrapper},
34};
35
36use crate::cli_args::Stream;
37use crate::fake_cert_verifier::NoVerification;
38
39use super::camera_synk::CameraSynk;
40
41pub struct QUICClient {
42    options: Stream,
43    sender: Option<Sender<Vec<u8>>>,
44}
45
46impl QUICClient {
47    pub fn new(options: Stream) -> Self {
48        Self {
49            options,
50            sender: None,
51        }
52    }
53
54    async fn send_connection_packet(&self) -> anyhow::Result<()> {
55        let connection_packet = ConnectionPacket {
56            meeting_id: self.options.meeting_id.clone(),
57            ..Default::default()
58        };
59        let packet = PacketWrapper {
60            packet_type: PacketType::CONNECTION.into(),
61            email: self.options.user_id.clone(),
62            data: connection_packet.write_to_bytes()?,
63            ..Default::default()
64        };
65        self.queue_message(packet.write_to_bytes()?).await?;
66        Ok(())
67    }
68
69    pub async fn send(conn: Connection, data: Vec<u8>) -> anyhow::Result<()> {
70        let mut stream = conn.open_uni().await?;
71        stream.write_all(&data).await?;
72        stream.finish().await?;
73        Ok(())
74    }
75
76    async fn queue_message(&self, message: Vec<u8>) -> anyhow::Result<()> {
77        if let Some(sender) = &self.sender {
78            sender
79                .send(message)
80                .await
81                .map_err(|_| Error::msg("Failed to send message to queue"))
82        } else {
83            Err(Error::msg("No sender available"))
84        }
85    }
86
87    async fn start_heartbeat(&self, conn: Connection, options: &Stream) {
88        let interval = time::interval(Duration::from_secs(1));
89        let email = options.user_id.clone();
90        tokio::spawn(async move {
91            let mut interval = interval;
92            loop {
93                let now_ms = SystemTime::now()
94                    .duration_since(UNIX_EPOCH)
95                    .expect("Time went backwards")
96                    .as_millis(); // Get milliseconds since Unix epoch
97                interval.tick().await;
98                let actual_heartbeat = MediaPacket {
99                    media_type: MediaType::HEARTBEAT.into(),
100                    email: email.clone(),
101                    timestamp: now_ms as f64,
102                    heartbeat_metadata: Some(HeartbeatMetadata {
103                        video_enabled: true,
104                        ..Default::default()
105                    })
106                    .into(),
107                    ..Default::default()
108                };
109
110                let packet = PacketWrapper {
111                    email: email.clone(),
112                    packet_type: PacketType::MEDIA.into(),
113                    data: actual_heartbeat.write_to_bytes().unwrap(),
114                    ..Default::default()
115                };
116                let data = packet.write_to_bytes().unwrap();
117                if let Err(e) = Self::send(conn.clone(), data).await {
118                    tracing::error!("Failed to send heartbeat: {}", e);
119                }
120            }
121        });
122    }
123}
124
125async fn connect_to_server(options: &Stream) -> anyhow::Result<Connection> {
126    loop {
127        info!("Attempting to connect to {}", options.url);
128        let addrs = options
129            .url
130            .socket_addrs(|| Some(443))
131            .expect("couldn't resolve the address provided");
132        let remote = addrs.first().to_owned();
133        let remote = remote.unwrap();
134        let mut client_crypto = if options.insecure_skip_verify {
135            info!("WARNING: Skipping TLS certificate verification - connection is insecure!");
136            rustls::ClientConfig::builder()
137                .with_safe_defaults()
138                .with_custom_certificate_verifier(Arc::new(NoVerification))
139                .with_no_client_auth()
140        } else {
141            let mut root_store = rustls::RootCertStore::empty();
142            root_store.add_trust_anchors(webpki_roots::TLS_SERVER_ROOTS.iter().map(|ta| {
143                rustls::OwnedTrustAnchor::from_subject_spki_name_constraints(
144                    ta.subject,
145                    ta.spki,
146                    ta.name_constraints,
147                )
148            }));
149            rustls::ClientConfig::builder()
150                .with_safe_defaults()
151                .with_root_certificates(root_store)
152                .with_no_client_auth()
153        };
154
155        let alpn = vec![b"hq-29".to_vec()];
156        client_crypto.alpn_protocols = alpn;
157        if options.keylog {
158            client_crypto.key_log = Arc::new(rustls::KeyLogFile::new());
159        }
160        let client_config = quinn::ClientConfig::new(Arc::new(client_crypto));
161        let host = options.url.host_str();
162
163        match quinn::Endpoint::client("[::]:0".parse().unwrap()) {
164            Ok(mut endpoint) => {
165                endpoint.set_default_client_config(client_config);
166                match endpoint.connect(*remote, host.unwrap()) {
167                    Ok(conn) => {
168                        let conn = conn.await?;
169                        info!("Connected successfully");
170                        return Ok(conn);
171                    }
172                    Err(e) => {
173                        tracing::error!("Connection failed: {}. Retrying in 5 seconds...", e);
174                        time::sleep(Duration::from_secs(5)).await;
175                    }
176                }
177            }
178            Err(e) => {
179                tracing::error!("Endpoint creation failed: {}. Retrying in 5 seconds...", e);
180                time::sleep(Duration::from_secs(5)).await;
181            }
182        }
183    }
184}
185
186impl CameraSynk for QUICClient {
187    async fn connect(&mut self) -> anyhow::Result<()> {
188        let conn = connect_to_server(&self.options).await?;
189        let (tx, mut rx) = mpsc::channel::<Vec<u8>>(100);
190        self.sender = Some(tx);
191
192        // Spawn a task to handle sending messages via the connection
193        let cloned_conn = conn.clone();
194        tokio::spawn(async move {
195            while let Some(message) = rx.recv().await {
196                let cloned_conn = cloned_conn.clone();
197                tokio::spawn(async move {
198                    if let Err(e) = Self::send(cloned_conn.clone(), message).await {
199                        tracing::error!("Failed to send message: {}", e);
200                    }
201                });
202            }
203        });
204
205        // Spawn a separate task for heartbeat
206        self.start_heartbeat(conn.clone(), &self.options).await;
207
208        self.send_connection_packet().await?;
209        Ok(())
210    }
211
212    async fn send_packet(&self, data: Vec<u8>) -> anyhow::Result<()> {
213        self.queue_message(data).await
214    }
215}