medea 0.2.0

Medea media server
Documentation
//! Service which is responsible for processing [`Peer`]s [`RtcStat`] metrics.
//!
//! 1. You should register [`Peer`] via [`RtcStatsHandler::register_peer`].
//! 2. Use [`RtcStatsHandler::subscribe`] to subscribe to stats processing
//!    results.
//! 3. Provide [`Peer`]'s metrics to [`RtcStatsHandler::add_stats`].
//! 4. Call [`RtcStatsHandler::check`] with reasonable interval
//!    (~1-2 sec), to check for stale metrics.
//!
//! Stores [`RtcStatsHandler`]s implementors.
//!
//! [`Peer`]: crate::media::peer::Peer

mod connection_failure_detector;
mod flowing_detector;
mod quality_meter;

use std::{cell::RefCell, fmt::Debug, rc::Rc, sync::Arc, time::Duration};

use chrono::{DateTime, Utc};
use futures::{
    channel::mpsc,
    stream::{self, LocalBoxStream, StreamExt as _},
};
use medea_client_api_proto::{
    stats::RtcStat, ConnectionQualityScore, MemberId, PeerConnectionState,
    PeerId, RoomId,
};
use medea_macro::dispatchable;

use crate::{
    api::control::callback::{MediaDirection, MediaType},
    media::PeerStateMachine,
    signalling::peers::{
        metrics::{
            connection_failure_detector::ConnectionFailureDetector,
            flowing_detector::TrafficFlowDetector,
            quality_meter::QualityMeterStatsHandler,
        },
        PeerTrafficWatcher,
    },
};

/// WebRTC statistics analysis results.
#[dispatchable]
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum PeersMetricsEvent {
    /// Some `MediaTrack`s with provided [`MediaType`] doesn't flows.
    NoTrafficFlow {
        peer_id: PeerId,
        was_flowing_at: DateTime<Utc>,
        media_type: MediaType,
        direction: MediaDirection,
    },

    /// Stopped `MediaTrack` with provided [`MediaType`] and [`MediaDirection`]
    /// was started after stopping.
    TrafficFlows {
        peer_id: PeerId,
        media_type: MediaType,
        direction: MediaDirection,
    },

    /// [`ConnectionQualityScore`] updated.
    QualityMeterUpdate {
        /// [`MemberId`] of the [`Peer`] which [`ConnectionQualityScore`]
        /// was updated.
        ///
        /// [`Peer`]: crate::media::peer::Peer
        member_id: MemberId,

        /// [`MemberId`] of the partner [`Peer`].
        ///
        /// [`Peer`]: crate::media::peer::Peer
        partner_member_id: MemberId,

        /// Actual [`ConnectionQualityScore`].
        quality_score: ConnectionQualityScore,
    },

    /// One or more of the ICE transports on the connection is in the `failed`
    /// state.
    PeerConnectionFailed {
        /// [`PeerId`] of `PeerConnection`.
        peer_id: PeerId,
    },
}

/// [`RtcStatsHandler`] performs [`RtcStat`]s analysis.
#[cfg_attr(test, mockall::automock)]
pub trait RtcStatsHandler: Debug {
    /// Acknowledge [`RtcStatsHandler`] that new `Peer` was created, so
    /// [`RtcStatsHandler`] should track its [`RtcStat`]s.
    fn register_peer(&mut self, peer_id: &PeerStateMachine);

    /// [`RtcStatsHandler`] should stop tracking provided [`Peer`]s.
    ///
    /// [`Peer`]: crate::media::peer::Peer
    fn unregister_peers(&mut self, peers_ids: &[PeerId]);

    /// [`RtcStatsHandler`] can update [`PeerStateMachine`]s internal
    /// representation.
    ///
    /// Must be called each time [`PeerStateMachine`] tracks set changes (some
    /// track was added or removed).
    fn update_peer(&mut self, peer: &PeerStateMachine);

    /// [`RtcStatsHandler`] can process all collected stats, re-calculate
    /// metrics and send [`PeersMetricsEvent`] (if it's needed).
    ///
    /// Will be called periodically by [`PeerMetricsService`].
    fn check(&mut self);

    /// [`PeerMetricsService`] provides new [`RtcStat`]s for the
    /// [`RtcStatsHandler`].
    fn add_stats(&mut self, peer_id: PeerId, stats: &[RtcStat]);

    /// [`PeerMetricsService`] provides [`PeerConnectionState`] update for the
    /// [`RtcStatsHandler`].
    fn update_peer_connection_state(
        &mut self,
        peer_id: PeerId,
        state: PeerConnectionState,
    );

    /// Returns [`Stream`] of [`PeersMetricsEvent`]s.
    ///
    /// Creating new subscription will invalidate previous, so there may be only
    /// one subscription. Events are not saved or buffered at sending side, so
    /// you won't receive any events happened before subscription was made.
    ///
    /// [`Stream`]: futures::stream::Stream
    fn subscribe(&mut self) -> LocalBoxStream<'static, PeersMetricsEvent>;
}

