use std::collections::VecDeque;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Instant;
use crate::client::Client;
use crate::metrics::SfuMetrics;
use crate::net::{IncomingDatagram, SfuProtocol};
use crate::propagate::Propagated;
mod drive;
mod lifecycle;
#[cfg(any(test, feature = "test-utils"))]
mod test_seams;
#[derive(Debug)]
pub struct Registry {
pub(super) clients: Vec<Client>,
pub(super) to_propagate: VecDeque<Propagated>,
pub(super) metrics: Arc<SfuMetrics>,
#[cfg(feature = "active-speaker")]
pub(super) detector: dominant_speaker::ActiveSpeakerDetector,
#[cfg(feature = "active-speaker")]
detector_epoch: std::time::Instant,
#[cfg(feature = "kalman-bwe")]
pub(crate) bandwidth: crate::bwe::estimator::BandwidthEstimator,
}
impl Registry {
pub fn new(metrics: Arc<SfuMetrics>) -> Self {
Self {
clients: Vec::new(),
to_propagate: VecDeque::new(),
metrics,
#[cfg(feature = "active-speaker")]
detector: dominant_speaker::ActiveSpeakerDetector::new(),
#[cfg(feature = "active-speaker")]
detector_epoch: std::time::Instant::now(),
#[cfg(feature = "kalman-bwe")]
bandwidth: crate::bwe::estimator::BandwidthEstimator::new(),
}
}
#[cfg(any(test, feature = "test-utils"))]
pub fn new_for_tests() -> Self {
Self::new(Arc::new(SfuMetrics::new_default()))
}
pub fn is_empty(&self) -> bool {
self.clients.is_empty()
}
pub fn len(&self) -> usize {
self.clients.len()
}
pub fn clients(&self) -> &[Client] {
&self.clients
}
pub fn insert(&mut self, mut client: Client) {
client.metrics = self.metrics.clone();
#[cfg(feature = "metrics-prometheus")]
{
client.video_frames_dropped = self.metrics.peer_drop_counter(*client.id);
}
for entry in self.clients.iter().flat_map(|c| c.tracks_in.iter()) {
client.handle_track_open(std::sync::Arc::downgrade(&entry.id));
}
#[cfg(feature = "active-speaker")]
{
if !client.is_relay() {
let now_ms = self.now_ms();
self.detector.add_peer(*client.id, now_ms);
}
}
self.metrics.inc_client_connect();
self.metrics.inc_active_participants();
self.clients.push(client);
}
pub fn handle_incoming(
&mut self,
source: SocketAddr,
destination: SocketAddr,
payload: &[u8],
) -> bool {
let datagram = IncomingDatagram {
received_at: Instant::now(),
proto: SfuProtocol::Udp,
source,
destination,
contents: payload.to_vec(),
};
if let Some(client) = self.clients.iter_mut().find(|c| c.accepts(&datagram)) {
client.handle_input(datagram);
true
} else {
tracing::debug!(?source, "no client accepts udp datagram");
false
}
}
#[cfg(feature = "active-speaker")]
#[cfg_attr(docsrs, doc(cfg(feature = "active-speaker")))]
pub fn record_audio_level(&mut self, peer_id: u64, level_raw: u8, now: Instant) {
if self
.clients
.iter()
.any(|c| *c.id == peer_id && c.is_relay())
{
return;
}
let now_ms = now
.saturating_duration_since(self.detector_epoch)
.as_millis() as u64;
self.detector.record_level(peer_id, level_raw, now_ms);
}
#[cfg(feature = "active-speaker")]
fn now_ms(&self) -> u64 {
self.detector_epoch.elapsed().as_millis() as u64
}
#[cfg(feature = "active-speaker")]
#[cfg_attr(docsrs, doc(cfg(feature = "active-speaker")))]
#[must_use]
pub fn peer_audio_scores(&self) -> Vec<(u64, f64, f64, f64)> {
self.detector.peer_scores()
}
}
#[cfg(feature = "kalman-bwe")]
#[cfg_attr(docsrs, doc(cfg(feature = "kalman-bwe")))]
impl Registry {
pub fn on_twcc_feedback(
&mut self,
subscriber: crate::propagate::ClientId,
feedback: &crate::bwe::feedback::TwccFeedback,
now: std::time::Instant,
) {
self.bandwidth.on_twcc_feedback(subscriber, feedback, now);
}
}