1use clasp_core::{
30 codec, CpskValidator, ErrorMessage, Message, SecurityMode, SignalType, TokenValidator,
31};
32#[cfg(feature = "rules")]
33use clasp_core::{PublishMessage, SetMessage};
34
35#[cfg(feature = "journal")]
36use clasp_journal::Journal;
37#[cfg(feature = "rules")]
38use clasp_rules::RulesEngine;
39use clasp_transport::{TransportEvent, TransportReceiver, TransportSender, TransportServer};
40use dashmap::DashMap;
41use parking_lot::RwLock;
42use std::net::SocketAddr;
43use std::sync::Arc;
44use tracing::{debug, error, info, warn, Instrument};
45
46#[cfg(feature = "websocket")]
47use clasp_transport::WebSocketServer;
48
49#[cfg(feature = "quic")]
50use clasp_transport::{QuicConfig, QuicTransport};
51
52use crate::{
53 error::{Result, RouterError},
54 gesture::GestureRegistry,
55 handlers,
56 p2p::P2PCapabilities,
57 session::{Session, SessionId},
58 state::{RouterState, RouterStateConfig},
59 subscription::SubscriptionManager,
60};
61use std::time::Duration;
62
63pub trait WriteValidator: Send + Sync {
69 fn validate_write(
78 &self,
79 address: &str,
80 value: &clasp_core::Value,
81 session: &Session,
82 state: &RouterState,
83 ) -> std::result::Result<(), String>;
84}
85
86pub trait SnapshotFilter: Send + Sync {
92 fn filter_snapshot(
100 &self,
101 params: Vec<clasp_core::ParamValue>,
102 session: &Session,
103 state: &RouterState,
104 ) -> Vec<clasp_core::ParamValue>;
105}
106
107pub trait SignalTransform: Send + Sync {
112 fn transform(&self, address: &str, value: &clasp_core::Value) -> Option<clasp_core::Value>;
114}
115
116const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(10);
118
119#[derive(Debug, Clone)]
123pub enum TransportConfig {
124 #[cfg(feature = "websocket")]
126 WebSocket {
127 addr: String,
129 },
130
131 #[cfg(feature = "quic")]
136 Quic {
137 addr: SocketAddr,
139 cert: Vec<u8>,
141 key: Vec<u8>,
143 },
144}
145
146#[derive(Debug, Clone, Default)]
171pub struct MultiProtocolConfig {
172 #[cfg(feature = "websocket")]
174 pub websocket_addr: Option<String>,
175
176 #[cfg(feature = "quic")]
178 pub quic: Option<QuicServerConfig>,
179
180 #[cfg(feature = "mqtt-server")]
182 pub mqtt: Option<crate::adapters::MqttServerConfig>,
183
184 #[cfg(feature = "osc-server")]
186 pub osc: Option<crate::adapters::OscServerConfig>,
187}
188
189#[cfg(feature = "quic")]
191#[derive(Debug, Clone)]
192pub struct QuicServerConfig {
193 pub addr: SocketAddr,
195 pub cert: Vec<u8>,
197 pub key: Vec<u8>,
199}
200
201#[derive(Debug, Clone)]
203pub struct RouterConfig {
204 pub name: String,
206 pub features: Vec<String>,
208 pub max_sessions: usize,
210 pub session_timeout: u64,
212 pub security_mode: SecurityMode,
214 pub max_subscriptions_per_session: usize,
216 pub gesture_coalescing: bool,
218 pub gesture_coalesce_interval_ms: u64,
220 pub max_messages_per_second: u32,
222 pub rate_limiting_enabled: bool,
224 pub state_config: RouterStateConfig,
226}
227
228impl Default for RouterConfig {
229 fn default() -> Self {
230 Self {
231 name: "Clasp Router".to_string(),
232 features: vec![
233 "param".to_string(),
234 "event".to_string(),
235 "stream".to_string(),
236 "timeline".to_string(),
237 "gesture".to_string(),
238 ],
239 max_sessions: 100,
240 session_timeout: 300,
241 security_mode: SecurityMode::Open,
242 max_subscriptions_per_session: 1000, gesture_coalescing: true,
244 gesture_coalesce_interval_ms: 16,
245 max_messages_per_second: 1000, rate_limiting_enabled: true,
247 state_config: RouterStateConfig::default(), }
249 }
250}
251
252#[derive(Debug, Clone, Default)]
254pub struct RouterConfigBuilder {
255 config: RouterConfig,
256}
257
258impl RouterConfigBuilder {
259 pub fn new() -> Self {
260 Self::default()
261 }
262
263 pub fn name(mut self, name: impl Into<String>) -> Self {
264 self.config.name = name.into();
265 self
266 }
267
268 pub fn max_sessions(mut self, max: usize) -> Self {
269 self.config.max_sessions = max;
270 self
271 }
272
273 pub fn session_timeout(mut self, secs: u64) -> Self {
274 self.config.session_timeout = secs;
275 self
276 }
277
278 pub fn security_mode(mut self, mode: SecurityMode) -> Self {
279 self.config.security_mode = mode;
280 self
281 }
282
283 pub fn gesture_coalescing(mut self, enabled: bool) -> Self {
284 self.config.gesture_coalescing = enabled;
285 self
286 }
287
288 pub fn gesture_coalesce_interval_ms(mut self, ms: u64) -> Self {
289 self.config.gesture_coalesce_interval_ms = ms;
290 self
291 }
292
293 pub fn build(self) -> RouterConfig {
294 self.config
295 }
296}
297
298pub struct Router {
300 config: RouterConfig,
301 sessions: Arc<DashMap<SessionId, Arc<Session>>>,
303 subscriptions: Arc<SubscriptionManager>,
305 state: Arc<RouterState>,
307 running: Arc<RwLock<bool>>,
309 token_validator: Option<Arc<dyn TokenValidator>>,
311 p2p_capabilities: Arc<P2PCapabilities>,
313 gesture_registry: Option<Arc<GestureRegistry>>,
315 write_validator: Option<Arc<dyn WriteValidator>>,
317 snapshot_filter: Option<Arc<dyn SnapshotFilter>>,
319 transforms: Option<Arc<dyn SignalTransform>>,
321 #[cfg(feature = "rules")]
323 rules_engine: Option<Arc<parking_lot::Mutex<RulesEngine>>>,
324}
325
326impl Router {
327 pub fn new(config: RouterConfig) -> Self {
329 let gesture_registry = if config.gesture_coalescing {
330 Some(Arc::new(GestureRegistry::new(Duration::from_millis(
331 config.gesture_coalesce_interval_ms,
332 ))))
333 } else {
334 None
335 };
336
337 let state = Arc::new(RouterState::with_config(config.state_config.clone()));
338
339 Self {
340 config,
341 sessions: Arc::new(DashMap::new()),
342 subscriptions: Arc::new(SubscriptionManager::new()),
343 state,
344 running: Arc::new(RwLock::new(false)),
345 token_validator: None,
346 p2p_capabilities: Arc::new(P2PCapabilities::new()),
347 gesture_registry,
348 write_validator: None,
349 snapshot_filter: None,
350 transforms: None,
351 #[cfg(feature = "rules")]
352 rules_engine: None,
353 }
354 }
355
356 pub fn with_validator<V: TokenValidator + 'static>(mut self, validator: V) -> Self {
358 self.token_validator = Some(Arc::new(validator));
359 self
360 }
361
362 pub fn set_validator<V: TokenValidator + 'static>(&mut self, validator: V) {
364 self.token_validator = Some(Arc::new(validator));
365 }
366
367 pub fn set_write_validator<V: WriteValidator + 'static>(&mut self, validator: V) {
369 self.write_validator = Some(Arc::new(validator));
370 }
371
372 pub fn set_write_validator_arc(&mut self, validator: Arc<dyn WriteValidator>) {
374 self.write_validator = Some(validator);
375 }
376
377 pub fn set_snapshot_filter<F: SnapshotFilter + 'static>(&mut self, filter: F) {
379 self.snapshot_filter = Some(Arc::new(filter));
380 }
381
382 pub fn set_snapshot_filter_arc(&mut self, filter: Arc<dyn SnapshotFilter>) {
384 self.snapshot_filter = Some(filter);
385 }
386
387 pub fn with_transforms(mut self, transforms: Arc<dyn SignalTransform>) -> Self {
392 self.transforms = Some(transforms);
393 self
394 }
395
396 #[cfg(feature = "journal")]
401 pub fn with_journal(mut self, journal: Arc<dyn Journal>) -> Self {
402 let mut state = RouterState::with_config(self.config.state_config.clone());
404 state.set_journal(journal);
405 self.state = Arc::new(state);
406 self
407 }
408
409 #[cfg(feature = "rules")]
414 pub fn with_rules(mut self, engine: RulesEngine) -> Self {
415 self.rules_engine = Some(Arc::new(parking_lot::Mutex::new(engine)));
416 self
417 }
418
419 #[cfg(feature = "rules")]
421 pub fn rules_engine(&self) -> Option<&Arc<parking_lot::Mutex<RulesEngine>>> {
422 self.rules_engine.as_ref()
423 }
424
425 pub fn cpsk_validator(&self) -> Option<&CpskValidator> {
428 self.token_validator
429 .as_ref()
430 .and_then(|v| v.as_any().downcast_ref::<CpskValidator>())
431 }
432
433 pub fn security_mode(&self) -> SecurityMode {
435 self.config.security_mode
436 }
437
438 pub async fn serve_on<S>(&self, mut server: S) -> Result<()>
461 where
462 S: TransportServer + 'static,
463 S::Sender: 'static,
464 S::Receiver: 'static,
465 {
466 info!("Router accepting connections");
467 *self.running.write() = true;
468
469 if self.config.session_timeout > 0 {
471 self.start_session_cleanup_task();
472 }
473
474 if let Some(ref registry) = self.gesture_registry {
476 self.start_gesture_flush_task(Arc::clone(registry));
477 }
478
479 self.start_state_cleanup_task();
481
482 while *self.running.read() {
483 match server.accept().await {
484 Ok((sender, receiver, addr)) => {
485 let current_sessions = self.sessions.len();
487 if current_sessions >= self.config.max_sessions {
488 warn!(
489 "Rejecting connection from {}: max sessions reached ({}/{})",
490 addr, current_sessions, self.config.max_sessions
491 );
492 continue;
494 }
495
496 info!("New connection from {}", addr);
497 #[cfg(feature = "metrics")]
498 metrics::gauge!("clasp_sessions_active").increment(1.0);
499 self.handle_connection(Arc::new(sender), receiver, addr);
500 }
501 Err(e) => {
502 warn!("Accept error: {}", e);
503 }
504 }
505 }
506
507 Ok(())
508 }
509
510 fn start_gesture_flush_task(&self, registry: Arc<GestureRegistry>) {
512 if self.config.gesture_coalesce_interval_ms == 0 {
514 return;
515 }
516
517 let sessions = Arc::clone(&self.sessions);
518 let subscriptions = Arc::clone(&self.subscriptions);
519 let running = Arc::clone(&self.running);
520 let flush_interval = Duration::from_millis(self.config.gesture_coalesce_interval_ms);
521
522 tokio::spawn(async move {
523 let mut ticker = tokio::time::interval(flush_interval);
524
525 loop {
526 ticker.tick().await;
527
528 if !*running.read() {
529 break;
530 }
531
532 let to_flush = registry.flush_stale();
534 for pub_msg in to_flush {
535 let msg = Message::Publish(pub_msg.clone());
536 let subscribers =
537 subscriptions.find_subscribers(&pub_msg.address, Some(SignalType::Gesture));
538
539 if let Ok(bytes) = codec::encode(&msg) {
540 for sub_session_id in subscribers {
541 if let Some(sub_session) = sessions.get(&sub_session_id) {
542 crate::handlers::try_send_with_drop_tracking_sync(
543 sub_session.value(),
544 bytes.clone(),
545 &sub_session_id,
546 );
547 }
548 }
549 }
550 }
551
552 registry.cleanup_stale(Duration::from_secs(300));
554 }
555
556 debug!("Gesture flush task stopped");
557 });
558 }
559
560 fn start_session_cleanup_task(&self) {
562 let sessions = Arc::clone(&self.sessions);
563 let subscriptions = Arc::clone(&self.subscriptions);
564 let running = Arc::clone(&self.running);
565 let timeout_secs = self.config.session_timeout;
566
567 tokio::spawn(async move {
568 let check_interval = std::time::Duration::from_secs(timeout_secs / 4)
569 .max(std::time::Duration::from_secs(10));
570 let timeout = std::time::Duration::from_secs(timeout_secs);
571
572 loop {
573 tokio::time::sleep(check_interval).await;
574
575 if !*running.read() {
576 break;
577 }
578
579 let timed_out: Vec<SessionId> = sessions
581 .iter()
582 .filter(|entry| entry.value().idle_duration() > timeout)
583 .map(|entry| entry.key().clone())
584 .collect();
585
586 for session_id in timed_out {
587 if let Some((id, session)) = sessions.remove(&session_id) {
588 info!(
589 "Session {} timed out after {:?} idle",
590 id,
591 session.idle_duration()
592 );
593 subscriptions.remove_session(&id);
594 }
595 }
596 }
597
598 debug!("Session cleanup task stopped");
599 });
600 }
601
602 fn start_state_cleanup_task(&self) {
604 let state = Arc::clone(&self.state);
605 let running = Arc::clone(&self.running);
606 #[cfg(feature = "metrics")]
607 let sessions = Arc::clone(&self.sessions);
608 #[cfg(feature = "metrics")]
609 let subscriptions = Arc::clone(&self.subscriptions);
610
611 tokio::spawn(async move {
612 let cleanup_interval = std::time::Duration::from_secs(60);
614
615 loop {
616 tokio::time::sleep(cleanup_interval).await;
617
618 if !*running.read() {
619 break;
620 }
621
622 let (params_removed, signals_removed) = state.cleanup_stale();
624
625 if params_removed > 0 || signals_removed > 0 {
626 debug!(
627 "State cleanup: removed {} stale params, {} stale signals",
628 params_removed, signals_removed
629 );
630 }
631
632 #[cfg(feature = "metrics")]
634 {
635 metrics::gauge!("clasp_state_params_active").set(state.len() as f64);
636 metrics::gauge!("clasp_sessions_active").set(sessions.len() as f64);
637 metrics::gauge!("clasp_subscriptions_active").set(subscriptions.len() as f64);
638 }
639 }
640
641 debug!("State cleanup task stopped");
642 });
643 }
644
645 #[cfg(feature = "websocket")]
658 pub async fn serve_websocket(&self, addr: &str) -> Result<()> {
659 let server = WebSocketServer::bind(addr).await?;
660 info!("WebSocket server listening on {}", addr);
661 self.serve_on(server).await
662 }
663
664 #[cfg(feature = "websocket")]
666 pub async fn serve(&self, addr: &str) -> Result<()> {
667 self.serve_websocket(addr).await
668 }
669
670 #[cfg(feature = "quic")]
691 pub async fn serve_quic(
692 &self,
693 addr: SocketAddr,
694 cert_der: Vec<u8>,
695 key_der: Vec<u8>,
696 ) -> Result<()> {
697 let server = QuicTransport::new_server(addr, cert_der, key_der)
698 .map_err(|e| RouterError::Transport(e))?;
699 info!("QUIC server listening on {}", addr);
700 self.serve_quic_transport(server).await
701 }
702
703 #[cfg(feature = "quic")]
708 async fn serve_quic_transport(&self, server: QuicTransport) -> Result<()> {
709 *self.running.write() = true;
710
711 while *self.running.read() {
712 match server.accept().await {
713 Ok(connection) => {
714 let addr = connection.remote_address();
715 info!("QUIC connection from {}", addr);
716
717 match connection.accept_bi().await {
719 Ok((sender, receiver)) => {
720 self.handle_connection(Arc::new(sender), receiver, addr);
721 }
722 Err(e) => {
723 error!("QUIC stream accept error: {}", e);
724 }
725 }
726 }
727 Err(e) => {
728 error!("QUIC accept error: {}", e);
729 }
730 }
731 }
732
733 Ok(())
734 }
735
736 pub async fn serve_multi(&self, transports: Vec<TransportConfig>) -> Result<()> {
761 use futures::future::try_join_all;
762
763 if transports.is_empty() {
764 return Err(RouterError::Config("No transports configured".into()));
765 }
766
767 let mut handles = vec![];
768
769 for config in transports {
770 let router = self.clone_internal();
771 let handle = tokio::spawn(async move {
772 match config {
773 #[cfg(feature = "websocket")]
774 TransportConfig::WebSocket { addr } => router.serve_websocket(&addr).await,
775 #[cfg(feature = "quic")]
776 TransportConfig::Quic { addr, cert, key } => {
777 router.serve_quic(addr, cert, key).await
778 }
779 #[allow(unreachable_patterns)]
780 _ => Err(RouterError::Config(
781 "Transport not enabled at compile time".into(),
782 )),
783 }
784 });
785 handles.push(handle);
786 }
787
788 let results = try_join_all(handles)
790 .await
791 .map_err(|e| RouterError::Config(format!("Transport task failed: {}", e)))?;
792
793 for result in results {
795 result?;
796 }
797
798 Ok(())
799 }
800
801 pub async fn serve_all(&self, config: MultiProtocolConfig) -> Result<()> {
823 use futures::future::select_all;
824
825 let mut handles: Vec<tokio::task::JoinHandle<Result<()>>> = vec![];
826 let mut protocol_names: Vec<&str> = vec![];
827
828 #[cfg(feature = "websocket")]
830 if let Some(ref addr) = config.websocket_addr {
831 info!("Starting WebSocket server on {}", addr);
832 protocol_names.push("WebSocket");
833 let router = self.clone_internal();
834 let addr = addr.clone();
835 handles.push(tokio::spawn(
836 async move { router.serve_websocket(&addr).await },
837 ));
838 }
839
840 #[cfg(feature = "quic")]
842 if let Some(ref quic_config) = config.quic {
843 info!("Starting QUIC server on {}", quic_config.addr);
844 protocol_names.push("QUIC");
845 let router = self.clone_internal();
846 let addr = quic_config.addr;
847 let cert = quic_config.cert.clone();
848 let key = quic_config.key.clone();
849 handles.push(tokio::spawn(async move {
850 router.serve_quic(addr, cert, key).await
851 }));
852 }
853
854 #[cfg(feature = "mqtt-server")]
856 if let Some(mqtt_config) = config.mqtt {
857 info!("Starting MQTT server on {}", mqtt_config.bind_addr);
858 protocol_names.push("MQTT");
859 let adapter = crate::adapters::MqttServerAdapter::new(
860 mqtt_config,
861 Arc::clone(&self.sessions),
862 Arc::clone(&self.subscriptions),
863 Arc::clone(&self.state),
864 );
865 handles.push(tokio::spawn(async move { adapter.serve().await }));
866 }
867
868 #[cfg(feature = "osc-server")]
870 if let Some(osc_config) = config.osc {
871 info!("Starting OSC server on {}", osc_config.bind_addr);
872 protocol_names.push("OSC");
873 let adapter = crate::adapters::OscServerAdapter::new(
874 osc_config,
875 Arc::clone(&self.sessions),
876 Arc::clone(&self.subscriptions),
877 Arc::clone(&self.state),
878 );
879 handles.push(tokio::spawn(async move { adapter.serve().await }));
880 }
881
882 if handles.is_empty() {
883 return Err(RouterError::Config("No protocols configured".into()));
884 }
885
886 info!(
887 "Multi-protocol server running with {} protocols: {}",
888 handles.len(),
889 protocol_names.join(", ")
890 );
891
892 *self.running.write() = true;
893
894 if self.config.session_timeout > 0 {
896 self.start_session_cleanup_task();
897 }
898
899 if let Some(ref registry) = self.gesture_registry {
901 self.start_gesture_flush_task(Arc::clone(registry));
902 }
903
904 self.start_state_cleanup_task();
906
907 loop {
909 if handles.is_empty() {
910 break;
911 }
912
913 let (result, _index, remaining) = select_all(handles).await;
914 handles = remaining;
915
916 match result {
917 Ok(Ok(())) => {
918 debug!("Protocol server completed normally");
920 }
921 Ok(Err(e)) => {
922 error!("Protocol server error: {}", e);
923 }
925 Err(e) => {
926 error!("Protocol server task panicked: {}", e);
927 }
929 }
930 }
931
932 Ok(())
933 }
934
935 #[allow(clippy::type_complexity)]
937 pub fn shared_state(
938 &self,
939 ) -> (
940 Arc<DashMap<SessionId, Arc<Session>>>,
941 Arc<SubscriptionManager>,
942 Arc<RouterState>,
943 ) {
944 (
945 Arc::clone(&self.sessions),
946 Arc::clone(&self.subscriptions),
947 Arc::clone(&self.state),
948 )
949 }
950
951 fn clone_internal(&self) -> Self {
954 Self {
955 config: self.config.clone(),
956 sessions: Arc::clone(&self.sessions),
957 subscriptions: Arc::clone(&self.subscriptions),
958 state: Arc::clone(&self.state),
959 running: Arc::clone(&self.running),
960 token_validator: self.token_validator.clone(),
961 p2p_capabilities: Arc::clone(&self.p2p_capabilities),
962 gesture_registry: self.gesture_registry.clone(),
963 write_validator: self.write_validator.clone(),
964 snapshot_filter: self.snapshot_filter.clone(),
965 transforms: self.transforms.clone(),
966 #[cfg(feature = "rules")]
967 rules_engine: self.rules_engine.clone(),
968 }
969 }
970
971 pub fn active_gesture_count(&self) -> usize {
973 self.gesture_registry
974 .as_ref()
975 .map(|r| r.active_count())
976 .unwrap_or(0)
977 }
978
979 fn handle_connection(
981 &self,
982 sender: Arc<dyn TransportSender>,
983 mut receiver: impl TransportReceiver + 'static,
984 addr: SocketAddr,
985 ) {
986 let sessions = Arc::clone(&self.sessions);
987 let subscriptions = Arc::clone(&self.subscriptions);
988 let state = Arc::clone(&self.state);
989 let config = self.config.clone();
990 let running = Arc::clone(&self.running);
991 let token_validator = self.token_validator.clone();
992 let security_mode = self.config.security_mode;
993 let p2p_capabilities = Arc::clone(&self.p2p_capabilities);
994 let gesture_registry = self.gesture_registry.clone();
995 let write_validator = self.write_validator.clone();
996 let snapshot_filter = self.snapshot_filter.clone();
997 let transforms = self.transforms.clone();
998 #[cfg(feature = "rules")]
999 let rules_engine = self.rules_engine.clone();
1000
1001 let conn_span =
1002 tracing::info_span!("connection", session_id = tracing::field::Empty, remote = %addr);
1003
1004 tokio::spawn(
1005 async move {
1006 let mut session: Option<Arc<Session>> = None;
1007 let mut handshake_complete = false;
1008
1009 let handshake_result = tokio::time::timeout(HANDSHAKE_TIMEOUT, async {
1011 loop {
1012 match receiver.recv().await {
1013 Some(TransportEvent::Data(data)) => {
1014 match codec::decode(&data) {
1016 Ok((msg, _)) => {
1017 if matches!(msg, Message::Hello(_)) {
1018 return Some(data);
1019 } else {
1020 warn!(
1022 "Received non-Hello message before handshake from {}",
1023 addr
1024 );
1025 return None;
1026 }
1027 }
1028 Err(e) => {
1029 warn!("Decode error during handshake from {}: {}", addr, e);
1030 return None;
1031 }
1032 }
1033 }
1034 Some(TransportEvent::Disconnected { .. }) | None => {
1035 return None;
1036 }
1037 Some(TransportEvent::Error(e)) => {
1038 error!("Transport error during handshake from {}: {}", addr, e);
1039 return None;
1040 }
1041 _ => {}
1042 }
1043 }
1044 })
1045 .await;
1046
1047 let hello_data = match handshake_result {
1049 Ok(Some(data)) => data,
1050 Ok(None) => {
1051 info!("Handshake failed for {}", addr);
1052 return;
1053 }
1054 Err(_) => {
1055 warn!(
1056 "Handshake timeout for {} after {:?}",
1057 addr, HANDSHAKE_TIMEOUT
1058 );
1059 return;
1060 }
1061 };
1062
1063 if let Ok((msg, frame)) = codec::decode(&hello_data) {
1065 let ctx = handlers::HandlerContext {
1066 session: &session,
1067 sender: &sender,
1068 sessions: &sessions,
1069 subscriptions: &subscriptions,
1070 state: &state,
1071 config: &config,
1072 security_mode,
1073 token_validator: &token_validator,
1074 p2p_capabilities: &p2p_capabilities,
1075 gesture_registry: &gesture_registry,
1076 write_validator: &write_validator,
1077 snapshot_filter: &snapshot_filter,
1078 transforms: &transforms,
1079 #[cfg(feature = "rules")]
1080 rules_engine: &rules_engine,
1081 };
1082 if let Some(response) = handlers::handle_message(&msg, &frame, &ctx).await {
1083 match response {
1084 handlers::MessageResult::NewSession(s) => {
1085 tracing::Span::current()
1086 .record("session_id", tracing::field::display(&s.id));
1087 session = Some(s);
1088 handshake_complete = true;
1089 }
1090 handlers::MessageResult::Send(bytes) => {
1091 let _ = sender.send(bytes).await;
1092 }
1093 handlers::MessageResult::Disconnect => {
1094 info!(
1095 "Disconnecting client {} due to auth failure during handshake",
1096 addr
1097 );
1098 return;
1099 }
1100 _ => {}
1101 }
1102 }
1103 }
1104
1105 if !handshake_complete {
1106 debug!("Handshake incomplete for {}", addr);
1107 return;
1108 }
1109
1110 while *running.read() {
1112 match receiver.recv().await {
1113 Some(TransportEvent::Data(data)) => {
1114 if config.rate_limiting_enabled {
1116 if let Some(ref s) = session {
1117 if !s.check_rate_limit(config.max_messages_per_second) {
1118 warn!(
1119 "Rate limit exceeded for session {} ({} msgs/sec > {})",
1120 s.id,
1121 s.messages_per_second(),
1122 config.max_messages_per_second
1123 );
1124 let error = Message::Error(ErrorMessage {
1126 code: 429, message: format!(
1128 "Rate limit exceeded: {} messages/second",
1129 config.max_messages_per_second
1130 ),
1131 address: None,
1132 correlation_id: None,
1133 });
1134 if let Ok(bytes) = codec::encode(&error) {
1135 let _ = sender.send(bytes).await;
1136 }
1137 continue;
1138 }
1139 }
1140 }
1141
1142 match codec::decode(&data) {
1144 Ok((msg, frame)) => {
1145 let ctx = handlers::HandlerContext {
1146 session: &session,
1147 sender: &sender,
1148 sessions: &sessions,
1149 subscriptions: &subscriptions,
1150 state: &state,
1151 config: &config,
1152 security_mode,
1153 token_validator: &token_validator,
1154 p2p_capabilities: &p2p_capabilities,
1155 gesture_registry: &gesture_registry,
1156 write_validator: &write_validator,
1157 snapshot_filter: &snapshot_filter,
1158 transforms: &transforms,
1159 #[cfg(feature = "rules")]
1160 rules_engine: &rules_engine,
1161 };
1162 if let Some(response) =
1163 handlers::handle_message(&msg, &frame, &ctx).await
1164 {
1165 match response {
1166 handlers::MessageResult::NewSession(s) => {
1167 session = Some(s);
1168 }
1169 handlers::MessageResult::Send(bytes) => {
1170 if let Err(e) = sender.send(bytes).await {
1171 error!("Send error: {}", e);
1172 break;
1173 }
1174 }
1175 handlers::MessageResult::Broadcast(bytes, exclude) => {
1176 handlers::broadcast_to_subscribers(
1177 &bytes, &sessions, &exclude,
1178 );
1179 }
1180 handlers::MessageResult::Disconnect => {
1181 info!(
1182 "Disconnecting client {} due to auth failure",
1183 addr
1184 );
1185 break;
1186 }
1187 handlers::MessageResult::None => {}
1188 }
1189 }
1190 }
1191 Err(e) => {
1192 warn!("Decode error from {}: {}", addr, e);
1193 }
1194 }
1195 }
1196 Some(TransportEvent::Disconnected { reason }) => {
1197 info!("Client {} disconnected: {:?}", addr, reason);
1198 break;
1199 }
1200 Some(TransportEvent::Error(e)) => {
1201 error!("Transport error from {}: {}", addr, e);
1202 break;
1203 }
1204 None => {
1205 break;
1206 }
1207 _ => {}
1208 }
1209 }
1210
1211 if let Some(s) = session {
1213 info!("Removing session {}", s.id);
1214 sessions.remove(&s.id);
1215 subscriptions.remove_session(&s.id);
1216 p2p_capabilities.unregister(&s.id);
1217 #[cfg(feature = "metrics")]
1218 metrics::gauge!("clasp_sessions_active").decrement(1.0);
1219 }
1220 }
1221 .instrument(conn_span),
1222 );
1223 }
1224
1225 pub fn stop(&self) {
1227 *self.running.write() = false;
1228 }
1229
1230 pub fn session_count(&self) -> usize {
1232 self.sessions.len()
1233 }
1234
1235 pub fn state(&self) -> &RouterState {
1237 &self.state
1238 }
1239
1240 pub fn subscription_count(&self) -> usize {
1242 self.subscriptions.len()
1243 }
1244}
1245
1246impl Default for Router {
1247 fn default() -> Self {
1248 Self::new(RouterConfig::default())
1249 }
1250}
1251
1252#[cfg(feature = "rules")]
1258pub fn execute_rule_actions(
1259 actions: Vec<clasp_rules::PendingAction>,
1260 state: &Arc<RouterState>,
1261 sessions: &Arc<DashMap<SessionId, Arc<Session>>>,
1262 subscriptions: &Arc<SubscriptionManager>,
1263) {
1264 for action in actions {
1265 match action.action {
1266 clasp_rules::RuleAction::Set { address, value } => {
1267 match state.set(
1268 &address,
1269 value.clone(),
1270 &action.origin,
1271 None,
1272 false,
1273 false,
1274 None,
1275 ) {
1276 Ok(revision) => {
1277 let subscribers =
1278 subscriptions.find_subscribers(&address, Some(SignalType::Param));
1279 let set_msg = Message::Set(SetMessage {
1280 address: address.clone(),
1281 value,
1282 revision: Some(revision),
1283 lock: false,
1284 unlock: false,
1285 ttl: None,
1286 });
1287 if let Ok(bytes) = codec::encode(&set_msg) {
1288 for sub_session_id in subscribers {
1289 if let Some(sub_session) = sessions.get(&sub_session_id) {
1290 crate::handlers::try_send_with_drop_tracking_sync(
1291 sub_session.value(),
1292 bytes.clone(),
1293 &sub_session_id,
1294 );
1295 }
1296 }
1297 }
1298 debug!("Rule {} applied SET to {}", action.rule_id, address);
1299 }
1300 Err(e) => {
1301 warn!("Rule {} SET to {} failed: {:?}", action.rule_id, address, e);
1302 }
1303 }
1304 }
1305 clasp_rules::RuleAction::Publish {
1306 address,
1307 signal,
1308 value,
1309 } => {
1310 let pub_msg = Message::Publish(PublishMessage {
1311 address: address.clone(),
1312 signal: Some(signal),
1313 value,
1314 payload: None,
1315 samples: None,
1316 rate: None,
1317 id: None,
1318 phase: None,
1319 timestamp: None,
1320 timeline: None,
1321 });
1322 let subscribers = subscriptions.find_subscribers(&address, Some(signal));
1323 if let Ok(bytes) = codec::encode(&pub_msg) {
1324 for sub_session_id in subscribers {
1325 if let Some(sub_session) = sessions.get(&sub_session_id) {
1326 crate::handlers::try_send_with_drop_tracking_sync(
1327 sub_session.value(),
1328 bytes.clone(),
1329 &sub_session_id,
1330 );
1331 }
1332 }
1333 }
1334 debug!("Rule {} applied PUBLISH to {}", action.rule_id, address);
1335 }
1336 clasp_rules::RuleAction::SetFromTrigger { address, transform } => {
1337 if let Some(current) = state.get(&address) {
1338 let transformed = transform.apply(¤t);
1339 match state.set(
1340 &address,
1341 transformed.clone(),
1342 &action.origin,
1343 None,
1344 false,
1345 false,
1346 None,
1347 ) {
1348 Ok(revision) => {
1349 let subscribers =
1350 subscriptions.find_subscribers(&address, Some(SignalType::Param));
1351 let set_msg = Message::Set(SetMessage {
1352 address: address.clone(),
1353 value: transformed,
1354 revision: Some(revision),
1355 lock: false,
1356 unlock: false,
1357 ttl: None,
1358 });
1359 if let Ok(bytes) = codec::encode(&set_msg) {
1360 for sub_session_id in subscribers {
1361 if let Some(sub_session) = sessions.get(&sub_session_id) {
1362 crate::handlers::try_send_with_drop_tracking_sync(
1363 sub_session.value(),
1364 bytes.clone(),
1365 &sub_session_id,
1366 );
1367 }
1368 }
1369 }
1370 debug!(
1371 "Rule {} applied SetFromTrigger to {}",
1372 action.rule_id, address
1373 );
1374 }
1375 Err(e) => {
1376 warn!(
1377 "Rule {} SetFromTrigger to {} failed: {:?}",
1378 action.rule_id, address, e
1379 );
1380 }
1381 }
1382 }
1383 }
1384 clasp_rules::RuleAction::Delay { .. } => {
1385 }
1387 }
1388 }
1389}
1390
1391#[cfg(feature = "federation")]
1399pub(crate) fn federation_pattern_covered_by(request: &str, declared: &str) -> bool {
1400 if request == declared {
1402 return true;
1403 }
1404
1405 let request_has_wildcards = request.contains('*');
1410 if !request_has_wildcards && clasp_core::address::glob_match(declared, request) {
1411 return true;
1412 }
1413
1414 let decl_parts: Vec<&str> = declared.split('/').filter(|s| !s.is_empty()).collect();
1417 let req_parts: Vec<&str> = request.split('/').filter(|s| !s.is_empty()).collect();
1418
1419 let mut di = 0;
1420 let mut ri = 0;
1421
1422 while di < decl_parts.len() && ri < req_parts.len() {
1423 let dp = decl_parts[di];
1424 let rp = req_parts[ri];
1425
1426 if dp == "**" {
1427 return true;
1429 }
1430
1431 if rp == "**" {
1432 return false;
1435 }
1436
1437 if dp == "*" {
1438 if rp == "*" {
1441 di += 1;
1443 ri += 1;
1444 continue;
1445 }
1446 di += 1;
1448 ri += 1;
1449 continue;
1450 }
1451
1452 if rp == "*" {
1453 return false;
1455 }
1456
1457 if dp != rp {
1458 return false;
1459 }
1460
1461 di += 1;
1462 ri += 1;
1463 }
1464
1465 if di < decl_parts.len() && decl_parts[di] == "**" {
1468 return true;
1469 }
1470
1471 di >= decl_parts.len() && ri >= req_parts.len()
1472}
1473
1474#[cfg(all(test, feature = "federation"))]
1475mod federation_tests {
1476 use super::*;
1477
1478 #[test]
1481 fn test_exact_match() {
1482 assert!(federation_pattern_covered_by(
1483 "/sensors/temp",
1484 "/sensors/temp"
1485 ));
1486 }
1487
1488 #[test]
1489 fn test_concrete_within_globstar() {
1490 assert!(federation_pattern_covered_by(
1491 "/sensors/temp/1",
1492 "/sensors/**"
1493 ));
1494 assert!(federation_pattern_covered_by(
1495 "/sensors/temp",
1496 "/sensors/**"
1497 ));
1498 }
1499
1500 #[test]
1501 fn test_sub_pattern_within_globstar() {
1502 assert!(federation_pattern_covered_by(
1503 "/sensors/temp/**",
1504 "/sensors/**"
1505 ));
1506 assert!(federation_pattern_covered_by(
1507 "/sensors/temp/*",
1508 "/sensors/**"
1509 ));
1510 }
1511
1512 #[test]
1513 fn test_globstar_root_covers_all() {
1514 assert!(federation_pattern_covered_by("/sensors/**", "/**"));
1515 assert!(federation_pattern_covered_by("/anything/deep/path", "/**"));
1516 }
1517
1518 #[test]
1519 fn test_disjoint_namespaces_rejected() {
1520 assert!(!federation_pattern_covered_by("/audio/**", "/sensors/**"));
1521 assert!(!federation_pattern_covered_by(
1522 "/audio/mixer",
1523 "/sensors/**"
1524 ));
1525 }
1526
1527 #[test]
1528 fn test_wider_pattern_rejected() {
1529 assert!(!federation_pattern_covered_by("/**", "/sensors/**"));
1531 }
1532
1533 #[test]
1534 fn test_wildcard_in_request_wider_than_literal() {
1535 assert!(!federation_pattern_covered_by(
1537 "/sensors/*",
1538 "/sensors/temp"
1539 ));
1540 }
1541
1542 #[test]
1543 fn test_declared_single_wildcard() {
1544 assert!(federation_pattern_covered_by("/sensors/temp", "/sensors/*"));
1546 }
1547
1548 #[test]
1551 fn test_federation_peer_detection() {
1552 let fed_session = Session::stub_federation("hub-peer");
1553 assert!(fed_session.is_federation_peer());
1554
1555 let normal_session = Session::stub(None);
1556 assert!(!normal_session.is_federation_peer());
1557 }
1558
1559 #[test]
1560 fn test_federation_namespaces_lifecycle() {
1561 let session = Session::stub_federation("peer");
1562 assert!(session.federation_namespaces().is_empty());
1563
1564 session
1565 .set_federation_namespaces(vec!["/sensors/**".to_string(), "/lights/**".to_string()]);
1566 let ns = session.federation_namespaces();
1567 assert_eq!(ns.len(), 2);
1568 assert!(ns.contains(&"/sensors/**".to_string()));
1569 assert!(ns.contains(&"/lights/**".to_string()));
1570
1571 session.set_federation_namespaces(vec!["/audio/**".to_string()]);
1573 let ns = session.federation_namespaces();
1574 assert_eq!(ns.len(), 1);
1575 assert_eq!(ns[0], "/audio/**");
1576 }
1577
1578 #[test]
1579 fn test_federation_router_id() {
1580 let session = Session::stub_federation("peer");
1581 assert!(session.federation_router_id().is_none());
1582
1583 session.set_federation_router_id("hub-alpha".to_string());
1584 assert_eq!(session.federation_router_id().unwrap(), "hub-alpha");
1585 }
1586
1587 #[test]
1588 fn test_federation_subscription_id_range() {
1589 let session = Session::stub_federation("peer");
1593 session.add_subscription(1); session.add_subscription(50000); session.add_subscription(50001); let subs = session.subscriptions();
1598 assert_eq!(subs.len(), 3);
1599 assert!(subs.contains(&1));
1600 assert!(subs.contains(&50000));
1601 assert!(subs.contains(&50001));
1602
1603 session.remove_subscription(50000);
1605 let subs = session.subscriptions();
1606 assert_eq!(subs.len(), 2);
1607 assert!(subs.contains(&1));
1608 assert!(!subs.contains(&50000));
1609 }
1610
1611 #[test]
1614 fn test_resource_limits_are_sane() {
1615 const MAX_PATTERNS: usize = 1000;
1618 const MAX_REVISIONS: usize = 10_000;
1619 assert!(MAX_PATTERNS > 0 && MAX_PATTERNS <= 10_000);
1620 assert!(MAX_REVISIONS > 0 && MAX_REVISIONS <= 100_000);
1621 }
1622
1623 #[test]
1626 fn test_empty_strings() {
1627 assert!(federation_pattern_covered_by("", ""));
1629 assert!(!federation_pattern_covered_by("/a", ""));
1630 assert!(!federation_pattern_covered_by("", "/a"));
1631 }
1632
1633 #[test]
1634 fn test_root_slash_only() {
1635 assert!(federation_pattern_covered_by("/", "/"));
1637 assert!(federation_pattern_covered_by("/", "/**"));
1638 }
1639
1640 #[test]
1641 fn test_trailing_slash() {
1642 assert!(federation_pattern_covered_by("/sensors/", "/sensors/**"));
1644 assert!(federation_pattern_covered_by(
1645 "/sensors/temp/",
1646 "/sensors/**"
1647 ));
1648 }
1649
1650 #[test]
1651 fn test_double_slashes() {
1652 assert!(federation_pattern_covered_by(
1654 "//sensors//temp",
1655 "/sensors/**"
1656 ));
1657 }
1658
1659 #[test]
1660 fn test_deep_nesting_under_globstar() {
1661 assert!(federation_pattern_covered_by("/a/b/c/d/e/f/g", "/**"));
1662 assert!(federation_pattern_covered_by("/a/b/c/d/e/f/g/**", "/**"));
1663 assert!(federation_pattern_covered_by("/a/b/c/d/e", "/a/**"));
1664 assert!(!federation_pattern_covered_by("/a/b/c/d/e", "/b/**"));
1665 }
1666
1667 #[test]
1668 fn test_single_wildcard_depth_mismatch() {
1669 assert!(federation_pattern_covered_by("/a/b", "/a/*"));
1671 assert!(!federation_pattern_covered_by("/a/b/c", "/a/*"));
1672 }
1673
1674 #[test]
1675 fn test_wildcard_request_vs_literal_declared() {
1676 assert!(!federation_pattern_covered_by("/a/*", "/a/b"));
1678 assert!(!federation_pattern_covered_by("/a/**", "/a/b"));
1679 assert!(!federation_pattern_covered_by("/a/**", "/a/b/c"));
1680 }
1681
1682 #[test]
1683 fn test_request_globstar_vs_declared_single_wildcard() {
1684 assert!(!federation_pattern_covered_by("/a/**", "/a/*"));
1686 }
1687
1688 #[test]
1689 fn test_mixed_wildcards_in_declared() {
1690 assert!(federation_pattern_covered_by("/a/x/c/d", "/a/*/c/**"));
1692 assert!(!federation_pattern_covered_by("/a/x/y/d", "/a/*/c/**"));
1694 }
1695
1696 #[test]
1697 fn test_request_pattern_with_wildcards_in_middle() {
1698 assert!(!federation_pattern_covered_by("/a/*/c", "/a/b/**"));
1701 }
1702
1703 #[test]
1704 fn test_identical_wildcard_patterns() {
1705 assert!(federation_pattern_covered_by("/**", "/**"));
1706 assert!(federation_pattern_covered_by("/a/**", "/a/**"));
1707 assert!(federation_pattern_covered_by("/a/*", "/a/*"));
1708 }
1709
1710 #[test]
1711 fn test_path_traversal_segments() {
1712 assert!(!federation_pattern_covered_by(
1714 "/../sensors/temp",
1715 "/sensors/**"
1716 ));
1717 assert!(federation_pattern_covered_by("/../sensors/temp", "/**"));
1718 }
1719
1720 #[test]
1721 fn test_single_segment_patterns() {
1722 assert!(federation_pattern_covered_by("/a", "/a"));
1723 assert!(!federation_pattern_covered_by("/a", "/b"));
1724 assert!(federation_pattern_covered_by("/a", "/*"));
1725 assert!(federation_pattern_covered_by("/a", "/**"));
1726 }
1727
1728 #[test]
1729 fn test_declared_shorter_than_request_no_wildcard() {
1730 assert!(!federation_pattern_covered_by("/a/b/c", "/a/b"));
1732 }
1733
1734 #[test]
1735 fn test_request_shorter_than_declared() {
1736 assert!(!federation_pattern_covered_by("/a", "/a/b"));
1738 }
1739}
1740
1741#[cfg(test)]
1742mod transform_tests {
1743 use super::*;
1744 use clasp_core::Value;
1745
1746 struct DoubleTransform;
1748
1749 impl SignalTransform for DoubleTransform {
1750 fn transform(&self, address: &str, value: &Value) -> Option<Value> {
1751 if clasp_core::address::glob_match("/sensors/**", address) {
1752 match value {
1753 Value::Float(f) => Some(Value::Float(f * 2.0)),
1754 Value::Int(i) => Some(Value::Int(i * 2)),
1755 _ => None,
1756 }
1757 } else {
1758 None
1759 }
1760 }
1761 }
1762
1763 struct PassthroughTransform;
1765
1766 impl SignalTransform for PassthroughTransform {
1767 fn transform(&self, _address: &str, _value: &Value) -> Option<Value> {
1768 None
1769 }
1770 }
1771
1772 struct ClampTransform;
1774
1775 impl SignalTransform for ClampTransform {
1776 fn transform(&self, _address: &str, value: &Value) -> Option<Value> {
1777 match value {
1778 Value::Float(f) => {
1779 let clamped = f.clamp(0.0, 1.0);
1780 if (clamped - f).abs() > f64::EPSILON {
1781 Some(Value::Float(clamped))
1782 } else {
1783 None
1784 }
1785 }
1786 _ => None,
1787 }
1788 }
1789 }
1790
1791 #[test]
1794 fn transform_applied_to_matching_address() {
1795 let t = DoubleTransform;
1796 let result = t.transform("/sensors/temp", &Value::Float(22.5));
1797 assert_eq!(result, Some(Value::Float(45.0)));
1798 }
1799
1800 #[test]
1801 fn transform_applied_to_int_value() {
1802 let t = DoubleTransform;
1803 let result = t.transform("/sensors/pressure", &Value::Int(50));
1804 assert_eq!(result, Some(Value::Int(100)));
1805 }
1806
1807 #[test]
1808 fn transform_skips_non_matching_address() {
1809 let t = DoubleTransform;
1810 let result = t.transform("/lights/brightness", &Value::Float(0.5));
1811 assert_eq!(result, None);
1812 }
1813
1814 #[test]
1815 fn transform_handles_nested_glob_pattern() {
1816 let t = DoubleTransform;
1817 let result = t.transform("/sensors/room1/temp", &Value::Int(20));
1819 assert_eq!(result, Some(Value::Int(40)));
1820 }
1821
1822 #[test]
1823 fn transform_returns_none_for_non_numeric_on_match() {
1824 let t = DoubleTransform;
1825 let result = t.transform("/sensors/name", &Value::String("probe-1".into()));
1827 assert_eq!(result, None);
1828 }
1829
1830 #[test]
1831 fn passthrough_transform_always_returns_none() {
1832 let t = PassthroughTransform;
1833 assert_eq!(t.transform("/anything", &Value::Float(1.0)), None);
1834 assert_eq!(t.transform("/sensors/temp", &Value::Int(42)), None);
1835 assert_eq!(
1836 t.transform("/a/b/c", &Value::String("hello".into())),
1837 None
1838 );
1839 }
1840
1841 #[test]
1842 fn clamp_transform_caps_high_value() {
1843 let t = ClampTransform;
1844 assert_eq!(
1845 t.transform("/vol", &Value::Float(1.5)),
1846 Some(Value::Float(1.0))
1847 );
1848 }
1849
1850 #[test]
1851 fn clamp_transform_floors_low_value() {
1852 let t = ClampTransform;
1853 assert_eq!(
1854 t.transform("/vol", &Value::Float(-0.3)),
1855 Some(Value::Float(0.0))
1856 );
1857 }
1858
1859 #[test]
1860 fn clamp_transform_passes_through_in_range() {
1861 let t = ClampTransform;
1862 assert_eq!(t.transform("/vol", &Value::Float(0.5)), None);
1864 }
1865
1866 #[test]
1867 fn clamp_transform_ignores_non_float() {
1868 let t = ClampTransform;
1869 assert_eq!(t.transform("/vol", &Value::Int(5)), None);
1870 }
1871
1872 struct ChainTransform {
1878 inner: Vec<Arc<dyn SignalTransform>>,
1879 }
1880
1881 impl SignalTransform for ChainTransform {
1882 fn transform(&self, address: &str, value: &Value) -> Option<Value> {
1883 for t in &self.inner {
1884 if let Some(v) = t.transform(address, value) {
1885 return Some(v);
1886 }
1887 }
1888 None
1889 }
1890 }
1891
1892 #[test]
1893 fn chain_first_match_wins() {
1894 let chain = ChainTransform {
1897 inner: vec![Arc::new(ClampTransform), Arc::new(DoubleTransform)],
1898 };
1899 let result = chain.transform("/sensors/level", &Value::Float(5.0));
1900 assert_eq!(result, Some(Value::Float(1.0)));
1901 }
1902
1903 #[test]
1904 fn chain_falls_through_to_second() {
1905 let chain = ChainTransform {
1909 inner: vec![Arc::new(DoubleTransform), Arc::new(ClampTransform)],
1910 };
1911 let result = chain.transform("/lights/dim", &Value::Float(2.0));
1912 assert_eq!(result, Some(Value::Float(1.0)));
1913 }
1914
1915 #[test]
1916 fn chain_all_passthrough() {
1917 let chain = ChainTransform {
1918 inner: vec![Arc::new(PassthroughTransform), Arc::new(PassthroughTransform)],
1919 };
1920 let result = chain.transform("/any", &Value::Float(42.0));
1921 assert_eq!(result, None);
1922 }
1923
1924 #[test]
1927 fn router_accepts_transform() {
1928 let config = RouterConfig::default();
1929 let router = Router::new(config).with_transforms(Arc::new(DoubleTransform));
1930 assert!(router.transforms.is_some());
1931 }
1932
1933 #[test]
1934 fn router_without_transform_has_none() {
1935 let config = RouterConfig::default();
1936 let router = Router::new(config);
1937 assert!(router.transforms.is_none());
1938 }
1939
1940 #[test]
1941 fn router_state_set_bypasses_transform() {
1942 let config = RouterConfig::default();
1946 let router = Router::new(config).with_transforms(Arc::new(DoubleTransform));
1947 let writer = "test-session".to_string();
1948
1949 router
1950 .state()
1951 .set("/sensors/temp", Value::Float(22.5), &writer, None, false, false, None)
1952 .unwrap();
1953
1954 let stored = router.state().get("/sensors/temp").unwrap();
1956 assert_eq!(stored, Value::Float(22.5));
1957 }
1958}