videocall_cli/consumers/
quic.rs1use std::sync::Arc;
20
21use anyhow::Error;
22use protobuf::Message;
23use quinn::Connection;
24use std::time::{SystemTime, UNIX_EPOCH};
25use tokio::{
26 sync::mpsc::{self, Sender},
27 time::{self, Duration},
28};
29use tracing::info;
30use videocall_types::protos::{
31 connection_packet::ConnectionPacket,
32 media_packet::{media_packet::MediaType, HeartbeatMetadata, MediaPacket},
33 packet_wrapper::{packet_wrapper::PacketType, PacketWrapper},
34};
35
36use crate::cli_args::Stream;
37use crate::fake_cert_verifier::NoVerification;
38
39use super::camera_synk::CameraSynk;
40
41pub struct QUICClient {
42 options: Stream,
43 sender: Option<Sender<Vec<u8>>>,
44}
45
46impl QUICClient {
47 pub fn new(options: Stream) -> Self {
48 Self {
49 options,
50 sender: None,
51 }
52 }
53
54 async fn send_connection_packet(&self) -> anyhow::Result<()> {
55 let connection_packet = ConnectionPacket {
56 meeting_id: self.options.meeting_id.clone(),
57 ..Default::default()
58 };
59 let packet = PacketWrapper {
60 packet_type: PacketType::CONNECTION.into(),
61 email: self.options.user_id.clone(),
62 data: connection_packet.write_to_bytes()?,
63 ..Default::default()
64 };
65 self.queue_message(packet.write_to_bytes()?).await?;
66 Ok(())
67 }
68
69 pub async fn send(conn: Connection, data: Vec<u8>) -> anyhow::Result<()> {
70 let mut stream = conn.open_uni().await?;
71 stream.write_all(&data).await?;
72 stream.finish().await?;
73 Ok(())
74 }
75
76 async fn queue_message(&self, message: Vec<u8>) -> anyhow::Result<()> {
77 if let Some(sender) = &self.sender {
78 sender
79 .send(message)
80 .await
81 .map_err(|_| Error::msg("Failed to send message to queue"))
82 } else {
83 Err(Error::msg("No sender available"))
84 }
85 }
86
87 async fn start_heartbeat(&self, conn: Connection, options: &Stream) {
88 let interval = time::interval(Duration::from_secs(1));
89 let email = options.user_id.clone();
90 tokio::spawn(async move {
91 let mut interval = interval;
92 loop {
93 let now_ms = SystemTime::now()
94 .duration_since(UNIX_EPOCH)
95 .expect("Time went backwards")
96 .as_millis(); interval.tick().await;
98 let actual_heartbeat = MediaPacket {
99 media_type: MediaType::HEARTBEAT.into(),
100 email: email.clone(),
101 timestamp: now_ms as f64,
102 heartbeat_metadata: Some(HeartbeatMetadata {
103 video_enabled: true,
104 ..Default::default()
105 })
106 .into(),
107 ..Default::default()
108 };
109
110 let packet = PacketWrapper {
111 email: email.clone(),
112 packet_type: PacketType::MEDIA.into(),
113 data: actual_heartbeat.write_to_bytes().unwrap(),
114 ..Default::default()
115 };
116 let data = packet.write_to_bytes().unwrap();
117 if let Err(e) = Self::send(conn.clone(), data).await {
118 tracing::error!("Failed to send heartbeat: {}", e);
119 }
120 }
121 });
122 }
123}
124
125async fn connect_to_server(options: &Stream) -> anyhow::Result<Connection> {
126 loop {
127 info!("Attempting to connect to {}", options.url);
128 let addrs = options
129 .url
130 .socket_addrs(|| Some(443))
131 .expect("couldn't resolve the address provided");
132 let remote = addrs.first().to_owned();
133 let remote = remote.unwrap();
134 let mut client_crypto = if options.insecure_skip_verify {
135 info!("WARNING: Skipping TLS certificate verification - connection is insecure!");
136 rustls::ClientConfig::builder()
137 .with_safe_defaults()
138 .with_custom_certificate_verifier(Arc::new(NoVerification))
139 .with_no_client_auth()
140 } else {
141 let mut root_store = rustls::RootCertStore::empty();
142 root_store.add_trust_anchors(webpki_roots::TLS_SERVER_ROOTS.iter().map(|ta| {
143 rustls::OwnedTrustAnchor::from_subject_spki_name_constraints(
144 ta.subject,
145 ta.spki,
146 ta.name_constraints,
147 )
148 }));
149 rustls::ClientConfig::builder()
150 .with_safe_defaults()
151 .with_root_certificates(root_store)
152 .with_no_client_auth()
153 };
154
155 let alpn = vec![b"hq-29".to_vec()];
156 client_crypto.alpn_protocols = alpn;
157 if options.keylog {
158 client_crypto.key_log = Arc::new(rustls::KeyLogFile::new());
159 }
160 let client_config = quinn::ClientConfig::new(Arc::new(client_crypto));
161 let host = options.url.host_str();
162
163 match quinn::Endpoint::client("[::]:0".parse().unwrap()) {
164 Ok(mut endpoint) => {
165 endpoint.set_default_client_config(client_config);
166 match endpoint.connect(*remote, host.unwrap()) {
167 Ok(conn) => {
168 let conn = conn.await?;
169 info!("Connected successfully");
170 return Ok(conn);
171 }
172 Err(e) => {
173 tracing::error!("Connection failed: {}. Retrying in 5 seconds...", e);
174 time::sleep(Duration::from_secs(5)).await;
175 }
176 }
177 }
178 Err(e) => {
179 tracing::error!("Endpoint creation failed: {}. Retrying in 5 seconds...", e);
180 time::sleep(Duration::from_secs(5)).await;
181 }
182 }
183 }
184}
185
186impl CameraSynk for QUICClient {
187 async fn connect(&mut self) -> anyhow::Result<()> {
188 let conn = connect_to_server(&self.options).await?;
189 let (tx, mut rx) = mpsc::channel::<Vec<u8>>(100);
190 self.sender = Some(tx);
191
192 let cloned_conn = conn.clone();
194 tokio::spawn(async move {
195 while let Some(message) = rx.recv().await {
196 let cloned_conn = cloned_conn.clone();
197 tokio::spawn(async move {
198 if let Err(e) = Self::send(cloned_conn.clone(), message).await {
199 tracing::error!("Failed to send message: {}", e);
200 }
201 });
202 }
203 });
204
205 self.start_heartbeat(conn.clone(), &self.options).await;
207
208 self.send_connection_packet().await?;
209 Ok(())
210 }
211
212 async fn send_packet(&self, data: Vec<u8>) -> anyhow::Result<()> {
213 self.queue_message(data).await
214 }
215}