videocall_cli/consumers/
quic.rs1use std::sync::Arc;
2
3use anyhow::Error;
4use protobuf::Message;
5use quinn::Connection;
6use std::time::{SystemTime, UNIX_EPOCH};
7use tokio::{
8 sync::mpsc::{self, Sender},
9 time::{self, Duration},
10};
11use tracing::info;
12use videocall_types::protos::{
13 connection_packet::ConnectionPacket,
14 media_packet::{media_packet::MediaType, MediaPacket},
15 packet_wrapper::{packet_wrapper::PacketType, PacketWrapper},
16};
17
18use crate::cli_args::Stream;
19
20use super::camera_synk::CameraSynk;
21
22pub struct QUICClient {
23 options: Stream,
24 sender: Option<Sender<Vec<u8>>>,
25}
26
27impl QUICClient {
28 pub fn new(options: Stream) -> Self {
29 Self {
30 options,
31 sender: None,
32 }
33 }
34
35 async fn send_connection_packet(&self) -> anyhow::Result<()> {
36 let connection_packet = ConnectionPacket {
37 meeting_id: self.options.meeting_id.clone(),
38 ..Default::default()
39 };
40 let packet = PacketWrapper {
41 packet_type: PacketType::CONNECTION.into(),
42 email: self.options.user_id.clone(),
43 data: connection_packet.write_to_bytes()?,
44 ..Default::default()
45 };
46 self.queue_message(packet.write_to_bytes()?).await?;
47 Ok(())
48 }
49
50 pub async fn send(conn: Connection, data: Vec<u8>) -> anyhow::Result<()> {
51 let mut stream = conn.open_uni().await?;
52 stream.write_all(&data).await?;
53 stream.finish().await?;
54 Ok(())
55 }
56
57 async fn queue_message(&self, message: Vec<u8>) -> anyhow::Result<()> {
58 if let Some(sender) = &self.sender {
59 sender
60 .send(message)
61 .await
62 .map_err(|_| Error::msg("Failed to send message to queue"))
63 } else {
64 Err(Error::msg("No sender available"))
65 }
66 }
67
68 async fn start_heartbeat(&self, conn: Connection, options: &Stream) {
69 let interval = time::interval(Duration::from_secs(1));
70 let email = options.user_id.clone();
71 tokio::spawn(async move {
72 let mut interval = interval;
73 loop {
74 let now_ms = SystemTime::now()
75 .duration_since(UNIX_EPOCH)
76 .expect("Time went backwards")
77 .as_millis(); interval.tick().await;
79 let actual_heartbeat = MediaPacket {
80 media_type: MediaType::HEARTBEAT.into(),
81 email: email.clone(),
82 timestamp: now_ms as f64,
83 ..Default::default()
84 };
85
86 let packet = PacketWrapper {
87 email: email.clone(),
88 packet_type: PacketType::MEDIA.into(),
89 data: actual_heartbeat.write_to_bytes().unwrap(),
90 ..Default::default()
91 };
92 let data = packet.write_to_bytes().unwrap();
93 if let Err(e) = Self::send(conn.clone(), data).await {
94 tracing::error!("Failed to send heartbeat: {}", e);
95 }
96 }
97 });
98 }
99}
100
101async fn connect_to_server(options: &Stream) -> anyhow::Result<Connection> {
102 loop {
103 info!("Attempting to connect to {}", options.url);
104 let addrs = options
105 .url
106 .socket_addrs(|| Some(443))
107 .expect("couldn't resolve the address provided");
108 let remote = addrs.first().to_owned();
109 let remote = remote.unwrap();
110 let mut root_store = rustls::RootCertStore::empty();
111 root_store.add_trust_anchors(webpki_roots::TLS_SERVER_ROOTS.iter().map(|ta| {
112 rustls::OwnedTrustAnchor::from_subject_spki_name_constraints(
113 ta.subject,
114 ta.spki,
115 ta.name_constraints,
116 )
117 }));
118 let mut client_crypto = rustls::ClientConfig::builder()
119 .with_safe_defaults()
120 .with_root_certificates(root_store)
121 .with_no_client_auth();
122
123 let alpn = vec![b"hq-29".to_vec()];
124 client_crypto.alpn_protocols = alpn;
125 if options.keylog {
126 client_crypto.key_log = Arc::new(rustls::KeyLogFile::new());
127 }
128 let client_config = quinn::ClientConfig::new(Arc::new(client_crypto));
129 let host = options.url.host_str();
130
131 match quinn::Endpoint::client("[::]:0".parse().unwrap()) {
132 Ok(mut endpoint) => {
133 endpoint.set_default_client_config(client_config);
134 match endpoint.connect(*remote, host.unwrap()) {
135 Ok(conn) => {
136 let conn = conn.await?;
137 info!("Connected successfully");
138 return Ok(conn);
139 }
140 Err(e) => {
141 tracing::error!("Connection failed: {}. Retrying in 5 seconds...", e);
142 time::sleep(Duration::from_secs(5)).await;
143 }
144 }
145 }
146 Err(e) => {
147 tracing::error!("Endpoint creation failed: {}. Retrying in 5 seconds...", e);
148 time::sleep(Duration::from_secs(5)).await;
149 }
150 }
151 }
152}
153
154impl CameraSynk for QUICClient {
155 async fn connect(&mut self) -> anyhow::Result<()> {
156 let conn = connect_to_server(&self.options).await?;
157 let (tx, mut rx) = mpsc::channel::<Vec<u8>>(100);
158 self.sender = Some(tx);
159
160 let cloned_conn = conn.clone();
162 tokio::spawn(async move {
163 while let Some(message) = rx.recv().await {
164 let cloned_conn = cloned_conn.clone();
165 tokio::spawn(async move {
166 if let Err(e) = Self::send(cloned_conn.clone(), message).await {
167 tracing::error!("Failed to send message: {}", e);
168 }
169 });
170 }
171 });
172
173 self.start_heartbeat(conn.clone(), &self.options).await;
175
176 self.send_connection_packet().await?;
177 Ok(())
178 }
179
180 async fn send_packet(&self, data: Vec<u8>) -> anyhow::Result<()> {
181 self.queue_message(data).await
182 }
183}