use std::collections::VecDeque;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Instant;
use crate::client::Client;
use crate::metrics::SfuMetrics;
use crate::net::{IncomingDatagram, SfuProtocol};
use crate::propagate::Propagated;
mod drive;
mod lifecycle;
#[cfg(any(test, feature = "test-utils"))]
mod test_seams;
#[derive(Debug)]
pub struct Registry {
pub(super) clients: Vec<Client>,
pub(super) to_propagate: VecDeque<Propagated>,
pub(super) metrics: Arc<SfuMetrics>,
#[cfg(feature = "active-speaker")]
pub(super) detector: dominant_speaker::ActiveSpeakerDetector,
}
impl Registry {
pub fn new(metrics: Arc<SfuMetrics>) -> Self {
Self {
clients: Vec::new(),
to_propagate: VecDeque::new(),
metrics,
#[cfg(feature = "active-speaker")]
detector: dominant_speaker::ActiveSpeakerDetector::new(),
}
}
#[cfg(any(test, feature = "test-utils"))]
pub fn new_for_tests() -> Self {
Self::new(Arc::new(SfuMetrics::new_default()))
}
pub fn is_empty(&self) -> bool {
self.clients.is_empty()
}
pub fn len(&self) -> usize {
self.clients.len()
}
pub fn clients(&self) -> &[Client] {
&self.clients
}
pub fn insert(&mut self, mut client: Client) {
client.metrics = self.metrics.clone();
for entry in self.clients.iter().flat_map(|c| c.tracks_in.iter()) {
client.handle_track_open(std::sync::Arc::downgrade(&entry.id));
}
#[cfg(feature = "active-speaker")]
self.detector.add_peer(*client.id, Instant::now());
self.metrics.inc_client_connect();
self.metrics.inc_active_participants();
self.clients.push(client);
}
pub fn handle_incoming(
&mut self,
source: SocketAddr,
destination: SocketAddr,
payload: &[u8],
) -> bool {
let datagram = IncomingDatagram {
received_at: Instant::now(),
proto: SfuProtocol::Udp,
source,
destination,
contents: payload.to_vec(),
};
if let Some(client) = self.clients.iter_mut().find(|c| c.accepts(&datagram)) {
client.handle_input(datagram);
true
} else {
tracing::debug!(?source, "no client accepts udp datagram");
false
}
}
#[cfg(feature = "active-speaker")]
pub fn record_audio_level(&mut self, peer_id: u64, level_raw: u8, now: Instant) {
self.detector.record_level(peer_id, level_raw, now);
}
}