use super::*;
#[derive(Debug, Clone)]
pub struct RelayTarget {
pub url: String,
pub active: bool,
pub bytes_forwarded: u64,
pub packets_dropped: u64,
}
impl RelayTarget {
#[must_use]
pub fn new(url: impl Into<String>) -> Self {
Self {
url: url.into(),
active: true,
bytes_forwarded: 0,
packets_dropped: 0,
}
}
}
pub struct RelayManager {
targets: Arc<RwLock<HashMap<String, Vec<RelayTarget>>>>,
channels: Arc<RwLock<HashMap<String, broadcast::Sender<MediaPacket>>>>,
}
impl RelayManager {
#[must_use]
pub fn new() -> Self {
Self {
targets: Arc::new(RwLock::new(HashMap::new())),
channels: Arc::new(RwLock::new(HashMap::new())),
}
}
pub async fn add_target(&self, stream_key: impl Into<String>, url: impl Into<String>) {
let key = stream_key.into();
let mut targets = self.targets.write().await;
targets
.entry(key)
.or_insert_with(Vec::new)
.push(RelayTarget::new(url));
}
pub async fn register_stream(
&self,
stream_key: impl Into<String>,
tx: broadcast::Sender<MediaPacket>,
) -> broadcast::Receiver<MediaPacket> {
let key = stream_key.into();
let rx = tx.subscribe();
let mut channels = self.channels.write().await;
channels.insert(key, tx);
rx
}
pub async fn forward(&self, stream_key: &str, packet: &MediaPacket) {
let mut targets = self.targets.write().await;
if let Some(list) = targets.get_mut(stream_key) {
for target in list.iter_mut() {
if !target.active {
continue;
}
target.bytes_forwarded += packet.data.len() as u64;
}
}
}
pub async fn unregister_stream(&self, stream_key: &str) {
let mut channels = self.channels.write().await;
channels.remove(stream_key);
}
pub async fn stats(&self, stream_key: &str) -> Vec<RelayTarget> {
let targets = self.targets.read().await;
targets.get(stream_key).cloned().unwrap_or_default()
}
pub async fn mark_inactive(&self, stream_key: &str, url: &str) {
let mut targets = self.targets.write().await;
if let Some(list) = targets.get_mut(stream_key) {
for target in list.iter_mut() {
if target.url == url {
target.active = false;
}
}
}
}
}
impl Default for RelayManager {
fn default() -> Self {
Self::new()
}
}