videocall_cli/consumers/
quic.rs1use std::sync::Arc;
20
21use anyhow::Error;
22use protobuf::Message;
23use quinn::Connection;
24use std::time::{SystemTime, UNIX_EPOCH};
25use tokio::{
26 sync::mpsc::{self, Sender},
27 time::{self, Duration},
28};
29use tracing::info;
30use videocall_types::protos::{
31 connection_packet::ConnectionPacket,
32 media_packet::{media_packet::MediaType, HeartbeatMetadata, MediaPacket},
33 packet_wrapper::{packet_wrapper::PacketType, PacketWrapper},
34};
35
36use crate::cli_args::Stream;
37
38use super::camera_synk::CameraSynk;
39
40pub struct QUICClient {
41 options: Stream,
42 sender: Option<Sender<Vec<u8>>>,
43}
44
45impl QUICClient {
46 pub fn new(options: Stream) -> Self {
47 Self {
48 options,
49 sender: None,
50 }
51 }
52
53 async fn send_connection_packet(&self) -> anyhow::Result<()> {
54 let connection_packet = ConnectionPacket {
55 meeting_id: self.options.meeting_id.clone(),
56 ..Default::default()
57 };
58 let packet = PacketWrapper {
59 packet_type: PacketType::CONNECTION.into(),
60 email: self.options.user_id.clone(),
61 data: connection_packet.write_to_bytes()?,
62 ..Default::default()
63 };
64 self.queue_message(packet.write_to_bytes()?).await?;
65 Ok(())
66 }
67
68 pub async fn send(conn: Connection, data: Vec<u8>) -> anyhow::Result<()> {
69 let mut stream = conn.open_uni().await?;
70 stream.write_all(&data).await?;
71 stream.finish().await?;
72 Ok(())
73 }
74
75 async fn queue_message(&self, message: Vec<u8>) -> anyhow::Result<()> {
76 if let Some(sender) = &self.sender {
77 sender
78 .send(message)
79 .await
80 .map_err(|_| Error::msg("Failed to send message to queue"))
81 } else {
82 Err(Error::msg("No sender available"))
83 }
84 }
85
86 async fn start_heartbeat(&self, conn: Connection, options: &Stream) {
87 let interval = time::interval(Duration::from_secs(1));
88 let email = options.user_id.clone();
89 tokio::spawn(async move {
90 let mut interval = interval;
91 loop {
92 let now_ms = SystemTime::now()
93 .duration_since(UNIX_EPOCH)
94 .expect("Time went backwards")
95 .as_millis(); interval.tick().await;
97 let actual_heartbeat = MediaPacket {
98 media_type: MediaType::HEARTBEAT.into(),
99 email: email.clone(),
100 timestamp: now_ms as f64,
101 heartbeat_metadata: Some(HeartbeatMetadata {
102 video_enabled: true,
103 ..Default::default()
104 })
105 .into(),
106 ..Default::default()
107 };
108
109 let packet = PacketWrapper {
110 email: email.clone(),
111 packet_type: PacketType::MEDIA.into(),
112 data: actual_heartbeat.write_to_bytes().unwrap(),
113 ..Default::default()
114 };
115 let data = packet.write_to_bytes().unwrap();
116 if let Err(e) = Self::send(conn.clone(), data).await {
117 tracing::error!("Failed to send heartbeat: {}", e);
118 }
119 }
120 });
121 }
122}
123
124async fn connect_to_server(options: &Stream) -> anyhow::Result<Connection> {
125 loop {
126 info!("Attempting to connect to {}", options.url);
127 let addrs = options
128 .url
129 .socket_addrs(|| Some(443))
130 .expect("couldn't resolve the address provided");
131 let remote = addrs.first().to_owned();
132 let remote = remote.unwrap();
133 let mut root_store = rustls::RootCertStore::empty();
134 root_store.add_trust_anchors(webpki_roots::TLS_SERVER_ROOTS.iter().map(|ta| {
135 rustls::OwnedTrustAnchor::from_subject_spki_name_constraints(
136 ta.subject,
137 ta.spki,
138 ta.name_constraints,
139 )
140 }));
141 let mut client_crypto = rustls::ClientConfig::builder()
142 .with_safe_defaults()
143 .with_root_certificates(root_store)
144 .with_no_client_auth();
145
146 let alpn = vec![b"hq-29".to_vec()];
147 client_crypto.alpn_protocols = alpn;
148 if options.keylog {
149 client_crypto.key_log = Arc::new(rustls::KeyLogFile::new());
150 }
151 let client_config = quinn::ClientConfig::new(Arc::new(client_crypto));
152 let host = options.url.host_str();
153
154 match quinn::Endpoint::client("[::]:0".parse().unwrap()) {
155 Ok(mut endpoint) => {
156 endpoint.set_default_client_config(client_config);
157 match endpoint.connect(*remote, host.unwrap()) {
158 Ok(conn) => {
159 let conn = conn.await?;
160 info!("Connected successfully");
161 return Ok(conn);
162 }
163 Err(e) => {
164 tracing::error!("Connection failed: {}. Retrying in 5 seconds...", e);
165 time::sleep(Duration::from_secs(5)).await;
166 }
167 }
168 }
169 Err(e) => {
170 tracing::error!("Endpoint creation failed: {}. Retrying in 5 seconds...", e);
171 time::sleep(Duration::from_secs(5)).await;
172 }
173 }
174 }
175}
176
177impl CameraSynk for QUICClient {
178 async fn connect(&mut self) -> anyhow::Result<()> {
179 let conn = connect_to_server(&self.options).await?;
180 let (tx, mut rx) = mpsc::channel::<Vec<u8>>(100);
181 self.sender = Some(tx);
182
183 let cloned_conn = conn.clone();
185 tokio::spawn(async move {
186 while let Some(message) = rx.recv().await {
187 let cloned_conn = cloned_conn.clone();
188 tokio::spawn(async move {
189 if let Err(e) = Self::send(cloned_conn.clone(), message).await {
190 tracing::error!("Failed to send message: {}", e);
191 }
192 });
193 }
194 });
195
196 self.start_heartbeat(conn.clone(), &self.options).await;
198
199 self.send_connection_packet().await?;
200 Ok(())
201 }
202
203 async fn send_packet(&self, data: Vec<u8>) -> anyhow::Result<()> {
204 self.queue_message(data).await
205 }
206}