rustzmq2 0.1.0

A native async Rust implementation of ZeroMQ
Documentation
//! Per-peer subscription trie shared by PUB and XPUB.
//!
//! Implements the topic-prefix matching logic defined in
//! [RFC 29](https://rfc.zeromq.org/spec/29/) for SUBSCRIBE/UNSUBSCRIBE
//! frames. Both publisher backends (`PubSocketBackend`,
//! `XPubSocketBackend`) compose this type; SUB is shaped differently
//! (outbound replay list) and does not use it.
//!
//! Zero-cost: a plain struct, no trait, no generic, no `dyn`. All methods
//! are direct dispatch. The only lock is `parking_lot::Mutex`, the same
//! one the two backends used before extraction.

use crate::engine::registry::PeerKey;

use parking_lot::Mutex;

use std::collections::HashMap;
use std::hash::{BuildHasher, Hasher};

/// Pass-through hasher for `PeerKey`. `PeerKey` is a `Slab` index — dense,
/// sequential, never adversarial — so the bare value already has perfect
/// distribution for the table's bucket math; `SipHash` on it would be pure
/// overhead.
#[derive(Default, Clone, Copy)]
struct PeerKeyHasher(u64);

impl Hasher for PeerKeyHasher {
    #[inline]
    fn write(&mut self, bytes: &[u8]) {
        for (i, &b) in bytes.iter().enumerate().take(8) {
            self.0 |= (b as u64) << (i * 8);
        }
    }
    #[inline]
    fn write_u32(&mut self, n: u32) {
        self.0 = n as u64;
    }
    #[inline]
    fn write_u64(&mut self, n: u64) {
        self.0 = n;
    }
    #[inline]
    fn finish(&self) -> u64 {
        self.0
    }
}

#[derive(Default, Clone, Copy)]
struct PeerKeyHasherBuilder;

impl BuildHasher for PeerKeyHasherBuilder {
    type Hasher = PeerKeyHasher;
    #[inline]
    fn build_hasher(&self) -> Self::Hasher {
        PeerKeyHasher::default()
    }
}

type PeerKeyMap<V> = HashMap<PeerKey, V, PeerKeyHasherBuilder>;

/// Outcome of applying a SUBSCRIBE/UNSUBSCRIBE frame. XPUB uses this to
/// decide whether to surface the frame to the application (default:
/// absorb duplicates; `xpub_verbose = true` surfaces everything). PUB
/// ignores the outcome.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum SubChange {
    /// First SUBSCRIBE for this prefix on this peer.
    NewTopic,
    /// SUBSCRIBE for a prefix the peer was already subscribed to.
    DuplicateSub,
    /// UNSUBSCRIBE removed an existing prefix.
    Removed,
    /// UNSUBSCRIBE for a prefix the peer wasn't subscribed to.
    DuplicateUnsub,
}

/// Per-peer subscription state. `accept_all` short-circuits the
/// `prefixes.iter().any(...)` check when the peer has subscribed to the
/// empty prefix (the broadcast-everything case).
#[derive(Default)]
struct PeerSubs {
    prefixes: Vec<Vec<u8>>,
    accept_all: bool,
}

impl PeerSubs {
    fn refresh_accept_all(&mut self) {
        self.accept_all = self.prefixes.iter().any(|p| p.is_empty());
    }
}

pub(crate) struct TopicRouter {
    /// Keyed on the dense `PeerKey` rather than `PeerIdentity` so every
    /// fanout iteration hashes a `Copy` u32 instead of a 32-byte `Bytes`.
    /// The custom `PeerKeyHasherBuilder` is a pass-through (slab keys
    /// have perfect distribution already) which avoids the `SipHash` cost
    /// the default `RandomState` was charging us per peer per send.
    subscriptions: Mutex<PeerKeyMap<PeerSubs>>,
}

impl TopicRouter {
    pub(crate) fn new() -> Self {
        Self {
            subscriptions: Mutex::new(PeerKeyMap::default()),
        }
    }

