use clasp_core::{
codec, CpskValidator, ErrorMessage, Message, SecurityMode, SignalType, TokenValidator,
};
#[cfg(feature = "rules")]
use clasp_core::{PublishMessage, SetMessage};
#[cfg(feature = "journal")]
use clasp_journal::Journal;
#[cfg(feature = "rules")]
use clasp_rules::RulesEngine;
use clasp_transport::{TransportEvent, TransportReceiver, TransportSender, TransportServer};
use dashmap::DashMap;
use parking_lot::RwLock;
use std::net::SocketAddr;
use std::sync::Arc;
use tracing::{debug, error, info, warn, Instrument};
#[cfg(feature = "websocket")]
use clasp_transport::WebSocketServer;
#[cfg(feature = "quic")]
use clasp_transport::{QuicConfig, QuicTransport};
use crate::{
error::{Result, RouterError},
gesture::GestureRegistry,
handlers,
p2p::P2PCapabilities,
session::{Session, SessionId},
state::{RouterState, RouterStateConfig},
subscription::SubscriptionManager,
};
use std::time::Duration;
pub trait WriteValidator: Send + Sync {
fn validate_write(
&self,
address: &str,
value: &clasp_core::Value,
session: &Session,
state: &RouterState,
) -> std::result::Result<(), String>;
}
pub trait SnapshotFilter: Send + Sync {
fn filter_snapshot(
&self,
params: Vec<clasp_core::ParamValue>,
session: &Session,
state: &RouterState,
) -> Vec<clasp_core::ParamValue>;
}
const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(10);
#[derive(Debug, Clone)]
pub enum TransportConfig {
#[cfg(feature = "websocket")]
WebSocket {
addr: String,
},
#[cfg(feature = "quic")]
Quic {
addr: SocketAddr,
cert: Vec<u8>,
key: Vec<u8>,
},
}
#[derive(Debug, Clone, Default)]
pub struct MultiProtocolConfig {
#[cfg(feature = "websocket")]
pub websocket_addr: Option<String>,
#[cfg(feature = "quic")]
pub quic: Option<QuicServerConfig>,
#[cfg(feature = "mqtt-server")]
pub mqtt: Option<crate::adapters::MqttServerConfig>,
#[cfg(feature = "osc-server")]
pub osc: Option<crate::adapters::OscServerConfig>,
}
#[cfg(feature = "quic")]
#[derive(Debug, Clone)]
pub struct QuicServerConfig {
pub addr: SocketAddr,
pub cert: Vec<u8>,
pub key: Vec<u8>,
}
#[derive(Debug, Clone)]
pub struct RouterConfig {
pub name: String,
pub features: Vec<String>,
pub max_sessions: usize,
pub session_timeout: u64,
pub security_mode: SecurityMode,
pub max_subscriptions_per_session: usize,
pub gesture_coalescing: bool,
pub gesture_coalesce_interval_ms: u64,
pub max_messages_per_second: u32,
pub rate_limiting_enabled: bool,
pub state_config: RouterStateConfig,
}
impl Default for RouterConfig {
fn default() -> Self {
Self {
name: "Clasp Router".to_string(),
features: vec![
"param".to_string(),
"event".to_string(),
"stream".to_string(),
"timeline".to_string(),
"gesture".to_string(),
],
max_sessions: 100,
session_timeout: 300,
security_mode: SecurityMode::Open,
max_subscriptions_per_session: 1000, gesture_coalescing: true,
gesture_coalesce_interval_ms: 16,
max_messages_per_second: 1000, rate_limiting_enabled: true,
state_config: RouterStateConfig::default(), }
}
}
#[derive(Debug, Clone, Default)]
pub struct RouterConfigBuilder {
config: RouterConfig,
}
impl RouterConfigBuilder {
pub fn new() -> Self {
Self::default()
}
pub fn name(mut self, name: impl Into<String>) -> Self {
self.config.name = name.into();
self
}
pub fn max_sessions(mut self, max: usize) -> Self {
self.config.max_sessions = max;
self
}
pub fn session_timeout(mut self, secs: u64) -> Self {
self.config.session_timeout = secs;
self
}
pub fn security_mode(mut self, mode: SecurityMode) -> Self {
self.config.security_mode = mode;
self
}
pub fn gesture_coalescing(mut self, enabled: bool) -> Self {
self.config.gesture_coalescing = enabled;
self
}
pub fn gesture_coalesce_interval_ms(mut self, ms: u64) -> Self {
self.config.gesture_coalesce_interval_ms = ms;
self
}
pub fn build(self) -> RouterConfig {
self.config
}
}
pub struct Router {
config: RouterConfig,
sessions: Arc<DashMap<SessionId, Arc<Session>>>,
subscriptions: Arc<SubscriptionManager>,
state: Arc<RouterState>,
running: Arc<RwLock<bool>>,
token_validator: Option<Arc<dyn TokenValidator>>,
p2p_capabilities: Arc<P2PCapabilities>,
gesture_registry: Option<Arc<GestureRegistry>>,
write_validator: Option<Arc<dyn WriteValidator>>,
snapshot_filter: Option<Arc<dyn SnapshotFilter>>,
#[cfg(feature = "rules")]
rules_engine: Option<Arc<parking_lot::Mutex<RulesEngine>>>,
}
impl Router {
pub fn new(config: RouterConfig) -> Self {
let gesture_registry = if config.gesture_coalescing {
Some(Arc::new(GestureRegistry::new(Duration::from_millis(
config.gesture_coalesce_interval_ms,
))))
} else {
None
};
let state = Arc::new(RouterState::with_config(config.state_config.clone()));
Self {
config,
sessions: Arc::new(DashMap::new()),
subscriptions: Arc::new(SubscriptionManager::new()),
state,
running: Arc::new(RwLock::new(false)),
token_validator: None,
p2p_capabilities: Arc::new(P2PCapabilities::new()),
gesture_registry,
write_validator: None,
snapshot_filter: None,
#[cfg(feature = "rules")]
rules_engine: None,
}
}
pub fn with_validator<V: TokenValidator + 'static>(mut self, validator: V) -> Self {
self.token_validator = Some(Arc::new(validator));
self
}
pub fn set_validator<V: TokenValidator + 'static>(&mut self, validator: V) {
self.token_validator = Some(Arc::new(validator));
}
pub fn set_write_validator<V: WriteValidator + 'static>(&mut self, validator: V) {
self.write_validator = Some(Arc::new(validator));
}
pub fn set_write_validator_arc(&mut self, validator: Arc<dyn WriteValidator>) {
self.write_validator = Some(validator);
}
pub fn set_snapshot_filter<F: SnapshotFilter + 'static>(&mut self, filter: F) {
self.snapshot_filter = Some(Arc::new(filter));
}
pub fn set_snapshot_filter_arc(&mut self, filter: Arc<dyn SnapshotFilter>) {
self.snapshot_filter = Some(filter);
}
#[cfg(feature = "journal")]
pub fn with_journal(mut self, journal: Arc<dyn Journal>) -> Self {
let mut state = RouterState::with_config(self.config.state_config.clone());
state.set_journal(journal);
self.state = Arc::new(state);
self
}
#[cfg(feature = "rules")]
pub fn with_rules(mut self, engine: RulesEngine) -> Self {
self.rules_engine = Some(Arc::new(parking_lot::Mutex::new(engine)));
self
}
#[cfg(feature = "rules")]
pub fn rules_engine(&self) -> Option<&Arc<parking_lot::Mutex<RulesEngine>>> {
self.rules_engine.as_ref()
}
pub fn cpsk_validator(&self) -> Option<&CpskValidator> {
self.token_validator
.as_ref()
.and_then(|v| v.as_any().downcast_ref::<CpskValidator>())
}
pub fn security_mode(&self) -> SecurityMode {
self.config.security_mode
}
pub async fn serve_on<S>(&self, mut server: S) -> Result<()>
where
S: TransportServer + 'static,
S::Sender: 'static,
S::Receiver: 'static,
{
info!("Router accepting connections");
*self.running.write() = true;
if self.config.session_timeout > 0 {
self.start_session_cleanup_task();
}
if let Some(ref registry) = self.gesture_registry {
self.start_gesture_flush_task(Arc::clone(registry));
}
self.start_state_cleanup_task();
while *self.running.read() {
match server.accept().await {
Ok((sender, receiver, addr)) => {
let current_sessions = self.sessions.len();
if current_sessions >= self.config.max_sessions {
warn!(
"Rejecting connection from {}: max sessions reached ({}/{})",
addr, current_sessions, self.config.max_sessions
);
continue;
}
info!("New connection from {}", addr);
#[cfg(feature = "metrics")]
metrics::gauge!("clasp_sessions_active").increment(1.0);
self.handle_connection(Arc::new(sender), receiver, addr);
}
Err(e) => {
debug!("Accept error: {}", e);
}
}
}
Ok(())
}
fn start_gesture_flush_task(&self, registry: Arc<GestureRegistry>) {
if self.config.gesture_coalesce_interval_ms == 0 {
return;
}
let sessions = Arc::clone(&self.sessions);
let subscriptions = Arc::clone(&self.subscriptions);
let running = Arc::clone(&self.running);
let flush_interval = Duration::from_millis(self.config.gesture_coalesce_interval_ms);
tokio::spawn(async move {
let mut ticker = tokio::time::interval(flush_interval);
loop {
ticker.tick().await;
if !*running.read() {
break;
}
let to_flush = registry.flush_stale();
for pub_msg in to_flush {
let msg = Message::Publish(pub_msg.clone());
let subscribers =
subscriptions.find_subscribers(&pub_msg.address, Some(SignalType::Gesture));
if let Ok(bytes) = codec::encode(&msg) {
for sub_session_id in subscribers {
if let Some(sub_session) = sessions.get(&sub_session_id) {
crate::handlers::try_send_with_drop_tracking_sync(
sub_session.value(),
bytes.clone(),
&sub_session_id,
);
}
}
}
}
registry.cleanup_stale(Duration::from_secs(300));
}
debug!("Gesture flush task stopped");
});
}
fn start_session_cleanup_task(&self) {
let sessions = Arc::clone(&self.sessions);
let subscriptions = Arc::clone(&self.subscriptions);
let running = Arc::clone(&self.running);
let timeout_secs = self.config.session_timeout;
tokio::spawn(async move {
let check_interval = std::time::Duration::from_secs(timeout_secs / 4)
.max(std::time::Duration::from_secs(10));
let timeout = std::time::Duration::from_secs(timeout_secs);
loop {
tokio::time::sleep(check_interval).await;
if !*running.read() {
break;
}
let timed_out: Vec<SessionId> = sessions
.iter()
.filter(|entry| entry.value().idle_duration() > timeout)
.map(|entry| entry.key().clone())
.collect();
for session_id in timed_out {
if let Some((id, session)) = sessions.remove(&session_id) {
info!(
"Session {} timed out after {:?} idle",
id,
session.idle_duration()
);
subscriptions.remove_session(&id);
}
}
}
debug!("Session cleanup task stopped");
});
}
fn start_state_cleanup_task(&self) {
let state = Arc::clone(&self.state);
let running = Arc::clone(&self.running);
#[cfg(feature = "metrics")]
let sessions = Arc::clone(&self.sessions);
#[cfg(feature = "metrics")]
let subscriptions = Arc::clone(&self.subscriptions);
tokio::spawn(async move {
let cleanup_interval = std::time::Duration::from_secs(60);
loop {
tokio::time::sleep(cleanup_interval).await;
if !*running.read() {
break;
}
let (params_removed, signals_removed) = state.cleanup_stale();
if params_removed > 0 || signals_removed > 0 {
debug!(
"State cleanup: removed {} stale params, {} stale signals",
params_removed, signals_removed
);
}
#[cfg(feature = "metrics")]
{
metrics::gauge!("clasp_state_params_active").set(state.len() as f64);
metrics::gauge!("clasp_sessions_active").set(sessions.len() as f64);
metrics::gauge!("clasp_subscriptions_active").set(subscriptions.len() as f64);
}
}
debug!("State cleanup task stopped");
});
}
#[cfg(feature = "websocket")]
pub async fn serve_websocket(&self, addr: &str) -> Result<()> {
let server = WebSocketServer::bind(addr).await?;
info!("WebSocket server listening on {}", addr);
self.serve_on(server).await
}
#[cfg(feature = "websocket")]
pub async fn serve(&self, addr: &str) -> Result<()> {
self.serve_websocket(addr).await
}
#[cfg(feature = "quic")]
pub async fn serve_quic(
&self,
addr: SocketAddr,
cert_der: Vec<u8>,
key_der: Vec<u8>,
) -> Result<()> {
let server = QuicTransport::new_server(addr, cert_der, key_der)
.map_err(|e| RouterError::Transport(e))?;
info!("QUIC server listening on {}", addr);
self.serve_quic_transport(server).await
}
#[cfg(feature = "quic")]
async fn serve_quic_transport(&self, server: QuicTransport) -> Result<()> {
*self.running.write() = true;
while *self.running.read() {
match server.accept().await {
Ok(connection) => {
let addr = connection.remote_address();
info!("QUIC connection from {}", addr);
match connection.accept_bi().await {
Ok((sender, receiver)) => {
self.handle_connection(Arc::new(sender), receiver, addr);
}
Err(e) => {
error!("QUIC stream accept error: {}", e);
}
}
}
Err(e) => {
error!("QUIC accept error: {}", e);
}
}
}
Ok(())
}
pub async fn serve_multi(&self, transports: Vec<TransportConfig>) -> Result<()> {
use futures::future::try_join_all;
if transports.is_empty() {
return Err(RouterError::Config("No transports configured".into()));
}
let mut handles = vec![];
for config in transports {
let router = self.clone_internal();
let handle = tokio::spawn(async move {
match config {
#[cfg(feature = "websocket")]
TransportConfig::WebSocket { addr } => router.serve_websocket(&addr).await,
#[cfg(feature = "quic")]
TransportConfig::Quic { addr, cert, key } => {
router.serve_quic(addr, cert, key).await
}
#[allow(unreachable_patterns)]
_ => Err(RouterError::Config(
"Transport not enabled at compile time".into(),
)),
}
});
handles.push(handle);
}
let results = try_join_all(handles)
.await
.map_err(|e| RouterError::Config(format!("Transport task failed: {}", e)))?;
for result in results {
result?;
}
Ok(())
}
pub async fn serve_all(&self, config: MultiProtocolConfig) -> Result<()> {
use futures::future::select_all;
let mut handles: Vec<tokio::task::JoinHandle<Result<()>>> = vec![];
let mut protocol_names: Vec<&str> = vec![];
#[cfg(feature = "websocket")]
if let Some(ref addr) = config.websocket_addr {
info!("Starting WebSocket server on {}", addr);
protocol_names.push("WebSocket");
let router = self.clone_internal();
let addr = addr.clone();
handles.push(tokio::spawn(
async move { router.serve_websocket(&addr).await },
));
}
#[cfg(feature = "quic")]
if let Some(ref quic_config) = config.quic {
info!("Starting QUIC server on {}", quic_config.addr);
protocol_names.push("QUIC");
let router = self.clone_internal();
let addr = quic_config.addr;
let cert = quic_config.cert.clone();
let key = quic_config.key.clone();
handles.push(tokio::spawn(async move {
router.serve_quic(addr, cert, key).await
}));
}
#[cfg(feature = "mqtt-server")]
if let Some(mqtt_config) = config.mqtt {
info!("Starting MQTT server on {}", mqtt_config.bind_addr);
protocol_names.push("MQTT");
let adapter = crate::adapters::MqttServerAdapter::new(
mqtt_config,
Arc::clone(&self.sessions),
Arc::clone(&self.subscriptions),
Arc::clone(&self.state),
);
handles.push(tokio::spawn(async move { adapter.serve().await }));
}
#[cfg(feature = "osc-server")]
if let Some(osc_config) = config.osc {
info!("Starting OSC server on {}", osc_config.bind_addr);
protocol_names.push("OSC");
let adapter = crate::adapters::OscServerAdapter::new(
osc_config,
Arc::clone(&self.sessions),
Arc::clone(&self.subscriptions),
Arc::clone(&self.state),
);
handles.push(tokio::spawn(async move { adapter.serve().await }));
}
if handles.is_empty() {
return Err(RouterError::Config("No protocols configured".into()));
}
info!(
"Multi-protocol server running with {} protocols: {}",
handles.len(),
protocol_names.join(", ")
);
*self.running.write() = true;
if self.config.session_timeout > 0 {
self.start_session_cleanup_task();
}
if let Some(ref registry) = self.gesture_registry {
self.start_gesture_flush_task(Arc::clone(registry));
}
self.start_state_cleanup_task();
loop {
if handles.is_empty() {
break;
}
let (result, _index, remaining) = select_all(handles).await;
handles = remaining;
match result {
Ok(Ok(())) => {
debug!("Protocol server completed normally");
}
Ok(Err(e)) => {
error!("Protocol server error: {}", e);
}
Err(e) => {
error!("Protocol server task panicked: {}", e);
}
}
}
Ok(())
}
#[allow(clippy::type_complexity)]
pub fn shared_state(
&self,
) -> (
Arc<DashMap<SessionId, Arc<Session>>>,
Arc<SubscriptionManager>,
Arc<RouterState>,
) {
(
Arc::clone(&self.sessions),
Arc::clone(&self.subscriptions),
Arc::clone(&self.state),
)
}
fn clone_internal(&self) -> Self {
Self {
config: self.config.clone(),
sessions: Arc::clone(&self.sessions),
subscriptions: Arc::clone(&self.subscriptions),
state: Arc::clone(&self.state),
running: Arc::clone(&self.running),
token_validator: self.token_validator.clone(),
p2p_capabilities: Arc::clone(&self.p2p_capabilities),
gesture_registry: self.gesture_registry.clone(),
write_validator: self.write_validator.clone(),
snapshot_filter: self.snapshot_filter.clone(),
#[cfg(feature = "rules")]
rules_engine: self.rules_engine.clone(),
}
}
pub fn active_gesture_count(&self) -> usize {
self.gesture_registry
.as_ref()
.map(|r| r.active_count())
.unwrap_or(0)
}
fn handle_connection(
&self,
sender: Arc<dyn TransportSender>,
mut receiver: impl TransportReceiver + 'static,
addr: SocketAddr,
) {
let sessions = Arc::clone(&self.sessions);
let subscriptions = Arc::clone(&self.subscriptions);
let state = Arc::clone(&self.state);
let config = self.config.clone();
let running = Arc::clone(&self.running);
let token_validator = self.token_validator.clone();
let security_mode = self.config.security_mode;
let p2p_capabilities = Arc::clone(&self.p2p_capabilities);
let gesture_registry = self.gesture_registry.clone();
let write_validator = self.write_validator.clone();
let snapshot_filter = self.snapshot_filter.clone();
#[cfg(feature = "rules")]
let rules_engine = self.rules_engine.clone();
let conn_span =
tracing::info_span!("connection", session_id = tracing::field::Empty, remote = %addr);
tokio::spawn(
async move {
let mut session: Option<Arc<Session>> = None;
let mut handshake_complete = false;
let handshake_result = tokio::time::timeout(HANDSHAKE_TIMEOUT, async {
loop {
match receiver.recv().await {
Some(TransportEvent::Data(data)) => {
match codec::decode(&data) {
Ok((msg, _)) => {
if matches!(msg, Message::Hello(_)) {
return Some(data);
} else {
warn!(
"Received non-Hello message before handshake from {}",
addr
);
return None;
}
}
Err(e) => {
warn!("Decode error during handshake from {}: {}", addr, e);
return None;
}
}
}
Some(TransportEvent::Disconnected { .. }) | None => {
return None;
}
Some(TransportEvent::Error(e)) => {
error!("Transport error during handshake from {}: {}", addr, e);
return None;
}
_ => {}
}
}
})
.await;
let hello_data = match handshake_result {
Ok(Some(data)) => data,
Ok(None) => {
debug!("Handshake failed for {}", addr);
return;
}
Err(_) => {
warn!(
"Handshake timeout for {} after {:?}",
addr, HANDSHAKE_TIMEOUT
);
return;
}
};
if let Ok((msg, frame)) = codec::decode(&hello_data) {
let ctx = handlers::HandlerContext {
session: &session,
sender: &sender,
sessions: &sessions,
subscriptions: &subscriptions,
state: &state,
config: &config,
security_mode,
token_validator: &token_validator,
p2p_capabilities: &p2p_capabilities,
gesture_registry: &gesture_registry,
write_validator: &write_validator,
snapshot_filter: &snapshot_filter,
#[cfg(feature = "rules")]
rules_engine: &rules_engine,
};
if let Some(response) = handlers::handle_message(&msg, &frame, &ctx).await {
match response {
handlers::MessageResult::NewSession(s) => {
tracing::Span::current()
.record("session_id", tracing::field::display(&s.id));
session = Some(s);
handshake_complete = true;
}
handlers::MessageResult::Send(bytes) => {
let _ = sender.send(bytes).await;
}
handlers::MessageResult::Disconnect => {
info!(
"Disconnecting client {} due to auth failure during handshake",
addr
);
return;
}
_ => {}
}
}
}
if !handshake_complete {
debug!("Handshake incomplete for {}", addr);
return;
}
while *running.read() {
match receiver.recv().await {
Some(TransportEvent::Data(data)) => {
if config.rate_limiting_enabled {
if let Some(ref s) = session {
if !s.check_rate_limit(config.max_messages_per_second) {
warn!(
"Rate limit exceeded for session {} ({} msgs/sec > {})",
s.id,
s.messages_per_second(),
config.max_messages_per_second
);
let error = Message::Error(ErrorMessage {
code: 429, message: format!(
"Rate limit exceeded: {} messages/second",
config.max_messages_per_second
),
address: None,
correlation_id: None,
});
if let Ok(bytes) = codec::encode(&error) {
let _ = sender.send(bytes).await;
}
continue;
}
}
}
match codec::decode(&data) {
Ok((msg, frame)) => {
let ctx = handlers::HandlerContext {
session: &session,
sender: &sender,
sessions: &sessions,
subscriptions: &subscriptions,
state: &state,
config: &config,
security_mode,
token_validator: &token_validator,
p2p_capabilities: &p2p_capabilities,
gesture_registry: &gesture_registry,
write_validator: &write_validator,
snapshot_filter: &snapshot_filter,
#[cfg(feature = "rules")]
rules_engine: &rules_engine,
};
if let Some(response) =
handlers::handle_message(&msg, &frame, &ctx).await
{
match response {
handlers::MessageResult::NewSession(s) => {
session = Some(s);
}
handlers::MessageResult::Send(bytes) => {
if let Err(e) = sender.send(bytes).await {
error!("Send error: {}", e);
break;
}
}
handlers::MessageResult::Broadcast(bytes, exclude) => {
handlers::broadcast_to_subscribers(
&bytes, &sessions, &exclude,
);
}
handlers::MessageResult::Disconnect => {
info!(
"Disconnecting client {} due to auth failure",
addr
);
break;
}
handlers::MessageResult::None => {}
}
}
}
Err(e) => {
warn!("Decode error from {}: {}", addr, e);
}
}
}
Some(TransportEvent::Disconnected { reason }) => {
info!("Client {} disconnected: {:?}", addr, reason);
break;
}
Some(TransportEvent::Error(e)) => {
error!("Transport error from {}: {}", addr, e);
break;
}
None => {
break;
}
_ => {}
}
}
if let Some(s) = session {
info!("Removing session {}", s.id);
sessions.remove(&s.id);
subscriptions.remove_session(&s.id);
p2p_capabilities.unregister(&s.id);
#[cfg(feature = "metrics")]
metrics::gauge!("clasp_sessions_active").decrement(1.0);
}
}
.instrument(conn_span),
);
}
pub fn stop(&self) {
*self.running.write() = false;
}
pub fn session_count(&self) -> usize {
self.sessions.len()
}
pub fn state(&self) -> &RouterState {
&self.state
}
pub fn subscription_count(&self) -> usize {
self.subscriptions.len()
}
}
impl Default for Router {
fn default() -> Self {
Self::new(RouterConfig::default())
}
}
#[cfg(feature = "rules")]
pub fn execute_rule_actions(
actions: Vec<clasp_rules::PendingAction>,
state: &Arc<RouterState>,
sessions: &Arc<DashMap<SessionId, Arc<Session>>>,
subscriptions: &Arc<SubscriptionManager>,
) {
for action in actions {
match action.action {
clasp_rules::RuleAction::Set { address, value } => {
match state.set(&address, value.clone(), &action.origin, None, false, false) {
Ok(revision) => {
let subscribers =
subscriptions.find_subscribers(&address, Some(SignalType::Param));
let set_msg = Message::Set(SetMessage {
address: address.clone(),
value,
revision: Some(revision),
lock: false,
unlock: false,
});
if let Ok(bytes) = codec::encode(&set_msg) {
for sub_session_id in subscribers {
if let Some(sub_session) = sessions.get(&sub_session_id) {
crate::handlers::try_send_with_drop_tracking_sync(
sub_session.value(),
bytes.clone(),
&sub_session_id,
);
}
}
}
debug!("Rule {} applied SET to {}", action.rule_id, address);
}
Err(e) => {
warn!("Rule {} SET to {} failed: {:?}", action.rule_id, address, e);
}
}
}
clasp_rules::RuleAction::Publish {
address,
signal,
value,
} => {
let pub_msg = Message::Publish(PublishMessage {
address: address.clone(),
signal: Some(signal),
value,
payload: None,
samples: None,
rate: None,
id: None,
phase: None,
timestamp: None,
timeline: None,
});
let subscribers = subscriptions.find_subscribers(&address, Some(signal));
if let Ok(bytes) = codec::encode(&pub_msg) {
for sub_session_id in subscribers {
if let Some(sub_session) = sessions.get(&sub_session_id) {
crate::handlers::try_send_with_drop_tracking_sync(
sub_session.value(),
bytes.clone(),
&sub_session_id,
);
}
}
}
debug!("Rule {} applied PUBLISH to {}", action.rule_id, address);
}
clasp_rules::RuleAction::SetFromTrigger { address, transform } => {
if let Some(current) = state.get(&address) {
let transformed = transform.apply(¤t);
match state.set(
&address,
transformed.clone(),
&action.origin,
None,
false,
false,
) {
Ok(revision) => {
let subscribers =
subscriptions.find_subscribers(&address, Some(SignalType::Param));
let set_msg = Message::Set(SetMessage {
address: address.clone(),
value: transformed,
revision: Some(revision),
lock: false,
unlock: false,
});
if let Ok(bytes) = codec::encode(&set_msg) {
for sub_session_id in subscribers {
if let Some(sub_session) = sessions.get(&sub_session_id) {
crate::handlers::try_send_with_drop_tracking_sync(
sub_session.value(),
bytes.clone(),
&sub_session_id,
);
}
}
}
debug!(
"Rule {} applied SetFromTrigger to {}",
action.rule_id, address
);
}
Err(e) => {
warn!(
"Rule {} SetFromTrigger to {} failed: {:?}",
action.rule_id, address, e
);
}
}
}
}
clasp_rules::RuleAction::Delay { .. } => {
}
}
}
}
#[cfg(feature = "federation")]
pub(crate) fn federation_pattern_covered_by(request: &str, declared: &str) -> bool {
if request == declared {
return true;
}
let request_has_wildcards = request.contains('*');
if !request_has_wildcards && clasp_core::address::glob_match(declared, request) {
return true;
}
let decl_parts: Vec<&str> = declared.split('/').filter(|s| !s.is_empty()).collect();
let req_parts: Vec<&str> = request.split('/').filter(|s| !s.is_empty()).collect();
let mut di = 0;
let mut ri = 0;
while di < decl_parts.len() && ri < req_parts.len() {
let dp = decl_parts[di];
let rp = req_parts[ri];
if dp == "**" {
return true;
}
if rp == "**" {
return false;
}
if dp == "*" {
if rp == "*" {
di += 1;
ri += 1;
continue;
}
di += 1;
ri += 1;
continue;
}
if rp == "*" {
return false;
}
if dp != rp {
return false;
}
di += 1;
ri += 1;
}
if di < decl_parts.len() && decl_parts[di] == "**" {
return true;
}
di >= decl_parts.len() && ri >= req_parts.len()
}
#[cfg(all(test, feature = "federation"))]
mod federation_tests {
use super::*;
#[test]
fn test_exact_match() {
assert!(federation_pattern_covered_by(
"/sensors/temp",
"/sensors/temp"
));
}
#[test]
fn test_concrete_within_globstar() {
assert!(federation_pattern_covered_by(
"/sensors/temp/1",
"/sensors/**"
));
assert!(federation_pattern_covered_by(
"/sensors/temp",
"/sensors/**"
));
}
#[test]
fn test_sub_pattern_within_globstar() {
assert!(federation_pattern_covered_by(
"/sensors/temp/**",
"/sensors/**"
));
assert!(federation_pattern_covered_by(
"/sensors/temp/*",
"/sensors/**"
));
}
#[test]
fn test_globstar_root_covers_all() {
assert!(federation_pattern_covered_by("/sensors/**", "/**"));
assert!(federation_pattern_covered_by("/anything/deep/path", "/**"));
}
#[test]
fn test_disjoint_namespaces_rejected() {
assert!(!federation_pattern_covered_by("/audio/**", "/sensors/**"));
assert!(!federation_pattern_covered_by(
"/audio/mixer",
"/sensors/**"
));
}
#[test]
fn test_wider_pattern_rejected() {
assert!(!federation_pattern_covered_by("/**", "/sensors/**"));
}
#[test]
fn test_wildcard_in_request_wider_than_literal() {
assert!(!federation_pattern_covered_by(
"/sensors/*",
"/sensors/temp"
));
}
#[test]
fn test_declared_single_wildcard() {
assert!(federation_pattern_covered_by("/sensors/temp", "/sensors/*"));
}
#[test]
fn test_federation_peer_detection() {
let fed_session = Session::stub_federation("hub-peer");
assert!(fed_session.is_federation_peer());
let normal_session = Session::stub(None);
assert!(!normal_session.is_federation_peer());
}
#[test]
fn test_federation_namespaces_lifecycle() {
let session = Session::stub_federation("peer");
assert!(session.federation_namespaces().is_empty());
session
.set_federation_namespaces(vec!["/sensors/**".to_string(), "/lights/**".to_string()]);
let ns = session.federation_namespaces();
assert_eq!(ns.len(), 2);
assert!(ns.contains(&"/sensors/**".to_string()));
assert!(ns.contains(&"/lights/**".to_string()));
session.set_federation_namespaces(vec!["/audio/**".to_string()]);
let ns = session.federation_namespaces();
assert_eq!(ns.len(), 1);
assert_eq!(ns[0], "/audio/**");
}
#[test]
fn test_federation_router_id() {
let session = Session::stub_federation("peer");
assert!(session.federation_router_id().is_none());
session.set_federation_router_id("hub-alpha".to_string());
assert_eq!(session.federation_router_id().unwrap(), "hub-alpha");
}
#[test]
fn test_federation_subscription_id_range() {
let session = Session::stub_federation("peer");
session.add_subscription(1); session.add_subscription(50000); session.add_subscription(50001);
let subs = session.subscriptions();
assert_eq!(subs.len(), 3);
assert!(subs.contains(&1));
assert!(subs.contains(&50000));
assert!(subs.contains(&50001));
session.remove_subscription(50000);
let subs = session.subscriptions();
assert_eq!(subs.len(), 2);
assert!(subs.contains(&1));
assert!(!subs.contains(&50000));
}
#[test]
fn test_resource_limits_are_sane() {
const MAX_PATTERNS: usize = 1000;
const MAX_REVISIONS: usize = 10_000;
assert!(MAX_PATTERNS > 0 && MAX_PATTERNS <= 10_000);
assert!(MAX_REVISIONS > 0 && MAX_REVISIONS <= 100_000);
}
#[test]
fn test_empty_strings() {
assert!(federation_pattern_covered_by("", ""));
assert!(!federation_pattern_covered_by("/a", ""));
assert!(!federation_pattern_covered_by("", "/a"));
}
#[test]
fn test_root_slash_only() {
assert!(federation_pattern_covered_by("/", "/"));
assert!(federation_pattern_covered_by("/", "/**"));
}
#[test]
fn test_trailing_slash() {
assert!(federation_pattern_covered_by("/sensors/", "/sensors/**"));
assert!(federation_pattern_covered_by(
"/sensors/temp/",
"/sensors/**"
));
}
#[test]
fn test_double_slashes() {
assert!(federation_pattern_covered_by(
"//sensors//temp",
"/sensors/**"
));
}
#[test]
fn test_deep_nesting_under_globstar() {
assert!(federation_pattern_covered_by("/a/b/c/d/e/f/g", "/**"));
assert!(federation_pattern_covered_by("/a/b/c/d/e/f/g/**", "/**"));
assert!(federation_pattern_covered_by("/a/b/c/d/e", "/a/**"));
assert!(!federation_pattern_covered_by("/a/b/c/d/e", "/b/**"));
}
#[test]
fn test_single_wildcard_depth_mismatch() {
assert!(federation_pattern_covered_by("/a/b", "/a/*"));
assert!(!federation_pattern_covered_by("/a/b/c", "/a/*"));
}
#[test]
fn test_wildcard_request_vs_literal_declared() {
assert!(!federation_pattern_covered_by("/a/*", "/a/b"));
assert!(!federation_pattern_covered_by("/a/**", "/a/b"));
assert!(!federation_pattern_covered_by("/a/**", "/a/b/c"));
}
#[test]
fn test_request_globstar_vs_declared_single_wildcard() {
assert!(!federation_pattern_covered_by("/a/**", "/a/*"));
}
#[test]
fn test_mixed_wildcards_in_declared() {
assert!(federation_pattern_covered_by("/a/x/c/d", "/a/*/c/**"));
assert!(!federation_pattern_covered_by("/a/x/y/d", "/a/*/c/**"));
}
#[test]
fn test_request_pattern_with_wildcards_in_middle() {
assert!(!federation_pattern_covered_by("/a/*/c", "/a/b/**"));
}
#[test]
fn test_identical_wildcard_patterns() {
assert!(federation_pattern_covered_by("/**", "/**"));
assert!(federation_pattern_covered_by("/a/**", "/a/**"));
assert!(federation_pattern_covered_by("/a/*", "/a/*"));
}
#[test]
fn test_path_traversal_segments() {
assert!(!federation_pattern_covered_by(
"/../sensors/temp",
"/sensors/**"
));
assert!(federation_pattern_covered_by("/../sensors/temp", "/**"));
}
#[test]
fn test_single_segment_patterns() {
assert!(federation_pattern_covered_by("/a", "/a"));
assert!(!federation_pattern_covered_by("/a", "/b"));
assert!(federation_pattern_covered_by("/a", "/*"));
assert!(federation_pattern_covered_by("/a", "/**"));
}
#[test]
fn test_declared_shorter_than_request_no_wildcard() {
assert!(!federation_pattern_covered_by("/a/b/c", "/a/b"));
}
#[test]
fn test_request_shorter_than_declared() {
assert!(!federation_pattern_covered_by("/a", "/a/b"));
}
}