use std::collections::{HashSet, VecDeque};
use std::sync::atomic::AtomicU64;
use std::sync::{Arc, Weak};
use std::time::Instant;
use str0m::media::{KeyframeRequestKind, MediaData, MediaKind, Mid, Rid};
use str0m::{Event, IceConnectionState, Output, Rtc};
use crate::ids::SfuRid;
use crate::metrics::SfuMetrics;
use crate::net::{IncomingDatagram, OutgoingDatagram};
use crate::propagate::{ClientId, Propagated};
pub mod accessors;
pub mod construct;
pub mod fanout;
pub mod keyframe;
pub mod layer;
pub mod stats;
#[cfg(any(test, feature = "test-utils"))]
pub mod test_seed;
pub mod tracks;
pub use tracks::TrackIn;
use tracks::{TrackInEntry, TrackOut, TrackOutState};
#[derive(Debug)]
pub struct Client {
pub id: ClientId,
pub(crate) origin: crate::origin::ClientOrigin,
pub(crate) rtc: Rtc,
pub(crate) tracks_in: Vec<TrackInEntry>,
pub(crate) tracks_out: Vec<TrackOut>,
pub(crate) chosen_rid: Option<Rid>,
pub(crate) desired_layer: SfuRid,
pub(crate) active_rids: HashSet<SfuRid>,
pub(crate) pending_out: VecDeque<str0m::net::Transmit>,
pub(crate) metrics: Arc<SfuMetrics>,
pub(crate) delivered_media: AtomicU64,
#[cfg(any(test, feature = "test-utils"))]
pub(crate) delivered_active_speaker: AtomicU64,
#[cfg(feature = "pacer")]
pub(crate) pacer: crate::bwe::SubscriberPacer,
#[cfg(feature = "av1-dd")]
pub(crate) max_temporal_layer: u8,
#[cfg(feature = "vfm")]
pub(crate) max_vfm_temporal_layer: u8,
}
impl Client {
pub fn handle_input(&mut self, datagram: IncomingDatagram) {
if !self.rtc.is_alive() {
return;
}
let contents = match (&datagram.contents[..]).try_into() {
Ok(c) => c,
Err(_) => {
tracing::debug!(client = *self.id, "ignoring empty or invalid datagram");
return;
}
};
let input = str0m::Input::Receive(
datagram.received_at,
str0m::net::Receive {
proto: datagram.proto.to_str0m(),
source: datagram.source,
destination: datagram.destination,
contents,
},
);
if let Err(e) = self.rtc.handle_input(input) {
tracing::warn!(client = *self.id, error = ?e, "client disconnected on handle_input");
self.rtc.disconnect();
}
}
pub(crate) fn handle_timeout(&mut self, at: Instant) {
if !self.rtc.is_alive() {
return;
}
if let Err(e) = self.rtc.handle_input(str0m::Input::Timeout(at)) {
tracing::warn!(client = *self.id, error = ?e, "client disconnected on timeout");
self.rtc.disconnect();
}
}
pub fn poll_output(&mut self) -> Propagated {
if !self.rtc.is_alive() {
return Propagated::Noop;
}
match self.rtc.poll_output() {
Ok(output) => self.handle_output(output),
Err(e) => {
tracing::warn!(client = *self.id, error = ?e, "poll_output failed");
self.rtc.disconnect();
Propagated::Noop
}
}
}
fn handle_output(&mut self, output: Output) -> Propagated {
match output {
Output::Transmit(t) => {
self.pending_out.push_back(t);
Propagated::Noop
}
Output::Timeout(t) => Propagated::Timeout(t),
Output::Event(e) => self.handle_event(e),
}
}
fn handle_event(&mut self, event: Event) -> Propagated {
match event {
Event::IceConnectionStateChange(IceConnectionState::Disconnected) => {
self.rtc.disconnect();
Propagated::Noop
}
Event::MediaAdded(m) => self.track_in_added(m.mid, m.kind),
Event::MediaData(data) => self.track_in_media(data),
Event::KeyframeRequest(req) => self.incoming_keyframe_req(req),
Event::EgressBitrateEstimate(bwe) => stats::propagate_bwe(self.id, bwe),
Event::PeerStats(s) => stats::propagate_peer_stats(self.id, s),
_ => Propagated::Noop,
}
}
fn track_in_added(&mut self, mid: Mid, kind: MediaKind) -> Propagated {
let entry = TrackInEntry {
id: Arc::new(TrackIn {
origin: self.id,
mid,
kind,
relay_source: self.is_relay(),
}),
last_keyframe_request: None,
};
let weak = Arc::downgrade(&entry.id);
self.tracks_in.push(entry);
Propagated::TrackOpen(self.id, weak)
}
fn track_in_media(&mut self, data: MediaData) -> Propagated {
if !data.contiguous {
self.request_keyframe_throttled(data.mid, data.rid, KeyframeRequestKind::Fir);
}
if let Some(rid) = data.rid {
self.active_rids.insert(SfuRid::from_str0m(rid));
}
Propagated::MediaData(self.id, crate::media::SfuMediaPayload::from_str0m(data))
}
pub fn handle_track_open(&mut self, track_in: Weak<TrackIn>) {
self.tracks_out.push(TrackOut {
track_in,
state: TrackOutState::ToOpen,
});
}
pub fn drain_pending_out(&mut self) -> impl Iterator<Item = OutgoingDatagram> + '_ {
std::mem::take(&mut self.pending_out)
.into_iter()
.map(OutgoingDatagram::from_transmit)
}
}