mod connection_failure_detector;
mod flowing_detector;
mod quality_meter;
use std::{cell::RefCell, fmt::Debug, rc::Rc, sync::Arc, time::Duration};
use chrono::{DateTime, Utc};
use futures::{
channel::mpsc,
stream::{self, LocalBoxStream, StreamExt as _},
};
use medea_client_api_proto::{
stats::RtcStat, ConnectionQualityScore, MemberId, PeerConnectionState,
PeerId, RoomId,
};
use medea_macro::dispatchable;
use crate::{
api::control::callback::{MediaDirection, MediaType},
media::PeerStateMachine,
signalling::peers::{
metrics::{
connection_failure_detector::ConnectionFailureDetector,
flowing_detector::TrafficFlowDetector,
quality_meter::QualityMeterStatsHandler,
},
PeerTrafficWatcher,
},
};
#[dispatchable]
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum PeersMetricsEvent {
NoTrafficFlow {
peer_id: PeerId,
was_flowing_at: DateTime<Utc>,
media_type: MediaType,
direction: MediaDirection,
},
TrafficFlows {
peer_id: PeerId,
media_type: MediaType,
direction: MediaDirection,
},
QualityMeterUpdate {
member_id: MemberId,
partner_member_id: MemberId,
quality_score: ConnectionQualityScore,
},
PeerConnectionFailed {
peer_id: PeerId,
},
}
#[cfg_attr(test, mockall::automock)]
pub trait RtcStatsHandler: Debug {
fn register_peer(&mut self, peer_id: &PeerStateMachine);
fn unregister_peers(&mut self, peers_ids: &[PeerId]);
fn update_peer(&mut self, peer: &PeerStateMachine);
fn check(&mut self);
fn add_stats(&mut self, peer_id: PeerId, stats: &[RtcStat]);
fn update_peer_connection_state(
&mut self,
peer_id: PeerId,
state: PeerConnectionState,
);
fn subscribe(&mut self) -> LocalBoxStream<'static, PeersMetricsEvent>;
}
#[cfg(test)]
impl_debug_by_struct_name!(MockRtcStatsHandler);
#[derive(Debug)]
pub struct PeerMetricsService {
event_tx: EventSender,
handlers: Vec<Box<dyn RtcStatsHandler>>,
}
impl PeerMetricsService {
pub fn new(
room_id: RoomId,
peers_traffic_watcher: Arc<dyn PeerTrafficWatcher>,
stats_ttl: Duration,
) -> Self {
let event_tx = EventSender::new();
let handlers: Vec<Box<dyn RtcStatsHandler>> = vec![
Box::new(TrafficFlowDetector::new(
room_id,
peers_traffic_watcher,
stats_ttl,
)),
Box::new(QualityMeterStatsHandler::new()),
Box::new(ConnectionFailureDetector::new()),
];
Self { event_tx, handlers }
}
}
impl RtcStatsHandler for PeerMetricsService {
fn register_peer(&mut self, peer: &PeerStateMachine) {
for handler in &mut self.handlers {
handler.register_peer(peer);
}
}
fn unregister_peers(&mut self, peers_ids: &[PeerId]) {
for handler in &mut self.handlers {
handler.unregister_peers(peers_ids);
}
}
fn update_peer(&mut self, peer: &PeerStateMachine) {
for handler in &mut self.handlers {
handler.update_peer(peer);
}
}
fn check(&mut self) {
for handler in &mut self.handlers {
handler.check();
}
}
fn add_stats(&mut self, peer_id: PeerId, stats: &[RtcStat]) {
for handler in &mut self.handlers {
handler.add_stats(peer_id, stats);
}
}
fn update_peer_connection_state(
&mut self,
peer_id: PeerId,
state: PeerConnectionState,
) {
for handler in &mut self.handlers {
handler.update_peer_connection_state(peer_id, state);
}
}
fn subscribe(&mut self) -> LocalBoxStream<'static, PeersMetricsEvent> {
stream::select_all(
self.handlers.iter_mut().map(|handler| handler.subscribe()),
)
.boxed_local()
}
}
#[derive(Debug, Clone)]
struct EventSender(
Rc<RefCell<Option<mpsc::UnboundedSender<PeersMetricsEvent>>>>,
);
impl EventSender {
fn new() -> Self {
Self(Rc::default())
}
fn send_event(&self, event: PeersMetricsEvent) {
if let Some(tx) = self.0.borrow().as_ref() {
let _ = tx.unbounded_send(event);
}
}
fn subscribe(&self) -> LocalBoxStream<'static, PeersMetricsEvent> {
let (tx, rx) = mpsc::unbounded();
self.0.borrow_mut().replace(tx);
Box::pin(rx)
}
}