gun/
websocket.rs

1//! WebSocket support for peer connections
2//!
3//! This module provides WebSocket client and server implementations for Gun's
4//! peer-to-peer networking. WebSockets are used to connect to relay servers
5//! and other peers for message exchange.
6//!
7//! ## Components
8//!
9//! - **WebSocketClient**: Connects to peer URLs (relay servers)
10//! - **WebSocketServer**: Listens for incoming peer connections (relay mode)
11//!
12//! Both client and server handle the DAM protocol message exchange over WebSocket.
13
14use crate::core::GunCore;
15use crate::dam::{Mesh, Peer};
16use crate::error::GunResult;
17use futures_util::{SinkExt, StreamExt};
18use std::sync::Arc;
19use std::time::Duration;
20use tokio::net::{TcpListener, TcpStream};
21use tokio::sync::mpsc;
22use tokio_tungstenite::accept_async;
23use tokio_tungstenite::{connect_async, tungstenite::Message};
24use url::Url;
25
26/// WebSocket client for connecting to peers
27///
28/// Handles outgoing WebSocket connections to peer URLs (typically relay servers).
29/// Provides automatic reconnection with exponential backoff.
30///
31/// # Example
32///
33/// ```rust,no_run
34/// use gun::websocket::WebSocketClient;
35/// use gun::core::GunCore;
36/// use gun::dam::Mesh;
37/// use std::sync::Arc;
38///
39/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
40/// let core = Arc::new(GunCore::new());
41/// let mesh = Arc::new(Mesh::new(core.clone(), /* ... */));
42/// let client = WebSocketClient::new(core, mesh);
43/// client.connect("ws://relay.example.com/gun").await?;
44/// # Ok(())
45/// # }
46/// ```
47pub struct WebSocketClient {
48    core: Arc<GunCore>,
49    mesh: Arc<Mesh>,
50}
51
52impl WebSocketClient {
53    pub fn new(core: Arc<GunCore>, mesh: Arc<Mesh>) -> Self {
54        Self { core, mesh }
55    }
56
57    /// Connect to a peer URL with automatic reconnection
58    /// Returns when connection is established or fails
59    pub async fn connect(&self, url: &str) -> GunResult<()> {
60        let url_str = url.to_string();
61        let core = self.core.clone();
62        let mesh = self.mesh.clone();
63
64        // Ensure we use the public URL (convert localhost/127.0.0.1 to public IP if needed)
65        let public_url = Self::ensure_public_url(&url_str)?;
66
67        let mut retry_count = 0;
68        let max_retries = 10;
69        let base_wait = Duration::from_millis(500);
70
71        loop {
72            match Self::connect_once(&public_url, core.clone(), mesh.clone()).await {
73                Ok(_) => {
74                    // Connection successful, wait a bit for handshake and peer registration to complete
75                    tokio::time::sleep(Duration::from_millis(500)).await;
76                    return Ok(());
77                }
78                Err(e) => {
79                    retry_count += 1;
80                    if retry_count >= max_retries {
81                        return Err(crate::error::GunError::Network(format!(
82                            "Max reconnection attempts reached for {}: {}",
83                            public_url, e
84                        )));
85                    }
86
87                    // Exponential backoff: wait = base_wait * 2^retry_count (capped at 2 seconds)
88                    let wait_time = base_wait * (1 << retry_count.min(4));
89                    eprintln!(
90                        "Connection failed for {}, retrying in {:?}... (attempt {})",
91                        public_url, wait_time, retry_count
92                    );
93                    tokio::time::sleep(wait_time).await;
94                }
95            }
96        }
97    }
98
99    /// Ensure URL uses public IP (not localhost/127.0.0.1)
100    /// For relay servers, we should always connect via their public IP
101    fn ensure_public_url(url: &str) -> GunResult<String> {
102        let parsed = Url::parse(url)?;
103
104        // Check if host is localhost or 127.0.0.1
105        if let Some(host) = parsed.host_str() {
106            if host == "localhost" || host == "127.0.0.1" || host == "::1" {
107                // This is a localhost URL - for testing, we might want to allow it
108                // but in production, we should fail or resolve to public IP
109                // For now, we'll allow it but warn
110                eprintln!("Warning: Connecting to localhost URL: {}. This should use a public IP for NAT traversal.", url);
111            }
112        }
113
114        Ok(url.to_string())
115    }
116
117    async fn connect_once(url: &str, _core: Arc<GunCore>, mesh: Arc<Mesh>) -> GunResult<()> {
118        // Convert http/https to ws/wss
119        let ws_url = url
120            .replace("http://", "ws://")
121            .replace("https://", "wss://");
122        let url = Url::parse(&ws_url).map_err(|e| {
123            crate::error::GunError::Network(format!("Invalid URL {}: {}", ws_url, e))
124        })?;
125
126        // Connect to the WebSocket server (always uses public IP since we're connecting to a remote URL)
127        let (ws_stream, response) = connect_async(url.clone()).await.map_err(|e| {
128            crate::error::GunError::Network(format!(
129                "WebSocket connection failed to {}: {}",
130                ws_url, e
131            ))
132        })?;
133
134        // Verify connection was successful (status 101 Switching Protocols)
135        if response.status() != 101 {
136            return Err(crate::error::GunError::Network(format!(
137                "WebSocket handshake failed with status: {}",
138                response.status()
139            )));
140        }
141
142        let peer = Peer::new(ws_url.clone());
143        let peer_id = peer.id.clone();
144
145        // Create channel for sending messages
146        let (tx, mut rx) = mpsc::unbounded_channel();
147
148        // Add peer to mesh FIRST (this registers the peer for message routing)
149        mesh.hi(peer.clone()).await?;
150
151        // Set sender in mesh AFTER adding peer (so peer exists in the map)
152        mesh.set_peer_sender(&peer_id, tx.clone()).await?;
153
154        // Split WebSocket stream
155        let (mut write, mut read) = ws_stream.split();
156
157        // Spawn task to handle incoming messages
158        let mesh_clone = mesh.clone();
159        let peer_for_read = peer.clone();
160        tokio::spawn(async move {
161            while let Some(msg) = read.next().await {
162                match msg {
163                    Ok(Message::Text(text)) => {
164                        if let Err(e) = mesh_clone.hear(&text, Some(&peer_for_read)).await {
165                            eprintln!("Error handling message from {}: {}", peer_for_read.id, e);
166                        }
167                    }
168                    Ok(Message::Close(_)) => {
169                        break;
170                    }
171                    Err(e) => {
172                        eprintln!("WebSocket error from {}: {}", peer_for_read.id, e);
173                        break;
174                    }
175                    _ => {}
176                }
177            }
178            // Cleanup on disconnect
179            let _ = mesh_clone.bye(&peer_for_read.id).await;
180        });
181
182        // Spawn task to send outgoing messages
183        tokio::spawn(async move {
184            while let Some(message) = rx.recv().await {
185                if write.send(Message::Text(message)).await.is_err() {
186                    break;
187                }
188            }
189        });
190
191        Ok(())
192    }
193}
194
195/// WebSocket server for accepting incoming peer connections
196///
197/// Runs a WebSocket server that listens for peer connections. Used when Gun is
198/// running in "super peer" (relay) mode to help other peers with NAT traversal.
199///
200/// # Example
201///
202/// ```rust,no_run
203/// use gun::websocket::WebSocketServer;
204/// use gun::core::GunCore;
205/// use gun::dam::Mesh;
206/// use std::sync::Arc;
207///
208/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
209/// let core = Arc::new(GunCore::new());
210/// let mesh = Arc::new(Mesh::new(core.clone(), /* ... */));
211/// let server = WebSocketServer::new(core, mesh, 8080);
212/// server.start().await?;
213/// # Ok(())
214/// # }
215/// ```
216pub struct WebSocketServer {
217    core: Arc<GunCore>,
218    mesh: Arc<Mesh>,
219    port: u16,
220}
221
222impl WebSocketServer {
223    pub fn new(core: Arc<GunCore>, mesh: Arc<Mesh>, port: u16) -> Self {
224        Self { core, mesh, port }
225    }
226
227    /// Start the WebSocket server
228    pub async fn start(&self) -> GunResult<()> {
229        let addr = format!("0.0.0.0:{}", self.port);
230        let listener = TcpListener::bind(&addr).await?;
231        println!("Gun.rs WebSocket server listening on ws://{}", addr);
232
233        while let Ok((stream, addr)) = listener.accept().await {
234            let core = self.core.clone();
235            let mesh = self.mesh.clone();
236            tokio::spawn(Self::handle_connection(stream, addr, core, mesh));
237        }
238
239        Ok(())
240    }
241
242    async fn handle_connection(
243        stream: TcpStream,
244        addr: std::net::SocketAddr,
245        _core: Arc<GunCore>,
246        mesh: Arc<Mesh>,
247    ) {
248        let ws_stream = match accept_async(stream).await {
249            Ok(ws) => ws,
250            Err(e) => {
251                eprintln!("Error accepting WebSocket connection: {}", e);
252                return;
253            }
254        };
255
256        let peer_url = format!("ws://{}", addr);
257        let mut peer = Peer::new(peer_url.clone());
258        let peer_id = peer.id.clone();
259
260        // Create channel for sending messages
261        let (tx, mut rx) = mpsc::unbounded_channel();
262        peer.set_sender(tx.clone());
263
264        if let Err(e) = mesh.hi(peer.clone()).await {
265            eprintln!("Error adding peer: {}", e);
266            return;
267        }
268
269        // Set sender in mesh
270        if let Err(e) = mesh.set_peer_sender(&peer_id, tx.clone()).await {
271            eprintln!("Error setting peer sender: {}", e);
272        }
273
274        let (mut write, mut read) = ws_stream.split();
275
276        // Spawn task to handle incoming messages
277        let mesh_clone = mesh.clone();
278        let peer_for_read = peer.clone();
279        let handle_task = tokio::spawn(async move {
280            while let Some(msg) = read.next().await {
281                match msg {
282                    Ok(Message::Text(text)) => {
283                        if let Err(e) = mesh_clone.hear(&text, Some(&peer_for_read)).await {
284                            eprintln!("Error handling message: {}", e);
285                        }
286                    }
287                    Ok(Message::Close(_)) => {
288                        break;
289                    }
290                    Err(e) => {
291                        eprintln!("WebSocket error: {}", e);
292                        break;
293                    }
294                    _ => {}
295                }
296            }
297        });
298
299        // Spawn task to send outgoing messages
300        let send_task = tokio::spawn(async move {
301            while let Some(message) = rx.recv().await {
302                if write.send(Message::Text(message)).await.is_err() {
303                    break;
304                }
305            }
306        });
307
308        // Wait for either task to complete (connection closed)
309        tokio::select! {
310            _ = handle_task => {},
311            _ = send_task => {},
312        }
313
314        // Cleanup on disconnect
315        if let Err(e) = mesh.bye(&peer_id).await {
316            eprintln!("Error removing peer: {}", e);
317        }
318    }
319}