use crate::{Result, StreamKey};
use async_trait::async_trait;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct NodeAddr(pub String);
impl<T: Into<String>> From<T> for NodeAddr {
fn from(s: T) -> Self {
NodeAddr(s.into())
}
}
#[async_trait]
pub trait ClusterRelay: Send + Sync + 'static {
async fn locate(&self, key: &StreamKey) -> Result<Option<NodeAddr>>;
async fn pull(&self, key: &StreamKey, origin: &NodeAddr) -> Result<()>;
async fn announce(&self, key: &StreamKey) -> Result<()>;
async fn withdraw(&self, key: &StreamKey) -> Result<()>;
}
#[cfg(feature = "cluster")]
pub use relay::{ClusterDirectory, InProcessRelay};
#[cfg(feature = "cluster")]
mod relay {
use super::{ClusterRelay, NodeAddr};
use crate::bus::{PlaybackRegistry, PublishRegistry};
use crate::{Result, StreamError, StreamKey};
use async_trait::async_trait;
use std::collections::{HashMap, HashSet};
use std::sync::{Arc, Mutex};
#[derive(Debug, Default)]
pub struct ClusterDirectory {
serving: Mutex<HashMap<StreamKey, HashSet<NodeAddr>>>,
}
impl ClusterDirectory {
pub fn new() -> Self {
Self::default()
}
pub fn announce(&self, node: &NodeAddr, key: &StreamKey) {
self.serving
.lock()
.unwrap()
.entry(key.clone())
.or_default()
.insert(node.clone());
}
pub fn withdraw(&self, node: &NodeAddr, key: &StreamKey) {
if let Some(set) = self.serving.lock().unwrap().get_mut(key) {
set.remove(node);
}
}
pub fn locate(&self, key: &StreamKey, exclude: &NodeAddr) -> Option<NodeAddr> {
self.serving
.lock()
.unwrap()
.get(key)
.and_then(|set| set.iter().find(|n| *n != exclude).cloned())
}
}
pub struct InProcessRelay {
node: NodeAddr,
local: Arc<dyn PublishRegistry>,
directory: Arc<ClusterDirectory>,
peers: HashMap<NodeAddr, Arc<dyn PlaybackRegistry>>,
}
impl InProcessRelay {
pub fn new(
node: impl Into<NodeAddr>,
local: Arc<dyn PublishRegistry>,
directory: Arc<ClusterDirectory>,
) -> Self {
Self {
node: node.into(),
local,
directory,
peers: HashMap::new(),
}
}
pub fn with_peer(
mut self,
addr: impl Into<NodeAddr>,
playback: Arc<dyn PlaybackRegistry>,
) -> Self {
self.peers.insert(addr.into(), playback);
self
}
}
#[async_trait]
impl ClusterRelay for InProcessRelay {
async fn locate(&self, key: &StreamKey) -> Result<Option<NodeAddr>> {
Ok(self.directory.locate(key, &self.node))
}
async fn pull(&self, key: &StreamKey, origin: &NodeAddr) -> Result<()> {
let peer = self.peers.get(origin).ok_or_else(|| {
StreamError::protocol(format!("cluster: unknown origin node {}", origin.0))
})?;
let src = peer.get_stream(key)?;
let dst = self.local.start_publish(key).await?;
let local = Arc::clone(&self.local);
let key = key.clone();
tokio::spawn(async move {
for frame in src.replay_buffer() {
let _ = dst.publish_frame((*frame).clone());
}
let mut sub = src.subscribe_resilient();
while let Some(frame) = sub.recv().await {
if dst.publish_frame((*frame).clone()).is_err() {
break;
}
}
let _ = local.end_publish(&key).await;
});
Ok(())
}
async fn announce(&self, key: &StreamKey) -> Result<()> {
self.directory.announce(&self.node, key);
Ok(())
}
async fn withdraw(&self, key: &StreamKey) -> Result<()> {
self.directory.withdraw(&self.node, key);
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{AppSpec, CodecId, Engine, MediaFrame};
fn frame(pts: i64) -> MediaFrame {
MediaFrame::new_video(
pts,
pts,
bytes::Bytes::from_static(b"x"),
CodecId::H264,
true,
)
}
#[tokio::test]
async fn edge_locates_and_mirrors_origin_stream() {
let directory = Arc::new(ClusterDirectory::new());
let key = StreamKey::new("live", "cam");
let origin = Engine::builder()
.application(AppSpec::new("live").gop_cache(8))
.build();
let origin_relay = InProcessRelay::new("origin", origin.clone(), directory.clone());
let src_handle = origin.start_publish(&key).await.unwrap();
origin_relay.announce(&key).await.unwrap();
src_handle.publish_frame(frame(0)).unwrap();
let edge = Engine::builder()
.application(AppSpec::new("live").gop_cache(8))
.build();
let edge_relay = InProcessRelay::new("edge", edge.clone(), directory.clone())
.with_peer("origin", origin.clone());
assert!(edge.get_stream(&key).is_err(), "not local yet");
let found = edge_relay.locate(&key).await.unwrap();
assert_eq!(found, Some(NodeAddr::from("origin")));
edge_relay.pull(&key, &found.unwrap()).await.unwrap();
let mirror = edge.get_stream(&key).expect("local mirror exists");
let mut sub = mirror.subscribe_resilient();
src_handle.publish_frame(frame(1)).unwrap();
let got = tokio::time::timeout(std::time::Duration::from_secs(5), sub.recv())
.await
.expect("a frame was mirrored")
.expect("frame");
assert!(got.is_video());
}
#[tokio::test]
async fn locate_excludes_self_and_withdraw_clears() {
let directory = Arc::new(ClusterDirectory::new());
let key = StreamKey::new("live", "s");
let engine = Engine::builder().application(AppSpec::new("live")).build();
let relay = InProcessRelay::new("only", engine, directory.clone());
relay.announce(&key).await.unwrap();
assert_eq!(relay.locate(&key).await.unwrap(), None);
relay.withdraw(&key).await.unwrap();
assert!(directory.locate(&key, &NodeAddr::from("other")).is_none());
}
}
}