oxpulse-sfu-kit 0.11.1

Reusable multi-client SFU kit built on top of str0m. Simulcast, fanout, per-peer event routing.
Documentation
//! Multi-client registry — routes UDP datagrams to the owning client and fans
//! out propagated events. Single-task ownership model (no `Arc<RwLock>`).
//!
//! Ported from the str0m `chat.rs` example with multi-client fanout, simulcast
//! layer management, and optional dominant-speaker detection added.
//!
//! Submodules: `lifecycle` (reap/drain), `test_seams` (test-only).

use std::collections::VecDeque;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Instant;

use crate::client::Client;
use crate::metrics::SfuMetrics;
use crate::net::{IncomingDatagram, SfuProtocol};
use crate::propagate::Propagated;

mod drive;
mod lifecycle;
#[cfg(any(test, feature = "test-utils"))]
mod test_seams;

/// Single-owner registry of connected peers in a room.
///
/// Drive it by calling [`insert`][Registry::insert] when a peer completes
/// signaling, then in a loop: feed datagrams via
/// [`handle_incoming`][Registry::handle_incoming], call
/// [`poll_all`][Registry::poll_all] + [`fanout_pending`][Registry::fanout_pending],
/// flush transmits via [`drain_transmits`][Registry::drain_transmits].
///
/// For the simple case, use [`run_udp_loop`][crate::run_udp_loop] which does
/// all of this for you.
#[derive(Debug)]
pub struct Registry {
    pub(super) clients: Vec<Client>,
    pub(super) to_propagate: VecDeque<Propagated>,
    pub(super) metrics: Arc<SfuMetrics>,
    #[cfg(feature = "active-speaker")]
    pub(super) detector: dominant_speaker::ActiveSpeakerDetector,
    /// Fixed epoch for converting  to u64 ms for the speaker detector.
    /// u64 ms with start at registry creation time (monotonic, not wall-clock).
    #[cfg(feature = "active-speaker")]
    detector_epoch: std::time::Instant,
    /// Per-subscriber Kalman/loss bandwidth estimator.
    #[cfg(feature = "kalman-bwe")]
    pub(crate) bandwidth: crate::bwe::estimator::BandwidthEstimator,
}

impl Registry {
    /// Create a new registry wired to the given metrics instance.
    pub fn new(metrics: Arc<SfuMetrics>) -> Self {
        Self {
            clients: Vec::new(),
            to_propagate: VecDeque::new(),
            metrics,
            #[cfg(feature = "active-speaker")]
            detector: dominant_speaker::ActiveSpeakerDetector::new(),
            #[cfg(feature = "active-speaker")]
            detector_epoch: std::time::Instant::now(),
            #[cfg(feature = "kalman-bwe")]
            bandwidth: crate::bwe::estimator::BandwidthEstimator::new(),
        }
    }

    /// Create a registry with a throwaway metrics instance.
    ///
    /// Intended only for tests that don't care about metrics values.
    #[cfg(any(test, feature = "test-utils"))]
    pub fn new_for_tests() -> Self {
        Self::new(Arc::new(SfuMetrics::new_default()))
    }

    /// Whether the registry has no connected peers.
    pub fn is_empty(&self) -> bool {
        self.clients.is_empty()
    }

    /// Number of connected peers.
    pub fn len(&self) -> usize {
        self.clients.len()
    }

    /// Read-only view of all clients.
    ///
    /// Intended for metrics inspection and tests; not for hot-path use.
    pub fn clients(&self) -> &[Client] {
        &self.clients
    }

    /// Insert a freshly-built client into the room.
    ///
    /// Announces every existing client's tracks to the newcomer
    /// (cross-advertisement pattern from str0m `chat.rs`). The client's
    /// metrics handle is replaced with the registry's own so all counters
    /// flow to one Prometheus registry.
    pub fn insert(&mut self, mut client: Client) {
        client.metrics = self.metrics.clone();
        // F7-1: re-resolve the cached drop counter against the registry's
        // metrics arc (which just replaced the client's initial one). Without
        // this the cached handle points to the throwaway metrics from new_client()
        // and increments are invisible to the registry's scrape endpoint.
        #[cfg(feature = "metrics-prometheus")]
        {
            client.video_frames_dropped = self.metrics.peer_drop_counter(*client.id);
        }
        for entry in self.clients.iter().flat_map(|c| c.tracks_in.iter()) {
            client.handle_track_open(std::sync::Arc::downgrade(&entry.id));
        }
        #[cfg(feature = "active-speaker")]
        {
            // Relay clients relay another room's audio — their levels are not
            // meaningful for this room's dominant-speaker election.
            if !client.is_relay() {
                let now_ms = self.now_ms();
                self.detector.add_peer(*client.id, now_ms);
            }
        }
        self.metrics.inc_client_connect();
        self.metrics.inc_active_participants();
        self.clients.push(client);
    }

