arcly-stream 0.2.0

An open-extensible live-media streaming kernel: lock-free zero-copy frame fan-out, instant-start GOP cache, a pluggable multi-protocol ingestion layer (RTMP, RTSP, SRT, WHIP/WHEP shipped), and a feature-gated pure-Rust media plane (MPEG-TS/HLS/fMP4) — runtime, config, and metrics free.
Documentation
//! Multi-node clustering contracts: origin/edge discovery and stream relay.
//!
//! These are **contracts only** — the engine defines the seams an edge tier
//! plugs into, but ships no concrete discovery or transport (those depend on the
//! deployment's service mesh / gossip / control plane). An edge node implements
//! [`ClusterRelay`] to locate a stream's origin, pull it locally, and announce
//! the streams it serves.

use crate::{Result, StreamKey};
use async_trait::async_trait;

/// Address of a node in the cluster (opaque to the engine).
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct NodeAddr(pub String);

impl<T: Into<String>> From<T> for NodeAddr {
    fn from(s: T) -> Self {
        NodeAddr(s.into())
    }
}

/// Origin/edge relay contract. Implement to federate streams across nodes:
/// an edge that lacks a stream locally [`locate`](Self::locate)s its origin and
/// [`pull`](Self::pull)s it; an origin [`announce`](Self::announce)s what it has.
#[async_trait]
pub trait ClusterRelay: Send + Sync + 'static {
    /// Find a node currently serving `key`, if any (e.g. via the control plane).
    async fn locate(&self, key: &StreamKey) -> Result<Option<NodeAddr>>;

    /// Begin relaying `key` from `origin` into the local engine, returning once
    /// the local mirror is publishing.
    async fn pull(&self, key: &StreamKey, origin: &NodeAddr) -> Result<()>;

    /// Advertise that this node serves `key` so edges can discover it.
    async fn announce(&self, key: &StreamKey) -> Result<()>;

    /// Withdraw a previous [`announce`](Self::announce) when the stream ends.
    async fn withdraw(&self, key: &StreamKey) -> Result<()>;
}

#[cfg(feature = "cluster")]
pub use relay::{ClusterDirectory, InProcessRelay};

/// A working, in-process reference [`ClusterRelay`] (feature `cluster`).
///
/// It federates streams between nodes that live in the same process — the shape
/// integration tests and single-box multi-engine setups need — over an in-memory
/// [`ClusterDirectory`] control plane. A production edge swaps the directory for
/// its gossip/service-mesh and `pull` for a real transport (RTMP/SRT/QUIC), but
/// the mirror loop (subscribe origin → republish locally) is identical.
#[cfg(feature = "cluster")]
mod relay {
    use super::{ClusterRelay, NodeAddr};
    use crate::bus::{PlaybackRegistry, PublishRegistry};
    use crate::{Result, StreamError, StreamKey};
    use async_trait::async_trait;
    use std::collections::{HashMap, HashSet};
    use std::sync::{Arc, Mutex};

    /// In-memory cluster directory (control plane): which nodes serve which
    /// streams. Shared (`Arc`) by every node's relay in the process.
    #[derive(Debug, Default)]
    pub struct ClusterDirectory {
        serving: Mutex<HashMap<StreamKey, HashSet<NodeAddr>>>,
    }

    impl ClusterDirectory {
        /// A fresh, empty directory.
        pub fn new() -> Self {
            Self::default()
        }

        /// Record that `node` serves `key`.
        pub fn announce(&self, node: &NodeAddr, key: &StreamKey) {
            self.serving
                .lock()
                .unwrap()
                .entry(key.clone())
                .or_default()
                .insert(node.clone());
        }

        /// Drop `node` from `key`'s server set.
        pub fn withdraw(&self, node: &NodeAddr, key: &StreamKey) {
            if let Some(set) = self.serving.lock().unwrap().get_mut(key) {
                set.remove(node);
            }
        }

        /// A node serving `key` other than `exclude` (the local node), if any.
        pub fn locate(&self, key: &StreamKey, exclude: &NodeAddr) -> Option<NodeAddr> {
            self.serving
                .lock()
                .unwrap()
                .get(key)
                .and_then(|set| set.iter().find(|n| *n != exclude).cloned())
        }
    }

    /// An in-process [`ClusterRelay`] for one node: mirrors a peer's stream into
    /// the local engine.
    pub struct InProcessRelay {
        node: NodeAddr,
        local: Arc<dyn PublishRegistry>,
        directory: Arc<ClusterDirectory>,
        peers: HashMap<NodeAddr, Arc<dyn PlaybackRegistry>>,
    }

