clasp-router 4.0.0

CLASP message router and server
Documentation
//! SUBSCRIBE / UNSUBSCRIBE message handlers.
//!
//! Manages per-session subscriptions with glob-pattern matching, enforces
//! per-session subscription limits, and sends filtered snapshots on subscribe.

use clasp_core::{codec, ErrorMessage, Message, SecurityMode};
use tracing::{debug, warn};

use super::{send_chunked_snapshot, HandlerContext, MessageResult};
use crate::subscription::Subscription;

pub(crate) async fn handle_subscribe(
    sub: &clasp_core::SubscribeMessage,
    ctx: &HandlerContext<'_>,
) -> Option<MessageResult> {
    let session = ctx.session.as_ref()?;

    let current_subs = session.subscriptions().len();
    let max_subs = 1000;
    if current_subs >= max_subs {
        warn!(
            "Session {} subscription limit reached ({}/{})",
            session.id, current_subs, max_subs
        );
        #[cfg(feature = "metrics")]
        metrics::counter!("clasp_errors_total", "code" => "429").increment(1);
        let error = Message::Error(ErrorMessage {
            code: 429,
            message: format!("Subscription limit reached (max {})", max_subs),
            address: Some(sub.pattern.clone()),
            correlation_id: None,
        });
        let bytes = codec::encode(&error).ok()?;
        return Some(MessageResult::Send(bytes));
    }

    if ctx.security_mode == SecurityMode::Authenticated
        && !session.has_strict_read_scope(&sub.pattern)
    {
        warn!(
            "Session {} denied SUBSCRIBE to {} - insufficient scope",
            session.id, sub.pattern
        );
        let error = Message::Error(ErrorMessage {
            code: 301,
            message: "Insufficient scope for subscription".to_string(),
            address: Some(sub.pattern.clone()),
            correlation_id: None,
        });
        let bytes = codec::encode(&error).ok()?;
        return Some(MessageResult::Send(bytes));
    }

    match Subscription::new(
        sub.id,
        session.id.clone(),
        &sub.pattern,
        sub.types.clone(),
        sub.options.clone().unwrap_or_default(),
    ) {
        Ok(subscription) => {
            ctx.subscriptions.add(subscription);
            session.add_subscription(sub.id);
            #[cfg(feature = "metrics")]
            metrics::gauge!("clasp_subscriptions_active").increment(1.0);
            debug!("Session {} subscribed to {}", session.id, sub.pattern);

            let mut snapshot = ctx.state.snapshot(&sub.pattern);
            if let Some(ref filter) = ctx.snapshot_filter {
                snapshot.params = filter.filter_snapshot(snapshot.params, session, ctx.state);
            }
            if !snapshot.params.is_empty() {
                send_chunked_snapshot(ctx.sender, snapshot).await;
            }
        }
        Err(e) => {
            warn!("Invalid subscription pattern: {}", e);
            let error = Message::Error(ErrorMessage {
                code: 202,
                message: e.to_string(),
                address: Some(sub.pattern.clone()),
                correlation_id: None,
            });
            let bytes = codec::encode(&error).ok()?;
            return Some(MessageResult::Send(bytes));
        }
    }

    Some(MessageResult::None)
}

pub(crate) async fn handle_unsubscribe(
    unsub: &clasp_core::UnsubscribeMessage,
    ctx: &HandlerContext<'_>,
) -> Option<MessageResult> {
    let session = ctx.session.as_ref()?;
    ctx.subscriptions.remove(&session.id, unsub.id);
    session.remove_subscription(unsub.id);
    #[cfg(feature = "metrics")]
    metrics::gauge!("clasp_subscriptions_active").decrement(1.0);
    Some(MessageResult::None)
}