Skip to main content

arcly_stream/
cluster.rs

1//! Multi-node clustering contracts: origin/edge discovery and stream relay.
2//!
3//! These are **contracts only** — the engine defines the seams an edge tier
4//! plugs into, but ships no concrete discovery or transport (those depend on the
5//! deployment's service mesh / gossip / control plane). An edge node implements
6//! [`ClusterRelay`] to locate a stream's origin, pull it locally, and announce
7//! the streams it serves.
8
9use crate::{Result, StreamKey};
10use async_trait::async_trait;
11
12/// Address of a node in the cluster (opaque to the engine).
13#[derive(Debug, Clone, PartialEq, Eq, Hash)]
14pub struct NodeAddr(pub String);
15
16impl<T: Into<String>> From<T> for NodeAddr {
17    fn from(s: T) -> Self {
18        NodeAddr(s.into())
19    }
20}
21
22/// Origin/edge relay contract. Implement to federate streams across nodes:
23/// an edge that lacks a stream locally [`locate`](Self::locate)s its origin and
24/// [`pull`](Self::pull)s it; an origin [`announce`](Self::announce)s what it has.
25#[async_trait]
26pub trait ClusterRelay: Send + Sync + 'static {
27    /// Find a node currently serving `key`, if any (e.g. via the control plane).
28    async fn locate(&self, key: &StreamKey) -> Result<Option<NodeAddr>>;
29
30    /// Begin relaying `key` from `origin` into the local engine, returning once
31    /// the local mirror is publishing.
32    async fn pull(&self, key: &StreamKey, origin: &NodeAddr) -> Result<()>;
33
34    /// Advertise that this node serves `key` so edges can discover it.
35    async fn announce(&self, key: &StreamKey) -> Result<()>;
36
37    /// Withdraw a previous [`announce`](Self::announce) when the stream ends.
38    async fn withdraw(&self, key: &StreamKey) -> Result<()>;
39}
40
41#[cfg(feature = "cluster")]
42pub use relay::{ClusterDirectory, InProcessRelay};
43
44/// A working, in-process reference [`ClusterRelay`] (feature `cluster`).
45///
46/// It federates streams between nodes that live in the same process — the shape
47/// integration tests and single-box multi-engine setups need — over an in-memory
48/// [`ClusterDirectory`] control plane. A production edge swaps the directory for
49/// its gossip/service-mesh and `pull` for a real transport (RTMP/SRT/QUIC), but
50/// the mirror loop (subscribe origin → republish locally) is identical.
51#[cfg(feature = "cluster")]
52mod relay {
53    use super::{ClusterRelay, NodeAddr};
54    use crate::bus::{PlaybackRegistry, PublishRegistry};
55    use crate::{Result, StreamError, StreamKey};
56    use async_trait::async_trait;
57    use std::collections::{HashMap, HashSet};
58    use std::sync::{Arc, Mutex};
59
60    /// In-memory cluster directory (control plane): which nodes serve which
61    /// streams. Shared (`Arc`) by every node's relay in the process.
62    #[derive(Debug, Default)]
63    pub struct ClusterDirectory {
64        serving: Mutex<HashMap<StreamKey, HashSet<NodeAddr>>>,
65    }
66
67    impl ClusterDirectory {
68        /// A fresh, empty directory.
69        pub fn new() -> Self {
70            Self::default()
71        }
72
73        /// Record that `node` serves `key`.
74        pub fn announce(&self, node: &NodeAddr, key: &StreamKey) {
75            self.serving
76                .lock()
77                .unwrap()
78                .entry(key.clone())
79                .or_default()
80                .insert(node.clone());
81        }
82
83        /// Drop `node` from `key`'s server set.
84        pub fn withdraw(&self, node: &NodeAddr, key: &StreamKey) {
85            if let Some(set) = self.serving.lock().unwrap().get_mut(key) {
86                set.remove(node);
87            }
88        }
89
90        /// A node serving `key` other than `exclude` (the local node), if any.
91        pub fn locate(&self, key: &StreamKey, exclude: &NodeAddr) -> Option<NodeAddr> {
92            self.serving
93                .lock()
94                .unwrap()
95                .get(key)
96                .and_then(|set| set.iter().find(|n| *n != exclude).cloned())
97        }
98    }
99
100    /// An in-process [`ClusterRelay`] for one node: mirrors a peer's stream into
101    /// the local engine.
102    pub struct InProcessRelay {
103        node: NodeAddr,
104        local: Arc<dyn PublishRegistry>,
105        directory: Arc<ClusterDirectory>,
106        peers: HashMap<NodeAddr, Arc<dyn PlaybackRegistry>>,
107    }
108
109    impl InProcessRelay {
110        /// A relay for `node`, mirroring into `local`, discovering via `directory`.
111        pub fn new(
112            node: impl Into<NodeAddr>,
113            local: Arc<dyn PublishRegistry>,
114            directory: Arc<ClusterDirectory>,
115        ) -> Self {
116            Self {
117                node: node.into(),
118                local,
119                directory,
120                peers: HashMap::new(),
121            }
122        }
123
124        /// Register a peer node's playback registry so [`pull`](ClusterRelay::pull)
125        /// can subscribe to its streams.
126        pub fn with_peer(
127            mut self,
128            addr: impl Into<NodeAddr>,
129            playback: Arc<dyn PlaybackRegistry>,
130        ) -> Self {
131            self.peers.insert(addr.into(), playback);
132            self
133        }
134    }
135
136    #[async_trait]
137    impl ClusterRelay for InProcessRelay {
138        async fn locate(&self, key: &StreamKey) -> Result<Option<NodeAddr>> {
139            Ok(self.directory.locate(key, &self.node))
140        }
141
142        async fn pull(&self, key: &StreamKey, origin: &NodeAddr) -> Result<()> {
143            let peer = self.peers.get(origin).ok_or_else(|| {
144                StreamError::protocol(format!("cluster: unknown origin node {}", origin.0))
145            })?;
146            // Resolve the origin's live handle and claim the local mirror.
147            let src = peer.get_stream(key)?;
148            let dst = self.local.start_publish(key).await?;
149
150            // Mirror: replay the instant-start buffer, then forward live frames
151            // until the origin closes, then release the local publish slot.
152            let local = Arc::clone(&self.local);
153            let key = key.clone();
154            tokio::spawn(async move {
155                for frame in src.replay_buffer() {
156                    let _ = dst.publish_frame((*frame).clone());
157                }
158                let mut sub = src.subscribe_resilient();
159                while let Some(frame) = sub.recv().await {
160                    if dst.publish_frame((*frame).clone()).is_err() {
161                        break;
162                    }
163                }
164                let _ = local.end_publish(&key).await;
165            });
166            Ok(())
167        }
168
169        async fn announce(&self, key: &StreamKey) -> Result<()> {
170            self.directory.announce(&self.node, key);
171            Ok(())
172        }
173
174        async fn withdraw(&self, key: &StreamKey) -> Result<()> {
175            self.directory.withdraw(&self.node, key);
176            Ok(())
177        }
178    }
179
180    #[cfg(test)]
181    mod tests {
182        use super::*;
183        use crate::{AppSpec, CodecId, Engine, MediaFrame};
184
185        fn frame(pts: i64) -> MediaFrame {
186            MediaFrame::new_video(
187                pts,
188                pts,
189                bytes::Bytes::from_static(b"x"),
190                CodecId::H264,
191                true,
192            )
193        }
194
195        #[tokio::test]
196        async fn edge_locates_and_mirrors_origin_stream() {
197            let directory = Arc::new(ClusterDirectory::new());
198            let key = StreamKey::new("live", "cam");
199
200            // Origin node publishes the stream and announces it.
201            let origin = Engine::builder()
202                .application(AppSpec::new("live").gop_cache(8))
203                .build();
204            let origin_relay = InProcessRelay::new("origin", origin.clone(), directory.clone());
205            let src_handle = origin.start_publish(&key).await.unwrap();
206            origin_relay.announce(&key).await.unwrap();
207            src_handle.publish_frame(frame(0)).unwrap();
208
209            // Edge node has the stream locally? No — it locates + pulls the origin.
210            let edge = Engine::builder()
211                .application(AppSpec::new("live").gop_cache(8))
212                .build();
213            let edge_relay = InProcessRelay::new("edge", edge.clone(), directory.clone())
214                .with_peer("origin", origin.clone());
215
216            assert!(edge.get_stream(&key).is_err(), "not local yet");
217            let found = edge_relay.locate(&key).await.unwrap();
218            assert_eq!(found, Some(NodeAddr::from("origin")));
219            edge_relay.pull(&key, &found.unwrap()).await.unwrap();
220
221            // The local mirror is now publishing and receives forwarded frames.
222            let mirror = edge.get_stream(&key).expect("local mirror exists");
223            let mut sub = mirror.subscribe_resilient();
224            src_handle.publish_frame(frame(1)).unwrap();
225            let got = tokio::time::timeout(std::time::Duration::from_secs(5), sub.recv())
226                .await
227                .expect("a frame was mirrored")
228                .expect("frame");
229            assert!(got.is_video());
230        }
231
232        #[tokio::test]
233        async fn locate_excludes_self_and_withdraw_clears() {
234            let directory = Arc::new(ClusterDirectory::new());
235            let key = StreamKey::new("live", "s");
236            let engine = Engine::builder().application(AppSpec::new("live")).build();
237            let relay = InProcessRelay::new("only", engine, directory.clone());
238
239            relay.announce(&key).await.unwrap();
240            // The only server is ourselves → locate returns None.
241            assert_eq!(relay.locate(&key).await.unwrap(), None);
242            relay.withdraw(&key).await.unwrap();
243            assert!(directory.locate(&key, &NodeAddr::from("other")).is_none());
244        }
245    }
246}