wsforge_core/router.rs
1//! Routing and server management for WebSocket connections.
2//!
3//! This module provides the core routing infrastructure for WsForge, allowing you to
4//! define handlers for different message patterns, manage application state, and configure
5//! server behavior. The router handles both WebSocket connections and static file serving
6//! on the same port.
7//!
8//! # Overview
9//!
10//! The [`Router`] is the main entry point for building a WebSocket server. It provides
11//! a builder-style API for:
12//! - Registering message handlers for specific routes
13//! - Managing shared application state
14//! - Serving static files (HTML, CSS, JS)
15//! - Configuring connection lifecycle callbacks
16//! - Managing WebSocket connections
17//!
18//! # Architecture
19//!
20//! ```
21//! ┌─────────────────┐
22//! │ TCP Listener │
23//! └────────┬────────┘
24//! │
25//! ├──→ HTTP Request → Static File Handler
26//! │
27//! └──→ WebSocket Upgrade → Connection Manager
28//! │
29//! ├──→ on_connect callback
30//! │
31//! ├──→ Message Router → Handler
32//! │
33//! └──→ on_disconnect callback
34//! ```
35//!
36//! # Examples
37//!
38//! ## Simple Echo Server
39//!
40//! ```
41//! use wsforge::prelude::*;
42//!
43//! async fn echo(msg: Message) -> Result<Message> {
44//! Ok(msg)
45//! }
46//!
47//! # async fn example() -> Result<()> {
48//! let router = Router::new()
49//! .default_handler(handler(echo));
50//!
51//! router.listen("127.0.0.1:8080").await?;
52//! # Ok(())
53//! # }
54//! ```
55//!
56//! ## Multiple Routes
57//!
58//! ```
59//! use wsforge::prelude::*;
60//!
61//! async fn echo(msg: Message) -> Result<Message> {
62//! Ok(msg)
63//! }
64//!
65//! async fn stats(State(manager): State<Arc<ConnectionManager>>) -> Result<String> {
66//! Ok(format!("Active connections: {}", manager.count()))
67//! }
68//!
69//! # async fn example() -> Result<()> {
70//! # use std::sync::Arc;
71//! let router = Router::new()
72//! .route("/echo", handler(echo))
73//! .route("/stats", handler(stats))
74//! .default_handler(handler(echo));
75//!
76//! router.listen("127.0.0.1:8080").await?;
77//! # Ok(())
78//! # }
79//! ```
80//!
81//! ## With State and Callbacks
82//!
83//! ```
84//! use wsforge::prelude::*;
85//! use std::sync::Arc;
86//!
87//! async fn broadcast(msg: Message, State(manager): State<Arc<ConnectionManager>>) -> Result<()> {
88//! manager.broadcast(msg);
89//! Ok(())
90//! }
91//!
92//! # async fn example() -> Result<()> {
93//! let router = Router::new()
94//! .default_handler(handler(broadcast))
95//! .on_connect(|manager, conn_id| {
96//! println!("✅ User {} connected (Total: {})", conn_id, manager.count());
97//! })
98//! .on_disconnect(|manager, conn_id| {
99//! println!("❌ User {} disconnected", conn_id);
100//! });
101//!
102//! router.listen("127.0.0.1:8080").await?;
103//! # Ok(())
104//! # }
105//! ```
106//!
107//! ## Hybrid HTTP/WebSocket Server
108//!
109//! ```
110//! use wsforge::prelude::*;
111//!
112//! async fn ws_handler(msg: Message) -> Result<Message> {
113//! Ok(msg)
114//! }
115//!
116//! # async fn example() -> Result<()> {
117//! let router = Router::new()
118//! .serve_static("public") // Serve HTML/CSS/JS from 'public' folder
119//! .default_handler(handler(ws_handler));
120//!
121//! // Handles both HTTP (for files) and WebSocket on same port
122//! router.listen("127.0.0.1:8080").await?;
123//! # Ok(())
124//! # }
125//! ```
126
127use crate::connection::{ConnectionId, ConnectionManager, handle_websocket};
128use crate::error::{Error, Result};
129use crate::extractor::Extensions;
130use crate::handler::Handler;
131use crate::message::Message;
132use crate::state::AppState;
133use dashmap::DashMap;
134use std::net::SocketAddr;
135use std::path::PathBuf;
136use std::sync::Arc;
137use tokio::net::{TcpListener, TcpStream};
138use tokio_tungstenite::accept_async;
139use tracing::{error, info};
140
141/// Represents a single route with its path and handler.
142///
143/// Routes map message patterns (paths) to handler functions.
144/// This is typically used internally by the [`Router`].
145///
146/// # Examples
147///
148/// ```
149/// use wsforge::prelude::*;
150///
151/// async fn my_handler() -> Result<String> {
152/// Ok("response".to_string())
153/// }
154///
155/// # fn example() {
156/// let route = Route {
157/// path: "/api/message".to_string(),
158/// handler: handler(my_handler),
159/// };
160/// # }
161/// ```
162pub struct Route {
163 /// The route path (e.g., "/chat", "/api/users")
164 pub path: String,
165 /// The handler for this route
166 pub handler: Arc<dyn Handler>,
167}
168
169/// The main router for WebSocket servers.
170///
171/// `Router` is the central component that manages routing, state, connections,
172/// and server lifecycle. It uses a builder pattern for configuration and
173/// supports both WebSocket and HTTP static file serving on the same port.
174///
175/// # Thread Safety
176///
177/// Router is thread-safe and can be cloned cheaply (uses `Arc` internally).
178/// All connections share the same router instance.
179///
180/// # Lifecycle
181///
182/// 1. Create router with `Router::new()`
183/// 2. Configure routes, state, handlers, callbacks
184/// 3. Call `listen()` to start the server
185/// 4. Router handles incoming connections automatically
186///
187/// # Examples
188///
189/// ## Basic Setup
190///
191/// ```
192/// use wsforge::prelude::*;
193///
194/// async fn handler(msg: Message) -> Result<String> {
195/// Ok("received".to_string())
196/// }
197///
198/// # async fn example() -> Result<()> {
199/// let router = Router::new()
200/// .default_handler(handler(handler));
201///
202/// router.listen("0.0.0.0:8080").await?;
203/// # Ok(())
204/// # }
205/// ```
206///
207/// ## With Shared State
208///
209/// ```
210/// use wsforge::prelude::*;
211/// use std::sync::Arc;
212///
213/// struct AppConfig {
214/// max_connections: usize,
215/// }
216///
217/// async fn handler(State(config): State<Arc<AppConfig>>) -> Result<String> {
218/// Ok(format!("Max connections: {}", config.max_connections))
219/// }
220///
221/// # async fn example() -> Result<()> {
222/// let config = Arc::new(AppConfig { max_connections: 100 });
223///
224/// let router = Router::new()
225/// .with_state(config)
226/// .default_handler(handler(handler));
227///
228/// router.listen("127.0.0.1:8080").await?;
229/// # Ok(())
230/// # }
231/// ```
232pub struct Router {
233 routes: Arc<DashMap<String, Arc<dyn Handler>>>,
234 state: AppState,
235 connection_manager: Arc<ConnectionManager>,
236 on_connect: Option<Arc<dyn Fn(&Arc<ConnectionManager>, ConnectionId) + Send + Sync>>,
237 on_disconnect: Option<Arc<dyn Fn(&Arc<ConnectionManager>, ConnectionId) + Send + Sync>>,
238 default_handler: Option<Arc<dyn Handler>>,
239 static_handler: Option<crate::static_files::StaticFileHandler>,
240}
241
242impl Router {
243 /// Creates a new empty router.
244 ///
245 /// The router starts with no routes, no state, and no handlers.
246 /// Use the builder methods to configure it.
247 ///
248 /// # Examples
249 ///
250 /// ```
251 /// use wsforge::prelude::*;
252 ///
253 /// let router = Router::new();
254 /// ```
255 pub fn new() -> Self {
256 Self {
257 routes: Arc::new(DashMap::new()),
258 state: AppState::new(),
259 connection_manager: Arc::new(ConnectionManager::new()),
260 on_connect: None,
261 on_disconnect: None,
262 default_handler: None,
263 static_handler: None,
264 }
265 }
266
267 /// Registers a handler for a specific route.
268 ///
269 /// Routes are matched against the beginning of incoming messages.
270 /// For example, a message like `/chat hello` would match route `/chat`.
271 ///
272 /// # Arguments
273 ///
274 /// * `path` - The route path (e.g., "/chat", "/api/users")
275 /// * `handler` - The handler function wrapped with `handler()`
276 ///
277 /// # Examples
278 ///
279 /// ```
280 /// use wsforge::prelude::*;
281 ///
282 /// async fn chat_handler(msg: Message) -> Result<String> {
283 /// Ok("chat response".to_string())
284 /// }
285 ///
286 /// async fn api_handler(msg: Message) -> Result<String> {
287 /// Ok("api response".to_string())
288 /// }
289 ///
290 /// # fn example() {
291 /// let router = Router::new()
292 /// .route("/chat", handler(chat_handler))
293 /// .route("/api", handler(api_handler));
294 /// # }
295 /// ```
296 pub fn route(self, path: impl Into<String>, handler: Arc<dyn Handler>) -> Self {
297 self.routes.insert(path.into(), handler);
298 self
299 }
300
301 /// Adds shared state to the router.
302 ///
303 /// State is shared across all connections and can be extracted in handlers
304 /// using the [`State`] extractor. Any type that is `Send + Sync + 'static`
305 /// can be used as state.
306 ///
307 /// # Type Safety
308 ///
309 /// Multiple different types can be added as state. Each type is stored
310 /// separately and retrieved by type.
311 ///
312 /// # Arguments
313 ///
314 /// * `data` - The state data to share
315 ///
316 /// # Examples
317 ///
318 /// ## Single State
319 ///
320 /// ```
321 /// use wsforge::prelude::*;
322 /// use std::sync::Arc;
323 ///
324 /// struct Database {
325 /// // database fields
326 /// }
327 ///
328 /// async fn handler(State(db): State<Arc<Database>>) -> Result<String> {
329 /// Ok("query result".to_string())
330 /// }
331 ///
332 /// # fn example() {
333 /// let db = Arc::new(Database {});
334 ///
335 /// let router = Router::new()
336 /// .with_state(db)
337 /// .default_handler(handler(handler));
338 /// # }
339 /// ```
340 ///
341 /// ## Multiple States
342 ///
343 /// ```
344 /// use wsforge::prelude::*;
345 /// use std::sync::Arc;
346 ///
347 /// struct Config {
348 /// port: u16,
349 /// }
350 ///
351 /// struct Database {
352 /// // database fields
353 /// }
354 ///
355 /// # fn example() {
356 /// let router = Router::new()
357 /// .with_state(Arc::new(Config { port: 8080 }))
358 /// .with_state(Arc::new(Database {}));
359 /// # }
360 /// ```
361 pub fn with_state<T: Send + Sync + 'static>(self, data: Arc<T>) -> Self {
362 self.state.insert(data);
363 self
364 }
365
366 /// Sets a callback to be called when a new connection is established.
367 ///
368 /// The callback receives a reference to the connection manager and the
369 /// connection ID. This is useful for logging, sending welcome messages,
370 /// or updating user lists.
371 ///
372 /// # Arguments
373 ///
374 /// * `f` - Callback function with signature `Fn(&Arc<ConnectionManager>, ConnectionId)`
375 ///
376 /// # Examples
377 ///
378 /// ## Simple Logging
379 ///
380 /// ```
381 /// use wsforge::prelude::*;
382 ///
383 /// # fn example() {
384 /// let router = Router::new()
385 /// .on_connect(|manager, conn_id| {
386 /// println!("New connection: {} (Total: {})", conn_id, manager.count());
387 /// });
388 /// # }
389 /// ```
390 ///
391 /// ## Send Welcome Message
392 ///
393 /// ```
394 /// use wsforge::prelude::*;
395 ///
396 /// # fn example() {
397 /// let router = Router::new()
398 /// .on_connect(|manager, conn_id| {
399 /// if let Some(conn) = manager.get(&conn_id) {
400 /// let _ = conn.send_text("Welcome to the server!");
401 /// }
402 /// });
403 /// # }
404 /// ```
405 ///
406 /// ## Broadcast Join Notification
407 ///
408 /// ```
409 /// use wsforge::prelude::*;
410 ///
411 /// # fn example() {
412 /// let router = Router::new()
413 /// .on_connect(|manager, conn_id| {
414 /// let msg = format!("User {} joined", conn_id);
415 /// manager.broadcast(Message::text(msg));
416 /// });
417 /// # }
418 /// ```
419 pub fn on_connect<F>(mut self, f: F) -> Self
420 where
421 F: Fn(&Arc<ConnectionManager>, ConnectionId) + Send + Sync + 'static,
422 {
423 self.on_connect = Some(Arc::new(f));
424 self
425 }
426
427 /// Sets a callback to be called when a connection is closed.
428 ///
429 /// The callback receives a reference to the connection manager and the
430 /// connection ID. Note that the connection is already removed from the
431 /// manager when this is called.
432 ///
433 /// # Arguments
434 ///
435 /// * `f` - Callback function with signature `Fn(&Arc<ConnectionManager>, ConnectionId)`
436 ///
437 /// # Examples
438 ///
439 /// ## Logging Disconnections
440 ///
441 /// ```
442 /// use wsforge::prelude::*;
443 ///
444 /// # fn example() {
445 /// let router = Router::new()
446 /// .on_disconnect(|manager, conn_id| {
447 /// println!("Connection {} closed (Remaining: {})", conn_id, manager.count());
448 /// });
449 /// # }
450 /// ```
451 ///
452 /// ## Broadcast Leave Notification
453 ///
454 /// ```
455 /// use wsforge::prelude::*;
456 ///
457 /// # fn example() {
458 /// let router = Router::new()
459 /// .on_disconnect(|manager, conn_id| {
460 /// let msg = format!("User {} left", conn_id);
461 /// manager.broadcast(Message::text(msg));
462 /// });
463 /// # }
464 /// ```
465 pub fn on_disconnect<F>(mut self, f: F) -> Self
466 where
467 F: Fn(&Arc<ConnectionManager>, ConnectionId) + Send + Sync + 'static,
468 {
469 self.on_disconnect = Some(Arc::new(f));
470 self
471 }
472
473 /// Sets the default handler for messages that don't match any route.
474 ///
475 /// This handler is called when no route matches the incoming message.
476 /// Use this for catch-all behavior or when you don't need routing.
477 ///
478 /// # Arguments
479 ///
480 /// * `handler` - The default handler wrapped with `handler()`
481 ///
482 /// # Examples
483 ///
484 /// ## Echo Server
485 ///
486 /// ```
487 /// use wsforge::prelude::*;
488 ///
489 /// async fn echo(msg: Message) -> Result<Message> {
490 /// Ok(msg)
491 /// }
492 ///
493 /// # fn example() {
494 /// let router = Router::new()
495 /// .default_handler(handler(echo));
496 /// # }
497 /// ```
498 ///
499 /// ## Error Handler
500 ///
501 /// ```
502 /// use wsforge::prelude::*;
503 ///
504 /// async fn not_found() -> Result<String> {
505 /// Ok("Unknown command".to_string())
506 /// }
507 ///
508 /// # fn example() {
509 /// let router = Router::new()
510 /// .route("/known", handler(not_found))
511 /// .default_handler(handler(not_found));
512 /// # }
513 /// ```
514 pub fn default_handler(mut self, handler: Arc<dyn Handler>) -> Self {
515 self.default_handler = Some(handler);
516 self
517 }
518
519 /// Enables static file serving from a directory.
520 ///
521 /// When enabled, the router will serve static files (HTML, CSS, JavaScript, images)
522 /// from the specified directory for HTTP requests, while still handling WebSocket
523 /// connections on the same port.
524 ///
525 /// # Path Resolution
526 ///
527 /// - Requests to `/` serve `index.html` from the directory
528 /// - Other requests map directly to files (e.g., `/style.css` → `directory/style.css`)
529 /// - MIME types are automatically detected
530 ///
531 /// # Security
532 ///
533 /// Path traversal attempts (e.g., `../../etc/passwd`) are automatically blocked.
534 ///
535 /// # Arguments
536 ///
537 /// * `path` - Path to the directory containing static files
538 ///
539 /// # Examples
540 ///
541 /// ## Serve Static Files
542 ///
543 /// ```
544 /// use wsforge::prelude::*;
545 ///
546 /// async fn ws_handler(msg: Message) -> Result<Message> {
547 /// Ok(msg)
548 /// }
549 ///
550 /// # fn example() {
551 /// let router = Router::new()
552 /// .serve_static("public") // Serve files from ./public
553 /// .default_handler(handler(ws_handler));
554 ///
555 /// // Now you can access:
556 /// // http://localhost:8080/ -> public/index.html
557 /// // http://localhost:8080/app.js -> public/app.js
558 /// // ws://localhost:8080 -> WebSocket handler
559 /// # }
560 /// ```
561 ///
562 /// ## Web Chat Application
563 ///
564 /// ```
565 /// use wsforge::prelude::*;
566 ///
567 /// async fn chat_handler(msg: Message, State(manager): State<Arc<ConnectionManager>>) -> Result<()> {
568 /// manager.broadcast(msg);
569 /// Ok(())
570 /// }
571 ///
572 /// # fn example() {
573 /// # use std::sync::Arc;
574 /// let router = Router::new()
575 /// .serve_static("chat-ui") // HTML/CSS/JS for chat interface
576 /// .default_handler(handler(chat_handler));
577 /// # }
578 /// ```
579 pub fn serve_static(mut self, path: impl Into<PathBuf>) -> Self {
580 self.static_handler = Some(crate::static_files::StaticFileHandler::new(path.into()));
581 self
582 }
583
584 /// Returns a reference to the connection manager.
585 ///
586 /// The connection manager is automatically created with the router.
587 /// Use this to get access to it for storing in state or elsewhere.
588 ///
589 /// # Examples
590 ///
591 /// ```
592 /// use wsforge::prelude::*;
593 ///
594 /// # fn example() {
595 /// let router = Router::new();
596 /// let manager = router.connection_manager();
597 ///
598 /// // Now you can use the manager
599 /// println!("Active connections: {}", manager.count());
600 /// # }
601 /// ```
602 pub fn connection_manager(&self) -> Arc<ConnectionManager> {
603 self.connection_manager.clone()
604 }
605
606 /// Starts the WebSocket server and listens for connections.
607 ///
608 /// This method consumes the router and starts the server loop. It will
609 /// run indefinitely until the process is terminated or an error occurs.
610 ///
611 /// # Arguments
612 ///
613 /// * `addr` - The address to bind to (e.g., "127.0.0.1:8080", "0.0.0.0:3000")
614 ///
615 /// # Errors
616 ///
617 /// Returns an error if:
618 /// - The address format is invalid
619 /// - The port is already in use
620 /// - Permission is denied (e.g., ports < 1024 on Unix)
621 ///
622 /// # Examples
623 ///
624 /// ## Basic Usage
625 ///
626 /// ```
627 /// use wsforge::prelude::*;
628 ///
629 /// # async fn example() -> Result<()> {
630 /// let router = Router::new();
631 /// router.listen("127.0.0.1:8080").await?;
632 /// # Ok(())
633 /// # }
634 /// ```
635 ///
636 /// ## All Interfaces
637 ///
638 /// ```
639 /// use wsforge::prelude::*;
640 ///
641 /// # async fn example() -> Result<()> {
642 /// let router = Router::new();
643 /// router.listen("0.0.0.0:8080").await?; // Accept connections from anywhere
644 /// # Ok(())
645 /// # }
646 /// ```
647 ///
648 /// ## With Error Handling
649 ///
650 /// ```
651 /// use wsforge::prelude::*;
652 ///
653 /// # async fn example() {
654 /// let router = Router::new();
655 ///
656 /// match router.listen("127.0.0.1:8080").await {
657 /// Ok(_) => println!("Server stopped"),
658 /// Err(e) => eprintln!("Server error: {}", e),
659 /// }
660 /// # }
661 /// ```
662 pub async fn listen(self, addr: impl AsRef<str>) -> Result<()> {
663 let addr: SocketAddr = addr
664 .as_ref()
665 .parse()
666 .map_err(|e| Error::custom(format!("Invalid address: {}", e)))?;
667
668 // Insert connection manager into state BEFORE wrapping in Arc
669 self.state.insert(self.connection_manager.clone());
670
671 let listener = TcpListener::bind(addr).await?;
672 info!("WebSocket server listening on {}", addr);
673
674 let router = Arc::new(self);
675
676 loop {
677 let (stream, peer_addr) = listener.accept().await?;
678 let router = router.clone();
679
680 tokio::spawn(async move {
681 if let Err(e) = router.handle_connection(stream, peer_addr).await {
682 error!("Connection error: {}", e);
683 }
684 });
685 }
686 }
687
688 async fn handle_connection(&self, stream: TcpStream, peer_addr: SocketAddr) -> Result<()> {
689 let mut buffer = [0u8; 1024];
690
691 let n = tokio::time::timeout(std::time::Duration::from_secs(5), stream.peek(&mut buffer))
692 .await
693 .map_err(|_| Error::custom("Connection timeout"))?
694 .map_err(|e| Error::custom(format!("Failed to read: {}", e)))?;
695
696 let header = String::from_utf8_lossy(&buffer[..n]);
697
698 if header.contains("Upgrade: websocket") || header.contains("upgrade: websocket") {
699 self.handle_websocket_connection(stream, peer_addr).await
700 } else if let Some(ref static_handler) = self.static_handler {
701 self.handle_http_request(stream, static_handler, &header)
702 .await
703 } else {
704 Err(Error::custom("No handler for HTTP requests"))
705 }
706 }
707
708 async fn handle_http_request(
709 &self,
710 mut stream: TcpStream,
711 static_handler: &crate::static_files::StaticFileHandler,
712 header: &str,
713 ) -> Result<()> {
714 use crate::static_files::http_response;
715 use tokio::io::AsyncWriteExt;
716
717 let path = header
718 .lines()
719 .next()
720 .and_then(|line| {
721 let parts: Vec<&str> = line.split_whitespace().collect();
722 if parts.len() >= 2 && (parts[0] == "GET" || parts[0] == "HEAD") {
723 Some(parts[1])
724 } else {
725 None
726 }
727 })
728 .unwrap_or("/");
729
730 let response = match static_handler.serve(path).await {
731 Ok((content, mime_type)) => {
732 info!("Served: {} ({} bytes)", path, content.len());
733 http_response(200, &mime_type, content)
734 }
735 Err(e) => {
736 tracing::warn!("File not found: {} - {}", path, e);
737 let html = b"<html><body><h1>404 Not Found</h1></body></html>".to_vec();
738 http_response(404, "text/html", html)
739 }
740 };
741
742 stream.write_all(&response).await?;
743 stream.flush().await?;
744 Ok(())
745 }
746
747 async fn handle_websocket_connection(
748 &self,
749 stream: TcpStream,
750 peer_addr: SocketAddr,
751 ) -> Result<()> {
752 let ws_stream = accept_async(stream).await?;
753 let conn_id = Self::generate_connection_id();
754
755 let router = self.clone();
756 let manager = self.connection_manager.clone();
757
758 let on_message = Arc::new(move |conn_id: ConnectionId, message: Message| {
759 let router = router.clone();
760 tokio::spawn(async move {
761 if let Err(e) = router.handle_message(conn_id, message).await {
762 error!("Message handling error: {}", e);
763 }
764 });
765 });
766
767 let manager_ref = manager.clone();
768 let on_connect = self
769 .on_connect
770 .clone()
771 .map(move |cb| {
772 let manager = manager_ref.clone();
773 Arc::new(move |conn_id: ConnectionId| {
774 cb(&manager, conn_id);
775 }) as Arc<dyn Fn(ConnectionId) + Send + Sync>
776 })
777 .unwrap_or_else(|| {
778 Arc::new(|conn_id: ConnectionId| {
779 info!("Client connected: {}", conn_id);
780 })
781 });
782
783 let manager_ref = manager.clone();
784 let on_disconnect = self
785 .on_disconnect
786 .clone()
787 .map(move |cb| {
788 let manager = manager_ref.clone();
789 Arc::new(move |conn_id: ConnectionId| {
790 cb(&manager, conn_id);
791 }) as Arc<dyn Fn(ConnectionId) + Send + Sync>
792 })
793 .unwrap_or_else(|| {
794 Arc::new(|conn_id: ConnectionId| {
795 info!("Client disconnected: {}", conn_id);
796 })
797 });
798
799 handle_websocket(
800 ws_stream,
801 conn_id,
802 peer_addr,
803 manager,
804 on_message,
805 on_connect,
806 on_disconnect,
807 )
808 .await;
809
810 Ok(())
811 }
812
813 async fn handle_message(&self, conn_id: ConnectionId, message: Message) -> Result<()> {
814 let conn = self
815 .connection_manager
816 .get(&conn_id)
817 .ok_or_else(|| Error::ConnectionNotFound(conn_id.clone()))?;
818
819 let extensions = Extensions::new();
820
821 let handler = if let Some(text) = message.as_text() {
822 if text.starts_with('/') {
823 if let Some((route, _)) = text.split_once(' ') {
824 self.routes.get(route).map(|h| h.value().clone())
825 } else {
826 self.routes.get(text).map(|h| h.value().clone())
827 }
828 } else {
829 None
830 }
831 } else {
832 None
833 };
834
835 let handler = handler.or_else(|| self.default_handler.clone());
836
837 if let Some(handler) = handler {
838 match handler
839 .call(message, conn.clone(), self.state.clone(), extensions)
840 .await
841 {
842 Ok(Some(response)) => {
843 if let Err(e) = conn.send(response) {
844 error!("Failed to send response to {}: {}", conn_id, e);
845 }
846 }
847 Ok(None) => {
848 tracing::debug!("Handler processed message without response");
849 }
850 Err(e) => {
851 error!("Handler error for {}: {}", conn_id, e);
852 }
853 }
854 } else {
855 tracing::warn!("No handler found for message from {}", conn_id);
856 }
857
858 Ok(())
859 }
860
861 fn generate_connection_id() -> ConnectionId {
862 use std::sync::atomic::{AtomicU64, Ordering};
863 static COUNTER: AtomicU64 = AtomicU64::new(0);
864 format!("conn_{}", COUNTER.fetch_add(1, Ordering::SeqCst))
865 }
866}
867
868impl Clone for Router {
869 fn clone(&self) -> Self {
870 Self {
871 routes: self.routes.clone(),
872 state: self.state.clone(),
873 connection_manager: self.connection_manager.clone(),
874 on_connect: self.on_connect.clone(),
875 on_disconnect: self.on_disconnect.clone(),
876 default_handler: self.default_handler.clone(),
877 static_handler: self.static_handler.clone(),
878 }
879 }
880}
881
882impl Default for Router {
883 fn default() -> Self {
884 Self::new()
885 }
886}