use std::{
collections::{HashMap, HashSet},
fmt::Debug,
sync::Arc,
time::{Duration, Instant},
};
use actix::{
Actor, Addr, AsyncContext, Handler, MailboxError, Message, SpawnHandle,
};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use medea_client_api_proto::{PeerId, RoomId};
use crate::{conf, log::prelude::*, utils::instant_into_utc};
#[cfg_attr(test, mockall::automock)]
pub trait PeerConnectionStateEventsHandler: Send + Debug {
fn peer_started(&self, peer_id: PeerId);
fn peer_stopped(&self, peer_id: PeerId, at: DateTime<Utc>);
}
#[cfg(test)]
impl_debug_by_struct_name!(MockPeerConnectionStateEventsHandler);
#[cfg(test)]
pub fn build_peers_traffic_watcher(
conf: &conf::Media,
) -> Arc<dyn PeerTrafficWatcher> {
Arc::new(PeersTrafficWatcherImpl::new(conf).start())
}
#[cfg(not(test))]
#[must_use]
pub fn build_peers_traffic_watcher(
_: &conf::Media,
) -> Arc<dyn PeerTrafficWatcher> {
#[derive(Debug)]
struct DummyPeerTrafficWatcher;
#[async_trait(?Send)]
impl PeerTrafficWatcher for DummyPeerTrafficWatcher {
async fn register_room(
&self,
_: RoomId,
_: Box<dyn PeerConnectionStateEventsHandler>,
) -> Result<(), MailboxError> {
Ok(())
}
fn unregister_room(&self, _: RoomId) {}
async fn register_peer(
&self,
_: RoomId,
_: PeerId,
_: bool,
) -> Result<(), MailboxError> {
Ok(())
}
fn unregister_peers(&self, _: RoomId, _: Vec<PeerId>) {}
fn traffic_flows(&self, _: RoomId, _: PeerId, _: FlowMetricSource) {}
fn traffic_stopped(&self, _: RoomId, _: PeerId, _: Instant) {}
}
Arc::new(DummyPeerTrafficWatcher)
}
#[async_trait(?Send)]
#[cfg_attr(test, mockall::automock)]
pub trait PeerTrafficWatcher: Debug + Send + Sync {
async fn register_room(
&self,
room_id: RoomId,
handler: Box<dyn PeerConnectionStateEventsHandler>,
) -> Result<(), MailboxError>;
fn unregister_room(&self, room_id: RoomId);
async fn register_peer(
&self,
room_id: RoomId,
peer_id: PeerId,
should_watch_turn: bool,
) -> Result<(), MailboxError>;
fn unregister_peers(&self, room_id: RoomId, peers_ids: Vec<PeerId>);
fn traffic_flows(
&self,
room_id: RoomId,
peer_id: PeerId,
source: FlowMetricSource,
);
fn traffic_stopped(&self, room_id: RoomId, peer_id: PeerId, at: Instant);
}
#[cfg(test)]
impl_debug_by_struct_name!(MockPeerTrafficWatcher);
fn build_flow_sources(should_watch_turn: bool) -> HashSet<FlowMetricSource> {
let mut sources =
hashset![FlowMetricSource::Peer, FlowMetricSource::PartnerPeer];
if should_watch_turn {
sources.insert(FlowMetricSource::Coturn);
}
sources
}
#[async_trait(?Send)]
impl PeerTrafficWatcher for Addr<PeersTrafficWatcherImpl> {
async fn register_room(
&self,
room_id: RoomId,
handler: Box<dyn PeerConnectionStateEventsHandler>,
) -> Result<(), MailboxError> {
self.send(RegisterRoom { room_id, handler }).await
}
fn unregister_room(&self, room_id: RoomId) {
self.do_send(UnregisterRoom(room_id))
}
async fn register_peer(
&self,
room_id: RoomId,
peer_id: PeerId,
should_watch_turn: bool,
) -> Result<(), MailboxError> {
self.send(RegisterPeer {
room_id,
peer_id,
flow_metrics_sources: build_flow_sources(should_watch_turn),
})
.await
}
fn unregister_peers(&self, room_id: RoomId, peers_ids: Vec<PeerId>) {
self.do_send(UnregisterPeers { room_id, peers_ids })
}
fn traffic_flows(
&self,
room_id: RoomId,
peer_id: PeerId,
source: FlowMetricSource,
) {
debug!("TrafficFlows: in {}/{} from {:?}", room_id, peer_id, source);
self.do_send(TrafficFlows {
room_id,
peer_id,
source,
})
}
fn traffic_stopped(&self, room_id: RoomId, peer_id: PeerId, at: Instant) {
debug!("TrafficStopped: in {}/{}", room_id, peer_id);
self.do_send(TrafficStopped {
room_id,
peer_id,
at,
})
}
}
#[derive(Debug, Default)]
pub struct PeersTrafficWatcherImpl {
stats: HashMap<RoomId, RoomStats>,
traffic_report_ttl: Duration,
init_timeout: Duration,
}
impl PeersTrafficWatcherImpl {
pub fn new(conf: &conf::Media) -> Self {
Self {
stats: HashMap::new(),
traffic_report_ttl: conf.max_lag,
init_timeout: conf.init_timeout,
}
}
fn check_is_started(&mut self, room_id: &RoomId, peer_id: PeerId) {
if let Some(room) = self.stats.get_mut(room_id) {
if let Some(peer) = room.peers.get_mut(&peer_id) {
if peer.state == PeerState::Starting {
if peer.is_flowing() {
peer.state = PeerState::Started;
} else {
peer.stop();
let at = peer.started_at.unwrap_or_else(Utc::now);
room.handler.peer_stopped(peer_id, at);
}
};
}
}
}
}
impl Actor for PeersTrafficWatcherImpl {
type Context = actix::Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
ctx.run_interval(Duration::from_secs(1), |this, _| {
for room in this.stats.values_mut() {
for peer in room.peers.values_mut() {
if peer.state == PeerState::Started && !peer.is_flowing() {
peer.stop();
room.handler.peer_stopped(
peer.peer_id,
instant_into_utc(Instant::now()),
);
}
}
}
});
}
}
#[derive(Debug, Message)]
#[rtype(result = "()")]
struct TrafficFlows {
room_id: RoomId,
peer_id: PeerId,
source: FlowMetricSource,
}
impl Handler<TrafficFlows> for PeersTrafficWatcherImpl {
type Result = ();
fn handle(
&mut self,
msg: TrafficFlows,
ctx: &mut Self::Context,
) -> Self::Result {
if let Some(room) = self.stats.get_mut(&msg.room_id) {
if let Some(peer) = room.peers.get_mut(&msg.peer_id) {
peer.received_sources.insert(msg.source, Instant::now());
match &mut peer.state {
PeerState::New => {
peer.state = PeerState::Starting;
peer.started_at = Some(Utc::now());
room.handler.peer_started(peer.peer_id);
let init_check_task_handle =
ctx.run_later(self.init_timeout, move |this, _| {
this.check_is_started(
&msg.room_id,
msg.peer_id,
);
});
peer.init_task_handler.replace(init_check_task_handle);
}
PeerState::Starting => {
if peer.state == PeerState::Starting
&& peer.is_flowing()
{
peer.state = PeerState::Started;
peer.init_task_handler.take();
};
}
PeerState::Stopped => {
if peer.is_flowing() {
peer.state = PeerState::Started;
peer.started_at = Some(Utc::now());
room.handler.peer_started(peer.peer_id);
}
}
_ => (),
}
}
}
}
}
#[derive(Debug, Message)]
#[rtype(result = "()")]
struct TrafficStopped {
room_id: RoomId,
peer_id: PeerId,
at: Instant,
}
impl Handler<TrafficStopped> for PeersTrafficWatcherImpl {
type Result = ();
fn handle(
&mut self,
msg: TrafficStopped,
_: &mut Self::Context,
) -> Self::Result {
if let Some(room) = self.stats.get_mut(&msg.room_id) {
if let Some(peer) = room.peers.get_mut(&msg.peer_id) {
if peer.state != PeerState::Stopped {
peer.stop();
room.handler
.peer_stopped(peer.peer_id, instant_into_utc(msg.at));
}
}
}
}
}
#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
pub enum FlowMetricSource {
PartnerPeer,
Peer,
Coturn,
}
#[derive(Clone, Copy, Debug, PartialEq)]
enum PeerState {
New,
Starting,
Started,
Stopped,
}
#[derive(Debug)]
struct PeerStat {
peer_id: PeerId,
state: PeerState,
init_task_handler: Option<SpawnHandle>,
tracked_sources: HashSet<FlowMetricSource>,
started_at: Option<DateTime<Utc>>,
received_sources: HashMap<FlowMetricSource, Instant>,
traffic_flowing_timeout: Duration,
}
impl PeerStat {
fn is_flowing(&self) -> bool {
for tracked_source in &self.tracked_sources {
if let Some(at) = self.received_sources.get(tracked_source) {
if at.elapsed() > self.traffic_flowing_timeout {
return false;
}
} else {
return false;
}
}
true
}
fn stop(&mut self) {
self.state = PeerState::Stopped;
self.received_sources.clear();
}
}
#[derive(Debug)]
struct RoomStats {
room_id: RoomId,
handler: Box<dyn PeerConnectionStateEventsHandler>,
peers: HashMap<PeerId, PeerStat>,
}
#[derive(Debug, Message)]
#[rtype(result = "()")]
struct RegisterRoom {
room_id: RoomId,
handler: Box<dyn PeerConnectionStateEventsHandler>,
}
impl Handler<RegisterRoom> for PeersTrafficWatcherImpl {
type Result = ();
fn handle(
&mut self,
msg: RegisterRoom,
_: &mut Self::Context,
) -> Self::Result {
debug!(
"Room [id = {}] was registered in the PeersTrafficWatcher.",
msg.room_id
);
self.stats.insert(
msg.room_id.clone(),
RoomStats {
room_id: msg.room_id,
handler: msg.handler,
peers: HashMap::new(),
},
);
}
}
#[derive(Debug, Message)]
#[rtype(result = "()")]
struct UnregisterRoom(pub RoomId);
impl Handler<UnregisterRoom> for PeersTrafficWatcherImpl {
type Result = ();
fn handle(
&mut self,
msg: UnregisterRoom,
_: &mut Self::Context,
) -> Self::Result {
if self.stats.remove(&msg.0).is_some() {
debug!(
"Room [id = {}] was unregistered in the PeersTrafficWatcher.",
msg.0
);
};
}
}
#[derive(Debug, Message)]
#[rtype(result = "()")]
struct RegisterPeer {
room_id: RoomId,
peer_id: PeerId,
flow_metrics_sources: HashSet<FlowMetricSource>,
}
impl Handler<RegisterPeer> for PeersTrafficWatcherImpl {
type Result = ();
fn handle(
&mut self,
msg: RegisterPeer,
_: &mut Self::Context,
) -> Self::Result {
if let Some(room) = self.stats.get_mut(&msg.room_id) {
if let Some(peer) = room.peers.get_mut(&msg.peer_id) {
peer.tracked_sources.extend(msg.flow_metrics_sources);
} else {
debug!(
"Peer [id = {}] from a Room [id = {}] was registered in \
the PeersTrafficWatcher with {:?} sources.",
msg.peer_id, msg.room_id, msg.flow_metrics_sources
);
room.peers.insert(
msg.peer_id,
PeerStat {
peer_id: msg.peer_id,
state: PeerState::New,
init_task_handler: None,
tracked_sources: msg.flow_metrics_sources,
started_at: None,
received_sources: HashMap::new(),
traffic_flowing_timeout: self.traffic_report_ttl,
},
);
}
}
}
}
#[derive(Debug, Message)]
#[rtype(result = "()")]
struct UnregisterPeers {
room_id: RoomId,
peers_ids: Vec<PeerId>,
}
impl Handler<UnregisterPeers> for PeersTrafficWatcherImpl {
type Result = ();
fn handle(
&mut self,
msg: UnregisterPeers,
_: &mut Self::Context,
) -> Self::Result {
if let Some(room_stats) = self.stats.get_mut(&msg.room_id) {
let room_id = msg.room_id;
for peer_id in msg.peers_ids {
if room_stats.peers.remove(&peer_id).is_some() {
debug!(
"Peer [id = {}] from a Room [id = {}] was \
unregistered in the PeersTrafficWatcher.",
peer_id, room_id,
);
};
}
}
}
}
#[cfg(test)]
mod tests {
use futures::{channel::mpsc, stream::LocalBoxStream, StreamExt};
use tokio::time::timeout;
use super::*;
struct Helper {
peer_stopped_rx: LocalBoxStream<'static, (PeerId, DateTime<Utc>)>,
peer_started_rx: LocalBoxStream<'static, PeerId>,
traffic_watcher: Addr<PeersTrafficWatcherImpl>,
}
impl Helper {
pub async fn new(cfg: &conf::Media) -> Self {
let watcher = PeersTrafficWatcherImpl::new(cfg).start();
let mut handler = MockPeerConnectionStateEventsHandler::new();
let (peer_stopped_tx, peer_stopped_rx) = mpsc::unbounded();
let (peer_started_tx, peer_started_rx) = mpsc::unbounded();
handler.expect_peer_stopped().returning(move |peer_id, at| {
peer_stopped_tx.unbounded_send((peer_id, at)).unwrap();
});
handler.expect_peer_started().returning(move |peer_id| {
peer_started_tx.unbounded_send(peer_id).unwrap();
});
watcher
.register_room(Self::room_id(), Box::new(handler))
.await
.unwrap();
Self {
traffic_watcher: watcher,
peer_started_rx: Box::pin(peer_started_rx),
peer_stopped_rx: Box::pin(peer_stopped_rx),
}
}
fn room_id() -> RoomId {
RoomId::from("test-room")
}
pub fn watcher(&self) -> Addr<PeersTrafficWatcherImpl> {
self.traffic_watcher.clone()
}
pub async fn next_peer_stopped(&mut self) -> (PeerId, DateTime<Utc>) {
self.peer_stopped_rx.next().await.unwrap()
}
pub async fn next_peer_started(&mut self) -> PeerId {
self.peer_started_rx.next().await.unwrap()
}
}
#[actix_rt::test]
async fn correct_stopped_at_when_init_timeout_stop() {
let mut helper = Helper::new(&conf::Media {
init_timeout: Duration::from_millis(100),
max_lag: Duration::from_secs(999),
})
.await;
helper
.watcher()
.register_peer(Helper::room_id(), PeerId(1), false)
.await
.unwrap();
helper.watcher().traffic_flows(
Helper::room_id(),
PeerId(1),
FlowMetricSource::Peer,
);
assert_eq!(helper.next_peer_started().await, PeerId(1));
let start_time = Utc::now();
let (_, at) = helper.next_peer_stopped().await;
assert_eq!(at.timestamp() / 10, start_time.timestamp() / 10);
}
async fn stop_on_max_lag_helper() -> Helper {
let mut helper = Helper::new(&conf::Media {
init_timeout: Duration::from_secs(999),
max_lag: Duration::from_millis(50),
})
.await;
helper
.watcher()
.register_peer(Helper::room_id(), PeerId(1), false)
.await
.unwrap();
helper.watcher().traffic_flows(
Helper::room_id(),
PeerId(1),
FlowMetricSource::Peer,
);
helper.watcher().traffic_flows(
Helper::room_id(),
PeerId(1),
FlowMetricSource::PartnerPeer,
);
timeout(Duration::from_millis(30), helper.next_peer_started())
.await
.unwrap();
timeout(Duration::from_millis(1100), helper.next_peer_stopped())
.await
.unwrap();
helper
}
#[actix_rt::test]
async fn stop_on_max_lag() {
stop_on_max_lag_helper().await;
}
#[actix_rt::test]
async fn start_after_stop_on_max_lag() {
let mut helper = stop_on_max_lag_helper().await;
helper.watcher().traffic_flows(
Helper::room_id(),
PeerId(1),
FlowMetricSource::Peer,
);
timeout(Duration::from_millis(30), helper.next_peer_started())
.await
.unwrap_err();
helper.watcher().traffic_flows(
Helper::room_id(),
PeerId(1),
FlowMetricSource::PartnerPeer,
);
timeout(Duration::from_millis(30), helper.next_peer_started())
.await
.unwrap();
}
async fn init_timeout_tests_helper(
should_watch_turn: bool,
traffic_flows_invocations: &[FlowMetricSource],
should_start: bool,
should_stop: bool,
) -> Helper {
let mut helper = Helper::new(&conf::Media {
init_timeout: Duration::from_millis(30),
max_lag: Duration::from_secs(999),
})
.await;
helper
.watcher()
.register_peer(Helper::room_id(), PeerId(1), should_watch_turn)
.await
.unwrap();
for source in traffic_flows_invocations {
helper.watcher().traffic_flows(
Helper::room_id(),
PeerId(1),
*source,
);
}
let peer_started =
timeout(Duration::from_millis(50), helper.next_peer_started())
.await;
if should_start {
peer_started.unwrap();
} else {
peer_started.unwrap_err();
}
let peer_stopped =
timeout(Duration::from_millis(50), helper.next_peer_stopped())
.await;
if should_stop {
peer_stopped.unwrap();
} else {
peer_stopped.unwrap_err();
};
helper
}
#[actix_rt::test]
async fn init_timeout_tests() {
use FlowMetricSource::{Coturn, PartnerPeer, Peer};
init_timeout_tests_helper(false, &[], false, false).await;
init_timeout_tests_helper(false, &[Peer], true, true).await;
init_timeout_tests_helper(false, &[Peer, Peer], true, true).await;
init_timeout_tests_helper(false, &[Peer, Coturn], true, true).await;
init_timeout_tests_helper(true, &[Peer, PartnerPeer], true, true).await;
init_timeout_tests_helper(false, &[Peer, PartnerPeer], true, false)
.await;
init_timeout_tests_helper(
true,
&[Peer, PartnerPeer, Coturn],
true,
false,
)
.await;
}
#[actix_rt::test]
async fn start_after_init_timeout_stop() {
let mut helper = init_timeout_tests_helper(
false,
&[FlowMetricSource::Peer],
true,
true,
)
.await;
helper.watcher().traffic_flows(
Helper::room_id(),
PeerId(1),
FlowMetricSource::Peer,
);
timeout(Duration::from_millis(30), helper.next_peer_started())
.await
.unwrap_err();
helper.watcher().traffic_flows(
Helper::room_id(),
PeerId(1),
FlowMetricSource::PartnerPeer,
);
timeout(Duration::from_millis(30), helper.next_peer_started())
.await
.unwrap();
}
#[actix_rt::test]
async fn peer_stop_when_traffic_stop() {
{
let mut helper = init_timeout_tests_helper(
false,
&[FlowMetricSource::Peer, FlowMetricSource::PartnerPeer],
true,
false,
)
.await;
helper.watcher().traffic_stopped(
Helper::room_id(),
PeerId(1),
Instant::now(),
);
timeout(Duration::from_millis(10), helper.next_peer_stopped())
.await
.unwrap();
}
{
let mut helper = Helper::new(&conf::Media {
init_timeout: Duration::from_secs(999),
max_lag: Duration::from_secs(999),
})
.await;
helper
.watcher()
.register_peer(Helper::room_id(), PeerId(1), false)
.await
.unwrap();
helper.watcher().traffic_flows(
Helper::room_id(),
PeerId(1),
FlowMetricSource::Peer,
);
timeout(Duration::from_millis(10), helper.next_peer_started())
.await
.unwrap();
helper.watcher().traffic_stopped(
Helper::room_id(),
PeerId(1),
Instant::now(),
);
timeout(Duration::from_millis(10), helper.next_peer_stopped())
.await
.unwrap();
}
{
let mut helper = init_timeout_tests_helper(
false,
&[FlowMetricSource::Peer],
true,
true,
)
.await;
helper.watcher().traffic_stopped(
Helper::room_id(),
PeerId(1),
Instant::now(),
);
timeout(Duration::from_millis(10), helper.next_peer_stopped())
.await
.unwrap_err();
}
}
}