    /// Feed an incoming UDP datagram to whichever client claims it.
    ///
    /// Returns `true` if a client accepted the datagram, `false` when no
    /// client matched (common early in a connection — STUN arrives before
    /// the `Rtc` is registered).
    pub fn handle_incoming(
        &mut self,
        source: SocketAddr,
        destination: SocketAddr,
        payload: &[u8],
    ) -> bool {
        let datagram = IncomingDatagram {
            received_at: Instant::now(),
            proto: SfuProtocol::Udp,
            source,
            destination,
            contents: payload.to_vec(),
        };
        if let Some(client) = self.clients.iter_mut().find(|c| c.accepts(&datagram)) {
            client.handle_input(datagram);
            true
        } else {
            tracing::debug!(?source, "no client accepts udp datagram");
            false
        }
    }

    /// Feed an RFC 6464 audio-level observation into the dominant-speaker detector.
    ///
    /// `level_raw` is 0–127 dBov (0 = loud, 127 = silent). Call this for every
    /// audio RTP packet received from `peer_id` after parsing the audio-level
    /// RTP header extension. Only available with the `active-speaker` feature.
    ///
    /// Levels for relay clients (`Client::is_relay()`) are silently ignored —
    /// relay audio belongs to the upstream room's election, not this one.
    #[cfg(feature = "active-speaker")]
    #[cfg_attr(docsrs, doc(cfg(feature = "active-speaker")))]
    pub fn record_audio_level(&mut self, peer_id: u64, level_raw: u8, now: Instant) {
        // Relay clients are excluded from speaker election; ignore their audio levels.
        if self
            .clients
            .iter()
            .any(|c| *c.id == peer_id && c.is_relay())
        {
            return;
        }
        let now_ms = now
            .saturating_duration_since(self.detector_epoch)
            .as_millis() as u64;
        self.detector.record_level(peer_id, level_raw, now_ms);
    }

    /// Monotonic millisecond timestamp relative to the registry epoch.
    ///
    /// Used internally to convert  values to the u64 ms the
    /// dominant-speaker detector requires (v0.3 API).
    #[cfg(feature = "active-speaker")]
    fn now_ms(&self) -> u64 {
        self.detector_epoch.elapsed().as_millis() as u64
    }

    /// Return raw activity scores for all non-paused peers in the room.
    ///
    /// Each tuple is `(peer_id, immediate_score, medium_score, long_score)`.
    /// Scores are the raw log-domain values from the Volfin & Cohen algorithm.
    /// Useful for debugging speaker detection or building custom UIs.
    ///
    /// Only available with the `active-speaker` feature.
    #[cfg(feature = "active-speaker")]
    #[cfg_attr(docsrs, doc(cfg(feature = "active-speaker")))]
    #[must_use]
    pub fn peer_audio_scores(&self) -> Vec<(u64, f64, f64, f64)> {
        self.detector.peer_scores()
    }
}

#[cfg(feature = "kalman-bwe")]
#[cfg_attr(docsrs, doc(cfg(feature = "kalman-bwe")))]
impl Registry {
    /// Process a TWCC feedback batch for a subscriber.
    ///
    /// Call this when str0m emits TWCC feedback for the subscriber's egress path.
    /// `subscriber` is the peer whose outgoing stream the feedback describes.
    ///
    /// Only available with the `kalman-bwe` feature.
    pub fn on_twcc_feedback(
        &mut self,
        subscriber: crate::propagate::ClientId,
        feedback: &crate::bwe::feedback::TwccFeedback,
        now: std::time::Instant,
    ) {
        self.bandwidth.on_twcc_feedback(subscriber, feedback, now);
    }
}