    impl InProcessRelay {
        /// A relay for `node`, mirroring into `local`, discovering via `directory`.
        pub fn new(
            node: impl Into<NodeAddr>,
            local: Arc<dyn PublishRegistry>,
            directory: Arc<ClusterDirectory>,
        ) -> Self {
            Self {
                node: node.into(),
                local,
                directory,
                peers: HashMap::new(),
            }
        }

        /// Register a peer node's playback registry so [`pull`](ClusterRelay::pull)
        /// can subscribe to its streams.
        pub fn with_peer(
            mut self,
            addr: impl Into<NodeAddr>,
            playback: Arc<dyn PlaybackRegistry>,
        ) -> Self {
            self.peers.insert(addr.into(), playback);
            self
        }
    }

    #[async_trait]
    impl ClusterRelay for InProcessRelay {
        async fn locate(&self, key: &StreamKey) -> Result<Option<NodeAddr>> {
            Ok(self.directory.locate(key, &self.node))
        }

        async fn pull(&self, key: &StreamKey, origin: &NodeAddr) -> Result<()> {
            let peer = self.peers.get(origin).ok_or_else(|| {
                StreamError::protocol(format!("cluster: unknown origin node {}", origin.0))
            })?;
            // Resolve the origin's live handle and claim the local mirror.
            let src = peer.get_stream(key)?;
            let dst = self.local.start_publish(key).await?;

            // Mirror: replay the instant-start buffer, then forward live frames
            // until the origin closes, then release the local publish slot.
            let local = Arc::clone(&self.local);
            let key = key.clone();
            tokio::spawn(async move {
                for frame in src.replay_buffer() {
                    let _ = dst.publish_frame((*frame).clone());
                }
                let mut sub = src.subscribe_resilient();
                while let Some(frame) = sub.recv().await {
                    if dst.publish_frame((*frame).clone()).is_err() {
                        break;
                    }
                }
                let _ = local.end_publish(&key).await;
            });
            Ok(())
        }

        async fn announce(&self, key: &StreamKey) -> Result<()> {
            self.directory.announce(&self.node, key);
            Ok(())
        }

        async fn withdraw(&self, key: &StreamKey) -> Result<()> {
            self.directory.withdraw(&self.node, key);
            Ok(())
        }
    }

    #[cfg(test)]
    mod tests {
        use super::*;
        use crate::{AppSpec, CodecId, Engine, MediaFrame};

        fn frame(pts: i64) -> MediaFrame {
            MediaFrame::new_video(
                pts,
                pts,
                bytes::Bytes::from_static(b"x"),
                CodecId::H264,
                true,
            )
        }

        #[tokio::test]
        async fn edge_locates_and_mirrors_origin_stream() {
            let directory = Arc::new(ClusterDirectory::new());
            let key = StreamKey::new("live", "cam");

            // Origin node publishes the stream and announces it.
            let origin = Engine::builder()
                .application(AppSpec::new("live").gop_cache(8))
                .build();
            let origin_relay = InProcessRelay::new("origin", origin.clone(), directory.clone());
            let src_handle = origin.start_publish(&key).await.unwrap();
            origin_relay.announce(&key).await.unwrap();
            src_handle.publish_frame(frame(0)).unwrap();

            // Edge node has the stream locally? No — it locates + pulls the origin.
            let edge = Engine::builder()
                .application(AppSpec::new("live").gop_cache(8))
                .build();
            let edge_relay = InProcessRelay::new("edge", edge.clone(), directory.clone())
                .with_peer("origin", origin.clone());

            assert!(edge.get_stream(&key).is_err(), "not local yet");
            let found = edge_relay.locate(&key).await.unwrap();
            assert_eq!(found, Some(NodeAddr::from("origin")));
            edge_relay.pull(&key, &found.unwrap()).await.unwrap();

            // The local mirror is now publishing and receives forwarded frames.
            let mirror = edge.get_stream(&key).expect("local mirror exists");
            let mut sub = mirror.subscribe_resilient();
            src_handle.publish_frame(frame(1)).unwrap();
            let got = tokio::time::timeout(std::time::Duration::from_secs(5), sub.recv())
                .await
                .expect("a frame was mirrored")
                .expect("frame");
            assert!(got.is_video());
        }

        #[tokio::test]
        async fn locate_excludes_self_and_withdraw_clears() {
            let directory = Arc::new(ClusterDirectory::new());
            let key = StreamKey::new("live", "s");
            let engine = Engine::builder().application(AppSpec::new("live")).build();
            let relay = InProcessRelay::new("only", engine, directory.clone());

            relay.announce(&key).await.unwrap();
            // The only server is ourselves → locate returns None.
            assert_eq!(relay.locate(&key).await.unwrap(), None);
            relay.withdraw(&key).await.unwrap();
            assert!(directory.locate(&key, &NodeAddr::from("other")).is_none());
        }
    }
}