1use crate::core::GunCore;
15use crate::dam::{Mesh, Peer};
16use crate::error::GunResult;
17use futures_util::{SinkExt, StreamExt};
18use std::sync::Arc;
19use std::time::Duration;
20use tokio::net::{TcpListener, TcpStream};
21use tokio::sync::mpsc;
22use tokio_tungstenite::accept_async;
23use tokio_tungstenite::{connect_async, tungstenite::Message};
24use url::Url;
25
26pub struct WebSocketClient {
48 core: Arc<GunCore>,
49 mesh: Arc<Mesh>,
50}
51
52impl WebSocketClient {
53 pub fn new(core: Arc<GunCore>, mesh: Arc<Mesh>) -> Self {
54 Self { core, mesh }
55 }
56
57 pub async fn connect(&self, url: &str) -> GunResult<()> {
60 let url_str = url.to_string();
61 let core = self.core.clone();
62 let mesh = self.mesh.clone();
63
64 let public_url = Self::ensure_public_url(&url_str)?;
66
67 let mut retry_count = 0;
68 let max_retries = 10;
69 let base_wait = Duration::from_millis(500);
70
71 loop {
72 match Self::connect_once(&public_url, core.clone(), mesh.clone()).await {
73 Ok(_) => {
74 tokio::time::sleep(Duration::from_millis(500)).await;
76 return Ok(());
77 }
78 Err(e) => {
79 retry_count += 1;
80 if retry_count >= max_retries {
81 return Err(crate::error::GunError::Network(format!(
82 "Max reconnection attempts reached for {}: {}",
83 public_url, e
84 )));
85 }
86
87 let wait_time = base_wait * (1 << retry_count.min(4));
89 eprintln!(
90 "Connection failed for {}, retrying in {:?}... (attempt {})",
91 public_url, wait_time, retry_count
92 );
93 tokio::time::sleep(wait_time).await;
94 }
95 }
96 }
97 }
98
99 fn ensure_public_url(url: &str) -> GunResult<String> {
102 let parsed = Url::parse(url)?;
103
104 if let Some(host) = parsed.host_str() {
106 if host == "localhost" || host == "127.0.0.1" || host == "::1" {
107 eprintln!("Warning: Connecting to localhost URL: {}. This should use a public IP for NAT traversal.", url);
111 }
112 }
113
114 Ok(url.to_string())
115 }
116
117 async fn connect_once(url: &str, _core: Arc<GunCore>, mesh: Arc<Mesh>) -> GunResult<()> {
118 let ws_url = url
120 .replace("http://", "ws://")
121 .replace("https://", "wss://");
122 let url = Url::parse(&ws_url).map_err(|e| {
123 crate::error::GunError::Network(format!("Invalid URL {}: {}", ws_url, e))
124 })?;
125
126 let (ws_stream, response) = connect_async(url.clone()).await.map_err(|e| {
128 crate::error::GunError::Network(format!(
129 "WebSocket connection failed to {}: {}",
130 ws_url, e
131 ))
132 })?;
133
134 if response.status() != 101 {
136 return Err(crate::error::GunError::Network(format!(
137 "WebSocket handshake failed with status: {}",
138 response.status()
139 )));
140 }
141
142 let peer = Peer::new(ws_url.clone());
143 let peer_id = peer.id.clone();
144
145 let (tx, mut rx) = mpsc::unbounded_channel();
147
148 mesh.hi(peer.clone()).await?;
150
151 mesh.set_peer_sender(&peer_id, tx.clone()).await?;
153
154 let (mut write, mut read) = ws_stream.split();
156
157 let mesh_clone = mesh.clone();
159 let peer_for_read = peer.clone();
160 tokio::spawn(async move {
161 while let Some(msg) = read.next().await {
162 match msg {
163 Ok(Message::Text(text)) => {
164 if let Err(e) = mesh_clone.hear(&text, Some(&peer_for_read)).await {
165 eprintln!("Error handling message from {}: {}", peer_for_read.id, e);
166 }
167 }
168 Ok(Message::Close(_)) => {
169 break;
170 }
171 Err(e) => {
172 eprintln!("WebSocket error from {}: {}", peer_for_read.id, e);
173 break;
174 }
175 _ => {}
176 }
177 }
178 let _ = mesh_clone.bye(&peer_for_read.id).await;
180 });
181
182 tokio::spawn(async move {
184 while let Some(message) = rx.recv().await {
185 if write.send(Message::Text(message)).await.is_err() {
186 break;
187 }
188 }
189 });
190
191 Ok(())
192 }
193}
194
195pub struct WebSocketServer {
217 core: Arc<GunCore>,
218 mesh: Arc<Mesh>,
219 port: u16,
220}
221
222impl WebSocketServer {
223 pub fn new(core: Arc<GunCore>, mesh: Arc<Mesh>, port: u16) -> Self {
224 Self { core, mesh, port }
225 }
226
227 pub async fn start(&self) -> GunResult<()> {
229 let addr = format!("0.0.0.0:{}", self.port);
230 let listener = TcpListener::bind(&addr).await?;
231 println!("Gun.rs WebSocket server listening on ws://{}", addr);
232
233 while let Ok((stream, addr)) = listener.accept().await {
234 let core = self.core.clone();
235 let mesh = self.mesh.clone();
236 tokio::spawn(Self::handle_connection(stream, addr, core, mesh));
237 }
238
239 Ok(())
240 }
241
242 async fn handle_connection(
243 stream: TcpStream,
244 addr: std::net::SocketAddr,
245 _core: Arc<GunCore>,
246 mesh: Arc<Mesh>,
247 ) {
248 let ws_stream = match accept_async(stream).await {
249 Ok(ws) => ws,
250 Err(e) => {
251 eprintln!("Error accepting WebSocket connection: {}", e);
252 return;
253 }
254 };
255
256 let peer_url = format!("ws://{}", addr);
257 let mut peer = Peer::new(peer_url.clone());
258 let peer_id = peer.id.clone();
259
260 let (tx, mut rx) = mpsc::unbounded_channel();
262 peer.set_sender(tx.clone());
263
264 if let Err(e) = mesh.hi(peer.clone()).await {
265 eprintln!("Error adding peer: {}", e);
266 return;
267 }
268
269 if let Err(e) = mesh.set_peer_sender(&peer_id, tx.clone()).await {
271 eprintln!("Error setting peer sender: {}", e);
272 }
273
274 let (mut write, mut read) = ws_stream.split();
275
276 let mesh_clone = mesh.clone();
278 let peer_for_read = peer.clone();
279 let handle_task = tokio::spawn(async move {
280 while let Some(msg) = read.next().await {
281 match msg {
282 Ok(Message::Text(text)) => {
283 if let Err(e) = mesh_clone.hear(&text, Some(&peer_for_read)).await {
284 eprintln!("Error handling message: {}", e);
285 }
286 }
287 Ok(Message::Close(_)) => {
288 break;
289 }
290 Err(e) => {
291 eprintln!("WebSocket error: {}", e);
292 break;
293 }
294 _ => {}
295 }
296 }
297 });
298
299 let send_task = tokio::spawn(async move {
301 while let Some(message) = rx.recv().await {
302 if write.send(Message::Text(message)).await.is_err() {
303 break;
304 }
305 }
306 });
307
308 tokio::select! {
310 _ = handle_task => {},
311 _ = send_task => {},
312 }
313
314 if let Err(e) = mesh.bye(&peer_id).await {
316 eprintln!("Error removing peer: {}", e);
317 }
318 }
319}