videocall_cli/consumers/
quic.rs

1use std::sync::Arc;
2
3use anyhow::Error;
4use protobuf::Message;
5use quinn::Connection;
6use std::time::{SystemTime, UNIX_EPOCH};
7use tokio::{
8    sync::mpsc::{self, Sender},
9    time::{self, Duration},
10};
11use tracing::info;
12use videocall_types::protos::{
13    connection_packet::ConnectionPacket,
14    media_packet::{media_packet::MediaType, MediaPacket},
15    packet_wrapper::{packet_wrapper::PacketType, PacketWrapper},
16};
17
18use crate::cli_args::Stream;
19
20use super::camera_synk::CameraSynk;
21
22pub struct QUICClient {
23    options: Stream,
24    sender: Option<Sender<Vec<u8>>>,
25}
26
27impl QUICClient {
28    pub fn new(options: Stream) -> Self {
29        Self {
30            options,
31            sender: None,
32        }
33    }
34
35    async fn send_connection_packet(&self) -> anyhow::Result<()> {
36        let connection_packet = ConnectionPacket {
37            meeting_id: self.options.meeting_id.clone(),
38            ..Default::default()
39        };
40        let packet = PacketWrapper {
41            packet_type: PacketType::CONNECTION.into(),
42            email: self.options.user_id.clone(),
43            data: connection_packet.write_to_bytes()?,
44            ..Default::default()
45        };
46        self.queue_message(packet.write_to_bytes()?).await?;
47        Ok(())
48    }
49
50    pub async fn send(conn: Connection, data: Vec<u8>) -> anyhow::Result<()> {
51        let mut stream = conn.open_uni().await?;
52        stream.write_all(&data).await?;
53        stream.finish().await?;
54        Ok(())
55    }
56
57    async fn queue_message(&self, message: Vec<u8>) -> anyhow::Result<()> {
58        if let Some(sender) = &self.sender {
59            sender
60                .send(message)
61                .await
62                .map_err(|_| Error::msg("Failed to send message to queue"))
63        } else {
64            Err(Error::msg("No sender available"))
65        }
66    }
67
68    async fn start_heartbeat(&self, conn: Connection, options: &Stream) {
69        let interval = time::interval(Duration::from_secs(1));
70        let email = options.user_id.clone();
71        tokio::spawn(async move {
72            let mut interval = interval;
73            loop {
74                let now_ms = SystemTime::now()
75                    .duration_since(UNIX_EPOCH)
76                    .expect("Time went backwards")
77                    .as_millis(); // Get milliseconds since Unix epoch
78                interval.tick().await;
79                let actual_heartbeat = MediaPacket {
80                    media_type: MediaType::HEARTBEAT.into(),
81                    email: email.clone(),
82                    timestamp: now_ms as f64,
83                    ..Default::default()
84                };
85
86                let packet = PacketWrapper {
87                    email: email.clone(),
88                    packet_type: PacketType::MEDIA.into(),
89                    data: actual_heartbeat.write_to_bytes().unwrap(),
90                    ..Default::default()
91                };
92                let data = packet.write_to_bytes().unwrap();
93                if let Err(e) = Self::send(conn.clone(), data).await {
94                    tracing::error!("Failed to send heartbeat: {}", e);
95                }
96            }
97        });
98    }
99}
100
101async fn connect_to_server(options: &Stream) -> anyhow::Result<Connection> {
102    loop {
103        info!("Attempting to connect to {}", options.url);
104        let addrs = options
105            .url
106            .socket_addrs(|| Some(443))
107            .expect("couldn't resolve the address provided");
108        let remote = addrs.first().to_owned();
109        let remote = remote.unwrap();
110        let mut root_store = rustls::RootCertStore::empty();
111        root_store.add_trust_anchors(webpki_roots::TLS_SERVER_ROOTS.iter().map(|ta| {
112            rustls::OwnedTrustAnchor::from_subject_spki_name_constraints(
113                ta.subject,
114                ta.spki,
115                ta.name_constraints,
116            )
117        }));
118        let mut client_crypto = rustls::ClientConfig::builder()
119            .with_safe_defaults()
120            .with_root_certificates(root_store)
121            .with_no_client_auth();
122
123        let alpn = vec![b"hq-29".to_vec()];
124        client_crypto.alpn_protocols = alpn;
125        if options.keylog {
126            client_crypto.key_log = Arc::new(rustls::KeyLogFile::new());
127        }
128        let client_config = quinn::ClientConfig::new(Arc::new(client_crypto));
129        let host = options.url.host_str();
130
131        match quinn::Endpoint::client("[::]:0".parse().unwrap()) {
132            Ok(mut endpoint) => {
133                endpoint.set_default_client_config(client_config);
134                match endpoint.connect(*remote, host.unwrap()) {
135                    Ok(conn) => {
136                        let conn = conn.await?;
137                        info!("Connected successfully");
138                        return Ok(conn);
139                    }
140                    Err(e) => {
141                        tracing::error!("Connection failed: {}. Retrying in 5 seconds...", e);
142                        time::sleep(Duration::from_secs(5)).await;
143                    }
144                }
145            }
146            Err(e) => {
147                tracing::error!("Endpoint creation failed: {}. Retrying in 5 seconds...", e);
148                time::sleep(Duration::from_secs(5)).await;
149            }
150        }
151    }
152}
153
154impl CameraSynk for QUICClient {
155    async fn connect(&mut self) -> anyhow::Result<()> {
156        let conn = connect_to_server(&self.options).await?;
157        let (tx, mut rx) = mpsc::channel::<Vec<u8>>(100);
158        self.sender = Some(tx);
159
160        // Spawn a task to handle sending messages via the connection
161        let cloned_conn = conn.clone();
162        tokio::spawn(async move {
163            while let Some(message) = rx.recv().await {
164                let cloned_conn = cloned_conn.clone();
165                tokio::spawn(async move {
166                    if let Err(e) = Self::send(cloned_conn.clone(), message).await {
167                        tracing::error!("Failed to send message: {}", e);
168                    }
169                });
170            }
171        });
172
173        // Spawn a separate task for heartbeat
174        self.start_heartbeat(conn.clone(), &self.options).await;
175
176        self.send_connection_packet().await?;
177        Ok(())
178    }
179
180    async fn send_packet(&self, data: Vec<u8>) -> anyhow::Result<()> {
181        self.queue_message(data).await
182    }
183}