rapina 0.11.0

A fast, type-safe web framework for Rust inspired by FastAPI
Documentation
//! Channel handlers and presence tracking for the relay system.

use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;

use dashmap::DashMap;
use serde::Serialize;

use crate::auth::CurrentUser;
use crate::error::Error;
use crate::state::AppState;

/// Events dispatched to channel handlers registered via `#[relay("pattern")]`.
#[derive(Debug, Clone)]
pub enum RelayEvent {
    /// A client subscribed to a topic.
    Join { topic: String, conn_id: u64 },
    /// A client sent a message to a topic.
    Message {
        topic: String,
        event: String,
        payload: serde_json::Value,
        conn_id: u64,
    },
    /// A client unsubscribed or disconnected.
    Leave { topic: String, conn_id: u64 },
}

impl RelayEvent {
    /// Returns the topic associated with this event.
    pub fn topic(&self) -> &str {
        match self {
            Self::Join { topic, .. } => topic,
            Self::Message { topic, .. } => topic,
            Self::Leave { topic, .. } => topic,
        }
    }

    /// Returns the connection ID associated with this event.
    pub fn conn_id(&self) -> u64 {
        match self {
            Self::Join { conn_id, .. } => *conn_id,
            Self::Message { conn_id, .. } => *conn_id,
            Self::Leave { conn_id, .. } => *conn_id,
        }
    }
}

/// Function signature for channel handler wrappers generated by `#[relay]`.
#[doc(hidden)]
pub type ChannelHandlerFn = fn(
    RelayEvent,
    Arc<AppState>,
    Option<CurrentUser>,
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>;

/// Describes a channel handler registered via `#[relay("pattern")]`.
///
/// Collected via `inventory` at startup and matched against topics
/// during WebSocket subscription. Handlers are sorted by specificity
/// in [`prepare()`](crate::app::Rapina::prepare): exact matches first,
/// then prefix matches by length descending.
pub struct ChannelDescriptor {
    /// The original pattern string (e.g. `"room:*"` or `"chat:lobby"`).
    pub pattern: &'static str,
    /// `true` if the pattern ends with `*` (prefix match).
    pub is_prefix: bool,
    /// The pattern without the trailing `*`. For exact-match patterns,
    /// this equals `pattern`.
    pub match_prefix: &'static str,
    /// The name of the handler function.
    pub handler_name: &'static str,
    /// The generated wrapper function.
    #[doc(hidden)]
    pub handle: ChannelHandlerFn,
}

inventory::collect!(ChannelDescriptor);

impl ChannelDescriptor {
    /// Returns `true` if `topic` matches this channel's pattern.
    pub fn matches(&self, topic: &str) -> bool {
        if self.is_prefix {
            topic.starts_with(self.match_prefix)
        } else {
            topic == self.pattern
        }
    }
}

/// A single presence entry for a connected client.
#[derive(Debug, Clone, Serialize)]
pub struct PresenceEntry {
    /// The unique connection ID assigned on WebSocket connect.
    pub conn_id: u64,
    /// Arbitrary metadata set by the channel handler (e.g. user info).
    pub meta: serde_json::Value,
}

/// Thread-safe presence tracker for connected clients per topic.
///
/// Channel handlers call [`Relay::track`](super::Relay::track) in the `Join`
/// event to register presence. The hub automatically calls
/// [`untrack`](Self::untrack) on unsubscribe and disconnect.
pub struct PresenceMap {
    inner: DashMap<String, DashMap<u64, PresenceEntry>>,
}

impl PresenceMap {
    pub fn new() -> Self {
        Self {
            inner: DashMap::new(),
        }
    }

    /// Track a client's presence in a topic.
    pub fn track(&self, topic: &str, conn_id: u64, meta: serde_json::Value) {
        self.inner
            .entry(topic.to_owned())
            .or_default()
            .insert(conn_id, PresenceEntry { conn_id, meta });
    }

    /// Remove a client's presence from a topic.
    ///
    /// Atomically removes the topic key when the inner map empties,
    /// using the same `remove_if` pattern as the backend cleanup.
    pub fn untrack(&self, topic: &str, conn_id: u64) {
        if let Some(conns) = self.inner.get(topic) {
            conns.remove(&conn_id);
        }
        self.inner.remove_if(topic, |_, conns| conns.is_empty());
    }

    /// List all presence entries for a topic.
    pub fn list(&self, topic: &str) -> Vec<PresenceEntry> {
        self.inner
            .get(topic)
            .map(|conns| conns.iter().map(|e| e.value().clone()).collect())
            .unwrap_or_default()
    }

    /// Count the number of connected clients in a topic.
    pub fn count(&self, topic: &str) -> usize {
        self.inner.get(topic).map(|conns| conns.len()).unwrap_or(0)
    }
}

impl Default for PresenceMap {
    fn default() -> Self {
        Self::new()
    }
}