1use crate::{Result, StreamKey};
10use async_trait::async_trait;
11
12#[derive(Debug, Clone, PartialEq, Eq, Hash)]
14pub struct NodeAddr(pub String);
15
16impl<T: Into<String>> From<T> for NodeAddr {
17 fn from(s: T) -> Self {
18 NodeAddr(s.into())
19 }
20}
21
22#[async_trait]
26pub trait ClusterRelay: Send + Sync + 'static {
27 async fn locate(&self, key: &StreamKey) -> Result<Option<NodeAddr>>;
29
30 async fn pull(&self, key: &StreamKey, origin: &NodeAddr) -> Result<()>;
33
34 async fn announce(&self, key: &StreamKey) -> Result<()>;
36
37 async fn withdraw(&self, key: &StreamKey) -> Result<()>;
39}
40
41#[cfg(feature = "cluster")]
42pub use relay::{ClusterDirectory, InProcessRelay};
43
44#[cfg(feature = "cluster")]
52mod relay {
53 use super::{ClusterRelay, NodeAddr};
54 use crate::bus::{PlaybackRegistry, PublishRegistry};
55 use crate::{Result, StreamError, StreamKey};
56 use async_trait::async_trait;
57 use std::collections::{HashMap, HashSet};
58 use std::sync::{Arc, Mutex};
59
60 #[derive(Debug, Default)]
63 pub struct ClusterDirectory {
64 serving: Mutex<HashMap<StreamKey, HashSet<NodeAddr>>>,
65 }
66
67 impl ClusterDirectory {
68 pub fn new() -> Self {
70 Self::default()
71 }
72
73 pub fn announce(&self, node: &NodeAddr, key: &StreamKey) {
75 self.serving
76 .lock()
77 .unwrap()
78 .entry(key.clone())
79 .or_default()
80 .insert(node.clone());
81 }
82
83 pub fn withdraw(&self, node: &NodeAddr, key: &StreamKey) {
85 if let Some(set) = self.serving.lock().unwrap().get_mut(key) {
86 set.remove(node);
87 }
88 }
89
90 pub fn locate(&self, key: &StreamKey, exclude: &NodeAddr) -> Option<NodeAddr> {
92 self.serving
93 .lock()
94 .unwrap()
95 .get(key)
96 .and_then(|set| set.iter().find(|n| *n != exclude).cloned())
97 }
98 }
99
100 pub struct InProcessRelay {
103 node: NodeAddr,
104 local: Arc<dyn PublishRegistry>,
105 directory: Arc<ClusterDirectory>,
106 peers: HashMap<NodeAddr, Arc<dyn PlaybackRegistry>>,
107 }
108
109 impl InProcessRelay {
110 pub fn new(
112 node: impl Into<NodeAddr>,
113 local: Arc<dyn PublishRegistry>,
114 directory: Arc<ClusterDirectory>,
115 ) -> Self {
116 Self {
117 node: node.into(),
118 local,
119 directory,
120 peers: HashMap::new(),
121 }
122 }
123
124 pub fn with_peer(
127 mut self,
128 addr: impl Into<NodeAddr>,
129 playback: Arc<dyn PlaybackRegistry>,
130 ) -> Self {
131 self.peers.insert(addr.into(), playback);
132 self
133 }
134 }
135
136 #[async_trait]
137 impl ClusterRelay for InProcessRelay {
138 async fn locate(&self, key: &StreamKey) -> Result<Option<NodeAddr>> {
139 Ok(self.directory.locate(key, &self.node))
140 }
141
142 async fn pull(&self, key: &StreamKey, origin: &NodeAddr) -> Result<()> {
143 let peer = self.peers.get(origin).ok_or_else(|| {
144 StreamError::protocol(format!("cluster: unknown origin node {}", origin.0))
145 })?;
146 let src = peer.get_stream(key)?;
148 let dst = self.local.start_publish(key).await?;
149
150 let local = Arc::clone(&self.local);
153 let key = key.clone();
154 tokio::spawn(async move {
155 for frame in src.replay_buffer() {
156 let _ = dst.publish_frame((*frame).clone());
157 }
158 let mut sub = src.subscribe_resilient();
159 while let Some(frame) = sub.recv().await {
160 if dst.publish_frame((*frame).clone()).is_err() {
161 break;
162 }
163 }
164 let _ = local.end_publish(&key).await;
165 });
166 Ok(())
167 }
168
169 async fn announce(&self, key: &StreamKey) -> Result<()> {
170 self.directory.announce(&self.node, key);
171 Ok(())
172 }
173
174 async fn withdraw(&self, key: &StreamKey) -> Result<()> {
175 self.directory.withdraw(&self.node, key);
176 Ok(())
177 }
178 }
179
180 #[cfg(test)]
181 mod tests {
182 use super::*;
183 use crate::{AppSpec, CodecId, Engine, MediaFrame};
184
185 fn frame(pts: i64) -> MediaFrame {
186 MediaFrame::new_video(
187 pts,
188 pts,
189 bytes::Bytes::from_static(b"x"),
190 CodecId::H264,
191 true,
192 )
193 }
194
195 #[tokio::test]
196 async fn edge_locates_and_mirrors_origin_stream() {
197 let directory = Arc::new(ClusterDirectory::new());
198 let key = StreamKey::new("live", "cam");
199
200 let origin = Engine::builder()
202 .application(AppSpec::new("live").gop_cache(8))
203 .build();
204 let origin_relay = InProcessRelay::new("origin", origin.clone(), directory.clone());
205 let src_handle = origin.start_publish(&key).await.unwrap();
206 origin_relay.announce(&key).await.unwrap();
207 src_handle.publish_frame(frame(0)).unwrap();
208
209 let edge = Engine::builder()
211 .application(AppSpec::new("live").gop_cache(8))
212 .build();
213 let edge_relay = InProcessRelay::new("edge", edge.clone(), directory.clone())
214 .with_peer("origin", origin.clone());
215
216 assert!(edge.get_stream(&key).is_err(), "not local yet");
217 let found = edge_relay.locate(&key).await.unwrap();
218 assert_eq!(found, Some(NodeAddr::from("origin")));
219 edge_relay.pull(&key, &found.unwrap()).await.unwrap();
220
221 let mirror = edge.get_stream(&key).expect("local mirror exists");
223 let mut sub = mirror.subscribe_resilient();
224 src_handle.publish_frame(frame(1)).unwrap();
225 let got = tokio::time::timeout(std::time::Duration::from_secs(5), sub.recv())
226 .await
227 .expect("a frame was mirrored")
228 .expect("frame");
229 assert!(got.is_video());
230 }
231
232 #[tokio::test]
233 async fn locate_excludes_self_and_withdraw_clears() {
234 let directory = Arc::new(ClusterDirectory::new());
235 let key = StreamKey::new("live", "s");
236 let engine = Engine::builder().application(AppSpec::new("live")).build();
237 let relay = InProcessRelay::new("only", engine, directory.clone());
238
239 relay.announce(&key).await.unwrap();
240 assert_eq!(relay.locate(&key).await.unwrap(), None);
242 relay.withdraw(&key).await.unwrap();
243 assert!(directory.locate(&key, &NodeAddr::from("other")).is_none());
244 }
245 }
246}