wsforge_core/
connection.rs

1//! WebSocket connection management and message handling.
2//!
3//! This module provides the core functionality for managing WebSocket connections,
4//! including connection lifecycle, message routing, and broadcasting capabilities.
5//!
6//! # Overview
7//!
8//! The connection module consists of three main components:
9//!
10//! - [`Connection`]: Represents an individual WebSocket connection
11//! - [`ConnectionManager`]: Manages multiple connections with thread-safe operations
12//! - [`handle_websocket`]: Async function that handles the WebSocket lifecycle
13//!
14//! # Architecture
15//!
16//! Each WebSocket connection runs two concurrent tasks:
17//! - **Read task**: Receives messages from the client
18//! - **Write task**: Sends messages to the client via an unbounded channel
19//!
20//! This architecture ensures that slow clients don't block message processing.
21//!
22//! # Examples
23//!
24//! ## Creating and Using a ConnectionManager
25//!
26//! ```
27//! use wsforge::prelude::*;
28//! use std::sync::Arc;
29//!
30//! let manager = Arc::new(ConnectionManager::new());
31//!
32//! // Check connection count
33//! println!("Active connections: {}", manager.count());
34//!
35//! // Broadcast a message to all connections
36//! manager.broadcast(Message::text("Hello everyone!"));
37//! ```
38//!
39//! ## Broadcasting Messages
40//!
41//! ```
42//! use wsforge::prelude::*;
43//! use std::sync::Arc;
44//!
45//! # let manager = Arc::new(ConnectionManager::new());
46//! # let conn_id = "conn_0".to_string();
47//! // Broadcast to all connections
48//! manager.broadcast(Message::text("System announcement"));
49//!
50//! // Broadcast to all except one
51//! manager.broadcast_except(&conn_id, Message::text("User joined"));
52//!
53//! // Broadcast to specific connections
54//! let target_ids = vec!["conn_1".to_string(), "conn_2".to_string()];
55//! manager.broadcast_to(&target_ids, Message::text("Private message"));
56//! ```
57
58use crate::error::{Error, Result};
59use crate::message::Message;
60use dashmap::DashMap;
61use futures_util::{SinkExt, StreamExt};
62use serde::{Deserialize, Serialize};
63use std::net::SocketAddr;
64use std::sync::Arc;
65use tokio::net::TcpStream;
66use tokio::sync::mpsc;
67use tokio_tungstenite::WebSocketStream;
68use tracing::{debug, error, info, warn};
69
70/// A unique identifier for a WebSocket connection.
71///
72/// Connection IDs are automatically generated and guaranteed to be unique
73/// within the lifetime of the application.
74pub type ConnectionId = String;
75
76/// Metadata about a WebSocket connection.
77///
78/// Contains information about when the connection was established,
79/// the client's address, and optional protocol information.
80///
81/// # Examples
82///
83/// ```
84/// use wsforge::connection::ConnectionInfo;
85/// use std::net::SocketAddr;
86///
87/// let info = ConnectionInfo {
88///     id: "conn_0".to_string(),
89///     addr: "127.0.0.1:8080".parse().unwrap(),
90///     connected_at: 1634567890,
91///     protocol: Some("websocket".to_string()),
92/// };
93///
94/// println!("Connection {} from {}", info.id, info.addr);
95/// ```
96#[derive(Debug, Clone, Serialize, Deserialize)]
97pub struct ConnectionInfo {
98    /// Unique identifier for this connection
99    pub id: ConnectionId,
100    /// Socket address of the connected client
101    pub addr: SocketAddr,
102    /// Unix timestamp when the connection was established
103    pub connected_at: u64,
104    /// Optional protocol information (e.g., "websocket", "wss")
105    pub protocol: Option<String>,
106}
107
108/// Represents an active WebSocket connection.
109///
110/// A `Connection` provides methods to send messages to the connected client.
111/// Messages are sent asynchronously through an unbounded channel, ensuring
112/// that slow clients don't block the server.
113///
114/// # Thread Safety
115///
116/// `Connection` is cheaply cloneable (uses `Arc` internally) and can be
117/// safely shared across threads.
118///
119/// # Examples
120///
121/// ## Sending Text Messages
122///
123/// ```
124/// use wsforge::prelude::*;
125///
126/// # async fn example(conn: Connection) -> Result<()> {
127/// // Send a text message
128/// conn.send_text("Hello, client!")?;
129///
130/// // Send JSON data
131/// #[derive(serde::Serialize)]
132/// struct Response {
133///     status: String,
134///     data: i32,
135/// }
136///
137/// conn.send_json(&Response {
138///     status: "ok".to_string(),
139///     data: 42,
140/// })?;
141/// # Ok(())
142/// # }
143/// ```
144///
145/// ## Sending Binary Data
146///
147/// ```
148/// use wsforge::prelude::*;
149///
150/// # async fn example(conn: Connection) -> Result<()> {
151/// let data = vec![0x01, 0x02, 0x03, 0x04];
152/// conn.send_binary(data)?;
153/// # Ok(())
154/// # }
155/// ```
156pub struct Connection {
157    /// Unique identifier for this connection
158    pub id: ConnectionId,
159    /// Connection metadata
160    pub info: ConnectionInfo,
161    /// Channel sender for outgoing messages
162    sender: mpsc::UnboundedSender<Message>,
163}
164
165impl Connection {
166    /// Creates a new `Connection` instance.
167    ///
168    /// This is typically called internally by the framework when a new
169    /// WebSocket connection is established.
170    ///
171    /// # Arguments
172    ///
173    /// * `id` - Unique identifier for the connection
174    /// * `addr` - Socket address of the client
175    /// * `sender` - Channel sender for outgoing messages
176    ///
177    /// # Examples
178    ///
179    /// ```
180    /// use wsforge::connection::Connection;
181    /// use tokio::sync::mpsc;
182    /// use std::net::SocketAddr;
183    ///
184    /// let (tx, rx) = mpsc::unbounded_channel();
185    /// let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap();
186    /// let conn = Connection::new("conn_0".to_string(), addr, tx);
187    ///
188    /// assert_eq!(conn.id(), "conn_0");
189    /// ```
190    pub fn new(id: ConnectionId, addr: SocketAddr, sender: mpsc::UnboundedSender<Message>) -> Self {
191        let info = ConnectionInfo {
192            id: id.clone(),
193            addr,
194            connected_at: std::time::SystemTime::now()
195                .duration_since(std::time::UNIX_EPOCH)
196                .unwrap()
197                .as_secs(),
198            protocol: None,
199        };
200
201        Self { id, info, sender }
202    }
203
204    /// Sends a message to the connected client.
205    ///
206    /// Messages are queued in an unbounded channel and sent asynchronously.
207    /// This method returns immediately without waiting for the message to be sent.
208    ///
209    /// # Errors
210    ///
211    /// Returns an error if the connection has been closed and the channel
212    /// receiver has been dropped.
213    ///
214    /// # Examples
215    ///
216    /// ```
217    /// use wsforge::prelude::*;
218    ///
219    /// # async fn example(conn: Connection) -> Result<()> {
220    /// let msg = Message::text("Hello!");
221    /// conn.send(msg)?;
222    /// # Ok(())
223    /// # }
224    /// ```
225    pub fn send(&self, message: Message) -> Result<()> {
226        self.sender
227            .send(message)
228            .map_err(|e| Error::custom(format!("Failed to send message: {}", e)))
229    }
230
231    /// Sends a text message to the connected client.
232    ///
233    /// This is a convenience method that creates a text [`Message`] and sends it.
234    ///
235    /// # Errors
236    ///
237    /// Returns an error if the connection has been closed.
238    ///
239    /// # Examples
240    ///
241    /// ```
242    /// use wsforge::prelude::*;
243    ///
244    /// # async fn example(conn: Connection) -> Result<()> {
245    /// conn.send_text("Welcome to the chat!")?;
246    /// # Ok(())
247    /// # }
248    /// ```
249    pub fn send_text(&self, text: impl Into<String>) -> Result<()> {
250        self.send(Message::text(text.into()))
251    }
252
253    /// Sends binary data to the connected client.
254    ///
255    /// This is a convenience method that creates a binary [`Message`] and sends it.
256    ///
257    /// # Errors
258    ///
259    /// Returns an error if the connection has been closed.
260    ///
261    /// # Examples
262    ///
263    /// ```
264    /// use wsforge::prelude::*;
265    ///
266    /// # async fn example(conn: Connection) -> Result<()> {
267    /// let data = vec![0xFF, 0xFE, 0xFD];
268    /// conn.send_binary(data)?;
269    /// # Ok(())
270    /// # }
271    /// ```
272    pub fn send_binary(&self, data: Vec<u8>) -> Result<()> {
273        self.send(Message::binary(data))
274    }
275
276    /// Serializes data to JSON and sends it as a text message.
277    ///
278    /// This is a convenience method for sending structured data. The data
279    /// is serialized using `serde_json` and sent as a text message.
280    ///
281    /// # Errors
282    ///
283    /// Returns an error if:
284    /// - Serialization fails
285    /// - The connection has been closed
286    ///
287    /// # Examples
288    ///
289    /// ```
290    /// use wsforge::prelude::*;
291    /// use serde::Serialize;
292    ///
293    /// #[derive(Serialize)]
294    /// struct GameState {
295    ///     score: u32,
296    ///     level: u8,
297    /// }
298    ///
299    /// # async fn example(conn: Connection) -> Result<()> {
300    /// let state = GameState { score: 1000, level: 5 };
301    /// conn.send_json(&state)?;
302    /// # Ok(())
303    /// # }
304    /// ```
305    pub fn send_json<T: Serialize>(&self, data: &T) -> Result<()> {
306        let json = serde_json::to_string(data)?;
307        self.send_text(json)
308    }
309
310    /// Returns the unique identifier for this connection.
311    ///
312    /// # Examples
313    ///
314    /// ```
315    /// use wsforge::prelude::*;
316    ///
317    /// # fn example(conn: Connection) {
318    /// println!("Connection ID: {}", conn.id());
319    /// # }
320    /// ```
321    pub fn id(&self) -> &ConnectionId {
322        &self.id
323    }
324
325    /// Returns the connection metadata.
326    ///
327    /// # Examples
328    ///
329    /// ```
330    /// use wsforge::prelude::*;
331    ///
332    /// # fn example(conn: Connection) {
333    /// let info = conn.info();
334    /// println!("Client address: {}", info.addr);
335    /// println!("Connected at: {}", info.connected_at);
336    /// # }
337    /// ```
338    pub fn info(&self) -> &ConnectionInfo {
339        &self.info
340    }
341}
342
343/// Manages a collection of active WebSocket connections.
344///
345/// `ConnectionManager` provides thread-safe operations for managing connections,
346/// including adding, removing, and broadcasting messages. It uses [`DashMap`]
347/// internally for lock-free concurrent access.
348///
349/// # Thread Safety
350///
351/// All operations are thread-safe and can be called from multiple threads
352/// concurrently without additional synchronization.
353///
354/// # Examples
355///
356/// ## Basic Usage
357///
358/// ```
359/// use wsforge::prelude::*;
360/// use std::sync::Arc;
361///
362/// let manager = Arc::new(ConnectionManager::new());
363///
364/// // Get connection count
365/// let count = manager.count();
366/// println!("Active connections: {}", count);
367///
368/// // Get all connection IDs
369/// let ids = manager.all_ids();
370/// for id in ids {
371///     println!("Connection: {}", id);
372/// }
373/// ```
374///
375/// ## Broadcasting
376///
377/// ```
378/// use wsforge::prelude::*;
379/// use std::sync::Arc;
380///
381/// # let manager = Arc::new(ConnectionManager::new());
382/// // Broadcast system announcement
383/// manager.broadcast(Message::text("Server maintenance in 5 minutes"));
384///
385/// // Notify everyone except the sender
386/// let sender_id = "conn_42";
387/// manager.broadcast_except(&sender_id.to_string(),
388///     Message::text("New user joined the chat"));
389/// ```
390pub struct ConnectionManager {
391    /// Thread-safe map of active connections
392    connections: Arc<DashMap<ConnectionId, Connection>>,
393}
394
395impl ConnectionManager {
396    /// Creates a new empty `ConnectionManager`.
397    ///
398    /// # Examples
399    ///
400    /// ```
401    /// use wsforge::prelude::*;
402    /// use std::sync::Arc;
403    ///
404    /// let manager = Arc::new(ConnectionManager::new());
405    /// assert_eq!(manager.count(), 0);
406    /// ```
407    pub fn new() -> Self {
408        Self {
409            connections: Arc::new(DashMap::new()),
410        }
411    }
412
413    /// Adds a connection to the manager.
414    ///
415    /// Returns the total number of connections after adding.
416    ///
417    /// # Examples
418    ///
419    /// ```
420    /// use wsforge::prelude::*;
421    /// use tokio::sync::mpsc;
422    /// use std::net::SocketAddr;
423    ///
424    /// # fn example() {
425    /// let manager = ConnectionManager::new();
426    /// let (tx, rx) = mpsc::unbounded_channel();
427    /// let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap();
428    /// let conn = Connection::new("conn_0".to_string(), addr, tx);
429    ///
430    /// let count = manager.add(conn);
431    /// assert_eq!(count, 1);
432    /// # }
433    /// ```
434    pub fn add(&self, conn: Connection) -> usize {
435        let id = conn.id.clone();
436        self.connections.insert(id.clone(), conn);
437        let count = self.connections.len();
438        info!("Added connection: {} (Total: {})", id, count);
439        count
440    }
441
442    /// Removes a connection from the manager.
443    ///
444    /// Returns the removed connection if it existed, or `None` if not found.
445    ///
446    /// # Examples
447    ///
448    /// ```
449    /// use wsforge::prelude::*;
450    ///
451    /// # fn example(manager: &ConnectionManager) {
452    /// let conn_id = "conn_42".to_string();
453    /// if let Some(conn) = manager.remove(&conn_id) {
454    ///     println!("Removed connection: {}", conn.id());
455    /// }
456    /// # }
457    /// ```
458    pub fn remove(&self, id: &ConnectionId) -> Option<Connection> {
459        let result = self.connections.remove(id).map(|(_, conn)| conn);
460        let count = self.connections.len();
461        info!("Removed connection: {} (Total: {})", id, count);
462        result
463    }
464
465    /// Retrieves a connection by its ID.
466    ///
467    /// Returns a clone of the connection if found, or `None` if not found.
468    ///
469    /// # Performance
470    ///
471    /// This operation is O(1) and does not block other concurrent operations.
472    ///
473    /// # Examples
474    ///
475    /// ```
476    /// use wsforge::prelude::*;
477    ///
478    /// # async fn example(manager: &ConnectionManager) -> Result<()> {
479    /// let conn_id = "conn_0".to_string();
480    /// if let Some(conn) = manager.get(&conn_id) {
481    ///     conn.send_text("Hello!")?;
482    /// }
483    /// # Ok(())
484    /// # }
485    /// ```
486    pub fn get(&self, id: &ConnectionId) -> Option<Connection> {
487        self.connections.get(id).map(|entry| entry.value().clone())
488    }
489
490    /// Broadcasts a message to all active connections.
491    ///
492    /// This method iterates through all connections and sends the message
493    /// to each one. Failed sends are logged but do not stop the broadcast.
494    ///
495    /// # Performance
496    ///
497    /// Broadcasts are performed synchronously but send operations are async,
498    /// so messages are queued immediately and sent in the background.
499    ///
500    /// # Examples
501    ///
502    /// ```
503    /// use wsforge::prelude::*;
504    ///
505    /// # fn example(manager: &ConnectionManager) {
506    /// manager.broadcast(Message::text("Server announcement!"));
507    /// # }
508    /// ```
509    pub fn broadcast(&self, message: Message) {
510        let count = self.connections.len();
511        debug!("Broadcasting message to {} connections", count);
512
513        let mut success = 0;
514        let mut failed = 0;
515
516        for entry in self.connections.iter() {
517            match entry.value().send(message.clone()) {
518                Ok(_) => {
519                    success += 1;
520                    debug!("✅ Broadcast sent to {}", entry.key());
521                }
522                Err(e) => {
523                    failed += 1;
524                    error!("❌ Failed to broadcast to {}: {}", entry.key(), e);
525                }
526            }
527        }
528
529        info!(
530            "Broadcast complete: {} success, {} failed out of {} total",
531            success, failed, count
532        );
533    }
534
535    /// Broadcasts a message to all connections except one.
536    ///
537    /// This is useful for notifying all users about an action taken by one user,
538    /// without sending the notification back to the actor.
539    ///
540    /// # Examples
541    ///
542    /// ```
543    /// use wsforge::prelude::*;
544    ///
545    /// # fn example(manager: &ConnectionManager) {
546    /// let sender_id = "conn_42".to_string();
547    /// manager.broadcast_except(&sender_id,
548    ///     Message::text("User 42 sent a message"));
549    /// # }
550    /// ```
551    pub fn broadcast_except(&self, except_id: &ConnectionId, message: Message) {
552        debug!(
553            "Broadcasting message to {} connections (except {})",
554            self.connections.len() - 1,
555            except_id
556        );
557        for entry in self.connections.iter() {
558            if entry.key() != except_id {
559                if let Err(e) = entry.value().send(message.clone()) {
560                    error!("Failed to broadcast to {}: {}", entry.key(), e);
561                }
562            }
563        }
564    }
565
566    /// Broadcasts a message to specific connections.
567    ///
568    /// Only connections whose IDs are in the provided list will receive the message.
569    /// Non-existent connection IDs are silently ignored.
570    ///
571    /// # Examples
572    ///
573    /// ```
574    /// use wsforge::prelude::*;
575    ///
576    /// # fn example(manager: &ConnectionManager) {
577    /// let vip_users = vec![
578    ///     "conn_1".to_string(),
579    ///     "conn_5".to_string(),
580    ///     "conn_10".to_string(),
581    /// ];
582    /// manager.broadcast_to(&vip_users, Message::text("VIP announcement"));
583    /// # }
584    /// ```
585    pub fn broadcast_to(&self, ids: &[ConnectionId], message: Message) {
586        for id in ids {
587            if let Some(conn) = self.get(id) {
588                if let Err(e) = conn.send(message.clone()) {
589                    error!("Failed to send to {}: {}", id, e);
590                }
591            }
592        }
593    }
594
595    /// Returns the number of active connections.
596    ///
597    /// # Examples
598    ///
599    /// ```
600    /// use wsforge::prelude::*;
601    ///
602    /// # fn example(manager: &ConnectionManager) {
603    /// let count = manager.count();
604    /// println!("Active connections: {}", count);
605    /// # }
606    /// ```
607    pub fn count(&self) -> usize {
608        self.connections.len()
609    }
610
611    /// Returns a list of all connection IDs.
612    ///
613    /// The order of IDs is not guaranteed.
614    ///
615    /// # Examples
616    ///
617    /// ```
618    /// use wsforge::prelude::*;
619    ///
620    /// # fn example(manager: &ConnectionManager) {
621    /// for id in manager.all_ids() {
622    ///     println!("Connection ID: {}", id);
623    /// }
624    /// # }
625    /// ```
626    pub fn all_ids(&self) -> Vec<ConnectionId> {
627        self.connections.iter().map(|e| e.key().clone()).collect()
628    }
629
630    /// Returns clones of all active connections.
631    ///
632    /// This is useful for batch operations on all connections.
633    ///
634    /// # Performance
635    ///
636    /// Since connections are lightweight (they contain Arc internally),
637    /// cloning is cheap.
638    ///
639    /// # Examples
640    ///
641    /// ```
642    /// use wsforge::prelude::*;
643    ///
644    /// # async fn example(manager: &ConnectionManager) -> Result<()> {
645    /// for conn in manager.all_connections() {
646    ///     conn.send_text("Shutdown in 1 minute")?;
647    /// }
648    /// # Ok(())
649    /// # }
650    /// ```
651    pub fn all_connections(&self) -> Vec<Connection> {
652        self.connections.iter().map(|e| e.value().clone()).collect()
653    }
654}
655
656impl Clone for Connection {
657    fn clone(&self) -> Self {
658        Self {
659            id: self.id.clone(),
660            info: self.info.clone(),
661            sender: self.sender.clone(),
662        }
663    }
664}
665
666impl Default for ConnectionManager {
667    fn default() -> Self {
668        Self::new()
669    }
670}
671
672/// Handles the lifecycle of a WebSocket connection.
673///
674/// This function manages the entire lifecycle of a WebSocket connection from
675/// establishment to termination. It spawns two concurrent tasks:
676/// - A read task that receives messages from the client
677/// - A write task that sends messages to the client
678///
679/// # Architecture
680///
681/// The function uses a split WebSocket stream and an unbounded channel to
682/// decouple reading and writing operations. This ensures that slow clients
683/// don't block message processing and allows for efficient broadcasting.
684///
685/// # Lifecycle Events
686///
687/// 1. Connection is added to the manager
688/// 2. `on_connect` callback is invoked
689/// 3. Read and write tasks run concurrently
690/// 4. When either task completes, both are terminated
691/// 5. Connection is removed from the manager
692/// 6. `on_disconnect` callback is invoked
693///
694/// # Arguments
695///
696/// * `stream` - The WebSocket stream
697/// * `conn_id` - Unique identifier for this connection
698/// * `peer_addr` - Socket address of the connected client
699/// * `manager` - Shared connection manager
700/// * `on_message` - Callback invoked when a message is received
701/// * `on_connect` - Callback invoked when the connection is established
702/// * `on_disconnect` - Callback invoked when the connection is closed
703///
704/// # Examples
705///
706/// This function is typically called by the router and not directly by users.
707/// However, for custom implementations:
708///
709/// ```
710/// use wsforge::prelude::*;
711/// use std::sync::Arc;
712/// use tokio_tungstenite::accept_async;
713///
714/// # async fn example(stream: tokio::net::TcpStream, peer_addr: std::net::SocketAddr) -> Result<()> {
715/// let ws_stream = accept_async(stream).await?;
716/// let conn_id = "conn_0".to_string();
717/// let manager = Arc::new(ConnectionManager::new());
718///
719/// let on_message = Arc::new(|id: ConnectionId, msg: Message| {
720///     println!("Received from {}: {:?}", id, msg);
721/// });
722///
723/// let on_connect = Arc::new(|id: ConnectionId| {
724///     println!("Connected: {}", id);
725/// });
726///
727/// let on_disconnect = Arc::new(|id: ConnectionId| {
728///     println!("Disconnected: {}", id);
729/// });
730///
731/// handle_websocket(
732///     ws_stream,
733///     conn_id,
734///     peer_addr,
735///     manager,
736///     on_message,
737///     on_connect,
738///     on_disconnect,
739/// ).await;
740/// # Ok(())
741/// # }
742/// ```
743pub async fn handle_websocket(
744    stream: WebSocketStream<TcpStream>,
745    conn_id: ConnectionId,
746    peer_addr: SocketAddr,
747    manager: Arc<ConnectionManager>,
748    on_message: Arc<dyn Fn(ConnectionId, Message) + Send + Sync>,
749    on_connect: Arc<dyn Fn(ConnectionId) + Send + Sync>,
750    on_disconnect: Arc<dyn Fn(ConnectionId) + Send + Sync>,
751) {
752    info!(
753        "WebSocket connection established: {} from {}",
754        conn_id, peer_addr
755    );
756
757    let (mut ws_sender, mut ws_receiver) = stream.split();
758    let (tx, mut rx) = mpsc::unbounded_channel::<Message>();
759
760    // Create connection with actual peer address
761    let conn = Connection::new(conn_id.clone(), peer_addr, tx);
762
763    // Add connection to manager and get the count
764    let _count = manager.add(conn);
765
766    // Verify connection is actually in the map
767    let verify_count = manager.count();
768    debug!(
769        "Connection {} added. Verified count: {}",
770        conn_id, verify_count
771    );
772
773    // NOW call on_connect AFTER we've verified the connection is added
774    on_connect(conn_id.clone());
775
776    // Write task - sends messages to WebSocket
777    let conn_id_write = conn_id.clone();
778    let write_task = tokio::spawn(async move {
779        debug!("Write task started for {}", conn_id_write);
780
781        while let Some(message) = rx.recv().await {
782            debug!("📤 Sending message to {}", conn_id_write);
783
784            let msg = message.into_tungstenite();
785            if let Err(e) = ws_sender.send(msg).await {
786                error!("Failed to send message to {}: {}", conn_id_write, e);
787                break;
788            }
789
790            debug!("✅ Message sent to {}", conn_id_write);
791        }
792
793        info!("Write task ended for {}", conn_id_write);
794    });
795
796    // Read task - receives messages from WebSocket
797    let conn_id_read = conn_id.clone();
798    let read_task = tokio::spawn(async move {
799        debug!("Read task started for {}", conn_id_read);
800
801        while let Some(result) = ws_receiver.next().await {
802            match result {
803                Ok(msg) => {
804                    if msg.is_close() {
805                        info!("Close message received from {}", conn_id_read);
806                        break;
807                    }
808                    debug!("📨 Received message from {}", conn_id_read);
809                    let message = Message::from_tungstenite(msg);
810                    on_message(conn_id_read.clone(), message);
811                }
812                Err(e) => {
813                    warn!("WebSocket error for {}: {}", conn_id_read, e);
814                    break;
815                }
816            }
817        }
818        debug!("Read task ended for {}", conn_id_read);
819    });
820
821    // Wait for either task to complete
822    tokio::select! {
823        _ = write_task => {
824            debug!("Write task finished first for {}", conn_id);
825        },
826        _ = read_task => {
827            debug!("Read task finished first for {}", conn_id);
828        },
829    }
830
831    // Remove connection and call disconnect
832    manager.remove(&conn_id);
833    on_disconnect(conn_id);
834}