    /// Pre-seed an empty filter list for a newly-connected peer. Matches
    /// the existing PUB/XPUB behaviour — `apply_sub_message` also
    /// `or_default`s for race safety, but seeding lets the fanout see a
    /// consistent (empty) entry the moment the peer is registered.
    pub(crate) fn register_peer(&self, key: PeerKey) {
        self.subscriptions.lock().entry(key).or_default();
    }

    /// Drop a peer's filter list on disconnect.
    pub(crate) fn forget_peer(&self, key: PeerKey) {
        self.subscriptions.lock().remove(&key);
    }

    /// Drop every peer's filter list (socket shutdown).
    pub(crate) fn clear(&self) {
        self.subscriptions.lock().clear();
    }

    /// Apply a raw SUBSCRIBE/UNSUBSCRIBE frame to `key`'s filter list.
    /// `frame[0]` is 1 for SUBSCRIBE or 0 for UNSUBSCRIBE; `frame[1..]`
    /// is the topic prefix. Returns [`None`] for an empty frame or an
    /// unrecognised first byte (logged as a warning).
    pub(crate) fn apply_sub_message(&self, key: PeerKey, frame: &[u8]) -> Option<SubChange> {
        if frame.is_empty() {
            return None;
        }
        let mut subs = self.subscriptions.lock();
        // `or_default`: the reader task can deliver a subscription
        // frame before `peer_connected` finishes registering the peer.
        let entry = subs.entry(key).or_default();
        let tail = &frame[1..];
        let outcome = match frame.first() {
            Some(1) => {
                if entry.prefixes.iter().any(|s| s.as_slice() == tail) {
                    Some(SubChange::DuplicateSub)
                } else {
                    entry.prefixes.push(Vec::from(tail));
                    Some(SubChange::NewTopic)
                }
            }
            Some(0) => {
                if let Some(idx) = entry.prefixes.iter().position(|s| s.as_slice() == tail) {
                    entry.prefixes.remove(idx);
                    Some(SubChange::Removed)
                } else {
                    Some(SubChange::DuplicateUnsub)
                }
            }
            other => {
                log::warn!("sub message unexpected first byte: {:?}", other);
                return None;
            }
        };
        entry.refresh_accept_all();
        outcome
    }

    /// Hold the subscription-map lock and run `f` against it. Callers
    /// that need to do a fanout inspect the map (via [`MatchGuard::matches`])
    /// while holding the guard; the guard is dropped at the end of `f`.
    ///
    /// This mirrors the existing PUB/XPUB fanout pattern: the lock is
    /// held for the duration of the `try_send` loop so the subscription
    /// state cannot drift mid-fanout, but is always dropped before any
    /// `.await` and never held across task boundaries.
    pub(crate) fn with_match_guard<R>(&self, f: impl FnOnce(&MatchGuard<'_>) -> R) -> R {
        let subs = self.subscriptions.lock();
        f(&MatchGuard { subs: &subs })
    }
}

/// Borrowed view over the subscription map held under the `TopicRouter`
/// mutex guard. Callers ask `matches(key, topic, invert)` per peer to
/// decide whether to forward.
pub(crate) struct MatchGuard<'a> {
    subs: &'a PeerKeyMap<PeerSubs>,
}

impl<'a> MatchGuard<'a> {
    /// Returns `true` if `topic` should be delivered to `key`. A peer
    /// with no entry (or an empty filter list) never matches. `invert`
    /// (libzmq's `ZMQ_INVERT_MATCHING`) flips the result: a peer matches
    /// when the topic does *not* start with any of its filter prefixes.
    pub(crate) fn matches(&self, key: PeerKey, topic: &[u8], invert: bool) -> bool {
        let Some(entry) = self.subs.get(&key) else {
            return false;
        };
        let matched = entry.accept_all
            || entry
                .prefixes
                .iter()
                .any(|f| f.len() <= topic.len() && &topic[..f.len()] == f.as_slice());
        matched != invert
    }
}

impl Default for TopicRouter {
    fn default() -> Self {
        Self::new()
    }
}