videocall_cli/consumers/
quic.rs

1/*
2 * Copyright 2025 Security Union LLC
3 *
4 * Licensed under either of
5 *
6 * * Apache License, Version 2.0
7 *   (http://www.apache.org/licenses/LICENSE-2.0)
8 * * MIT license
9 *   (http://opensource.org/licenses/MIT)
10 *
11 * at your option.
12 *
13 * Unless you explicitly state otherwise, any contribution intentionally
14 * submitted for inclusion in the work by you, as defined in the Apache-2.0
15 * license, shall be dual licensed as above, without any additional terms or
16 * conditions.
17 */
18
19use std::sync::Arc;
20
21use anyhow::Error;
22use protobuf::Message;
23use quinn::Connection;
24use std::time::{SystemTime, UNIX_EPOCH};
25use tokio::{
26    sync::mpsc::{self, Sender},
27    time::{self, Duration},
28};
29use tracing::info;
30use videocall_types::protos::{
31    connection_packet::ConnectionPacket,
32    media_packet::{media_packet::MediaType, HeartbeatMetadata, MediaPacket},
33    packet_wrapper::{packet_wrapper::PacketType, PacketWrapper},
34};
35
36use crate::cli_args::Stream;
37
38use super::camera_synk::CameraSynk;
39
40pub struct QUICClient {
41    options: Stream,
42    sender: Option<Sender<Vec<u8>>>,
43}
44
45impl QUICClient {
46    pub fn new(options: Stream) -> Self {
47        Self {
48            options,
49            sender: None,
50        }
51    }
52
53    async fn send_connection_packet(&self) -> anyhow::Result<()> {
54        let connection_packet = ConnectionPacket {
55            meeting_id: self.options.meeting_id.clone(),
56            ..Default::default()
57        };
58        let packet = PacketWrapper {
59            packet_type: PacketType::CONNECTION.into(),
60            email: self.options.user_id.clone(),
61            data: connection_packet.write_to_bytes()?,
62            ..Default::default()
63        };
64        self.queue_message(packet.write_to_bytes()?).await?;
65        Ok(())
66    }
67
68    pub async fn send(conn: Connection, data: Vec<u8>) -> anyhow::Result<()> {
69        let mut stream = conn.open_uni().await?;
70        stream.write_all(&data).await?;
71        stream.finish().await?;
72        Ok(())
73    }
74
75    async fn queue_message(&self, message: Vec<u8>) -> anyhow::Result<()> {
76        if let Some(sender) = &self.sender {
77            sender
78                .send(message)
79                .await
80                .map_err(|_| Error::msg("Failed to send message to queue"))
81        } else {
82            Err(Error::msg("No sender available"))
83        }
84    }
85
86    async fn start_heartbeat(&self, conn: Connection, options: &Stream) {
87        let interval = time::interval(Duration::from_secs(1));
88        let email = options.user_id.clone();
89        tokio::spawn(async move {
90            let mut interval = interval;
91            loop {
92                let now_ms = SystemTime::now()
93                    .duration_since(UNIX_EPOCH)
94                    .expect("Time went backwards")
95                    .as_millis(); // Get milliseconds since Unix epoch
96                interval.tick().await;
97                let actual_heartbeat = MediaPacket {
98                    media_type: MediaType::HEARTBEAT.into(),
99                    email: email.clone(),
100                    timestamp: now_ms as f64,
101                    heartbeat_metadata: Some(HeartbeatMetadata {
102                        video_enabled: true,
103                        ..Default::default()
104                    })
105                    .into(),
106                    ..Default::default()
107                };
108
109                let packet = PacketWrapper {
110                    email: email.clone(),
111                    packet_type: PacketType::MEDIA.into(),
112                    data: actual_heartbeat.write_to_bytes().unwrap(),
113                    ..Default::default()
114                };
115                let data = packet.write_to_bytes().unwrap();
116                if let Err(e) = Self::send(conn.clone(), data).await {
117                    tracing::error!("Failed to send heartbeat: {}", e);
118                }
119            }
120        });
121    }
122}
123
124async fn connect_to_server(options: &Stream) -> anyhow::Result<Connection> {
125    loop {
126        info!("Attempting to connect to {}", options.url);
127        let addrs = options
128            .url
129            .socket_addrs(|| Some(443))
130            .expect("couldn't resolve the address provided");
131        let remote = addrs.first().to_owned();
132        let remote = remote.unwrap();
133        let mut root_store = rustls::RootCertStore::empty();
134        root_store.add_trust_anchors(webpki_roots::TLS_SERVER_ROOTS.iter().map(|ta| {
135            rustls::OwnedTrustAnchor::from_subject_spki_name_constraints(
136                ta.subject,
137                ta.spki,
138                ta.name_constraints,
139            )
140        }));
141        let mut client_crypto = rustls::ClientConfig::builder()
142            .with_safe_defaults()
143            .with_root_certificates(root_store)
144            .with_no_client_auth();
145
146        let alpn = vec![b"hq-29".to_vec()];
147        client_crypto.alpn_protocols = alpn;
148        if options.keylog {
149            client_crypto.key_log = Arc::new(rustls::KeyLogFile::new());
150        }
151        let client_config = quinn::ClientConfig::new(Arc::new(client_crypto));
152        let host = options.url.host_str();
153
154        match quinn::Endpoint::client("[::]:0".parse().unwrap()) {
155            Ok(mut endpoint) => {
156                endpoint.set_default_client_config(client_config);
157                match endpoint.connect(*remote, host.unwrap()) {
158                    Ok(conn) => {
159                        let conn = conn.await?;
160                        info!("Connected successfully");
161                        return Ok(conn);
162                    }
163                    Err(e) => {
164                        tracing::error!("Connection failed: {}. Retrying in 5 seconds...", e);
165                        time::sleep(Duration::from_secs(5)).await;
166                    }
167                }
168            }
169            Err(e) => {
170                tracing::error!("Endpoint creation failed: {}. Retrying in 5 seconds...", e);
171                time::sleep(Duration::from_secs(5)).await;
172            }
173        }
174    }
175}
176
177impl CameraSynk for QUICClient {
178    async fn connect(&mut self) -> anyhow::Result<()> {
179        let conn = connect_to_server(&self.options).await?;
180        let (tx, mut rx) = mpsc::channel::<Vec<u8>>(100);
181        self.sender = Some(tx);
182
183        // Spawn a task to handle sending messages via the connection
184        let cloned_conn = conn.clone();
185        tokio::spawn(async move {
186            while let Some(message) = rx.recv().await {
187                let cloned_conn = cloned_conn.clone();
188                tokio::spawn(async move {
189                    if let Err(e) = Self::send(cloned_conn.clone(), message).await {
190                        tracing::error!("Failed to send message: {}", e);
191                    }
192                });
193            }
194        });
195
196        // Spawn a separate task for heartbeat
197        self.start_heartbeat(conn.clone(), &self.options).await;
198
199        self.send_connection_packet().await?;
200        Ok(())
201    }
202
203    async fn send_packet(&self, data: Vec<u8>) -> anyhow::Result<()> {
204        self.queue_message(data).await
205    }
206}