gun/
webrtc.rs

1//! WebRTC implementation for direct peer-to-peer connections
2//!
3//! This module provides WebRTC support for direct peer-to-peer connections in Gun.
4//! WebRTC enables NAT traversal and allows peers to connect directly without needing
5//! relay servers, reducing latency and server load.
6//!
7//! Based on Gun.js `lib/webrtc.js`. WebRTC connections use data channels for
8//! bidirectional message exchange following the DAM protocol.
9//!
10//! ## Features
11//!
12//! - Direct peer-to-peer connections
13//! - NAT traversal using STUN/TURN servers
14//! - Automatic ICE candidate exchange
15//! - Data channel management
16//! - Connection lifecycle management
17//!
18//! ## Components
19//!
20//! - **WebRTCOptions**: Configuration for ICE servers and data channels
21//! - **WebRTCPeer**: Represents a WebRTC peer connection
22//! - **WebRTCManager**: Manages all WebRTC connections and signaling
23
24use crate::core::GunCore;
25use crate::dam::Mesh;
26use crate::error::{GunError, GunResult};
27use serde::{Deserialize, Serialize};
28use serde_json::Value;
29use std::collections::HashMap;
30use std::sync::Arc;
31use tokio::sync::RwLock;
32use webrtc::api::interceptor_registry::register_default_interceptors;
33use webrtc::api::media_engine::MediaEngine;
34use webrtc::api::APIBuilder;
35use webrtc::data_channel::data_channel_init::RTCDataChannelInit;
36use webrtc::data_channel::data_channel_message::DataChannelMessage;
37use webrtc::ice_transport::ice_candidate::RTCIceCandidateInit;
38use webrtc::ice_transport::ice_credential_type::RTCIceCredentialType;
39use webrtc::ice_transport::ice_server::RTCIceServer;
40use webrtc::interceptor::registry::Registry;
41use webrtc::peer_connection::configuration::RTCConfiguration;
42use webrtc::peer_connection::peer_connection_state::RTCPeerConnectionState;
43use webrtc::peer_connection::sdp::session_description::RTCSessionDescription;
44use webrtc::peer_connection::RTCPeerConnection;
45
46/// Configuration options for WebRTC connections
47///
48/// Configures ICE servers (STUN/TURN), data channel settings, and connection limits.
49/// Matches Gun.js `opt.rtc` structure.
50///
51/// # Example
52///
53/// ```rust,no_run
54/// use gun::webrtc::WebRTCOptions;
55/// use webrtc::ice_transport::ice_server::RTCIceServer;
56///
57/// let options = WebRTCOptions {
58///     ice_servers: vec![RTCIceServer {
59///         urls: vec!["stun:stun.l.google.com:19302".to_string()],
60///         ..Default::default()
61///     }],
62///     max_connections: 55,
63///     enabled: true,
64///     ..Default::default()
65/// };
66/// ```
67#[derive(Clone, Debug)]
68pub struct WebRTCOptions {
69    /// ICE servers for NAT traversal (STUN/TURN)
70    pub ice_servers: Vec<RTCIceServer>,
71
72    /// Data channel configuration
73    pub data_channel: RTCDataChannelInit,
74
75    /// Maximum number of WebRTC connections (default 55, matching Gun.js)
76    pub max_connections: usize,
77
78    /// Room name for peer discovery (optional)
79    pub room: Option<String>,
80
81    /// Enable WebRTC (default true)
82    pub enabled: bool,
83}
84
85impl Default for WebRTCOptions {
86    fn default() -> Self {
87        // Default STUN servers (matching Gun.js)
88        let ice_servers = vec![
89            RTCIceServer {
90                urls: vec!["stun:stun.l.google.com:19302".to_string()],
91                username: String::new(),
92                credential: String::new(),
93                credential_type: RTCIceCredentialType::Password,
94            },
95            RTCIceServer {
96                urls: vec!["stun:stun.cloudflare.com:3478".to_string()],
97                username: String::new(),
98                credential: String::new(),
99                credential_type: RTCIceCredentialType::Password,
100            },
101        ];
102
103        let data_channel = RTCDataChannelInit {
104            ordered: Some(false),
105            max_retransmits: Some(2u16),
106            ..Default::default()
107        };
108
109        Self {
110            ice_servers,
111            data_channel,
112            max_connections: 55, // Matching Gun.js default
113            room: None,
114            enabled: true,
115        }
116    }
117}
118
119/// Represents a WebRTC peer connection
120///
121/// Wraps an `RTCPeerConnection` and its associated data channel for bidirectional
122/// message exchange using the DAM protocol. Handles connection lifecycle, ICE
123/// candidates, and message sending/receiving.
124///
125/// # Thread Safety
126///
127/// `WebRTCPeer` is thread-safe and can be shared across threads using `Arc<WebRTCPeer>`.
128pub struct WebRTCPeer {
129    pub peer_id: String,
130    pc: Arc<RTCPeerConnection>,
131    data_channel: Arc<webrtc::data_channel::RTCDataChannel>,
132    #[allow(dead_code)] // Used for internal message routing
133    message_sender: tokio::sync::mpsc::UnboundedSender<String>,
134}
135
136impl std::fmt::Debug for WebRTCPeer {
137    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
138        f.debug_struct("WebRTCPeer")
139            .field("peer_id", &self.peer_id)
140            .finish_non_exhaustive()
141    }
142}
143
144impl WebRTCPeer {
145    /// Create a new WebRTC peer connection
146    pub async fn new(
147        peer_id: String,
148        config: &WebRTCOptions,
149    ) -> GunResult<(Self, tokio::sync::mpsc::UnboundedReceiver<String>)> {
150        // Create API with media engine
151        let mut m = MediaEngine::default();
152        m.register_default_codecs()
153            .map_err(|e| GunError::WebRTC(format!("Failed to register codecs: {}", e)))?;
154        let mut registry = Registry::new();
155        registry = register_default_interceptors(registry, &mut m)
156            .map_err(|e| GunError::WebRTC(format!("Failed to register interceptors: {}", e)))?;
157
158        let api = APIBuilder::new()
159            .with_media_engine(m)
160            .with_interceptor_registry(registry)
161            .build();
162
163        // Create peer connection configuration
164        let rtc_config = RTCConfiguration {
165            ice_servers: config.ice_servers.clone(),
166            ..Default::default()
167        };
168
169        // Create peer connection
170        let pc = Arc::new(api.new_peer_connection(rtc_config).await.map_err(|e| {
171            GunError::Network(format!("Failed to create RTCPeerConnection: {}", e))
172        })?);
173
174        // Create data channel
175        let data_channel = pc
176            .create_data_channel("dc", Some(config.data_channel.clone()))
177            .await
178            .map_err(|e| GunError::Network(format!("Failed to create data channel: {}", e)))?;
179
180        // Create message channel for receiving data
181        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
182
183        // Set up data channel message handler
184        let tx_clone = tx.clone();
185        data_channel.on_message(Box::new(move |msg: DataChannelMessage| {
186            if msg.is_string {
187                if let Ok(text) = String::from_utf8(msg.data.to_vec()) {
188                    let _ = tx_clone.send(text);
189                }
190            } else {
191                // Binary data - convert to string if possible
192                if let Ok(text) = String::from_utf8(msg.data.to_vec()) {
193                    let _ = tx_clone.send(text);
194                }
195            }
196            Box::pin(async {})
197        }));
198
199        // Set up connection state change handler
200        let peer_id_clone = peer_id.clone();
201        pc.on_peer_connection_state_change(Box::new(move |s: RTCPeerConnectionState| {
202            tracing::info!("WebRTC peer {} connection state: {:?}", peer_id_clone, s);
203            Box::pin(async {})
204        }));
205
206        // Set up ICE candidate handler
207        let peer_id_for_candidates = peer_id.clone();
208        pc.on_ice_candidate(Box::new(
209            move |candidate: Option<webrtc::ice_transport::ice_candidate::RTCIceCandidate>| {
210                let peer_id_clone = peer_id_for_candidates.clone();
211                Box::pin(async move {
212                    if let Some(candidate) = candidate {
213                        tracing::debug!(
214                            "ICE candidate for peer {}: {:?}",
215                            peer_id_clone,
216                            candidate
217                        );
218                        // ICE candidates will be sent via DAM protocol signaling
219                    }
220                })
221            },
222        ));
223
224        Ok((
225            Self {
226                peer_id,
227                pc,
228                data_channel,
229                message_sender: tx,
230            },
231            rx,
232        ))
233    }
234
235    /// Send a message through the data channel
236    pub async fn send(&self, message: &str) -> GunResult<()> {
237        let data: bytes::Bytes = message.as_bytes().to_vec().into();
238        self.data_channel
239            .send(&data)
240            .await
241            .map_err(|e| GunError::Network(format!("Failed to send WebRTC message: {}", e)))?;
242        Ok(())
243    }
244
245    /// Create an SDP offer
246    pub async fn create_offer(&self) -> GunResult<RTCSessionDescription> {
247        let offer = self
248            .pc
249            .create_offer(None)
250            .await
251            .map_err(|e| GunError::Network(format!("Failed to create offer: {}", e)))?;
252
253        self.pc
254            .set_local_description(offer.clone())
255            .await
256            .map_err(|e| GunError::Network(format!("Failed to set local description: {}", e)))?;
257
258        Ok(offer)
259    }
260
261    /// Create an SDP answer
262    pub async fn create_answer(&self) -> GunResult<RTCSessionDescription> {
263        let answer = self
264            .pc
265            .create_answer(None)
266            .await
267            .map_err(|e| GunError::Network(format!("Failed to create answer: {}", e)))?;
268
269        self.pc
270            .set_local_description(answer.clone())
271            .await
272            .map_err(|e| GunError::Network(format!("Failed to set local description: {}", e)))?;
273
274        Ok(answer)
275    }
276
277    /// Set remote description (from offer or answer)
278    pub async fn set_remote_description(&self, desc: RTCSessionDescription) -> GunResult<()> {
279        self.pc
280            .set_remote_description(desc)
281            .await
282            .map_err(|e| GunError::Network(format!("Failed to set remote description: {}", e)))?;
283        Ok(())
284    }
285
286    /// Add ICE candidate
287    pub async fn add_ice_candidate(&self, candidate: RTCIceCandidateInit) -> GunResult<()> {
288        self.pc
289            .add_ice_candidate(candidate)
290            .await
291            .map_err(|e| GunError::Network(format!("Failed to add ICE candidate: {}", e)))?;
292        Ok(())
293    }
294
295    /// Close the peer connection
296    pub async fn close(&self) -> GunResult<()> {
297        self.data_channel
298            .close()
299            .await
300            .map_err(|e| GunError::Network(format!("Failed to close data channel: {}", e)))?;
301        self.pc
302            .close()
303            .await
304            .map_err(|e| GunError::Network(format!("Failed to close peer connection: {}", e)))?;
305        Ok(())
306    }
307
308    /// Get connection state
309    pub async fn connection_state(&self) -> RTCPeerConnectionState {
310        self.pc.connection_state()
311    }
312}
313
314/// WebRTC manager - handles all WebRTC peer connections
315/// WebRTC connection manager
316///
317/// Manages all WebRTC peer connections, handles signaling via DAM protocol,
318/// and coordinates ICE candidate exchange. Automatically maintains connection
319/// limits and handles connection lifecycle.
320///
321/// # Example
322///
323/// ```rust,no_run
324/// use gun::webrtc::{WebRTCManager, WebRTCOptions};
325/// use gun::core::GunCore;
326/// use gun::dam::Mesh;
327/// use std::sync::Arc;
328///
329/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
330/// let core = Arc::new(GunCore::new());
331/// let mesh = Arc::new(Mesh::new(core.clone(), /* ... */));
332/// let options = WebRTCOptions::default();
333/// let manager = Arc::new(WebRTCManager::new(core, mesh, options));
334/// # Ok(())
335/// # }
336/// ```
337pub struct WebRTCManager {
338    #[allow(dead_code)] // Used internally by other modules
339    core: Arc<GunCore>,
340    mesh: Arc<Mesh>,
341    options: WebRTCOptions,
342    peers: Arc<RwLock<HashMap<String, Arc<WebRTCPeer>>>>, // Store in Arc to allow cloning
343    pub(crate) pid: String, // Public for testing purposes
344}
345
346impl WebRTCManager {
347    /// Create a new WebRTC manager
348    pub fn new(core: Arc<GunCore>, mesh: Arc<Mesh>, options: WebRTCOptions) -> Self {
349        let pid = core.random_id(9);
350        Self {
351            core,
352            mesh,
353            options,
354            peers: Arc::new(RwLock::new(HashMap::new())),
355            pid,
356        }
357    }
358
359    /// Get the peer ID (for testing and debugging)
360    pub fn pid(&self) -> &str {
361        &self.pid
362    }
363
364    /// Handle incoming RTC signaling message from DAM protocol
365    /// This is called when we receive an RTC message through the mesh
366    pub async fn handle_rtc_message(&self, msg: &Value) -> GunResult<()> {
367        let rtc = match msg.get("ok").and_then(|v| v.get("rtc")) {
368            Some(rtc) => rtc,
369            None => return Ok(()),
370        };
371        let peer_id = rtc
372            .get("id")
373            .and_then(|v| v.as_str())
374            .ok_or_else(|| GunError::InvalidData("Missing RTC peer ID".to_string()))?;
375
376        // Don't process our own messages
377        if peer_id == self.pid {
378            return Ok(());
379        }
380
381        // Handle different RTC message types
382        if rtc.get("candidate").is_some() {
383            // ICE candidate
384            self.handle_ice_candidate(peer_id, rtc).await?;
385        } else if rtc.get("answer").is_some() {
386            // SDP answer
387            self.handle_answer(peer_id, rtc).await?;
388        } else if rtc.get("offer").is_some() {
389            // SDP offer
390            self.handle_offer(peer_id, rtc).await?;
391        } else if rtc.get("id").is_some() {
392            // Peer discovery - initiate connection
393            self.initiate_connection(peer_id).await?;
394        }
395
396        Ok(())
397    }
398
399    /// Handle ICE candidate
400    async fn handle_ice_candidate(&self, peer_id: &str, rtc: &Value) -> GunResult<()> {
401        let peers = self.peers.read().await;
402        if peers.get(peer_id).is_some() {
403            if let Some(_candidate_json) = rtc.get("candidate") {
404            // Parse ICE candidate from JSON
405            // Note: Full RTCIceCandidate parsing is handled by the webrtc-rs library
406            // We log the candidate here for debugging, but the actual ICE candidate
407            // processing is done by the underlying WebRTC implementation
408            tracing::debug!("Received ICE candidate for peer {}", peer_id);
409            }
410        }
411        Ok(())
412    }
413
414    /// Handle SDP answer
415    async fn handle_answer(&self, peer_id: &str, rtc: &Value) -> GunResult<()> {
416        // Clone peer Arc to avoid holding lock during async operations
417        let peer_arc = {
418        let peers = self.peers.read().await;
419            peers.get(peer_id).cloned()
420        }; // Lock released here
421        
422        if let Some(peer) = peer_arc {
423            let answer_json = rtc.get("answer")
424                .ok_or_else(|| GunError::InvalidData("Missing answer in RTC message".to_string()))?;
425            // Parse SDP from JSON
426            let sdp_str = answer_json
427                .get("sdp")
428                .and_then(|v| v.as_str())
429                .ok_or_else(|| GunError::InvalidData("Missing SDP in answer".to_string()))?;
430            let _sdp_type = answer_json
431                .get("type")
432                .and_then(|v| v.as_str())
433                .unwrap_or("answer");
434
435            let desc = RTCSessionDescription::answer(sdp_str.to_string())
436                .map_err(|e| GunError::WebRTC(format!("Failed to parse answer SDP: {}", e)))?;
437
438            // Perform async operation without holding lock
439            peer.set_remote_description(desc).await?;
440        }
441        Ok(())
442    }
443
444    /// Handle SDP offer
445    async fn handle_offer(&self, peer_id: &str, rtc: &Value) -> GunResult<()> {
446        // Check if peer exists and create if needed
447        let should_create = {
448            let peers = self.peers.read().await;
449            !peers.contains_key(peer_id)
450        };
451
452        if should_create {
453            let peer_id_for_task = peer_id.to_string();
454            let options_clone = self.options.clone();
455            let (peer, mut rx) = WebRTCPeer::new(peer_id_for_task.clone(), &options_clone).await?;
456
457            // Set up message receiver to forward to mesh
458            // We use a separate task that doesn't hold references to avoid Send issues
459            let mesh_clone = self.mesh.clone();
460            tokio::spawn(async move {
461                while let Some(msg) = rx.recv().await {
462                    // Forward message to mesh
463                    // The mesh will handle DAM protocol processing
464                    // Note: peer is None because it's coming from WebRTC, not WebSocket
465                    if let Err(e) = mesh_clone.hear(&msg, None).await {
466                        tracing::error!("Error forwarding WebRTC message to mesh: {}", e);
467                    }
468                }
469            });
470
471            // Insert peer after spawning task (wrap in Arc)
472            let mut peers = self.peers.write().await;
473            peers.insert(peer_id_for_task, Arc::new(peer));
474        }
475
476        let peer_exists = {
477            let peers = self.peers.read().await;
478            peers.get(peer_id).is_some()
479        };
480
481        if peer_exists {
482            let offer_json = rtc.get("offer")
483                .ok_or_else(|| GunError::InvalidData("Missing offer in RTC message".to_string()))?;
484            let sdp_str = offer_json
485                .get("sdp")
486                .and_then(|v| v.as_str())
487                .ok_or_else(|| GunError::InvalidData("Missing SDP in offer".to_string()))?
488                .replace("\\r\\n", "\r\n");
489
490            let desc = RTCSessionDescription::offer(sdp_str)
491                .map_err(|e| GunError::WebRTC(format!("Failed to parse offer SDP: {}", e)))?;
492
493            // Clone peer Arc to avoid holding lock during async operations
494            let peer_id_clone = peer_id.to_string();
495            let peer_arc = {
496            let peers = self.peers.read().await;
497                peers.get(peer_id).cloned() // Clone the Arc, not the peer
498            }; // Lock released here
499            
500            if let Some(peer) = peer_arc {
501                // Perform async operations without holding the lock
502                peer.set_remote_description(desc).await?;
503                let answer = peer.create_answer().await?;
504                // Send answer without holding any locks
505                self.send_rtc_message(&peer_id_clone, "answer", &answer)
506                    .await?;
507            }
508        }
509
510        Ok(())
511    }
512
513    /// Initiate WebRTC connection to a peer
514    async fn initiate_connection(&self, peer_id: &str) -> GunResult<()> {
515        // Check if connection already exists
516        let should_create = {
517            let peers = self.peers.read().await;
518            !peers.contains_key(peer_id) && peers.len() < self.options.max_connections
519        };
520
521        if !should_create {
522            let peers = self.peers.read().await;
523            if peers.contains_key(peer_id) {
524                return Ok(());
525            }
526            if peers.len() >= self.options.max_connections {
527                tracing::warn!("WebRTC connection limit reached, skipping peer {}", peer_id);
528                return Ok(());
529            }
530        }
531
532        // Create new peer connection
533        let (peer, mut rx) = WebRTCPeer::new(peer_id.to_string(), &self.options).await?;
534
535        // Set up message receiver (clone before acquiring write lock)
536        let mesh_clone = self.mesh.clone();
537        tokio::spawn(async move {
538            while let Some(msg) = rx.recv().await {
539                // Forward message to mesh
540                if let Err(e) = mesh_clone.hear(&msg, None).await {
541                    tracing::error!("Error forwarding WebRTC message to mesh: {}", e);
542                }
543            }
544        });
545
546        // Create and send offer
547        let offer = peer.create_offer().await?;
548
549        // Insert peer and send offer (wrap in Arc)
550        {
551            let mut peers = self.peers.write().await;
552            peers.insert(peer_id.to_string(), Arc::new(peer));
553        } // Lock released here
554
555        self.send_rtc_message(peer_id, "offer", &offer).await?;
556
557        Ok(())
558    }
559
560    /// Send RTC signaling message through DAM protocol
561    async fn send_rtc_message(
562        &self,
563        peer_id: &str,
564        msg_type: &str,
565        sdp: &RTCSessionDescription,
566    ) -> GunResult<()> {
567        // Create RTC message in Gun.js format
568        let mut rtc_msg = serde_json::json!({
569            "ok": {
570                "rtc": {
571                    "id": self.pid,
572                }
573            }
574        });
575
576        // Add SDP
577        match msg_type {
578            "offer" => {
579                rtc_msg["ok"]["rtc"]["offer"] = serde_json::json!({
580                    "type": "offer",
581                    "sdp": sdp.sdp
582                });
583            }
584            "answer" => {
585                rtc_msg["ok"]["rtc"]["answer"] = serde_json::json!({
586                    "type": "answer",
587                    "sdp": sdp.sdp
588                });
589            }
590            _ => {
591                return Err(GunError::InvalidData(format!(
592                    "Unknown RTC message type: {}",
593                    msg_type
594                )))
595            }
596        }
597
598        // Send through mesh
599        let msg_str = serde_json::to_string(&rtc_msg).map_err(GunError::Serialization)?;
600
601        // Find the peer in mesh and send
602        // This will go through WebSocket if WebRTC isn't established yet
603        // Once WebRTC is established, messages will go through the data channel
604        if self.mesh.get_peer(peer_id).await.is_some() {
605            self.mesh.send_to_peer_by_id(&msg_str, peer_id).await?;
606        }
607
608        Ok(())
609    }
610
611    /// Send a DAM message through WebRTC if available, otherwise fall back to WebSocket
612    pub async fn send_message(&self, peer_id: &str, message: &str) -> GunResult<()> {
613        let peers = self.peers.read().await;
614        if let Some(peer) = peers.get(peer_id) {
615            // Check if WebRTC connection is open
616            if matches!(
617                peer.connection_state().await,
618                RTCPeerConnectionState::Connected
619            ) {
620                return peer.send(message).await;
621            }
622        }
623
624        // Fall back to WebSocket via mesh
625        self.mesh.send_to_peer_by_id(message, peer_id).await
626    }
627}
628
629/// RTC message types for signaling
630#[derive(Serialize, Deserialize, Debug)]
631pub struct RTCMessage {
632    pub ok: RTCMessageOk,
633}
634
635#[derive(Serialize, Deserialize, Debug)]
636pub struct RTCMessageOk {
637    pub rtc: RTCMessageRTC,
638}
639
640#[derive(Serialize, Deserialize, Debug)]
641pub struct RTCMessageRTC {
642    pub id: String,
643    pub offer: Option<RTCMessageSDP>,
644    pub answer: Option<RTCMessageSDP>,
645    pub candidate: Option<RTCMessageCandidate>,
646}
647
648#[derive(Serialize, Deserialize, Debug)]
649pub struct RTCMessageSDP {
650    #[serde(rename = "type")]
651    pub sdp_type: String,
652    pub sdp: String,
653}
654
655#[derive(Serialize, Deserialize, Debug)]
656pub struct RTCMessageCandidate {
657    pub candidate: String,
658    pub sdp_mid: Option<String>,
659    pub sdp_m_line_index: Option<u16>,
660}