oxpulse-sfu-kit 0.11.5

Reusable multi-client SFU kit built on top of str0m. Simulcast, fanout, per-peer event routing.
Documentation
//! Public read/write accessors on [`Client`].
//!
//! Separated from the core poll/event loop in `mod.rs` to keep that file
//! focused on str0m I/O driving.

use std::sync::atomic::Ordering;

use super::Client;
use crate::dc::ChannelConfig;
use crate::ids::SfuRid;
use crate::net::IncomingDatagram;

impl Client {
    /// Pre-registered DataChannels to open during SDP negotiation.
    ///
    /// The application signalling layer (e.g. `partner-edge`) reads this slice
    /// and calls `Rtc::open_stream` for each entry. Populated via
    /// [`with_extra_dc`][Client::with_extra_dc], [`with_chat_dcs`][Client::with_chat_dcs],
    /// and [`with_voice_dc`][Client::with_voice_dc].
    #[must_use]
    pub fn extra_dcs(&self) -> &[ChannelConfig] {
        &self.extra_dcs
    }

    /// This subscriber's current desired simulcast layer.
    #[must_use]
    pub fn desired_layer(&self) -> SfuRid {
        self.desired_layer
    }

    /// Override this subscriber's desired simulcast layer.
    ///
    /// Takes effect on the next forwarded packet; no SDP renegotiation required.
    pub fn set_desired_layer(&mut self, rid: SfuRid) {
        self.desired_layer = rid;
        // Invalidate the cached chosen layer so keyframe requests target the
        // correct RID on the next forwarded packet.
        self.chosen_rid = None;
    }

    /// Simulcast RIDs the peer has been observed publishing.
    ///
    /// Built up incrementally on each received `MediaData`. Empty until the
    /// first video packet arrives. Callers that use this as the "available
    /// layers" input should fall back to the full ladder (`[LOW, MEDIUM, HIGH]`)
    /// when empty — before the first packet the full ladder is the correct assumption.
    #[must_use]
    pub fn active_rids(&self) -> Vec<SfuRid> {
        self.active_rids.iter().copied().collect()
    }

    /// Number of `MediaData` events forwarded to this client after layer filtering.
    #[must_use]
    pub fn delivered_media_count(&self) -> u64 {
        self.delivered_media.load(Ordering::Relaxed)
    }

    /// Number of `ActiveSpeakerChanged` events delivered to this client.
    ///
    /// Only available with `test-utils` feature; used to verify skip-self semantics.
    #[cfg(any(test, feature = "test-utils"))]
    #[must_use]
    pub fn delivered_active_speaker_count(&self) -> u64 {
        self.delivered_active_speaker.load(Ordering::Relaxed)
    }

    /// Whether the underlying str0m `Rtc` is still alive.
    #[must_use]
    pub fn is_alive(&self) -> bool {
        self.rtc.is_alive()
    }

    /// Demux probe — returns `true` if this client owns the given datagram.
    ///
    /// Used by the registry to route incoming UDP to the correct peer.
    #[must_use]
    pub fn accepts(&self, datagram: &IncomingDatagram) -> bool {
        let Ok(contents) = (&datagram.contents[..]).try_into() else {
            return false;
        };
        let input = str0m::Input::Receive(
            datagram.received_at,
            str0m::net::Receive {
                proto: datagram.proto.to_str0m(),
                source: datagram.source,
                destination: datagram.destination,
                contents,
            },
        );
        self.rtc.accepts(&input)
    }

    /// Feed a new egress BWE reading to this subscriber's pacer.
    ///
    /// If the action is `PacerAction::ChangeLayer`, `desired_layer` is updated
    /// in-place before returning. For `GoAudioOnly` / `RestoreVideo`, the registry
    /// should emit `Propagated::AudioOnlyMode`.
    ///
    /// Only available with the `pacer` feature.
    #[cfg(feature = "pacer")]
    #[cfg_attr(docsrs, doc(cfg(feature = "pacer")))]
    pub fn drive_pacer(&mut self, bps: u64) -> crate::bwe::PacerAction {
        let action = self.pacer.update(bps);
        if let crate::bwe::PacerAction::ChangeLayer(rid) = action {
            self.set_desired_layer(rid);
        }
        action
    }

