Skip to main content

clasp_router/
router.rs

1//! Main router implementation
2//!
3//! The router is transport-agnostic - it can accept connections from any transport
4//! that implements the `TransportServer` trait (WebSocket, QUIC, TCP, etc.).
5//!
6//! # Transport Support
7//!
8//! - **WebSocket** (default): Works everywhere, including browsers and DO App Platform
9//! - **QUIC**: High-performance for native apps. Requires UDP - NOT supported on DO App Platform
10//! - **TCP**: Simple fallback, works everywhere
11//!
12//! # Example
13//!
14//! ```no_run
15//! use clasp_router::{Router, RouterConfig};
16//!
17//! #[tokio::main]
18//! async fn main() {
19//!     let router = Router::new(RouterConfig::default());
20//!
21//!     // WebSocket (most common)
22//!     router.serve_websocket("0.0.0.0:7330").await.unwrap();
23//!
24//!     // Or use any TransportServer implementation
25//!     // router.serve_on(my_custom_server).await.unwrap();
26//! }
27//! ```
28
29use clasp_core::{
30    codec, CpskValidator, ErrorMessage, Message, SecurityMode, SignalType, TokenValidator,
31};
32#[cfg(feature = "rules")]
33use clasp_core::{PublishMessage, SetMessage};
34
35#[cfg(feature = "journal")]
36use clasp_journal::Journal;
37#[cfg(feature = "rules")]
38use clasp_rules::RulesEngine;
39use clasp_transport::{TransportEvent, TransportReceiver, TransportSender, TransportServer};
40use dashmap::DashMap;
41use parking_lot::RwLock;
42use std::net::SocketAddr;
43use std::sync::Arc;
44use tracing::{debug, error, info, warn, Instrument};
45
46#[cfg(feature = "websocket")]
47use clasp_transport::WebSocketServer;
48
49#[cfg(feature = "quic")]
50use clasp_transport::{QuicConfig, QuicTransport};
51
52use crate::{
53    error::{Result, RouterError},
54    gesture::GestureRegistry,
55    handlers,
56    p2p::P2PCapabilities,
57    session::{Session, SessionId},
58    state::{RouterState, RouterStateConfig},
59    subscription::SubscriptionManager,
60};
61use std::time::Duration;
62
63/// Application-specific write validation callback.
64///
65/// Called after scope checks but before `state.apply_set()` for SET operations,
66/// and before broadcast for PUBLISH operations. Allows the application to enforce
67/// semantic authorization rules (e.g., "only room creators can modify admin paths").
68pub trait WriteValidator: Send + Sync {
69    /// Validate a write operation.
70    ///
71    /// - `address`: the CLASP address being written to
72    /// - `value`: the value being written
73    /// - `session`: the session performing the write
74    /// - `state`: the current router state (for looking up existing values)
75    ///
76    /// Returns `Ok(())` to allow the write, or `Err(message)` to reject it.
77    fn validate_write(
78        &self,
79        address: &str,
80        value: &clasp_core::Value,
81        session: &Session,
82        state: &RouterState,
83    ) -> std::result::Result<(), String>;
84}
85
86/// Application-specific snapshot filtering callback.
87///
88/// Called before sending the initial SNAPSHOT after WELCOME, and before sending
89/// subscription snapshots. Allows the application to strip sensitive fields
90/// or restrict visibility of certain paths.
91pub trait SnapshotFilter: Send + Sync {
92    /// Filter a snapshot before delivery to a session.
93    ///
94    /// - `params`: the snapshot parameters to filter
95    /// - `session`: the session receiving the snapshot
96    /// - `state`: the current router state
97    ///
98    /// Returns the filtered list of parameters.
99    fn filter_snapshot(
100        &self,
101        params: Vec<clasp_core::ParamValue>,
102        session: &Session,
103        state: &RouterState,
104    ) -> Vec<clasp_core::ParamValue>;
105}
106
107/// Signal transform applied to SET values before storage.
108///
109/// Transforms are matched by address pattern and applied in order.
110/// Used by LensVM to run WASM transforms on the router's hot path.
111pub trait SignalTransform: Send + Sync {
112    /// Transform a value for the given address. Return None to pass through unchanged.
113    fn transform(&self, address: &str, value: &clasp_core::Value) -> Option<clasp_core::Value>;
114}
115
116/// Timeout for clients to complete the handshake (send Hello message)
117const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(10);
118
119/// Transport configuration for multi-transport serving.
120///
121/// Use with `Router::serve_multi()` to run multiple transports simultaneously.
122#[derive(Debug, Clone)]
123pub enum TransportConfig {
124    /// WebSocket transport (default, works everywhere)
125    #[cfg(feature = "websocket")]
126    WebSocket {
127        /// Listen address, e.g., "0.0.0.0:7330"
128        addr: String,
129    },
130
131    /// QUIC transport (high-performance, requires UDP)
132    ///
133    /// **WARNING**: Not supported on DigitalOcean App Platform or most PaaS.
134    /// Use a VPS/Droplet for QUIC support.
135    #[cfg(feature = "quic")]
136    Quic {
137        /// Listen address
138        addr: SocketAddr,
139        /// TLS certificate (DER format)
140        cert: Vec<u8>,
141        /// TLS private key (DER format)
142        key: Vec<u8>,
143    },
144}
145
146/// Multi-protocol server configuration.
147///
148/// Configure which protocols the router should accept connections on.
149/// All configured protocols share the same router state.
150///
151/// # Example
152///
153/// ```no_run
154/// use clasp_router::{Router, MultiProtocolConfig};
155///
156/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
157/// let router = Router::default();
158/// let config = MultiProtocolConfig {
159///     websocket_addr: Some("0.0.0.0:7330".into()),
160///     #[cfg(feature = "mqtt-server")]
161///     mqtt: None,
162///     #[cfg(feature = "osc-server")]
163///     osc: None,
164///     ..Default::default()
165/// };
166/// router.serve_all(config).await?;
167/// # Ok(())
168/// # }
169/// ```
170#[derive(Debug, Clone, Default)]
171pub struct MultiProtocolConfig {
172    /// WebSocket listen address (e.g., "0.0.0.0:7330")
173    #[cfg(feature = "websocket")]
174    pub websocket_addr: Option<String>,
175
176    /// QUIC configuration
177    #[cfg(feature = "quic")]
178    pub quic: Option<QuicServerConfig>,
179
180    /// MQTT server configuration
181    #[cfg(feature = "mqtt-server")]
182    pub mqtt: Option<crate::adapters::MqttServerConfig>,
183
184    /// OSC server configuration
185    #[cfg(feature = "osc-server")]
186    pub osc: Option<crate::adapters::OscServerConfig>,
187}
188
189/// QUIC server configuration
190#[cfg(feature = "quic")]
191#[derive(Debug, Clone)]
192pub struct QuicServerConfig {
193    /// Listen address
194    pub addr: SocketAddr,
195    /// TLS certificate (DER format)
196    pub cert: Vec<u8>,
197    /// TLS private key (DER format)
198    pub key: Vec<u8>,
199}
200
201/// Router configuration
202#[derive(Debug, Clone)]
203pub struct RouterConfig {
204    /// Server name
205    pub name: String,
206    /// Supported features
207    pub features: Vec<String>,
208    /// Maximum sessions
209    pub max_sessions: usize,
210    /// Session timeout (seconds)
211    pub session_timeout: u64,
212    /// Security mode (Open or Authenticated)
213    pub security_mode: SecurityMode,
214    /// Maximum subscriptions per session (0 = unlimited)
215    pub max_subscriptions_per_session: usize,
216    /// Enable gesture move coalescing (reduces bandwidth for high-frequency touch input)
217    pub gesture_coalescing: bool,
218    /// Gesture move coalesce interval in milliseconds (default: 16ms = 60fps)
219    pub gesture_coalesce_interval_ms: u64,
220    /// Maximum messages per second per client (0 = unlimited)
221    pub max_messages_per_second: u32,
222    /// Enable rate limiting
223    pub rate_limiting_enabled: bool,
224    /// State store configuration (TTL, limits)
225    pub state_config: RouterStateConfig,
226}
227
228impl Default for RouterConfig {
229    fn default() -> Self {
230        Self {
231            name: "Clasp Router".to_string(),
232            features: vec![
233                "param".to_string(),
234                "event".to_string(),
235                "stream".to_string(),
236                "timeline".to_string(),
237                "gesture".to_string(),
238            ],
239            max_sessions: 100,
240            session_timeout: 300,
241            security_mode: SecurityMode::Open,
242            max_subscriptions_per_session: 1000, // 0 = unlimited
243            gesture_coalescing: true,
244            gesture_coalesce_interval_ms: 16,
245            max_messages_per_second: 1000, // 1000 msgs/sec default
246            rate_limiting_enabled: true,
247            state_config: RouterStateConfig::default(), // 1 hour TTL by default
248        }
249    }
250}
251
252/// Builder for RouterConfig
253#[derive(Debug, Clone, Default)]
254pub struct RouterConfigBuilder {
255    config: RouterConfig,
256}
257
258impl RouterConfigBuilder {
259    pub fn new() -> Self {
260        Self::default()
261    }
262
263    pub fn name(mut self, name: impl Into<String>) -> Self {
264        self.config.name = name.into();
265        self
266    }
267
268    pub fn max_sessions(mut self, max: usize) -> Self {
269        self.config.max_sessions = max;
270        self
271    }
272
273    pub fn session_timeout(mut self, secs: u64) -> Self {
274        self.config.session_timeout = secs;
275        self
276    }
277
278    pub fn security_mode(mut self, mode: SecurityMode) -> Self {
279        self.config.security_mode = mode;
280        self
281    }
282
283    pub fn gesture_coalescing(mut self, enabled: bool) -> Self {
284        self.config.gesture_coalescing = enabled;
285        self
286    }
287
288    pub fn gesture_coalesce_interval_ms(mut self, ms: u64) -> Self {
289        self.config.gesture_coalesce_interval_ms = ms;
290        self
291    }
292
293    pub fn build(self) -> RouterConfig {
294        self.config
295    }
296}
297
298/// Clasp router
299pub struct Router {
300    config: RouterConfig,
301    /// Active sessions
302    sessions: Arc<DashMap<SessionId, Arc<Session>>>,
303    /// Subscription manager
304    subscriptions: Arc<SubscriptionManager>,
305    /// Global state
306    state: Arc<RouterState>,
307    /// Running flag
308    running: Arc<RwLock<bool>>,
309    /// Token validator (None = always reject in authenticated mode)
310    token_validator: Option<Arc<dyn TokenValidator>>,
311    /// P2P capabilities tracker
312    p2p_capabilities: Arc<P2PCapabilities>,
313    /// Gesture registry for move coalescing
314    gesture_registry: Option<Arc<GestureRegistry>>,
315    /// Application-specific write validator
316    write_validator: Option<Arc<dyn WriteValidator>>,
317    /// Application-specific snapshot filter
318    snapshot_filter: Option<Arc<dyn SnapshotFilter>>,
319    /// Signal transform pipeline for SET values
320    transforms: Option<Arc<dyn SignalTransform>>,
321    /// Rules engine for server-side automation
322    #[cfg(feature = "rules")]
323    rules_engine: Option<Arc<parking_lot::Mutex<RulesEngine>>>,
324}
325
326impl Router {
327    /// Create a new router with the given configuration
328    pub fn new(config: RouterConfig) -> Self {
329        let gesture_registry = if config.gesture_coalescing {
330            Some(Arc::new(GestureRegistry::new(Duration::from_millis(
331                config.gesture_coalesce_interval_ms,
332            ))))
333        } else {
334            None
335        };
336
337        let state = Arc::new(RouterState::with_config(config.state_config.clone()));
338
339        Self {
340            config,
341            sessions: Arc::new(DashMap::new()),
342            subscriptions: Arc::new(SubscriptionManager::new()),
343            state,
344            running: Arc::new(RwLock::new(false)),
345            token_validator: None,
346            p2p_capabilities: Arc::new(P2PCapabilities::new()),
347            gesture_registry,
348            write_validator: None,
349            snapshot_filter: None,
350            transforms: None,
351            #[cfg(feature = "rules")]
352            rules_engine: None,
353        }
354    }
355
356    /// Create a router with a token validator for authenticated mode
357    pub fn with_validator<V: TokenValidator + 'static>(mut self, validator: V) -> Self {
358        self.token_validator = Some(Arc::new(validator));
359        self
360    }
361
362    /// Set the token validator
363    pub fn set_validator<V: TokenValidator + 'static>(&mut self, validator: V) {
364        self.token_validator = Some(Arc::new(validator));
365    }
366
367    /// Set the write validator for application-specific authorization
368    pub fn set_write_validator<V: WriteValidator + 'static>(&mut self, validator: V) {
369        self.write_validator = Some(Arc::new(validator));
370    }
371
372    /// Set the write validator from a pre-wrapped `Arc` (for library embedding).
373    pub fn set_write_validator_arc(&mut self, validator: Arc<dyn WriteValidator>) {
374        self.write_validator = Some(validator);
375    }
376
377    /// Set the snapshot filter for application-specific data redaction
378    pub fn set_snapshot_filter<F: SnapshotFilter + 'static>(&mut self, filter: F) {
379        self.snapshot_filter = Some(Arc::new(filter));
380    }
381
382    /// Set the snapshot filter from a pre-wrapped `Arc` (for library embedding).
383    pub fn set_snapshot_filter_arc(&mut self, filter: Arc<dyn SnapshotFilter>) {
384        self.snapshot_filter = Some(filter);
385    }
386
387    /// Add a signal transform pipeline for processing SET values.
388    ///
389    /// Transforms run after write validation and before state storage.
390    /// Used by LensVM to apply WASM transforms on the router's hot path.
391    pub fn with_transforms(mut self, transforms: Arc<dyn SignalTransform>) -> Self {
392        self.transforms = Some(transforms);
393        self
394    }
395
396    /// Create a router with a journal for state persistence.
397    ///
398    /// The journal records all state mutations, enabling crash recovery
399    /// and REPLAY message support.
400    #[cfg(feature = "journal")]
401    pub fn with_journal(mut self, journal: Arc<dyn Journal>) -> Self {
402        // We need to recreate the state with journal support
403        let mut state = RouterState::with_config(self.config.state_config.clone());
404        state.set_journal(journal);
405        self.state = Arc::new(state);
406        self
407    }
408
409    /// Create a router with a rules engine for server-side automation.
410    ///
411    /// Rules are evaluated after SET and PUBLISH operations, allowing
412    /// automatic responses like "when motion detected, turn on lights".
413    #[cfg(feature = "rules")]
414    pub fn with_rules(mut self, engine: RulesEngine) -> Self {
415        self.rules_engine = Some(Arc::new(parking_lot::Mutex::new(engine)));
416        self
417    }
418
419    /// Get the rules engine interval rules for spawning timer tasks.
420    #[cfg(feature = "rules")]
421    pub fn rules_engine(&self) -> Option<&Arc<parking_lot::Mutex<RulesEngine>>> {
422        self.rules_engine.as_ref()
423    }
424
425    /// Get a reference to the CPSK validator if one is configured
426    /// This allows adding tokens at runtime
427    pub fn cpsk_validator(&self) -> Option<&CpskValidator> {
428        self.token_validator
429            .as_ref()
430            .and_then(|v| v.as_any().downcast_ref::<CpskValidator>())
431    }
432
433    /// Get the security mode
434    pub fn security_mode(&self) -> SecurityMode {
435        self.config.security_mode
436    }
437
438    // =========================================================================
439    // Transport-Agnostic Methods
440    // =========================================================================
441
442    /// Serve using any TransportServer implementation.
443    ///
444    /// This is the core method that all transport-specific methods use internally.
445    /// Use this when you have a custom transport or want full control.
446    ///
447    /// # Example
448    ///
449    /// ```no_run
450    /// use clasp_router::Router;
451    /// use clasp_transport::WebSocketServer;
452    ///
453    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
454    /// let router = Router::default();
455    /// let server = WebSocketServer::bind("0.0.0.0:7330").await?;
456    /// router.serve_on(server).await?;
457    /// # Ok(())
458    /// # }
459    /// ```
460    pub async fn serve_on<S>(&self, mut server: S) -> Result<()>
461    where
462        S: TransportServer + 'static,
463        S::Sender: 'static,
464        S::Receiver: 'static,
465    {
466        info!("Router accepting connections");
467        *self.running.write() = true;
468
469        // Start session cleanup task if timeout is configured
470        if self.config.session_timeout > 0 {
471            self.start_session_cleanup_task();
472        }
473
474        // Start gesture flush task if coalescing is enabled
475        if let Some(ref registry) = self.gesture_registry {
476            self.start_gesture_flush_task(Arc::clone(registry));
477        }
478
479        // Start state cleanup task (removes stale params and signals)
480        self.start_state_cleanup_task();
481
482        while *self.running.read() {
483            match server.accept().await {
484                Ok((sender, receiver, addr)) => {
485                    // Enforce max_sessions limit
486                    let current_sessions = self.sessions.len();
487                    if current_sessions >= self.config.max_sessions {
488                        warn!(
489                            "Rejecting connection from {}: max sessions reached ({}/{})",
490                            addr, current_sessions, self.config.max_sessions
491                        );
492                        // Connection will be closed when sender/receiver are dropped
493                        continue;
494                    }
495
496                    info!("New connection from {}", addr);
497                    #[cfg(feature = "metrics")]
498                    metrics::gauge!("clasp_sessions_active").increment(1.0);
499                    self.handle_connection(Arc::new(sender), receiver, addr);
500                }
501                Err(e) => {
502                    warn!("Accept error: {}", e);
503                }
504            }
505        }
506
507        Ok(())
508    }
509
510    /// Start background task to flush stale gesture moves
511    fn start_gesture_flush_task(&self, registry: Arc<GestureRegistry>) {
512        // Skip gesture flush task if coalescing is disabled (interval is 0)
513        if self.config.gesture_coalesce_interval_ms == 0 {
514            return;
515        }
516
517        let sessions = Arc::clone(&self.sessions);
518        let subscriptions = Arc::clone(&self.subscriptions);
519        let running = Arc::clone(&self.running);
520        let flush_interval = Duration::from_millis(self.config.gesture_coalesce_interval_ms);
521
522        tokio::spawn(async move {
523            let mut ticker = tokio::time::interval(flush_interval);
524
525            loop {
526                ticker.tick().await;
527
528                if !*running.read() {
529                    break;
530                }
531
532                // Flush any stale buffered moves
533                let to_flush = registry.flush_stale();
534                for pub_msg in to_flush {
535                    let msg = Message::Publish(pub_msg.clone());
536                    let subscribers =
537                        subscriptions.find_subscribers(&pub_msg.address, Some(SignalType::Gesture));
538
539                    if let Ok(bytes) = codec::encode(&msg) {
540                        for sub_session_id in subscribers {
541                            if let Some(sub_session) = sessions.get(&sub_session_id) {
542                                crate::handlers::try_send_with_drop_tracking_sync(
543                                    sub_session.value(),
544                                    bytes.clone(),
545                                    &sub_session_id,
546                                );
547                            }
548                        }
549                    }
550                }
551
552                // Cleanup very old gestures (> 5 minutes with no end)
553                registry.cleanup_stale(Duration::from_secs(300));
554            }
555
556            debug!("Gesture flush task stopped");
557        });
558    }
559
560    /// Start background task to clean up timed-out sessions
561    fn start_session_cleanup_task(&self) {
562        let sessions = Arc::clone(&self.sessions);
563        let subscriptions = Arc::clone(&self.subscriptions);
564        let running = Arc::clone(&self.running);
565        let timeout_secs = self.config.session_timeout;
566
567        tokio::spawn(async move {
568            let check_interval = std::time::Duration::from_secs(timeout_secs / 4)
569                .max(std::time::Duration::from_secs(10));
570            let timeout = std::time::Duration::from_secs(timeout_secs);
571
572            loop {
573                tokio::time::sleep(check_interval).await;
574
575                if !*running.read() {
576                    break;
577                }
578
579                // Find and remove timed-out sessions
580                let timed_out: Vec<SessionId> = sessions
581                    .iter()
582                    .filter(|entry| entry.value().idle_duration() > timeout)
583                    .map(|entry| entry.key().clone())
584                    .collect();
585
586                for session_id in timed_out {
587                    if let Some((id, session)) = sessions.remove(&session_id) {
588                        info!(
589                            "Session {} timed out after {:?} idle",
590                            id,
591                            session.idle_duration()
592                        );
593                        subscriptions.remove_session(&id);
594                    }
595                }
596            }
597
598            debug!("Session cleanup task stopped");
599        });
600    }
601
602    /// Start background task to clean up stale state entries
603    fn start_state_cleanup_task(&self) {
604        let state = Arc::clone(&self.state);
605        let running = Arc::clone(&self.running);
606        #[cfg(feature = "metrics")]
607        let sessions = Arc::clone(&self.sessions);
608        #[cfg(feature = "metrics")]
609        let subscriptions = Arc::clone(&self.subscriptions);
610
611        tokio::spawn(async move {
612            // Clean up every 60 seconds
613            let cleanup_interval = std::time::Duration::from_secs(60);
614
615            loop {
616                tokio::time::sleep(cleanup_interval).await;
617
618                if !*running.read() {
619                    break;
620                }
621
622                // Run cleanup on state store
623                let (params_removed, signals_removed) = state.cleanup_stale();
624
625                if params_removed > 0 || signals_removed > 0 {
626                    debug!(
627                        "State cleanup: removed {} stale params, {} stale signals",
628                        params_removed, signals_removed
629                    );
630                }
631
632                // Update absolute gauge values periodically
633                #[cfg(feature = "metrics")]
634                {
635                    metrics::gauge!("clasp_state_params_active").set(state.len() as f64);
636                    metrics::gauge!("clasp_sessions_active").set(sessions.len() as f64);
637                    metrics::gauge!("clasp_subscriptions_active").set(subscriptions.len() as f64);
638                }
639            }
640
641            debug!("State cleanup task stopped");
642        });
643    }
644
645    // =========================================================================
646    // WebSocket Transport
647    // =========================================================================
648
649    /// Start the router on WebSocket (default, recommended).
650    ///
651    /// WebSocket is the universal baseline transport:
652    /// - Works in browsers
653    /// - Works on all hosting platforms (including DO App Platform)
654    /// - Easy firewall/proxy traversal
655    ///
656    /// Default port: 7330
657    #[cfg(feature = "websocket")]
658    pub async fn serve_websocket(&self, addr: &str) -> Result<()> {
659        let server = WebSocketServer::bind(addr).await?;
660        info!("WebSocket server listening on {}", addr);
661        self.serve_on(server).await
662    }
663
664    /// Backward-compatible alias for `serve_websocket`.
665    #[cfg(feature = "websocket")]
666    pub async fn serve(&self, addr: &str) -> Result<()> {
667        self.serve_websocket(addr).await
668    }
669
670    // =========================================================================
671    // QUIC Transport (feature-gated)
672    // =========================================================================
673
674    /// Start the router on QUIC.
675    ///
676    /// QUIC is ideal for native applications:
677    /// - 0-RTT connection establishment
678    /// - Connection migration (mobile networks)
679    /// - Built-in encryption (TLS 1.3)
680    /// - Lower latency than WebSocket
681    ///
682    /// **WARNING**: QUIC requires UDP, which is NOT supported on:
683    /// - DigitalOcean App Platform
684    /// - Many PaaS providers
685    /// - Some corporate firewalls
686    ///
687    /// Use a VPS/Droplet for QUIC support.
688    ///
689    /// Default port: 7331 (to avoid conflict with WebSocket on 7330)
690    #[cfg(feature = "quic")]
691    pub async fn serve_quic(
692        &self,
693        addr: SocketAddr,
694        cert_der: Vec<u8>,
695        key_der: Vec<u8>,
696    ) -> Result<()> {
697        let server = QuicTransport::new_server(addr, cert_der, key_der)
698            .map_err(|e| RouterError::Transport(e))?;
699        info!("QUIC server listening on {}", addr);
700        self.serve_quic_transport(server).await
701    }
702
703    /// Internal: Serve using a QuicTransport server.
704    ///
705    /// QUIC has a different accept pattern (connection then stream),
706    /// so we need special handling.
707    #[cfg(feature = "quic")]
708    async fn serve_quic_transport(&self, server: QuicTransport) -> Result<()> {
709        *self.running.write() = true;
710
711        while *self.running.read() {
712            match server.accept().await {
713                Ok(connection) => {
714                    let addr = connection.remote_address();
715                    info!("QUIC connection from {}", addr);
716
717                    // Accept bidirectional stream for CLASP protocol
718                    match connection.accept_bi().await {
719                        Ok((sender, receiver)) => {
720                            self.handle_connection(Arc::new(sender), receiver, addr);
721                        }
722                        Err(e) => {
723                            error!("QUIC stream accept error: {}", e);
724                        }
725                    }
726                }
727                Err(e) => {
728                    error!("QUIC accept error: {}", e);
729                }
730            }
731        }
732
733        Ok(())
734    }
735
736    // =========================================================================
737    // Multi-Transport Support
738    // =========================================================================
739
740    /// Serve on multiple transports simultaneously.
741    ///
742    /// All transports share the same router state, so a client connected via
743    /// WebSocket can communicate with a client connected via QUIC.
744    ///
745    /// # Example
746    ///
747    /// ```no_run
748    /// use clasp_router::{Router, TransportConfig};
749    ///
750    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
751    /// let router = Router::default();
752    /// router.serve_multi(vec![
753    ///     TransportConfig::WebSocket { addr: "0.0.0.0:7330".into() },
754    ///     // QUIC requires feature and UDP support
755    ///     // TransportConfig::Quic { addr: "0.0.0.0:7331".parse()?, cert, key },
756    /// ]).await?;
757    /// # Ok(())
758    /// # }
759    /// ```
760    pub async fn serve_multi(&self, transports: Vec<TransportConfig>) -> Result<()> {
761        use futures::future::try_join_all;
762
763        if transports.is_empty() {
764            return Err(RouterError::Config("No transports configured".into()));
765        }
766
767        let mut handles = vec![];
768
769        for config in transports {
770            let router = self.clone_internal();
771            let handle = tokio::spawn(async move {
772                match config {
773                    #[cfg(feature = "websocket")]
774                    TransportConfig::WebSocket { addr } => router.serve_websocket(&addr).await,
775                    #[cfg(feature = "quic")]
776                    TransportConfig::Quic { addr, cert, key } => {
777                        router.serve_quic(addr, cert, key).await
778                    }
779                    #[allow(unreachable_patterns)]
780                    _ => Err(RouterError::Config(
781                        "Transport not enabled at compile time".into(),
782                    )),
783                }
784            });
785            handles.push(handle);
786        }
787
788        // Wait for all transports (or first error)
789        let results = try_join_all(handles)
790            .await
791            .map_err(|e| RouterError::Config(format!("Transport task failed: {}", e)))?;
792
793        // Check for errors from any transport
794        for result in results {
795            result?;
796        }
797
798        Ok(())
799    }
800
801    /// Serve all configured protocols simultaneously.
802    ///
803    /// This is the recommended way to run a multi-protocol CLASP server.
804    /// All protocols share the same router state, so clients connected via
805    /// different protocols can communicate seamlessly.
806    ///
807    /// # Example
808    ///
809    /// ```no_run
810    /// use clasp_router::{Router, MultiProtocolConfig};
811    ///
812    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
813    /// let router = Router::default();
814    /// let config = MultiProtocolConfig {
815    ///     websocket_addr: Some("0.0.0.0:7330".into()),
816    ///     ..Default::default()
817    /// };
818    /// router.serve_all(config).await?;
819    /// # Ok(())
820    /// # }
821    /// ```
822    pub async fn serve_all(&self, config: MultiProtocolConfig) -> Result<()> {
823        use futures::future::select_all;
824
825        let mut handles: Vec<tokio::task::JoinHandle<Result<()>>> = vec![];
826        let mut protocol_names: Vec<&str> = vec![];
827
828        // WebSocket server
829        #[cfg(feature = "websocket")]
830        if let Some(ref addr) = config.websocket_addr {
831            info!("Starting WebSocket server on {}", addr);
832            protocol_names.push("WebSocket");
833            let router = self.clone_internal();
834            let addr = addr.clone();
835            handles.push(tokio::spawn(
836                async move { router.serve_websocket(&addr).await },
837            ));
838        }
839
840        // QUIC server
841        #[cfg(feature = "quic")]
842        if let Some(ref quic_config) = config.quic {
843            info!("Starting QUIC server on {}", quic_config.addr);
844            protocol_names.push("QUIC");
845            let router = self.clone_internal();
846            let addr = quic_config.addr;
847            let cert = quic_config.cert.clone();
848            let key = quic_config.key.clone();
849            handles.push(tokio::spawn(async move {
850                router.serve_quic(addr, cert, key).await
851            }));
852        }
853
854        // MQTT server adapter
855        #[cfg(feature = "mqtt-server")]
856        if let Some(mqtt_config) = config.mqtt {
857            info!("Starting MQTT server on {}", mqtt_config.bind_addr);
858            protocol_names.push("MQTT");
859            let adapter = crate::adapters::MqttServerAdapter::new(
860                mqtt_config,
861                Arc::clone(&self.sessions),
862                Arc::clone(&self.subscriptions),
863                Arc::clone(&self.state),
864            );
865            handles.push(tokio::spawn(async move { adapter.serve().await }));
866        }
867
868        // OSC server adapter
869        #[cfg(feature = "osc-server")]
870        if let Some(osc_config) = config.osc {
871            info!("Starting OSC server on {}", osc_config.bind_addr);
872            protocol_names.push("OSC");
873            let adapter = crate::adapters::OscServerAdapter::new(
874                osc_config,
875                Arc::clone(&self.sessions),
876                Arc::clone(&self.subscriptions),
877                Arc::clone(&self.state),
878            );
879            handles.push(tokio::spawn(async move { adapter.serve().await }));
880        }
881
882        if handles.is_empty() {
883            return Err(RouterError::Config("No protocols configured".into()));
884        }
885
886        info!(
887            "Multi-protocol server running with {} protocols: {}",
888            handles.len(),
889            protocol_names.join(", ")
890        );
891
892        *self.running.write() = true;
893
894        // Start session cleanup task
895        if self.config.session_timeout > 0 {
896            self.start_session_cleanup_task();
897        }
898
899        // Start gesture flush task if coalescing is enabled
900        if let Some(ref registry) = self.gesture_registry {
901            self.start_gesture_flush_task(Arc::clone(registry));
902        }
903
904        // Start state cleanup task (removes stale params and signals)
905        self.start_state_cleanup_task();
906
907        // Wait for any server to complete (usually due to error or shutdown)
908        loop {
909            if handles.is_empty() {
910                break;
911            }
912
913            let (result, _index, remaining) = select_all(handles).await;
914            handles = remaining;
915
916            match result {
917                Ok(Ok(())) => {
918                    // Server completed normally (shutdown)
919                    debug!("Protocol server completed normally");
920                }
921                Ok(Err(e)) => {
922                    error!("Protocol server error: {}", e);
923                    // Continue running other servers
924                }
925                Err(e) => {
926                    error!("Protocol server task panicked: {}", e);
927                    // Continue running other servers
928                }
929            }
930        }
931
932        Ok(())
933    }
934
935    /// Get shared state references for use by adapters
936    #[allow(clippy::type_complexity)]
937    pub fn shared_state(
938        &self,
939    ) -> (
940        Arc<DashMap<SessionId, Arc<Session>>>,
941        Arc<SubscriptionManager>,
942        Arc<RouterState>,
943    ) {
944        (
945            Arc::clone(&self.sessions),
946            Arc::clone(&self.subscriptions),
947            Arc::clone(&self.state),
948        )
949    }
950
951    /// Internal clone for spawning transport tasks.
952    /// Shares all Arc state with the original.
953    fn clone_internal(&self) -> Self {
954        Self {
955            config: self.config.clone(),
956            sessions: Arc::clone(&self.sessions),
957            subscriptions: Arc::clone(&self.subscriptions),
958            state: Arc::clone(&self.state),
959            running: Arc::clone(&self.running),
960            token_validator: self.token_validator.clone(),
961            p2p_capabilities: Arc::clone(&self.p2p_capabilities),
962            gesture_registry: self.gesture_registry.clone(),
963            write_validator: self.write_validator.clone(),
964            snapshot_filter: self.snapshot_filter.clone(),
965            transforms: self.transforms.clone(),
966            #[cfg(feature = "rules")]
967            rules_engine: self.rules_engine.clone(),
968        }
969    }
970
971    /// Get active gesture count (for diagnostics)
972    pub fn active_gesture_count(&self) -> usize {
973        self.gesture_registry
974            .as_ref()
975            .map(|r| r.active_count())
976            .unwrap_or(0)
977    }
978
979    /// Handle a new connection
980    fn handle_connection(
981        &self,
982        sender: Arc<dyn TransportSender>,
983        mut receiver: impl TransportReceiver + 'static,
984        addr: SocketAddr,
985    ) {
986        let sessions = Arc::clone(&self.sessions);
987        let subscriptions = Arc::clone(&self.subscriptions);
988        let state = Arc::clone(&self.state);
989        let config = self.config.clone();
990        let running = Arc::clone(&self.running);
991        let token_validator = self.token_validator.clone();
992        let security_mode = self.config.security_mode;
993        let p2p_capabilities = Arc::clone(&self.p2p_capabilities);
994        let gesture_registry = self.gesture_registry.clone();
995        let write_validator = self.write_validator.clone();
996        let snapshot_filter = self.snapshot_filter.clone();
997        let transforms = self.transforms.clone();
998        #[cfg(feature = "rules")]
999        let rules_engine = self.rules_engine.clone();
1000
1001        let conn_span =
1002            tracing::info_span!("connection", session_id = tracing::field::Empty, remote = %addr);
1003
1004        tokio::spawn(
1005            async move {
1006                let mut session: Option<Arc<Session>> = None;
1007                let mut handshake_complete = false;
1008
1009                // Phase 1: Wait for Hello message with timeout
1010                let handshake_result = tokio::time::timeout(HANDSHAKE_TIMEOUT, async {
1011                    loop {
1012                        match receiver.recv().await {
1013                            Some(TransportEvent::Data(data)) => {
1014                                // Decode and check if it's a Hello message
1015                                match codec::decode(&data) {
1016                                    Ok((msg, _)) => {
1017                                        if matches!(msg, Message::Hello(_)) {
1018                                            return Some(data);
1019                                        } else {
1020                                            // Non-Hello message before handshake
1021                                            warn!(
1022                                            "Received non-Hello message before handshake from {}",
1023                                            addr
1024                                        );
1025                                            return None;
1026                                        }
1027                                    }
1028                                    Err(e) => {
1029                                        warn!("Decode error during handshake from {}: {}", addr, e);
1030                                        return None;
1031                                    }
1032                                }
1033                            }
1034                            Some(TransportEvent::Disconnected { .. }) | None => {
1035                                return None;
1036                            }
1037                            Some(TransportEvent::Error(e)) => {
1038                                error!("Transport error during handshake from {}: {}", addr, e);
1039                                return None;
1040                            }
1041                            _ => {}
1042                        }
1043                    }
1044                })
1045                .await;
1046
1047                // Check handshake result
1048                let hello_data = match handshake_result {
1049                    Ok(Some(data)) => data,
1050                    Ok(None) => {
1051                        info!("Handshake failed for {}", addr);
1052                        return;
1053                    }
1054                    Err(_) => {
1055                        warn!(
1056                            "Handshake timeout for {} after {:?}",
1057                            addr, HANDSHAKE_TIMEOUT
1058                        );
1059                        return;
1060                    }
1061                };
1062
1063                // Process the Hello message
1064                if let Ok((msg, frame)) = codec::decode(&hello_data) {
1065                    let ctx = handlers::HandlerContext {
1066                        session: &session,
1067                        sender: &sender,
1068                        sessions: &sessions,
1069                        subscriptions: &subscriptions,
1070                        state: &state,
1071                        config: &config,
1072                        security_mode,
1073                        token_validator: &token_validator,
1074                        p2p_capabilities: &p2p_capabilities,
1075                        gesture_registry: &gesture_registry,
1076                        write_validator: &write_validator,
1077                        snapshot_filter: &snapshot_filter,
1078                        transforms: &transforms,
1079                        #[cfg(feature = "rules")]
1080                        rules_engine: &rules_engine,
1081                    };
1082                    if let Some(response) = handlers::handle_message(&msg, &frame, &ctx).await {
1083                        match response {
1084                            handlers::MessageResult::NewSession(s) => {
1085                                tracing::Span::current()
1086                                    .record("session_id", tracing::field::display(&s.id));
1087                                session = Some(s);
1088                                handshake_complete = true;
1089                            }
1090                            handlers::MessageResult::Send(bytes) => {
1091                                let _ = sender.send(bytes).await;
1092                            }
1093                            handlers::MessageResult::Disconnect => {
1094                                info!(
1095                                    "Disconnecting client {} due to auth failure during handshake",
1096                                    addr
1097                                );
1098                                return;
1099                            }
1100                            _ => {}
1101                        }
1102                    }
1103                }
1104
1105                if !handshake_complete {
1106                    debug!("Handshake incomplete for {}", addr);
1107                    return;
1108                }
1109
1110                // Phase 2: Main message loop (after successful handshake)
1111                while *running.read() {
1112                    match receiver.recv().await {
1113                        Some(TransportEvent::Data(data)) => {
1114                            // Check rate limit before processing
1115                            if config.rate_limiting_enabled {
1116                                if let Some(ref s) = session {
1117                                    if !s.check_rate_limit(config.max_messages_per_second) {
1118                                        warn!(
1119                                            "Rate limit exceeded for session {} ({} msgs/sec > {})",
1120                                            s.id,
1121                                            s.messages_per_second(),
1122                                            config.max_messages_per_second
1123                                        );
1124                                        // Send error and continue (don't disconnect for rate limiting)
1125                                        let error = Message::Error(ErrorMessage {
1126                                            code: 429, // Too Many Requests
1127                                            message: format!(
1128                                                "Rate limit exceeded: {} messages/second",
1129                                                config.max_messages_per_second
1130                                            ),
1131                                            address: None,
1132                                            correlation_id: None,
1133                                        });
1134                                        if let Ok(bytes) = codec::encode(&error) {
1135                                            let _ = sender.send(bytes).await;
1136                                        }
1137                                        continue;
1138                                    }
1139                                }
1140                            }
1141
1142                            // Decode message
1143                            match codec::decode(&data) {
1144                                Ok((msg, frame)) => {
1145                                    let ctx = handlers::HandlerContext {
1146                                        session: &session,
1147                                        sender: &sender,
1148                                        sessions: &sessions,
1149                                        subscriptions: &subscriptions,
1150                                        state: &state,
1151                                        config: &config,
1152                                        security_mode,
1153                                        token_validator: &token_validator,
1154                                        p2p_capabilities: &p2p_capabilities,
1155                                        gesture_registry: &gesture_registry,
1156                                        write_validator: &write_validator,
1157                                        snapshot_filter: &snapshot_filter,
1158                                        transforms: &transforms,
1159                                        #[cfg(feature = "rules")]
1160                                        rules_engine: &rules_engine,
1161                                    };
1162                                    if let Some(response) =
1163                                        handlers::handle_message(&msg, &frame, &ctx).await
1164                                    {
1165                                        match response {
1166                                            handlers::MessageResult::NewSession(s) => {
1167                                                session = Some(s);
1168                                            }
1169                                            handlers::MessageResult::Send(bytes) => {
1170                                                if let Err(e) = sender.send(bytes).await {
1171                                                    error!("Send error: {}", e);
1172                                                    break;
1173                                                }
1174                                            }
1175                                            handlers::MessageResult::Broadcast(bytes, exclude) => {
1176                                                handlers::broadcast_to_subscribers(
1177                                                    &bytes, &sessions, &exclude,
1178                                                );
1179                                            }
1180                                            handlers::MessageResult::Disconnect => {
1181                                                info!(
1182                                                    "Disconnecting client {} due to auth failure",
1183                                                    addr
1184                                                );
1185                                                break;
1186                                            }
1187                                            handlers::MessageResult::None => {}
1188                                        }
1189                                    }
1190                                }
1191                                Err(e) => {
1192                                    warn!("Decode error from {}: {}", addr, e);
1193                                }
1194                            }
1195                        }
1196                        Some(TransportEvent::Disconnected { reason }) => {
1197                            info!("Client {} disconnected: {:?}", addr, reason);
1198                            break;
1199                        }
1200                        Some(TransportEvent::Error(e)) => {
1201                            error!("Transport error from {}: {}", addr, e);
1202                            break;
1203                        }
1204                        None => {
1205                            break;
1206                        }
1207                        _ => {}
1208                    }
1209                }
1210
1211                // Cleanup session
1212                if let Some(s) = session {
1213                    info!("Removing session {}", s.id);
1214                    sessions.remove(&s.id);
1215                    subscriptions.remove_session(&s.id);
1216                    p2p_capabilities.unregister(&s.id);
1217                    #[cfg(feature = "metrics")]
1218                    metrics::gauge!("clasp_sessions_active").decrement(1.0);
1219                }
1220            }
1221            .instrument(conn_span),
1222        );
1223    }
1224
1225    /// Stop the router
1226    pub fn stop(&self) {
1227        *self.running.write() = false;
1228    }
1229
1230    /// Get session count
1231    pub fn session_count(&self) -> usize {
1232        self.sessions.len()
1233    }
1234
1235    /// Get state
1236    pub fn state(&self) -> &RouterState {
1237        &self.state
1238    }
1239
1240    /// Get subscription count
1241    pub fn subscription_count(&self) -> usize {
1242        self.subscriptions.len()
1243    }
1244}
1245
1246impl Default for Router {
1247    fn default() -> Self {
1248        Self::new(RouterConfig::default())
1249    }
1250}
1251
1252/// Execute pending actions produced by the rules engine.
1253///
1254/// Applies SET actions to state and broadcasts to subscribers.
1255/// PUBLISH actions are encoded and broadcast to matching subscribers.
1256/// Actions carry an origin like "rule:my_rule_id" to prevent re-triggering.
1257#[cfg(feature = "rules")]
1258pub fn execute_rule_actions(
1259    actions: Vec<clasp_rules::PendingAction>,
1260    state: &Arc<RouterState>,
1261    sessions: &Arc<DashMap<SessionId, Arc<Session>>>,
1262    subscriptions: &Arc<SubscriptionManager>,
1263) {
1264    for action in actions {
1265        match action.action {
1266            clasp_rules::RuleAction::Set { address, value } => {
1267                match state.set(
1268                    &address,
1269                    value.clone(),
1270                    &action.origin,
1271                    None,
1272                    false,
1273                    false,
1274                    None,
1275                ) {
1276                    Ok(revision) => {
1277                        let subscribers =
1278                            subscriptions.find_subscribers(&address, Some(SignalType::Param));
1279                        let set_msg = Message::Set(SetMessage {
1280                            address: address.clone(),
1281                            value,
1282                            revision: Some(revision),
1283                            lock: false,
1284                            unlock: false,
1285                            ttl: None,
1286                        });
1287                        if let Ok(bytes) = codec::encode(&set_msg) {
1288                            for sub_session_id in subscribers {
1289                                if let Some(sub_session) = sessions.get(&sub_session_id) {
1290                                    crate::handlers::try_send_with_drop_tracking_sync(
1291                                        sub_session.value(),
1292                                        bytes.clone(),
1293                                        &sub_session_id,
1294                                    );
1295                                }
1296                            }
1297                        }
1298                        debug!("Rule {} applied SET to {}", action.rule_id, address);
1299                    }
1300                    Err(e) => {
1301                        warn!("Rule {} SET to {} failed: {:?}", action.rule_id, address, e);
1302                    }
1303                }
1304            }
1305            clasp_rules::RuleAction::Publish {
1306                address,
1307                signal,
1308                value,
1309            } => {
1310                let pub_msg = Message::Publish(PublishMessage {
1311                    address: address.clone(),
1312                    signal: Some(signal),
1313                    value,
1314                    payload: None,
1315                    samples: None,
1316                    rate: None,
1317                    id: None,
1318                    phase: None,
1319                    timestamp: None,
1320                    timeline: None,
1321                });
1322                let subscribers = subscriptions.find_subscribers(&address, Some(signal));
1323                if let Ok(bytes) = codec::encode(&pub_msg) {
1324                    for sub_session_id in subscribers {
1325                        if let Some(sub_session) = sessions.get(&sub_session_id) {
1326                            crate::handlers::try_send_with_drop_tracking_sync(
1327                                sub_session.value(),
1328                                bytes.clone(),
1329                                &sub_session_id,
1330                            );
1331                        }
1332                    }
1333                }
1334                debug!("Rule {} applied PUBLISH to {}", action.rule_id, address);
1335            }
1336            clasp_rules::RuleAction::SetFromTrigger { address, transform } => {
1337                if let Some(current) = state.get(&address) {
1338                    let transformed = transform.apply(&current);
1339                    match state.set(
1340                        &address,
1341                        transformed.clone(),
1342                        &action.origin,
1343                        None,
1344                        false,
1345                        false,
1346                        None,
1347                    ) {
1348                        Ok(revision) => {
1349                            let subscribers =
1350                                subscriptions.find_subscribers(&address, Some(SignalType::Param));
1351                            let set_msg = Message::Set(SetMessage {
1352                                address: address.clone(),
1353                                value: transformed,
1354                                revision: Some(revision),
1355                                lock: false,
1356                                unlock: false,
1357                                ttl: None,
1358                            });
1359                            if let Ok(bytes) = codec::encode(&set_msg) {
1360                                for sub_session_id in subscribers {
1361                                    if let Some(sub_session) = sessions.get(&sub_session_id) {
1362                                        crate::handlers::try_send_with_drop_tracking_sync(
1363                                            sub_session.value(),
1364                                            bytes.clone(),
1365                                            &sub_session_id,
1366                                        );
1367                                    }
1368                                }
1369                            }
1370                            debug!(
1371                                "Rule {} applied SetFromTrigger to {}",
1372                                action.rule_id, address
1373                            );
1374                        }
1375                        Err(e) => {
1376                            warn!(
1377                                "Rule {} SetFromTrigger to {} failed: {:?}",
1378                                action.rule_id, address, e
1379                            );
1380                        }
1381                    }
1382                }
1383            }
1384            clasp_rules::RuleAction::Delay { .. } => {
1385                // Delay actions are handled at a higher level (relay timer tasks)
1386            }
1387        }
1388    }
1389}
1390
1391/// Check if a federation `request` pattern is covered by a `declared` namespace pattern.
1392///
1393/// A request is covered if every address it could match is also matched by the declared
1394/// namespace. This handles:
1395/// - Exact match: `/sensors/temp` covered by `/sensors/temp`
1396/// - Concrete within glob: `/sensors/temp/1` covered by `/sensors/**`
1397/// - Sub-pattern within glob: `/sensors/temp/**` covered by `/sensors/**`
1398#[cfg(feature = "federation")]
1399pub(crate) fn federation_pattern_covered_by(request: &str, declared: &str) -> bool {
1400    // Exact match
1401    if request == declared {
1402        return true;
1403    }
1404
1405    // If the request has no wildcards, we can use glob_match to check if
1406    // the declared namespace covers it as a literal address.
1407    // We must NOT do this when request contains wildcards, because glob_match
1408    // would treat `**` in the request as literal characters.
1409    let request_has_wildcards = request.contains('*');
1410    if !request_has_wildcards && clasp_core::address::glob_match(declared, request) {
1411        return true;
1412    }
1413
1414    // Sub-pattern check: strip wildcards from request and check prefix coverage
1415    // e.g., `/sensors/temp/**` is covered by `/sensors/**`
1416    let decl_parts: Vec<&str> = declared.split('/').filter(|s| !s.is_empty()).collect();
1417    let req_parts: Vec<&str> = request.split('/').filter(|s| !s.is_empty()).collect();
1418
1419    let mut di = 0;
1420    let mut ri = 0;
1421
1422    while di < decl_parts.len() && ri < req_parts.len() {
1423        let dp = decl_parts[di];
1424        let rp = req_parts[ri];
1425
1426        if dp == "**" {
1427            // Declared namespace has **, covers everything below this prefix
1428            return true;
1429        }
1430
1431        if rp == "**" {
1432            // Request has ** — it's wider than anything except declared **
1433            // (which was already checked above)
1434            return false;
1435        }
1436
1437        if dp == "*" {
1438            // Declared * matches any single segment in request
1439            // (rp == "**" already handled above)
1440            if rp == "*" {
1441                // Both are single wildcards — equivalent at this position
1442                di += 1;
1443                ri += 1;
1444                continue;
1445            }
1446            // rp is a literal — covered by declared *
1447            di += 1;
1448            ri += 1;
1449            continue;
1450        }
1451
1452        if rp == "*" {
1453            // Request has * where declared has literal — request is wider, not covered
1454            return false;
1455        }
1456
1457        if dp != rp {
1458            return false;
1459        }
1460
1461        di += 1;
1462        ri += 1;
1463    }
1464
1465    // If declared is exhausted but request still has segments, not covered
1466    // (unless declared ended with **)
1467    if di < decl_parts.len() && decl_parts[di] == "**" {
1468        return true;
1469    }
1470
1471    di >= decl_parts.len() && ri >= req_parts.len()
1472}
1473
1474#[cfg(all(test, feature = "federation"))]
1475mod federation_tests {
1476    use super::*;
1477
1478    // --- federation_pattern_covered_by tests ---
1479
1480    #[test]
1481    fn test_exact_match() {
1482        assert!(federation_pattern_covered_by(
1483            "/sensors/temp",
1484            "/sensors/temp"
1485        ));
1486    }
1487
1488    #[test]
1489    fn test_concrete_within_globstar() {
1490        assert!(federation_pattern_covered_by(
1491            "/sensors/temp/1",
1492            "/sensors/**"
1493        ));
1494        assert!(federation_pattern_covered_by(
1495            "/sensors/temp",
1496            "/sensors/**"
1497        ));
1498    }
1499
1500    #[test]
1501    fn test_sub_pattern_within_globstar() {
1502        assert!(federation_pattern_covered_by(
1503            "/sensors/temp/**",
1504            "/sensors/**"
1505        ));
1506        assert!(federation_pattern_covered_by(
1507            "/sensors/temp/*",
1508            "/sensors/**"
1509        ));
1510    }
1511
1512    #[test]
1513    fn test_globstar_root_covers_all() {
1514        assert!(federation_pattern_covered_by("/sensors/**", "/**"));
1515        assert!(federation_pattern_covered_by("/anything/deep/path", "/**"));
1516    }
1517
1518    #[test]
1519    fn test_disjoint_namespaces_rejected() {
1520        assert!(!federation_pattern_covered_by("/audio/**", "/sensors/**"));
1521        assert!(!federation_pattern_covered_by(
1522            "/audio/mixer",
1523            "/sensors/**"
1524        ));
1525    }
1526
1527    #[test]
1528    fn test_wider_pattern_rejected() {
1529        // Request for /** but declared only /sensors/**
1530        assert!(!federation_pattern_covered_by("/**", "/sensors/**"));
1531    }
1532
1533    #[test]
1534    fn test_wildcard_in_request_wider_than_literal() {
1535        // /sensors/* is wider than /sensors/temp (declared)
1536        assert!(!federation_pattern_covered_by(
1537            "/sensors/*",
1538            "/sensors/temp"
1539        ));
1540    }
1541
1542    #[test]
1543    fn test_declared_single_wildcard() {
1544        // Declared /sensors/*, request /sensors/temp — covered
1545        assert!(federation_pattern_covered_by("/sensors/temp", "/sensors/*"));
1546    }
1547
1548    // --- Session federation feature tests ---
1549
1550    #[test]
1551    fn test_federation_peer_detection() {
1552        let fed_session = Session::stub_federation("hub-peer");
1553        assert!(fed_session.is_federation_peer());
1554
1555        let normal_session = Session::stub(None);
1556        assert!(!normal_session.is_federation_peer());
1557    }
1558
1559    #[test]
1560    fn test_federation_namespaces_lifecycle() {
1561        let session = Session::stub_federation("peer");
1562        assert!(session.federation_namespaces().is_empty());
1563
1564        session
1565            .set_federation_namespaces(vec!["/sensors/**".to_string(), "/lights/**".to_string()]);
1566        let ns = session.federation_namespaces();
1567        assert_eq!(ns.len(), 2);
1568        assert!(ns.contains(&"/sensors/**".to_string()));
1569        assert!(ns.contains(&"/lights/**".to_string()));
1570
1571        // Re-declare replaces
1572        session.set_federation_namespaces(vec!["/audio/**".to_string()]);
1573        let ns = session.federation_namespaces();
1574        assert_eq!(ns.len(), 1);
1575        assert_eq!(ns[0], "/audio/**");
1576    }
1577
1578    #[test]
1579    fn test_federation_router_id() {
1580        let session = Session::stub_federation("peer");
1581        assert!(session.federation_router_id().is_none());
1582
1583        session.set_federation_router_id("hub-alpha".to_string());
1584        assert_eq!(session.federation_router_id().unwrap(), "hub-alpha");
1585    }
1586
1587    #[test]
1588    fn test_federation_subscription_id_range() {
1589        // Federation subscriptions use IDs starting at 50000
1590        // User subscriptions typically use small sequential IDs
1591        // Verify the ranges don't overlap with typical usage
1592        let session = Session::stub_federation("peer");
1593        session.add_subscription(1); // user sub
1594        session.add_subscription(50000); // federation sub
1595        session.add_subscription(50001); // federation sub
1596
1597        let subs = session.subscriptions();
1598        assert_eq!(subs.len(), 3);
1599        assert!(subs.contains(&1));
1600        assert!(subs.contains(&50000));
1601        assert!(subs.contains(&50001));
1602
1603        // Remove federation sub, user sub remains
1604        session.remove_subscription(50000);
1605        let subs = session.subscriptions();
1606        assert_eq!(subs.len(), 2);
1607        assert!(subs.contains(&1));
1608        assert!(!subs.contains(&50000));
1609    }
1610
1611    // --- Resource limit constant tests ---
1612
1613    #[test]
1614    fn test_resource_limits_are_sane() {
1615        // Verify the constants are within reasonable bounds
1616        // (these are compile-time checks essentially)
1617        const MAX_PATTERNS: usize = 1000;
1618        const MAX_REVISIONS: usize = 10_000;
1619        assert!(MAX_PATTERNS > 0 && MAX_PATTERNS <= 10_000);
1620        assert!(MAX_REVISIONS > 0 && MAX_REVISIONS <= 100_000);
1621    }
1622
1623    // --- Pattern matcher edge case / fuzz tests ---
1624
1625    #[test]
1626    fn test_empty_strings() {
1627        // Empty patterns should not match anything useful
1628        assert!(federation_pattern_covered_by("", ""));
1629        assert!(!federation_pattern_covered_by("/a", ""));
1630        assert!(!federation_pattern_covered_by("", "/a"));
1631    }
1632
1633    #[test]
1634    fn test_root_slash_only() {
1635        // Root path edge cases
1636        assert!(federation_pattern_covered_by("/", "/"));
1637        assert!(federation_pattern_covered_by("/", "/**"));
1638    }
1639
1640    #[test]
1641    fn test_trailing_slash() {
1642        // Trailing slash creates an empty segment that gets filtered
1643        assert!(federation_pattern_covered_by("/sensors/", "/sensors/**"));
1644        assert!(federation_pattern_covered_by(
1645            "/sensors/temp/",
1646            "/sensors/**"
1647        ));
1648    }
1649
1650    #[test]
1651    fn test_double_slashes() {
1652        // Double slashes create empty segments that get filtered
1653        assert!(federation_pattern_covered_by(
1654            "//sensors//temp",
1655            "/sensors/**"
1656        ));
1657    }
1658
1659    #[test]
1660    fn test_deep_nesting_under_globstar() {
1661        assert!(federation_pattern_covered_by("/a/b/c/d/e/f/g", "/**"));
1662        assert!(federation_pattern_covered_by("/a/b/c/d/e/f/g/**", "/**"));
1663        assert!(federation_pattern_covered_by("/a/b/c/d/e", "/a/**"));
1664        assert!(!federation_pattern_covered_by("/a/b/c/d/e", "/b/**"));
1665    }
1666
1667    #[test]
1668    fn test_single_wildcard_depth_mismatch() {
1669        // /a/* covers one level under /a/; request for deeper path is NOT covered
1670        assert!(federation_pattern_covered_by("/a/b", "/a/*"));
1671        assert!(!federation_pattern_covered_by("/a/b/c", "/a/*"));
1672    }
1673
1674    #[test]
1675    fn test_wildcard_request_vs_literal_declared() {
1676        // Request with wildcard is wider than literal — should be rejected
1677        assert!(!federation_pattern_covered_by("/a/*", "/a/b"));
1678        assert!(!federation_pattern_covered_by("/a/**", "/a/b"));
1679        assert!(!federation_pattern_covered_by("/a/**", "/a/b/c"));
1680    }
1681
1682    #[test]
1683    fn test_request_globstar_vs_declared_single_wildcard() {
1684        // /a/** is wider than /a/* — should be rejected
1685        assert!(!federation_pattern_covered_by("/a/**", "/a/*"));
1686    }
1687
1688    #[test]
1689    fn test_mixed_wildcards_in_declared() {
1690        // Declared /a/*/c/** should cover /a/x/c/d
1691        assert!(federation_pattern_covered_by("/a/x/c/d", "/a/*/c/**"));
1692        // But not /a/x/y/d (wrong segment at position 2)
1693        assert!(!federation_pattern_covered_by("/a/x/y/d", "/a/*/c/**"));
1694    }
1695
1696    #[test]
1697    fn test_request_pattern_with_wildcards_in_middle() {
1698        // Request /a/*/c is wider at position 1 than declared /a/b/**
1699        // even though declared covers deeper paths under /a/b
1700        assert!(!federation_pattern_covered_by("/a/*/c", "/a/b/**"));
1701    }
1702
1703    #[test]
1704    fn test_identical_wildcard_patterns() {
1705        assert!(federation_pattern_covered_by("/**", "/**"));
1706        assert!(federation_pattern_covered_by("/a/**", "/a/**"));
1707        assert!(federation_pattern_covered_by("/a/*", "/a/*"));
1708    }
1709
1710    #[test]
1711    fn test_path_traversal_segments() {
1712        // ".." is just a literal segment in CLASP, not filesystem traversal
1713        assert!(!federation_pattern_covered_by(
1714            "/../sensors/temp",
1715            "/sensors/**"
1716        ));
1717        assert!(federation_pattern_covered_by("/../sensors/temp", "/**"));
1718    }
1719
1720    #[test]
1721    fn test_single_segment_patterns() {
1722        assert!(federation_pattern_covered_by("/a", "/a"));
1723        assert!(!federation_pattern_covered_by("/a", "/b"));
1724        assert!(federation_pattern_covered_by("/a", "/*"));
1725        assert!(federation_pattern_covered_by("/a", "/**"));
1726    }
1727
1728    #[test]
1729    fn test_declared_shorter_than_request_no_wildcard() {
1730        // Declared /a/b does not cover /a/b/c — no wildcard means exact depth only
1731        assert!(!federation_pattern_covered_by("/a/b/c", "/a/b"));
1732    }
1733
1734    #[test]
1735    fn test_request_shorter_than_declared() {
1736        // Request /a doesn't match declared /a/b (request must be within declared scope)
1737        assert!(!federation_pattern_covered_by("/a", "/a/b"));
1738    }
1739}
1740
1741#[cfg(test)]
1742mod transform_tests {
1743    use super::*;
1744    use clasp_core::Value;
1745
1746    /// A test transform that doubles numeric values for /sensors/** addresses.
1747    struct DoubleTransform;
1748
1749    impl SignalTransform for DoubleTransform {
1750        fn transform(&self, address: &str, value: &Value) -> Option<Value> {
1751            if clasp_core::address::glob_match("/sensors/**", address) {
1752                match value {
1753                    Value::Float(f) => Some(Value::Float(f * 2.0)),
1754                    Value::Int(i) => Some(Value::Int(i * 2)),
1755                    _ => None,
1756                }
1757            } else {
1758                None
1759            }
1760        }
1761    }
1762
1763    /// A transform that always returns None (passthrough / no-op).
1764    struct PassthroughTransform;
1765
1766    impl SignalTransform for PassthroughTransform {
1767        fn transform(&self, _address: &str, _value: &Value) -> Option<Value> {
1768            None
1769        }
1770    }
1771
1772    /// A transform that clamps floats into [0.0, 1.0].
1773    struct ClampTransform;
1774
1775    impl SignalTransform for ClampTransform {
1776        fn transform(&self, _address: &str, value: &Value) -> Option<Value> {
1777            match value {
1778                Value::Float(f) => {
1779                    let clamped = f.clamp(0.0, 1.0);
1780                    if (clamped - f).abs() > f64::EPSILON {
1781                        Some(Value::Float(clamped))
1782                    } else {
1783                        None
1784                    }
1785                }
1786                _ => None,
1787            }
1788        }
1789    }
1790
1791    // -- Transform trait logic tests --
1792
1793    #[test]
1794    fn transform_applied_to_matching_address() {
1795        let t = DoubleTransform;
1796        let result = t.transform("/sensors/temp", &Value::Float(22.5));
1797        assert_eq!(result, Some(Value::Float(45.0)));
1798    }
1799
1800    #[test]
1801    fn transform_applied_to_int_value() {
1802        let t = DoubleTransform;
1803        let result = t.transform("/sensors/pressure", &Value::Int(50));
1804        assert_eq!(result, Some(Value::Int(100)));
1805    }
1806
1807    #[test]
1808    fn transform_skips_non_matching_address() {
1809        let t = DoubleTransform;
1810        let result = t.transform("/lights/brightness", &Value::Float(0.5));
1811        assert_eq!(result, None);
1812    }
1813
1814    #[test]
1815    fn transform_handles_nested_glob_pattern() {
1816        let t = DoubleTransform;
1817        // ** should match multiple path segments
1818        let result = t.transform("/sensors/room1/temp", &Value::Int(20));
1819        assert_eq!(result, Some(Value::Int(40)));
1820    }
1821
1822    #[test]
1823    fn transform_returns_none_for_non_numeric_on_match() {
1824        let t = DoubleTransform;
1825        // Address matches but value type is string -- transform returns None (passthrough)
1826        let result = t.transform("/sensors/name", &Value::String("probe-1".into()));
1827        assert_eq!(result, None);
1828    }
1829
1830    #[test]
1831    fn passthrough_transform_always_returns_none() {
1832        let t = PassthroughTransform;
1833        assert_eq!(t.transform("/anything", &Value::Float(1.0)), None);
1834        assert_eq!(t.transform("/sensors/temp", &Value::Int(42)), None);
1835        assert_eq!(
1836            t.transform("/a/b/c", &Value::String("hello".into())),
1837            None
1838        );
1839    }
1840
1841    #[test]
1842    fn clamp_transform_caps_high_value() {
1843        let t = ClampTransform;
1844        assert_eq!(
1845            t.transform("/vol", &Value::Float(1.5)),
1846            Some(Value::Float(1.0))
1847        );
1848    }
1849
1850    #[test]
1851    fn clamp_transform_floors_low_value() {
1852        let t = ClampTransform;
1853        assert_eq!(
1854            t.transform("/vol", &Value::Float(-0.3)),
1855            Some(Value::Float(0.0))
1856        );
1857    }
1858
1859    #[test]
1860    fn clamp_transform_passes_through_in_range() {
1861        let t = ClampTransform;
1862        // Value already in [0, 1] -- returns None (no change)
1863        assert_eq!(t.transform("/vol", &Value::Float(0.5)), None);
1864    }
1865
1866    #[test]
1867    fn clamp_transform_ignores_non_float() {
1868        let t = ClampTransform;
1869        assert_eq!(t.transform("/vol", &Value::Int(5)), None);
1870    }
1871
1872    // -- First-match-wins: simulates the SET handler's transform selection --
1873
1874    /// A chain of transforms where the first match wins, mirroring set.rs logic:
1875    ///   if let Some(new_value) = transforms.transform(addr, val) { use new_value }
1876    ///   else { use original }
1877    struct ChainTransform {
1878        inner: Vec<Arc<dyn SignalTransform>>,
1879    }
1880
1881    impl SignalTransform for ChainTransform {
1882        fn transform(&self, address: &str, value: &Value) -> Option<Value> {
1883            for t in &self.inner {
1884                if let Some(v) = t.transform(address, value) {
1885                    return Some(v);
1886                }
1887            }
1888            None
1889        }
1890    }
1891
1892    #[test]
1893    fn chain_first_match_wins() {
1894        // ClampTransform fires first (clamps 5.0 to 1.0),
1895        // DoubleTransform would double it but never runs
1896        let chain = ChainTransform {
1897            inner: vec![Arc::new(ClampTransform), Arc::new(DoubleTransform)],
1898        };
1899        let result = chain.transform("/sensors/level", &Value::Float(5.0));
1900        assert_eq!(result, Some(Value::Float(1.0)));
1901    }
1902
1903    #[test]
1904    fn chain_falls_through_to_second() {
1905        // For a non-sensor address, DoubleTransform returns None.
1906        // ClampTransform returns None for in-range values.
1907        // Put DoubleTransform first: it skips /lights, then ClampTransform fires.
1908        let chain = ChainTransform {
1909            inner: vec![Arc::new(DoubleTransform), Arc::new(ClampTransform)],
1910        };
1911        let result = chain.transform("/lights/dim", &Value::Float(2.0));
1912        assert_eq!(result, Some(Value::Float(1.0)));
1913    }
1914
1915    #[test]
1916    fn chain_all_passthrough() {
1917        let chain = ChainTransform {
1918            inner: vec![Arc::new(PassthroughTransform), Arc::new(PassthroughTransform)],
1919        };
1920        let result = chain.transform("/any", &Value::Float(42.0));
1921        assert_eq!(result, None);
1922    }
1923
1924    // -- Router structural tests --
1925
1926    #[test]
1927    fn router_accepts_transform() {
1928        let config = RouterConfig::default();
1929        let router = Router::new(config).with_transforms(Arc::new(DoubleTransform));
1930        assert!(router.transforms.is_some());
1931    }
1932
1933    #[test]
1934    fn router_without_transform_has_none() {
1935        let config = RouterConfig::default();
1936        let router = Router::new(config);
1937        assert!(router.transforms.is_none());
1938    }
1939
1940    #[test]
1941    fn router_state_set_bypasses_transform() {
1942        // Verify that calling state().set() directly does NOT apply transforms.
1943        // This documents the important design fact: transforms only run in the
1944        // SET message handler (handlers/set.rs), not in the state store.
1945        let config = RouterConfig::default();
1946        let router = Router::new(config).with_transforms(Arc::new(DoubleTransform));
1947        let writer = "test-session".to_string();
1948
1949        router
1950            .state()
1951            .set("/sensors/temp", Value::Float(22.5), &writer, None, false, false, None)
1952            .unwrap();
1953
1954        // Value is 22.5, NOT 45.0 -- state.set() bypasses the transform pipeline
1955        let stored = router.state().get("/sensors/temp").unwrap();
1956        assert_eq!(stored, Value::Float(22.5));
1957    }
1958}