videocall_cli/consumers/
webtransport.rs1use anyhow::Error;
20use protobuf::Message;
21use std::time::{SystemTime, UNIX_EPOCH};
22use tokio::{
23 sync::mpsc::{self, Sender},
24 time::{self, Duration},
25};
26use tracing::info;
27use videocall_types::protos::{
28 connection_packet::ConnectionPacket,
29 media_packet::{media_packet::MediaType, HeartbeatMetadata, MediaPacket},
30 packet_wrapper::{packet_wrapper::PacketType, PacketWrapper},
31};
32
33use crate::cli_args::Stream;
34
35use super::camera_synk::CameraSynk;
36
37pub struct WebTransportClient {
38 options: Stream,
39 sender: Option<Sender<Vec<u8>>>,
40}
41
42impl WebTransportClient {
43 pub fn new(options: Stream) -> Self {
44 Self {
45 options,
46 sender: None,
47 }
48 }
49
50 async fn send_connection_packet(&self) -> anyhow::Result<()> {
51 let connection_packet = ConnectionPacket {
52 meeting_id: self.options.meeting_id.clone(),
53 ..Default::default()
54 };
55 let packet = PacketWrapper {
56 packet_type: PacketType::CONNECTION.into(),
57 email: self.options.user_id.clone(),
58 data: connection_packet.write_to_bytes()?,
59 ..Default::default()
60 };
61 self.queue_message(packet.write_to_bytes()?).await?;
62 Ok(())
63 }
64
65 pub async fn send(session: &web_transport_quinn::Session, data: Vec<u8>) -> anyhow::Result<()> {
66 let mut stream = session.open_uni().await?;
67 stream.write_all(&data).await?;
68 stream.finish()?;
69 Ok(())
70 }
71
72 async fn queue_message(&self, message: Vec<u8>) -> anyhow::Result<()> {
73 if let Some(sender) = &self.sender {
74 sender
75 .send(message)
76 .await
77 .map_err(|_| Error::msg("Failed to send message to queue"))
78 } else {
79 Err(Error::msg("No sender available"))
80 }
81 }
82
83 async fn start_heartbeat(&self, session: web_transport_quinn::Session, options: &Stream) {
84 let interval = time::interval(Duration::from_secs(1));
85 let email = options.user_id.clone();
86 tokio::spawn(async move {
87 let mut interval = interval;
88 loop {
89 let now_ms = SystemTime::now()
90 .duration_since(UNIX_EPOCH)
91 .expect("Time went backwards")
92 .as_millis(); interval.tick().await;
94 let actual_heartbeat = MediaPacket {
95 media_type: MediaType::HEARTBEAT.into(),
96 email: email.clone(),
97 timestamp: now_ms as f64,
98 heartbeat_metadata: Some(HeartbeatMetadata {
99 video_enabled: true,
100 ..Default::default()
101 })
102 .into(),
103 ..Default::default()
104 };
105
106 let packet = PacketWrapper {
107 email: email.clone(),
108 packet_type: PacketType::MEDIA.into(),
109 data: actual_heartbeat.write_to_bytes().unwrap(),
110 ..Default::default()
111 };
112 let data = packet.write_to_bytes().unwrap();
113 if let Err(e) = Self::send(&session, data).await {
114 tracing::error!("Failed to send heartbeat: {}", e);
115 }
116 }
117 });
118 }
119}
120
121async fn connect_to_server(options: &Stream) -> anyhow::Result<web_transport_quinn::Session> {
122 loop {
123 info!("Attempting to connect to {}", options.url);
124
125 let mut url = options.url.clone();
127 url.set_path(&format!(
128 "/lobby/{}/{}",
129 options.user_id, options.meeting_id
130 ));
131
132 let client = if options.insecure_skip_verify {
134 info!("WARNING: Skipping TLS certificate verification - connection is insecure!");
135 unsafe { web_transport_quinn::ClientBuilder::new().with_no_certificate_verification()? }
136 } else {
137 web_transport_quinn::ClientBuilder::new().with_system_roots()?
138 };
139
140 match client.connect(url).await {
141 Ok(session) => {
142 info!("WebTransport session established successfully");
143 return Ok(session);
144 }
145 Err(e) => {
146 tracing::error!(
147 "WebTransport connection failed: {}. Retrying in 5 seconds...",
148 e
149 );
150 time::sleep(Duration::from_secs(5)).await;
151 }
152 }
153 }
154}
155
156impl CameraSynk for WebTransportClient {
157 async fn connect(&mut self) -> anyhow::Result<()> {
158 let session = connect_to_server(&self.options).await?;
159 let (tx, mut rx) = mpsc::channel::<Vec<u8>>(100);
160 self.sender = Some(tx);
161
162 let session_clone = session.clone();
164 tokio::spawn(async move {
165 while let Some(message) = rx.recv().await {
166 let session_clone_inner = session_clone.clone();
167 tokio::spawn(async move {
168 if let Err(e) = WebTransportClient::send(&session_clone_inner, message).await {
169 tracing::error!("Failed to send message: {}", e);
170 }
171 });
172 }
173 });
174
175 self.start_heartbeat(session.clone(), &self.options).await;
177
178 self.send_connection_packet().await?;
179 Ok(())
180 }
181
182 async fn send_packet(&self, data: Vec<u8>) -> anyhow::Result<()> {
183 self.queue_message(data).await
184 }
185}