    /// Set the maximum AV1 temporal layer to forward to this subscriber.
    ///
    /// Packets with `temporal_id > max` are dropped at fanout.
    /// Default is `u8::MAX` (all layers forwarded).
    ///
    /// Only available with the `av1-dd` feature.
    #[cfg(feature = "av1-dd")]
    #[cfg_attr(docsrs, doc(cfg(feature = "av1-dd")))]
    pub fn set_max_temporal_layer(&mut self, max: u8) {
        self.max_temporal_layer = max;
    }

    /// Current AV1 temporal layer cap.
    ///
    /// Only available with the `av1-dd` feature.
    #[cfg(feature = "av1-dd")]
    #[cfg_attr(docsrs, doc(cfg(feature = "av1-dd")))]
    #[must_use]
    pub fn max_temporal_layer(&self) -> u8 {
        self.max_temporal_layer
    }

    /// Set the maximum RFC 9626 temporal layer to forward to this subscriber.
    ///
    /// Packets with `temporal_id > max` are dropped at fanout.
    /// Default: `u8::MAX` (all layers forwarded).
    #[cfg(feature = "vfm")]
    #[cfg_attr(docsrs, doc(cfg(feature = "vfm")))]
    pub fn set_max_vfm_temporal_layer(&mut self, max: u8) {
        self.max_vfm_temporal_layer = max;
    }

    /// Current RFC 9626 temporal layer cap.
    #[cfg(feature = "vfm")]
    #[cfg_attr(docsrs, doc(cfg(feature = "vfm")))]
    #[must_use]
    pub fn max_vfm_temporal_layer(&self) -> u8 {
        self.max_vfm_temporal_layer
    }

    /// This client's origin (local peer or upstream SFU relay).
    #[must_use]
    pub fn origin(&self) -> &crate::origin::ClientOrigin {
        &self.origin
    }

    /// Override the client origin.
    ///
    /// Must be called **before** [`Registry::insert`][crate::Registry::insert].
    /// See [`ClientOrigin`][crate::origin::ClientOrigin] for the call-order contract.
    pub fn set_origin(&mut self, origin: crate::origin::ClientOrigin) {
        self.origin = origin;
    }

    /// Returns `true` if this client is a relay connection from another SFU edge.
    #[must_use]
    pub fn is_relay(&self) -> bool {
        matches!(self.origin, crate::origin::ClientOrigin::RelayFromSfu(_))
    }

    /// Set the suspended flag. Called by the registry pacer path when
    /// `PacerAction::SuspendVideo` / `RestoreAudio` fires.
    /// Only available with the `pacer` feature.
    #[cfg(feature = "pacer")]
    #[cfg_attr(docsrs, doc(cfg(feature = "pacer")))]
    pub fn set_suspended(&mut self, suspended: bool) {
        self.suspended = suspended;
    }

    /// Read the suspended flag.
    #[cfg(feature = "pacer")]
    #[cfg_attr(docsrs, doc(cfg(feature = "pacer")))]
    #[must_use]
    pub fn is_suspended(&self) -> bool {
        self.suspended
    }
}

#[cfg(all(test, feature = "pacer"))]
mod tests {
    use crate::client::test_seed::new_client;
    use crate::propagate::ClientId;

    #[test]
    fn suspended_default_is_false() {
        let client = new_client(ClientId(0));
        assert!(
            !client.is_suspended(),
            "Client::new should initialise suspended to false"
        );
    }

    #[test]
    fn set_suspended_round_trips() {
        let mut client = new_client(ClientId(1));
        assert!(!client.is_suspended());
        client.set_suspended(true);
        assert!(client.is_suspended());
        client.set_suspended(false);
        assert!(!client.is_suspended());
    }
}