wsforge_core/connection.rs
1//! WebSocket connection management and message handling.
2//!
3//! This module provides the core functionality for managing WebSocket connections,
4//! including connection lifecycle, message routing, and broadcasting capabilities.
5//!
6//! # Overview
7//!
8//! The connection module consists of three main components:
9//!
10//! - [`Connection`]: Represents an individual WebSocket connection
11//! - [`ConnectionManager`]: Manages multiple connections with thread-safe operations
12//! - [`handle_websocket`]: Async function that handles the WebSocket lifecycle
13//!
14//! # Architecture
15//!
16//! Each WebSocket connection runs two concurrent tasks:
17//! - **Read task**: Receives messages from the client
18//! - **Write task**: Sends messages to the client via an unbounded channel
19//!
20//! This architecture ensures that slow clients don't block message processing.
21//!
22//! # Examples
23//!
24//! ## Creating and Using a ConnectionManager
25//!
26//! ```
27//! use wsforge::prelude::*;
28//! use std::sync::Arc;
29//!
30//! let manager = Arc::new(ConnectionManager::new());
31//!
32//! // Check connection count
33//! println!("Active connections: {}", manager.count());
34//!
35//! // Broadcast a message to all connections
36//! manager.broadcast(Message::text("Hello everyone!"));
37//! ```
38//!
39//! ## Broadcasting Messages
40//!
41//! ```
42//! use wsforge::prelude::*;
43//! use std::sync::Arc;
44//!
45//! # let manager = Arc::new(ConnectionManager::new());
46//! # let conn_id = "conn_0".to_string();
47//! // Broadcast to all connections
48//! manager.broadcast(Message::text("System announcement"));
49//!
50//! // Broadcast to all except one
51//! manager.broadcast_except(&conn_id, Message::text("User joined"));
52//!
53//! // Broadcast to specific connections
54//! let target_ids = vec!["conn_1".to_string(), "conn_2".to_string()];
55//! manager.broadcast_to(&target_ids, Message::text("Private message"));
56//! ```
57
58use crate::error::{Error, Result};
59use crate::message::Message;
60use dashmap::DashMap;
61use futures_util::{SinkExt, StreamExt};
62use serde::{Deserialize, Serialize};
63use std::net::SocketAddr;
64use std::sync::Arc;
65use tokio::net::TcpStream;
66use tokio::sync::mpsc;
67use tokio_tungstenite::WebSocketStream;
68use tracing::{debug, error, info, warn};
69
70/// A unique identifier for a WebSocket connection.
71///
72/// Connection IDs are automatically generated and guaranteed to be unique
73/// within the lifetime of the application.
74pub type ConnectionId = String;
75
76/// Metadata about a WebSocket connection.
77///
78/// Contains information about when the connection was established,
79/// the client's address, and optional protocol information.
80///
81/// # Examples
82///
83/// ```
84/// use wsforge::connection::ConnectionInfo;
85/// use std::net::SocketAddr;
86///
87/// let info = ConnectionInfo {
88/// id: "conn_0".to_string(),
89/// addr: "127.0.0.1:8080".parse().unwrap(),
90/// connected_at: 1634567890,
91/// protocol: Some("websocket".to_string()),
92/// };
93///
94/// println!("Connection {} from {}", info.id, info.addr);
95/// ```
96#[derive(Debug, Clone, Serialize, Deserialize)]
97pub struct ConnectionInfo {
98 /// Unique identifier for this connection
99 pub id: ConnectionId,
100 /// Socket address of the connected client
101 pub addr: SocketAddr,
102 /// Unix timestamp when the connection was established
103 pub connected_at: u64,
104 /// Optional protocol information (e.g., "websocket", "wss")
105 pub protocol: Option<String>,
106}
107
108/// Represents an active WebSocket connection.
109///
110/// A `Connection` provides methods to send messages to the connected client.
111/// Messages are sent asynchronously through an unbounded channel, ensuring
112/// that slow clients don't block the server.
113///
114/// # Thread Safety
115///
116/// `Connection` is cheaply cloneable (uses `Arc` internally) and can be
117/// safely shared across threads.
118///
119/// # Examples
120///
121/// ## Sending Text Messages
122///
123/// ```
124/// use wsforge::prelude::*;
125///
126/// # async fn example(conn: Connection) -> Result<()> {
127/// // Send a text message
128/// conn.send_text("Hello, client!")?;
129///
130/// // Send JSON data
131/// #[derive(serde::Serialize)]
132/// struct Response {
133/// status: String,
134/// data: i32,
135/// }
136///
137/// conn.send_json(&Response {
138/// status: "ok".to_string(),
139/// data: 42,
140/// })?;
141/// # Ok(())
142/// # }
143/// ```
144///
145/// ## Sending Binary Data
146///
147/// ```
148/// use wsforge::prelude::*;
149///
150/// # async fn example(conn: Connection) -> Result<()> {
151/// let data = vec![0x01, 0x02, 0x03, 0x04];
152/// conn.send_binary(data)?;
153/// # Ok(())
154/// # }
155/// ```
156pub struct Connection {
157 /// Unique identifier for this connection
158 pub id: ConnectionId,
159 /// Connection metadata
160 pub info: ConnectionInfo,
161 /// Channel sender for outgoing messages
162 sender: mpsc::UnboundedSender<Message>,
163}
164
165impl Connection {
166 /// Creates a new `Connection` instance.
167 ///
168 /// This is typically called internally by the framework when a new
169 /// WebSocket connection is established.
170 ///
171 /// # Arguments
172 ///
173 /// * `id` - Unique identifier for the connection
174 /// * `addr` - Socket address of the client
175 /// * `sender` - Channel sender for outgoing messages
176 ///
177 /// # Examples
178 ///
179 /// ```
180 /// use wsforge::connection::Connection;
181 /// use tokio::sync::mpsc;
182 /// use std::net::SocketAddr;
183 ///
184 /// let (tx, rx) = mpsc::unbounded_channel();
185 /// let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap();
186 /// let conn = Connection::new("conn_0".to_string(), addr, tx);
187 ///
188 /// assert_eq!(conn.id(), "conn_0");
189 /// ```
190 pub fn new(id: ConnectionId, addr: SocketAddr, sender: mpsc::UnboundedSender<Message>) -> Self {
191 let info = ConnectionInfo {
192 id: id.clone(),
193 addr,
194 connected_at: std::time::SystemTime::now()
195 .duration_since(std::time::UNIX_EPOCH)
196 .unwrap()
197 .as_secs(),
198 protocol: None,
199 };
200
201 Self { id, info, sender }
202 }
203
204 /// Sends a message to the connected client.
205 ///
206 /// Messages are queued in an unbounded channel and sent asynchronously.
207 /// This method returns immediately without waiting for the message to be sent.
208 ///
209 /// # Errors
210 ///
211 /// Returns an error if the connection has been closed and the channel
212 /// receiver has been dropped.
213 ///
214 /// # Examples
215 ///
216 /// ```
217 /// use wsforge::prelude::*;
218 ///
219 /// # async fn example(conn: Connection) -> Result<()> {
220 /// let msg = Message::text("Hello!");
221 /// conn.send(msg)?;
222 /// # Ok(())
223 /// # }
224 /// ```
225 pub fn send(&self, message: Message) -> Result<()> {
226 self.sender
227 .send(message)
228 .map_err(|e| Error::custom(format!("Failed to send message: {}", e)))
229 }
230
231 /// Sends a text message to the connected client.
232 ///
233 /// This is a convenience method that creates a text [`Message`] and sends it.
234 ///
235 /// # Errors
236 ///
237 /// Returns an error if the connection has been closed.
238 ///
239 /// # Examples
240 ///
241 /// ```
242 /// use wsforge::prelude::*;
243 ///
244 /// # async fn example(conn: Connection) -> Result<()> {
245 /// conn.send_text("Welcome to the chat!")?;
246 /// # Ok(())
247 /// # }
248 /// ```
249 pub fn send_text(&self, text: impl Into<String>) -> Result<()> {
250 self.send(Message::text(text.into()))
251 }
252
253 /// Sends binary data to the connected client.
254 ///
255 /// This is a convenience method that creates a binary [`Message`] and sends it.
256 ///
257 /// # Errors
258 ///
259 /// Returns an error if the connection has been closed.
260 ///
261 /// # Examples
262 ///
263 /// ```
264 /// use wsforge::prelude::*;
265 ///
266 /// # async fn example(conn: Connection) -> Result<()> {
267 /// let data = vec![0xFF, 0xFE, 0xFD];
268 /// conn.send_binary(data)?;
269 /// # Ok(())
270 /// # }
271 /// ```
272 pub fn send_binary(&self, data: Vec<u8>) -> Result<()> {
273 self.send(Message::binary(data))
274 }
275
276 /// Serializes data to JSON and sends it as a text message.
277 ///
278 /// This is a convenience method for sending structured data. The data
279 /// is serialized using `serde_json` and sent as a text message.
280 ///
281 /// # Errors
282 ///
283 /// Returns an error if:
284 /// - Serialization fails
285 /// - The connection has been closed
286 ///
287 /// # Examples
288 ///
289 /// ```
290 /// use wsforge::prelude::*;
291 /// use serde::Serialize;
292 ///
293 /// #[derive(Serialize)]
294 /// struct GameState {
295 /// score: u32,
296 /// level: u8,
297 /// }
298 ///
299 /// # async fn example(conn: Connection) -> Result<()> {
300 /// let state = GameState { score: 1000, level: 5 };
301 /// conn.send_json(&state)?;
302 /// # Ok(())
303 /// # }
304 /// ```
305 pub fn send_json<T: Serialize>(&self, data: &T) -> Result<()> {
306 let json = serde_json::to_string(data)?;
307 self.send_text(json)
308 }
309
310 /// Returns the unique identifier for this connection.
311 ///
312 /// # Examples
313 ///
314 /// ```
315 /// use wsforge::prelude::*;
316 ///
317 /// # fn example(conn: Connection) {
318 /// println!("Connection ID: {}", conn.id());
319 /// # }
320 /// ```
321 pub fn id(&self) -> &ConnectionId {
322 &self.id
323 }
324
325 /// Returns the connection metadata.
326 ///
327 /// # Examples
328 ///
329 /// ```
330 /// use wsforge::prelude::*;
331 ///
332 /// # fn example(conn: Connection) {
333 /// let info = conn.info();
334 /// println!("Client address: {}", info.addr);
335 /// println!("Connected at: {}", info.connected_at);
336 /// # }
337 /// ```
338 pub fn info(&self) -> &ConnectionInfo {
339 &self.info
340 }
341}
342
343/// Manages a collection of active WebSocket connections.
344///
345/// `ConnectionManager` provides thread-safe operations for managing connections,
346/// including adding, removing, and broadcasting messages. It uses [`DashMap`]
347/// internally for lock-free concurrent access.
348///
349/// # Thread Safety
350///
351/// All operations are thread-safe and can be called from multiple threads
352/// concurrently without additional synchronization.
353///
354/// # Examples
355///
356/// ## Basic Usage
357///
358/// ```
359/// use wsforge::prelude::*;
360/// use std::sync::Arc;
361///
362/// let manager = Arc::new(ConnectionManager::new());
363///
364/// // Get connection count
365/// let count = manager.count();
366/// println!("Active connections: {}", count);
367///
368/// // Get all connection IDs
369/// let ids = manager.all_ids();
370/// for id in ids {
371/// println!("Connection: {}", id);
372/// }
373/// ```
374///
375/// ## Broadcasting
376///
377/// ```
378/// use wsforge::prelude::*;
379/// use std::sync::Arc;
380///
381/// # let manager = Arc::new(ConnectionManager::new());
382/// // Broadcast system announcement
383/// manager.broadcast(Message::text("Server maintenance in 5 minutes"));
384///
385/// // Notify everyone except the sender
386/// let sender_id = "conn_42";
387/// manager.broadcast_except(&sender_id.to_string(),
388/// Message::text("New user joined the chat"));
389/// ```
390pub struct ConnectionManager {
391 /// Thread-safe map of active connections
392 connections: Arc<DashMap<ConnectionId, Connection>>,
393}
394
395impl ConnectionManager {
396 /// Creates a new empty `ConnectionManager`.
397 ///
398 /// # Examples
399 ///
400 /// ```
401 /// use wsforge::prelude::*;
402 /// use std::sync::Arc;
403 ///
404 /// let manager = Arc::new(ConnectionManager::new());
405 /// assert_eq!(manager.count(), 0);
406 /// ```
407 pub fn new() -> Self {
408 Self {
409 connections: Arc::new(DashMap::new()),
410 }
411 }
412
413 /// Adds a connection to the manager.
414 ///
415 /// Returns the total number of connections after adding.
416 ///
417 /// # Examples
418 ///
419 /// ```
420 /// use wsforge::prelude::*;
421 /// use tokio::sync::mpsc;
422 /// use std::net::SocketAddr;
423 ///
424 /// # fn example() {
425 /// let manager = ConnectionManager::new();
426 /// let (tx, rx) = mpsc::unbounded_channel();
427 /// let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap();
428 /// let conn = Connection::new("conn_0".to_string(), addr, tx);
429 ///
430 /// let count = manager.add(conn);
431 /// assert_eq!(count, 1);
432 /// # }
433 /// ```
434 pub fn add(&self, conn: Connection) -> usize {
435 let id = conn.id.clone();
436 self.connections.insert(id.clone(), conn);
437 let count = self.connections.len();
438 info!("Added connection: {} (Total: {})", id, count);
439 count
440 }
441
442 /// Removes a connection from the manager.
443 ///
444 /// Returns the removed connection if it existed, or `None` if not found.
445 ///
446 /// # Examples
447 ///
448 /// ```
449 /// use wsforge::prelude::*;
450 ///
451 /// # fn example(manager: &ConnectionManager) {
452 /// let conn_id = "conn_42".to_string();
453 /// if let Some(conn) = manager.remove(&conn_id) {
454 /// println!("Removed connection: {}", conn.id());
455 /// }
456 /// # }
457 /// ```
458 pub fn remove(&self, id: &ConnectionId) -> Option<Connection> {
459 let result = self.connections.remove(id).map(|(_, conn)| conn);
460 let count = self.connections.len();
461 info!("Removed connection: {} (Total: {})", id, count);
462 result
463 }
464
465 /// Retrieves a connection by its ID.
466 ///
467 /// Returns a clone of the connection if found, or `None` if not found.
468 ///
469 /// # Performance
470 ///
471 /// This operation is O(1) and does not block other concurrent operations.
472 ///
473 /// # Examples
474 ///
475 /// ```
476 /// use wsforge::prelude::*;
477 ///
478 /// # async fn example(manager: &ConnectionManager) -> Result<()> {
479 /// let conn_id = "conn_0".to_string();
480 /// if let Some(conn) = manager.get(&conn_id) {
481 /// conn.send_text("Hello!")?;
482 /// }
483 /// # Ok(())
484 /// # }
485 /// ```
486 pub fn get(&self, id: &ConnectionId) -> Option<Connection> {
487 self.connections.get(id).map(|entry| entry.value().clone())
488 }
489
490 /// Broadcasts a message to all active connections.
491 ///
492 /// This method iterates through all connections and sends the message
493 /// to each one. Failed sends are logged but do not stop the broadcast.
494 ///
495 /// # Performance
496 ///
497 /// Broadcasts are performed synchronously but send operations are async,
498 /// so messages are queued immediately and sent in the background.
499 ///
500 /// # Examples
501 ///
502 /// ```
503 /// use wsforge::prelude::*;
504 ///
505 /// # fn example(manager: &ConnectionManager) {
506 /// manager.broadcast(Message::text("Server announcement!"));
507 /// # }
508 /// ```
509 pub fn broadcast(&self, message: Message) {
510 let count = self.connections.len();
511 debug!("Broadcasting message to {} connections", count);
512
513 let mut success = 0;
514 let mut failed = 0;
515
516 for entry in self.connections.iter() {
517 match entry.value().send(message.clone()) {
518 Ok(_) => {
519 success += 1;
520 debug!("✅ Broadcast sent to {}", entry.key());
521 }
522 Err(e) => {
523 failed += 1;
524 error!("❌ Failed to broadcast to {}: {}", entry.key(), e);
525 }
526 }
527 }
528
529 info!(
530 "Broadcast complete: {} success, {} failed out of {} total",
531 success, failed, count
532 );
533 }
534
535 /// Broadcasts a message to all connections except one.
536 ///
537 /// This is useful for notifying all users about an action taken by one user,
538 /// without sending the notification back to the actor.
539 ///
540 /// # Examples
541 ///
542 /// ```
543 /// use wsforge::prelude::*;
544 ///
545 /// # fn example(manager: &ConnectionManager) {
546 /// let sender_id = "conn_42".to_string();
547 /// manager.broadcast_except(&sender_id,
548 /// Message::text("User 42 sent a message"));
549 /// # }
550 /// ```
551 pub fn broadcast_except(&self, except_id: &ConnectionId, message: Message) {
552 debug!(
553 "Broadcasting message to {} connections (except {})",
554 self.connections.len() - 1,
555 except_id
556 );
557 for entry in self.connections.iter() {
558 if entry.key() != except_id {
559 if let Err(e) = entry.value().send(message.clone()) {
560 error!("Failed to broadcast to {}: {}", entry.key(), e);
561 }
562 }
563 }
564 }
565
566 /// Broadcasts a message to specific connections.
567 ///
568 /// Only connections whose IDs are in the provided list will receive the message.
569 /// Non-existent connection IDs are silently ignored.
570 ///
571 /// # Examples
572 ///
573 /// ```
574 /// use wsforge::prelude::*;
575 ///
576 /// # fn example(manager: &ConnectionManager) {
577 /// let vip_users = vec![
578 /// "conn_1".to_string(),
579 /// "conn_5".to_string(),
580 /// "conn_10".to_string(),
581 /// ];
582 /// manager.broadcast_to(&vip_users, Message::text("VIP announcement"));
583 /// # }
584 /// ```
585 pub fn broadcast_to(&self, ids: &[ConnectionId], message: Message) {
586 for id in ids {
587 if let Some(conn) = self.get(id) {
588 if let Err(e) = conn.send(message.clone()) {
589 error!("Failed to send to {}: {}", id, e);
590 }
591 }
592 }
593 }
594
595 /// Returns the number of active connections.
596 ///
597 /// # Examples
598 ///
599 /// ```
600 /// use wsforge::prelude::*;
601 ///
602 /// # fn example(manager: &ConnectionManager) {
603 /// let count = manager.count();
604 /// println!("Active connections: {}", count);
605 /// # }
606 /// ```
607 pub fn count(&self) -> usize {
608 self.connections.len()
609 }
610
611 /// Returns a list of all connection IDs.
612 ///
613 /// The order of IDs is not guaranteed.
614 ///
615 /// # Examples
616 ///
617 /// ```
618 /// use wsforge::prelude::*;
619 ///
620 /// # fn example(manager: &ConnectionManager) {
621 /// for id in manager.all_ids() {
622 /// println!("Connection ID: {}", id);
623 /// }
624 /// # }
625 /// ```
626 pub fn all_ids(&self) -> Vec<ConnectionId> {
627 self.connections.iter().map(|e| e.key().clone()).collect()
628 }
629
630 /// Returns clones of all active connections.
631 ///
632 /// This is useful for batch operations on all connections.
633 ///
634 /// # Performance
635 ///
636 /// Since connections are lightweight (they contain Arc internally),
637 /// cloning is cheap.
638 ///
639 /// # Examples
640 ///
641 /// ```
642 /// use wsforge::prelude::*;
643 ///
644 /// # async fn example(manager: &ConnectionManager) -> Result<()> {
645 /// for conn in manager.all_connections() {
646 /// conn.send_text("Shutdown in 1 minute")?;
647 /// }
648 /// # Ok(())
649 /// # }
650 /// ```
651 pub fn all_connections(&self) -> Vec<Connection> {
652 self.connections.iter().map(|e| e.value().clone()).collect()
653 }
654}
655
656impl Clone for Connection {
657 fn clone(&self) -> Self {
658 Self {
659 id: self.id.clone(),
660 info: self.info.clone(),
661 sender: self.sender.clone(),
662 }
663 }
664}
665
666impl Default for ConnectionManager {
667 fn default() -> Self {
668 Self::new()
669 }
670}
671
672/// Handles the lifecycle of a WebSocket connection.
673///
674/// This function manages the entire lifecycle of a WebSocket connection from
675/// establishment to termination. It spawns two concurrent tasks:
676/// - A read task that receives messages from the client
677/// - A write task that sends messages to the client
678///
679/// # Architecture
680///
681/// The function uses a split WebSocket stream and an unbounded channel to
682/// decouple reading and writing operations. This ensures that slow clients
683/// don't block message processing and allows for efficient broadcasting.
684///
685/// # Lifecycle Events
686///
687/// 1. Connection is added to the manager
688/// 2. `on_connect` callback is invoked
689/// 3. Read and write tasks run concurrently
690/// 4. When either task completes, both are terminated
691/// 5. Connection is removed from the manager
692/// 6. `on_disconnect` callback is invoked
693///
694/// # Arguments
695///
696/// * `stream` - The WebSocket stream
697/// * `conn_id` - Unique identifier for this connection
698/// * `peer_addr` - Socket address of the connected client
699/// * `manager` - Shared connection manager
700/// * `on_message` - Callback invoked when a message is received
701/// * `on_connect` - Callback invoked when the connection is established
702/// * `on_disconnect` - Callback invoked when the connection is closed
703///
704/// # Examples
705///
706/// This function is typically called by the router and not directly by users.
707/// However, for custom implementations:
708///
709/// ```
710/// use wsforge::prelude::*;
711/// use std::sync::Arc;
712/// use tokio_tungstenite::accept_async;
713///
714/// # async fn example(stream: tokio::net::TcpStream, peer_addr: std::net::SocketAddr) -> Result<()> {
715/// let ws_stream = accept_async(stream).await?;
716/// let conn_id = "conn_0".to_string();
717/// let manager = Arc::new(ConnectionManager::new());
718///
719/// let on_message = Arc::new(|id: ConnectionId, msg: Message| {
720/// println!("Received from {}: {:?}", id, msg);
721/// });
722///
723/// let on_connect = Arc::new(|id: ConnectionId| {
724/// println!("Connected: {}", id);
725/// });
726///
727/// let on_disconnect = Arc::new(|id: ConnectionId| {
728/// println!("Disconnected: {}", id);
729/// });
730///
731/// handle_websocket(
732/// ws_stream,
733/// conn_id,
734/// peer_addr,
735/// manager,
736/// on_message,
737/// on_connect,
738/// on_disconnect,
739/// ).await;
740/// # Ok(())
741/// # }
742/// ```
743pub async fn handle_websocket(
744 stream: WebSocketStream<TcpStream>,
745 conn_id: ConnectionId,
746 peer_addr: SocketAddr,
747 manager: Arc<ConnectionManager>,
748 on_message: Arc<dyn Fn(ConnectionId, Message) + Send + Sync>,
749 on_connect: Arc<dyn Fn(ConnectionId) + Send + Sync>,
750 on_disconnect: Arc<dyn Fn(ConnectionId) + Send + Sync>,
751) {
752 info!(
753 "WebSocket connection established: {} from {}",
754 conn_id, peer_addr
755 );
756
757 let (mut ws_sender, mut ws_receiver) = stream.split();
758 let (tx, mut rx) = mpsc::unbounded_channel::<Message>();
759
760 // Create connection with actual peer address
761 let conn = Connection::new(conn_id.clone(), peer_addr, tx);
762
763 // Add connection to manager and get the count
764 let _count = manager.add(conn);
765
766 // Verify connection is actually in the map
767 let verify_count = manager.count();
768 debug!(
769 "Connection {} added. Verified count: {}",
770 conn_id, verify_count
771 );
772
773 // NOW call on_connect AFTER we've verified the connection is added
774 on_connect(conn_id.clone());
775
776 // Write task - sends messages to WebSocket
777 let conn_id_write = conn_id.clone();
778 let write_task = tokio::spawn(async move {
779 debug!("Write task started for {}", conn_id_write);
780
781 while let Some(message) = rx.recv().await {
782 debug!("📤 Sending message to {}", conn_id_write);
783
784 let msg = message.into_tungstenite();
785 if let Err(e) = ws_sender.send(msg).await {
786 error!("Failed to send message to {}: {}", conn_id_write, e);
787 break;
788 }
789
790 debug!("✅ Message sent to {}", conn_id_write);
791 }
792
793 info!("Write task ended for {}", conn_id_write);
794 });
795
796 // Read task - receives messages from WebSocket
797 let conn_id_read = conn_id.clone();
798 let read_task = tokio::spawn(async move {
799 debug!("Read task started for {}", conn_id_read);
800
801 while let Some(result) = ws_receiver.next().await {
802 match result {
803 Ok(msg) => {
804 if msg.is_close() {
805 info!("Close message received from {}", conn_id_read);
806 break;
807 }
808 debug!("📨 Received message from {}", conn_id_read);
809 let message = Message::from_tungstenite(msg);
810 on_message(conn_id_read.clone(), message);
811 }
812 Err(e) => {
813 warn!("WebSocket error for {}: {}", conn_id_read, e);
814 break;
815 }
816 }
817 }
818 debug!("Read task ended for {}", conn_id_read);
819 });
820
821 // Wait for either task to complete
822 tokio::select! {
823 _ = write_task => {
824 debug!("Write task finished first for {}", conn_id);
825 },
826 _ = read_task => {
827 debug!("Read task finished first for {}", conn_id);
828 },
829 }
830
831 // Remove connection and call disconnect
832 manager.remove(&conn_id);
833 on_disconnect(conn_id);
834}