1use crate::core::GunCore;
25use crate::dam::Mesh;
26use crate::error::{GunError, GunResult};
27use serde::{Deserialize, Serialize};
28use serde_json::Value;
29use std::collections::HashMap;
30use std::sync::Arc;
31use tokio::sync::RwLock;
32use webrtc::api::interceptor_registry::register_default_interceptors;
33use webrtc::api::media_engine::MediaEngine;
34use webrtc::api::APIBuilder;
35use webrtc::data_channel::data_channel_init::RTCDataChannelInit;
36use webrtc::data_channel::data_channel_message::DataChannelMessage;
37use webrtc::ice_transport::ice_candidate::RTCIceCandidateInit;
38use webrtc::ice_transport::ice_credential_type::RTCIceCredentialType;
39use webrtc::ice_transport::ice_server::RTCIceServer;
40use webrtc::interceptor::registry::Registry;
41use webrtc::peer_connection::configuration::RTCConfiguration;
42use webrtc::peer_connection::peer_connection_state::RTCPeerConnectionState;
43use webrtc::peer_connection::sdp::session_description::RTCSessionDescription;
44use webrtc::peer_connection::RTCPeerConnection;
45
46#[derive(Clone, Debug)]
68pub struct WebRTCOptions {
69 pub ice_servers: Vec<RTCIceServer>,
71
72 pub data_channel: RTCDataChannelInit,
74
75 pub max_connections: usize,
77
78 pub room: Option<String>,
80
81 pub enabled: bool,
83}
84
85impl Default for WebRTCOptions {
86 fn default() -> Self {
87 let ice_servers = vec![
89 RTCIceServer {
90 urls: vec!["stun:stun.l.google.com:19302".to_string()],
91 username: String::new(),
92 credential: String::new(),
93 credential_type: RTCIceCredentialType::Password,
94 },
95 RTCIceServer {
96 urls: vec!["stun:stun.cloudflare.com:3478".to_string()],
97 username: String::new(),
98 credential: String::new(),
99 credential_type: RTCIceCredentialType::Password,
100 },
101 ];
102
103 let data_channel = RTCDataChannelInit {
104 ordered: Some(false),
105 max_retransmits: Some(2u16),
106 ..Default::default()
107 };
108
109 Self {
110 ice_servers,
111 data_channel,
112 max_connections: 55, room: None,
114 enabled: true,
115 }
116 }
117}
118
119pub struct WebRTCPeer {
129 pub peer_id: String,
130 pc: Arc<RTCPeerConnection>,
131 data_channel: Arc<webrtc::data_channel::RTCDataChannel>,
132 #[allow(dead_code)] message_sender: tokio::sync::mpsc::UnboundedSender<String>,
134}
135
136impl std::fmt::Debug for WebRTCPeer {
137 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
138 f.debug_struct("WebRTCPeer")
139 .field("peer_id", &self.peer_id)
140 .finish_non_exhaustive()
141 }
142}
143
144impl WebRTCPeer {
145 pub async fn new(
147 peer_id: String,
148 config: &WebRTCOptions,
149 ) -> GunResult<(Self, tokio::sync::mpsc::UnboundedReceiver<String>)> {
150 let mut m = MediaEngine::default();
152 m.register_default_codecs()
153 .map_err(|e| GunError::WebRTC(format!("Failed to register codecs: {}", e)))?;
154 let mut registry = Registry::new();
155 registry = register_default_interceptors(registry, &mut m)
156 .map_err(|e| GunError::WebRTC(format!("Failed to register interceptors: {}", e)))?;
157
158 let api = APIBuilder::new()
159 .with_media_engine(m)
160 .with_interceptor_registry(registry)
161 .build();
162
163 let rtc_config = RTCConfiguration {
165 ice_servers: config.ice_servers.clone(),
166 ..Default::default()
167 };
168
169 let pc = Arc::new(api.new_peer_connection(rtc_config).await.map_err(|e| {
171 GunError::Network(format!("Failed to create RTCPeerConnection: {}", e))
172 })?);
173
174 let data_channel = pc
176 .create_data_channel("dc", Some(config.data_channel.clone()))
177 .await
178 .map_err(|e| GunError::Network(format!("Failed to create data channel: {}", e)))?;
179
180 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
182
183 let tx_clone = tx.clone();
185 data_channel.on_message(Box::new(move |msg: DataChannelMessage| {
186 if msg.is_string {
187 if let Ok(text) = String::from_utf8(msg.data.to_vec()) {
188 let _ = tx_clone.send(text);
189 }
190 } else {
191 if let Ok(text) = String::from_utf8(msg.data.to_vec()) {
193 let _ = tx_clone.send(text);
194 }
195 }
196 Box::pin(async {})
197 }));
198
199 let peer_id_clone = peer_id.clone();
201 pc.on_peer_connection_state_change(Box::new(move |s: RTCPeerConnectionState| {
202 tracing::info!("WebRTC peer {} connection state: {:?}", peer_id_clone, s);
203 Box::pin(async {})
204 }));
205
206 let peer_id_for_candidates = peer_id.clone();
208 pc.on_ice_candidate(Box::new(
209 move |candidate: Option<webrtc::ice_transport::ice_candidate::RTCIceCandidate>| {
210 let peer_id_clone = peer_id_for_candidates.clone();
211 Box::pin(async move {
212 if let Some(candidate) = candidate {
213 tracing::debug!(
214 "ICE candidate for peer {}: {:?}",
215 peer_id_clone,
216 candidate
217 );
218 }
220 })
221 },
222 ));
223
224 Ok((
225 Self {
226 peer_id,
227 pc,
228 data_channel,
229 message_sender: tx,
230 },
231 rx,
232 ))
233 }
234
235 pub async fn send(&self, message: &str) -> GunResult<()> {
237 let data: bytes::Bytes = message.as_bytes().to_vec().into();
238 self.data_channel
239 .send(&data)
240 .await
241 .map_err(|e| GunError::Network(format!("Failed to send WebRTC message: {}", e)))?;
242 Ok(())
243 }
244
245 pub async fn create_offer(&self) -> GunResult<RTCSessionDescription> {
247 let offer = self
248 .pc
249 .create_offer(None)
250 .await
251 .map_err(|e| GunError::Network(format!("Failed to create offer: {}", e)))?;
252
253 self.pc
254 .set_local_description(offer.clone())
255 .await
256 .map_err(|e| GunError::Network(format!("Failed to set local description: {}", e)))?;
257
258 Ok(offer)
259 }
260
261 pub async fn create_answer(&self) -> GunResult<RTCSessionDescription> {
263 let answer = self
264 .pc
265 .create_answer(None)
266 .await
267 .map_err(|e| GunError::Network(format!("Failed to create answer: {}", e)))?;
268
269 self.pc
270 .set_local_description(answer.clone())
271 .await
272 .map_err(|e| GunError::Network(format!("Failed to set local description: {}", e)))?;
273
274 Ok(answer)
275 }
276
277 pub async fn set_remote_description(&self, desc: RTCSessionDescription) -> GunResult<()> {
279 self.pc
280 .set_remote_description(desc)
281 .await
282 .map_err(|e| GunError::Network(format!("Failed to set remote description: {}", e)))?;
283 Ok(())
284 }
285
286 pub async fn add_ice_candidate(&self, candidate: RTCIceCandidateInit) -> GunResult<()> {
288 self.pc
289 .add_ice_candidate(candidate)
290 .await
291 .map_err(|e| GunError::Network(format!("Failed to add ICE candidate: {}", e)))?;
292 Ok(())
293 }
294
295 pub async fn close(&self) -> GunResult<()> {
297 self.data_channel
298 .close()
299 .await
300 .map_err(|e| GunError::Network(format!("Failed to close data channel: {}", e)))?;
301 self.pc
302 .close()
303 .await
304 .map_err(|e| GunError::Network(format!("Failed to close peer connection: {}", e)))?;
305 Ok(())
306 }
307
308 pub async fn connection_state(&self) -> RTCPeerConnectionState {
310 self.pc.connection_state()
311 }
312}
313
314pub struct WebRTCManager {
338 #[allow(dead_code)] core: Arc<GunCore>,
340 mesh: Arc<Mesh>,
341 options: WebRTCOptions,
342 peers: Arc<RwLock<HashMap<String, Arc<WebRTCPeer>>>>, pub(crate) pid: String, }
345
346impl WebRTCManager {
347 pub fn new(core: Arc<GunCore>, mesh: Arc<Mesh>, options: WebRTCOptions) -> Self {
349 let pid = core.random_id(9);
350 Self {
351 core,
352 mesh,
353 options,
354 peers: Arc::new(RwLock::new(HashMap::new())),
355 pid,
356 }
357 }
358
359 pub fn pid(&self) -> &str {
361 &self.pid
362 }
363
364 pub async fn handle_rtc_message(&self, msg: &Value) -> GunResult<()> {
367 let rtc = match msg.get("ok").and_then(|v| v.get("rtc")) {
368 Some(rtc) => rtc,
369 None => return Ok(()),
370 };
371 let peer_id = rtc
372 .get("id")
373 .and_then(|v| v.as_str())
374 .ok_or_else(|| GunError::InvalidData("Missing RTC peer ID".to_string()))?;
375
376 if peer_id == self.pid {
378 return Ok(());
379 }
380
381 if rtc.get("candidate").is_some() {
383 self.handle_ice_candidate(peer_id, rtc).await?;
385 } else if rtc.get("answer").is_some() {
386 self.handle_answer(peer_id, rtc).await?;
388 } else if rtc.get("offer").is_some() {
389 self.handle_offer(peer_id, rtc).await?;
391 } else if rtc.get("id").is_some() {
392 self.initiate_connection(peer_id).await?;
394 }
395
396 Ok(())
397 }
398
399 async fn handle_ice_candidate(&self, peer_id: &str, rtc: &Value) -> GunResult<()> {
401 let peers = self.peers.read().await;
402 if peers.get(peer_id).is_some() {
403 if let Some(_candidate_json) = rtc.get("candidate") {
404 tracing::debug!("Received ICE candidate for peer {}", peer_id);
409 }
410 }
411 Ok(())
412 }
413
414 async fn handle_answer(&self, peer_id: &str, rtc: &Value) -> GunResult<()> {
416 let peer_arc = {
418 let peers = self.peers.read().await;
419 peers.get(peer_id).cloned()
420 }; if let Some(peer) = peer_arc {
423 let answer_json = rtc.get("answer")
424 .ok_or_else(|| GunError::InvalidData("Missing answer in RTC message".to_string()))?;
425 let sdp_str = answer_json
427 .get("sdp")
428 .and_then(|v| v.as_str())
429 .ok_or_else(|| GunError::InvalidData("Missing SDP in answer".to_string()))?;
430 let _sdp_type = answer_json
431 .get("type")
432 .and_then(|v| v.as_str())
433 .unwrap_or("answer");
434
435 let desc = RTCSessionDescription::answer(sdp_str.to_string())
436 .map_err(|e| GunError::WebRTC(format!("Failed to parse answer SDP: {}", e)))?;
437
438 peer.set_remote_description(desc).await?;
440 }
441 Ok(())
442 }
443
444 async fn handle_offer(&self, peer_id: &str, rtc: &Value) -> GunResult<()> {
446 let should_create = {
448 let peers = self.peers.read().await;
449 !peers.contains_key(peer_id)
450 };
451
452 if should_create {
453 let peer_id_for_task = peer_id.to_string();
454 let options_clone = self.options.clone();
455 let (peer, mut rx) = WebRTCPeer::new(peer_id_for_task.clone(), &options_clone).await?;
456
457 let mesh_clone = self.mesh.clone();
460 tokio::spawn(async move {
461 while let Some(msg) = rx.recv().await {
462 if let Err(e) = mesh_clone.hear(&msg, None).await {
466 tracing::error!("Error forwarding WebRTC message to mesh: {}", e);
467 }
468 }
469 });
470
471 let mut peers = self.peers.write().await;
473 peers.insert(peer_id_for_task, Arc::new(peer));
474 }
475
476 let peer_exists = {
477 let peers = self.peers.read().await;
478 peers.get(peer_id).is_some()
479 };
480
481 if peer_exists {
482 let offer_json = rtc.get("offer")
483 .ok_or_else(|| GunError::InvalidData("Missing offer in RTC message".to_string()))?;
484 let sdp_str = offer_json
485 .get("sdp")
486 .and_then(|v| v.as_str())
487 .ok_or_else(|| GunError::InvalidData("Missing SDP in offer".to_string()))?
488 .replace("\\r\\n", "\r\n");
489
490 let desc = RTCSessionDescription::offer(sdp_str)
491 .map_err(|e| GunError::WebRTC(format!("Failed to parse offer SDP: {}", e)))?;
492
493 let peer_id_clone = peer_id.to_string();
495 let peer_arc = {
496 let peers = self.peers.read().await;
497 peers.get(peer_id).cloned() }; if let Some(peer) = peer_arc {
501 peer.set_remote_description(desc).await?;
503 let answer = peer.create_answer().await?;
504 self.send_rtc_message(&peer_id_clone, "answer", &answer)
506 .await?;
507 }
508 }
509
510 Ok(())
511 }
512
513 async fn initiate_connection(&self, peer_id: &str) -> GunResult<()> {
515 let should_create = {
517 let peers = self.peers.read().await;
518 !peers.contains_key(peer_id) && peers.len() < self.options.max_connections
519 };
520
521 if !should_create {
522 let peers = self.peers.read().await;
523 if peers.contains_key(peer_id) {
524 return Ok(());
525 }
526 if peers.len() >= self.options.max_connections {
527 tracing::warn!("WebRTC connection limit reached, skipping peer {}", peer_id);
528 return Ok(());
529 }
530 }
531
532 let (peer, mut rx) = WebRTCPeer::new(peer_id.to_string(), &self.options).await?;
534
535 let mesh_clone = self.mesh.clone();
537 tokio::spawn(async move {
538 while let Some(msg) = rx.recv().await {
539 if let Err(e) = mesh_clone.hear(&msg, None).await {
541 tracing::error!("Error forwarding WebRTC message to mesh: {}", e);
542 }
543 }
544 });
545
546 let offer = peer.create_offer().await?;
548
549 {
551 let mut peers = self.peers.write().await;
552 peers.insert(peer_id.to_string(), Arc::new(peer));
553 } self.send_rtc_message(peer_id, "offer", &offer).await?;
556
557 Ok(())
558 }
559
560 async fn send_rtc_message(
562 &self,
563 peer_id: &str,
564 msg_type: &str,
565 sdp: &RTCSessionDescription,
566 ) -> GunResult<()> {
567 let mut rtc_msg = serde_json::json!({
569 "ok": {
570 "rtc": {
571 "id": self.pid,
572 }
573 }
574 });
575
576 match msg_type {
578 "offer" => {
579 rtc_msg["ok"]["rtc"]["offer"] = serde_json::json!({
580 "type": "offer",
581 "sdp": sdp.sdp
582 });
583 }
584 "answer" => {
585 rtc_msg["ok"]["rtc"]["answer"] = serde_json::json!({
586 "type": "answer",
587 "sdp": sdp.sdp
588 });
589 }
590 _ => {
591 return Err(GunError::InvalidData(format!(
592 "Unknown RTC message type: {}",
593 msg_type
594 )))
595 }
596 }
597
598 let msg_str = serde_json::to_string(&rtc_msg).map_err(GunError::Serialization)?;
600
601 if self.mesh.get_peer(peer_id).await.is_some() {
605 self.mesh.send_to_peer_by_id(&msg_str, peer_id).await?;
606 }
607
608 Ok(())
609 }
610
611 pub async fn send_message(&self, peer_id: &str, message: &str) -> GunResult<()> {
613 let peers = self.peers.read().await;
614 if let Some(peer) = peers.get(peer_id) {
615 if matches!(
617 peer.connection_state().await,
618 RTCPeerConnectionState::Connected
619 ) {
620 return peer.send(message).await;
621 }
622 }
623
624 self.mesh.send_to_peer_by_id(message, peer_id).await
626 }
627}
628
629#[derive(Serialize, Deserialize, Debug)]
631pub struct RTCMessage {
632 pub ok: RTCMessageOk,
633}
634
635#[derive(Serialize, Deserialize, Debug)]
636pub struct RTCMessageOk {
637 pub rtc: RTCMessageRTC,
638}
639
640#[derive(Serialize, Deserialize, Debug)]
641pub struct RTCMessageRTC {
642 pub id: String,
643 pub offer: Option<RTCMessageSDP>,
644 pub answer: Option<RTCMessageSDP>,
645 pub candidate: Option<RTCMessageCandidate>,
646}
647
648#[derive(Serialize, Deserialize, Debug)]
649pub struct RTCMessageSDP {
650 #[serde(rename = "type")]
651 pub sdp_type: String,
652 pub sdp: String,
653}
654
655#[derive(Serialize, Deserialize, Debug)]
656pub struct RTCMessageCandidate {
657 pub candidate: String,
658 pub sdp_mid: Option<String>,
659 pub sdp_m_line_index: Option<u16>,
660}