#[cfg(test)]
impl_debug_by_struct_name!(MockRtcStatsHandler);

/// Service which is responsible for processing [`Peer`]s [`RtcStat`] metrics.
///
/// [`Peer`]: crate::media::peer::Peer
#[derive(Debug)]
pub struct PeerMetricsService {
    /// Sender of the [`PeersMetricsEvent`]s.
    event_tx: EventSender,

    /// All [`RtcStatsHandler`]s registered in this [`PeerMetricsService`].
    handlers: Vec<Box<dyn RtcStatsHandler>>,
}

impl PeerMetricsService {
    /// Creates new [`PeerMetricsService`], registers all needed
    /// [`RtcStatsHandler`]s.
    pub fn new(
        room_id: RoomId,
        peers_traffic_watcher: Arc<dyn PeerTrafficWatcher>,
        stats_ttl: Duration,
    ) -> Self {
        let event_tx = EventSender::new();
        let handlers: Vec<Box<dyn RtcStatsHandler>> = vec![
            Box::new(TrafficFlowDetector::new(
                room_id,
                peers_traffic_watcher,
                stats_ttl,
            )),
            Box::new(QualityMeterStatsHandler::new()),
            Box::new(ConnectionFailureDetector::new()),
        ];

        Self { event_tx, handlers }
    }
}

impl RtcStatsHandler for PeerMetricsService {
    /// Calls [`RtcStatsHandler::register_peer`] on all registered
    /// [`RtcStatsHandler`]s.
    fn register_peer(&mut self, peer: &PeerStateMachine) {
        for handler in &mut self.handlers {
            handler.register_peer(peer);
        }
    }

    /// Calls [`RtcStatsHandler::unregister_peers`] on the all registered
    /// [`RtcStatsHandler`]s.
    fn unregister_peers(&mut self, peers_ids: &[PeerId]) {
        for handler in &mut self.handlers {
            handler.unregister_peers(peers_ids);
        }
    }

    /// Calls [`RtcStatsHandler::update_peer`] on the all registered
    /// [`RtcStatsHandler`]s.
    fn update_peer(&mut self, peer: &PeerStateMachine) {
        for handler in &mut self.handlers {
            handler.update_peer(peer);
        }
    }

    /// Calls [`RtcStatsHandler::check`] on the all registered
    /// [`RtcStatsHandler`]s.
    fn check(&mut self) {
        for handler in &mut self.handlers {
            handler.check();
        }
    }

    /// Calls [`RtcStatsHandler::add_stats`] on the all registered
    /// [`RtcStatsHandler`]s.
    fn add_stats(&mut self, peer_id: PeerId, stats: &[RtcStat]) {
        for handler in &mut self.handlers {
            handler.add_stats(peer_id, stats);
        }
    }

    /// Calls [`RtcStatsHandler::update_peer_connection_state`] on the
    /// registered [`RtcStatsHandler`]s,
    fn update_peer_connection_state(
        &mut self,
        peer_id: PeerId,
        state: PeerConnectionState,
    ) {
        for handler in &mut self.handlers {
            handler.update_peer_connection_state(peer_id, state);
        }
    }

    /// Calls [`RtcStatsHandler::subscribe`] on the all registered
    /// [`RtcStatsHandler`]s returning merged stream.
    ///
    /// Creating new subscription will invalidate previous, so there may be only
    /// one subscription. Events are not saved or buffered at sending side, so
    /// you won't receive any events happened before subscription was made.
    fn subscribe(&mut self) -> LocalBoxStream<'static, PeersMetricsEvent> {
        stream::select_all(
            self.handlers.iter_mut().map(|handler| handler.subscribe()),
        )
        .boxed_local()
    }
}

/// [`PeersMetricsEvent`]s sender.
#[derive(Debug, Clone)]
struct EventSender(
    Rc<RefCell<Option<mpsc::UnboundedSender<PeersMetricsEvent>>>>,
);

impl EventSender {
    /// Returns new [`EventSender`].
    fn new() -> Self {
        Self(Rc::default())
    }

    /// Tries to send provided [`PeersMetricsEvent`] to the subscriber.
    ///
    /// If no one subscribed, then no-op.
    fn send_event(&self, event: PeersMetricsEvent) {
        if let Some(tx) = self.0.borrow().as_ref() {
            let _ = tx.unbounded_send(event);
        }
    }

    /// Returns [`Stream`] of [`PeersMetricsEvent`]s.
    ///
    /// Creating new subscription will invalidate previous, so there may be only
    /// one subscription. Events are not saved or buffered at sending side, so
    /// you won't receive any events happened before subscription was made.
    ///
    /// [`Stream`]: futures::stream::Stream
    fn subscribe(&self) -> LocalBoxStream<'static, PeersMetricsEvent> {
        let (tx, rx) = mpsc::unbounded();
        self.0.borrow_mut().replace(tx);
        Box::pin(rx)
    }
}