medea/signalling/peers/
traffic_watcher.rs

1//! Provides [`PeerTrafficWatcher`] trait and its impl.
2//!
3//! [`PeerTrafficWatcher`] analyzes `Peer` traffic metrics and notifies
4//! [`PeerConnectionStateEventsHandler`] about traffic flowing changes.
5//!
6//! Traffic metrics, consumed by [`PeerTrafficWatcher`] can originate from
7//! different sources:
8//! 1. [`FlowMetricSource::Peer`] - Stats received from member that owns target
9//!    `Peer`.
10//! 2. [`FlowMetricSource::PartnerPeer`] - Stats received from member,
11//!    that owns `Peer`, connected to target `Peer`.
12//! 3. [`FlowMetricSource::Coturn`] - Stats reported by Coturn TURN server, this
13//! source is only being tracked if target `Peer` traffic is being relayed.
14//!
15//! At first you should register [`Room`] (`PeerTrafficWatcher.register_room()`)
16//! and `Peer` (`PeerTrafficWatcher.register_peer()`). When first source will
17//! report that traffic is flowing (`PeerTrafficWatcher.traffic_flows()`)
18//! [`PeerConnectionStateEventsHandler::peer_started`] will be called.
19//!
20//! After that [`PeerTrafficWatcher`] will wait for other sources to report that
21//! traffic is flowing for `init_timeout`, or
22//! [`PeerConnectionStateEventsHandler::peer_stopped`] will be called.
23//!
24//! If some source will report that it observes traffic stopped flowing
25//! (`PeerTrafficWatcher.traffic_stopped()`), then
26//! [`PeerConnectionStateEventsHandler::peer_stopped`] will be called.
27//!
28//! [`Room`]: crate::signalling::room::Room
29
30use std::{
31    collections::{HashMap, HashSet},
32    fmt::Debug,
33    sync::Arc,
34    time::{Duration, Instant},
35};
36
37use actix::{
38    Actor, Addr, AsyncContext, Handler, MailboxError, Message, SpawnHandle,
39};
40use async_trait::async_trait;
41use chrono::{DateTime, Utc};
42use medea_client_api_proto::{PeerId, RoomId};
43
44use crate::{conf, log::prelude::*, utils::instant_into_utc};
45
46/// Subscriber of `Peer` traffic flowing changes.
47#[cfg_attr(test, mockall::automock)]
48pub trait PeerConnectionStateEventsHandler: Send + Debug {
49    /// [`PeerTrafficWatcher`] believes that traffic was started.
50    fn peer_started(&self, peer_id: PeerId);
51
52    /// [`PeerTrafficWatcher`] believes that traffic was stopped.
53    fn peer_stopped(&self, peer_id: PeerId, at: DateTime<Utc>);
54}
55
56#[cfg(test)]
57impl_debug_by_struct_name!(MockPeerConnectionStateEventsHandler);
58
59/// Builds [`PeerTrafficWatcher`].
60#[cfg(test)]
61pub fn build_peers_traffic_watcher(
62    conf: &conf::Media,
63) -> Arc<dyn PeerTrafficWatcher> {
64    Arc::new(PeersTrafficWatcherImpl::new(conf).start())
65}
66
67// TODO: Returns dummy implementation cause this component is currently
68//       disabled.
69//       Will be enabled in https://github.com/instrumentisto/medea/pull/91
70/// Builds [`PeerTrafficWatcher`].
71#[cfg(not(test))]
72#[must_use]
73pub fn build_peers_traffic_watcher(
74    _: &conf::Media,
75) -> Arc<dyn PeerTrafficWatcher> {
76    #[derive(Debug)]
77    struct DummyPeerTrafficWatcher;
78
79    #[async_trait(?Send)]
80    impl PeerTrafficWatcher for DummyPeerTrafficWatcher {
81        async fn register_room(
82            &self,
83            _: RoomId,
84            _: Box<dyn PeerConnectionStateEventsHandler>,
85        ) -> Result<(), MailboxError> {
86            Ok(())
87        }
88
89        fn unregister_room(&self, _: RoomId) {}
90
91        async fn register_peer(
92            &self,
93            _: RoomId,
94            _: PeerId,
95            _: bool,
96        ) -> Result<(), MailboxError> {
97            Ok(())
98        }
99
100        fn unregister_peers(&self, _: RoomId, _: Vec<PeerId>) {}
101
102        fn traffic_flows(&self, _: RoomId, _: PeerId, _: FlowMetricSource) {}
103
104        fn traffic_stopped(&self, _: RoomId, _: PeerId, _: Instant) {}
105    }
106    Arc::new(DummyPeerTrafficWatcher)
107}
108
109/// Consumes `Peer` traffic metrics for further processing.
110#[async_trait(?Send)]
111#[cfg_attr(test, mockall::automock)]
112pub trait PeerTrafficWatcher: Debug + Send + Sync {
113    /// Registers provided [`PeerConnectionStateEventsHandler`] as `Peer`s state
114    /// messages listener, preparing [`PeerTrafficWatcher`] for registering
115    /// `Peer`s from this [`PeerConnectionStateEventsHandler`].
116    async fn register_room(
117        &self,
118        room_id: RoomId,
119        handler: Box<dyn PeerConnectionStateEventsHandler>,
120    ) -> Result<(), MailboxError>;
121
122    /// Unregisters [`Room`] as `Peer`s state messages listener.
123    ///
124    /// All `Peer` subscriptions related to this [`Room`] will be removed.
125    ///
126    /// [`Room`]: crate::signalling::room::Room
127    fn unregister_room(&self, room_id: RoomId);
128
129    /// Registers `Peer`, so that [`PeerTrafficWatcher`] will be able to
130    /// process traffic flow events of this `Peer`.
131    async fn register_peer(
132        &self,
133        room_id: RoomId,
134        peer_id: PeerId,
135        should_watch_turn: bool,
136    ) -> Result<(), MailboxError>;
137
138    /// Unregisters `Peer`s, so that [`PeerTrafficWatcher`] will not be able
139    /// to process traffic flow events of this `Peer` anymore.
140    fn unregister_peers(&self, room_id: RoomId, peers_ids: Vec<PeerId>);
141
142    /// Notifies [`PeerTrafficWatcher`] that some `Peer` traffic flowing.
143    fn traffic_flows(
144        &self,
145        room_id: RoomId,
146        peer_id: PeerId,
147        source: FlowMetricSource,
148    );
149
150    /// Notifies [`PeerTrafficWatcher`] that some `Peer`s traffic flowing was
151    /// stopped.
152    fn traffic_stopped(&self, room_id: RoomId, peer_id: PeerId, at: Instant);
153}
154
155#[cfg(test)]
156impl_debug_by_struct_name!(MockPeerTrafficWatcher);
157
158/// Returns [`FlowMetricSource`]s, which will be used to emit `Peer` state
159/// events.
160///
161/// [`FlowMetricSource::Peer`] and [`FlowMetricSource::PartnerPeer`] are
162/// always returned, [`FlowMetricSource::Coturn`] is optional (should be used
163/// only if media is forcibly relayed).
164fn build_flow_sources(should_watch_turn: bool) -> HashSet<FlowMetricSource> {
165    let mut sources =
166        hashset![FlowMetricSource::Peer, FlowMetricSource::PartnerPeer];
167    if should_watch_turn {
168        sources.insert(FlowMetricSource::Coturn);
169    }
170
171    sources
172}
173
174#[async_trait(?Send)]
175impl PeerTrafficWatcher for Addr<PeersTrafficWatcherImpl> {
176    async fn register_room(
177        &self,
178        room_id: RoomId,
179        handler: Box<dyn PeerConnectionStateEventsHandler>,
180    ) -> Result<(), MailboxError> {
181        self.send(RegisterRoom { room_id, handler }).await
182    }
183
184    fn unregister_room(&self, room_id: RoomId) {
185        self.do_send(UnregisterRoom(room_id))
186    }
187
188    async fn register_peer(
189        &self,
190        room_id: RoomId,
191        peer_id: PeerId,
192        should_watch_turn: bool,
193    ) -> Result<(), MailboxError> {
194        self.send(RegisterPeer {
195            room_id,
196            peer_id,
197            flow_metrics_sources: build_flow_sources(should_watch_turn),
198        })
199        .await
200    }
201
202    fn unregister_peers(&self, room_id: RoomId, peers_ids: Vec<PeerId>) {
203        self.do_send(UnregisterPeers { room_id, peers_ids })
204    }
205
206    fn traffic_flows(
207        &self,
208        room_id: RoomId,
209        peer_id: PeerId,
210        source: FlowMetricSource,
211    ) {
212        debug!("TrafficFlows: in {}/{} from {:?}", room_id, peer_id, source);
213        self.do_send(TrafficFlows {
214            room_id,
215            peer_id,
216            source,
217        })
218    }
219
220    fn traffic_stopped(&self, room_id: RoomId, peer_id: PeerId, at: Instant) {
221        debug!("TrafficStopped: in {}/{}", room_id, peer_id);
222        self.do_send(TrafficStopped {
223            room_id,
224            peer_id,
225            at,
226        })
227    }
228}
229
230/// Service which analyzes `Peer` traffic metrics and notifies about traffic
231/// flowing changes [`PeerConnectionStateEventsHandler`]s.
232#[derive(Debug, Default)]
233pub struct PeersTrafficWatcherImpl {
234    /// All `Room`s which exists on the Medea server.
235    stats: HashMap<RoomId, RoomStats>,
236
237    /// Media source traffic report ttl. Media sources must continuously report
238    /// that traffic is flowing, if some media source wont send new reports for
239    /// this timeout, then it is considered that this source is not flowing any
240    /// more.
241    traffic_report_ttl: Duration,
242
243    /// Duration after which [`PeersTrafficWatcherImpl`] will check that all
244    /// tracked traffic sources have reported that traffic is flowing.
245    init_timeout: Duration,
246}
247
248impl PeersTrafficWatcherImpl {
249    /// Returns new [`PeersTrafficWatcherImpl`].
250    pub fn new(conf: &conf::Media) -> Self {
251        Self {
252            stats: HashMap::new(),
253            traffic_report_ttl: conf.max_lag,
254            init_timeout: conf.init_timeout,
255        }
256    }
257
258    /// Checks that all [`FlowMetricSource`] have reported that `Peer` traffic
259    /// is flowing for `Peer` in `PeerState::Starting` state.
260    ///
261    /// If this check fails, then
262    /// [`PeerConnectionStateEventsHandler::peer_stopped`] will be called with a
263    /// time, when first source reported that `Peer` traffic is flowing.
264    ///
265    /// If check succeeds then `Peer` is transitioned to `PeerState::Started`
266    /// state.
267    ///
268    /// Called for every `Peer` after `init_timeout` passed since first source
269    /// reported that `Peer` traffic is flowing.
270    fn check_is_started(&mut self, room_id: &RoomId, peer_id: PeerId) {
271        if let Some(room) = self.stats.get_mut(room_id) {
272            if let Some(peer) = room.peers.get_mut(&peer_id) {
273                if peer.state == PeerState::Starting {
274                    if peer.is_flowing() {
275                        peer.state = PeerState::Started;
276                    } else {
277                        peer.stop();
278                        let at = peer.started_at.unwrap_or_else(Utc::now);
279                        room.handler.peer_stopped(peer_id, at);
280                    }
281                };
282            }
283        }
284    }
285}
286
287impl Actor for PeersTrafficWatcherImpl {
288    type Context = actix::Context<Self>;
289
290    /// Checks if [`PeerState::Started`] [`PeerStat`]s traffic is still
291    /// flowing.
292    fn started(&mut self, ctx: &mut Self::Context) {
293        ctx.run_interval(Duration::from_secs(1), |this, _| {
294            for room in this.stats.values_mut() {
295                for peer in room.peers.values_mut() {
296                    if peer.state == PeerState::Started && !peer.is_flowing() {
297                        peer.stop();
298                        room.handler.peer_stopped(
299                            peer.peer_id,
300                            instant_into_utc(Instant::now()),
301                        );
302                    }
303                }
304            }
305        });
306    }
307}
308
309/// Some [`FlowMetricSource`] notifies that it observes that
310/// `Peer`s traffic is flowing.
311#[derive(Debug, Message)]
312#[rtype(result = "()")]
313struct TrafficFlows {
314    /// [`RoomId`] of [`Room`] where this `Peer` is stored.
315    ///
316    /// [`Room`]: crate::signalling::room::Room
317    room_id: RoomId,
318
319    /// [`PeerId`] of `Peer` which flows.
320    peer_id: PeerId,
321
322    /// Source of this metric.
323    source: FlowMetricSource,
324}
325
326impl Handler<TrafficFlows> for PeersTrafficWatcherImpl {
327    type Result = ();
328
329    /// Saves that provided [`FlowMetricSource`] reported that it observes
330    /// `Peer` traffic flowing.
331    ///
332    /// If [`PeerStat`] is in [`PeerState::Stopped`] state:
333    /// 1. This stat is changed to [`PeerState::Starting`] state in which
334    /// `Peer` init
335    /// 2. [`PeerConnectionStateEventsHandler::peer_started`] is called.
336    /// 3. [`PeersTrafficWatcherImpl::check_is_started`] is scheduled to run
337    ///    for this [`PeerStat`] in [`PeersTrafficWatcherImpl::init_timeout`].
338    ///
339    /// If [`PeerStat`] is in [`PeerState::Starting`] state then provided
340    /// [`FlowMetricSource`] is saved to list of received
341    /// [`FlowMetricSource`]. This list will be checked in the
342    /// [`PeersTrafficWatcherImpl::check_is_started`] function.
343    ///
344    /// If [`PeerStat`] is in [`PeerState::Started`] then last update time of
345    /// the provided [`FlowMetricSource`] will be updated.
346    ///
347    /// If [`PeerStat`] is in [`PeerState::Stopped`] state then
348    /// [`FlowMetricSource`] will be save and it'll check
349    /// [`FlowMetricSource`]s will be received then [`PeerStat`] will be
350    /// transferred into [`PeerState::Started`] with [`FlowMetricSource`]s from
351    /// the [`PeerState::Stopped`] state with [`Instant::now`] time.
352    fn handle(
353        &mut self,
354        msg: TrafficFlows,
355        ctx: &mut Self::Context,
356    ) -> Self::Result {
357        if let Some(room) = self.stats.get_mut(&msg.room_id) {
358            if let Some(peer) = room.peers.get_mut(&msg.peer_id) {
359                peer.received_sources.insert(msg.source, Instant::now());
360                match &mut peer.state {
361                    PeerState::New => {
362                        peer.state = PeerState::Starting;
363                        peer.started_at = Some(Utc::now());
364
365                        room.handler.peer_started(peer.peer_id);
366
367                        let init_check_task_handle =
368                            ctx.run_later(self.init_timeout, move |this, _| {
369                                this.check_is_started(
370                                    &msg.room_id,
371                                    msg.peer_id,
372                                );
373                            });
374                        peer.init_task_handler.replace(init_check_task_handle);
375                    }
376                    PeerState::Starting => {
377                        if peer.state == PeerState::Starting
378                            && peer.is_flowing()
379                        {
380                            peer.state = PeerState::Started;
381                            peer.init_task_handler.take();
382                        };
383                    }
384                    PeerState::Stopped => {
385                        if peer.is_flowing() {
386                            peer.state = PeerState::Started;
387                            peer.started_at = Some(Utc::now());
388                            room.handler.peer_started(peer.peer_id);
389                        }
390                    }
391                    _ => (),
392                }
393            }
394        }
395    }
396}
397
398/// Some [`FlowMetricSource`] notifies that it observes that
399/// `Peer`s traffic stopped flowing.
400#[derive(Debug, Message)]
401#[rtype(result = "()")]
402struct TrafficStopped {
403    /// [`RoomId`] of [`Room`] where this `Peer` is stored.
404    ///
405    /// [`Room`]: crate::signalling::room::Room
406    room_id: RoomId,
407
408    /// [`PeerId`] of `Peer` which traffic was stopped.
409    peer_id: PeerId,
410
411    /// Time when proof of `Peer`s traffic stopping was gotten.
412    at: Instant,
413}
414
415impl Handler<TrafficStopped> for PeersTrafficWatcherImpl {
416    type Result = ();
417
418    /// Calls [`PeerConnectionStateEventsHandler::peer_stopped`] if [`PeerStat`]
419    /// isn't in [`PeerState::Stopped`] state.
420    ///
421    /// Transfers [`PeerStat`] of the stopped `Peer` into
422    /// [`PeerState::Stopped`].
423    fn handle(
424        &mut self,
425        msg: TrafficStopped,
426        _: &mut Self::Context,
427    ) -> Self::Result {
428        if let Some(room) = self.stats.get_mut(&msg.room_id) {
429            if let Some(peer) = room.peers.get_mut(&msg.peer_id) {
430                if peer.state != PeerState::Stopped {
431                    peer.stop();
432                    room.handler
433                        .peer_stopped(peer.peer_id, instant_into_utc(msg.at));
434                }
435            }
436        }
437    }
438}
439
440/// All possible sources of traffic flows signal.
441///
442/// It's considered that traffic is flowing if all listed sources confirm that
443/// it does.
444#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
445pub enum FlowMetricSource {
446    /// Metrics from the partner `Peer`.
447    PartnerPeer,
448
449    /// Metrics from the `Peer`.
450    Peer,
451
452    /// Metrics for this `Peer` from the Coturn TURN server.
453    Coturn,
454}
455
456/// Current state of [`PeerStat`].
457///
458/// Transitions:
459/// +-------+    +----------+    +-----------+     +-----------+
460/// |  New  +--->+ Starting +--->+  Started  +<--->+  Stopped  |
461/// +-------+    +----------+    +-----------+     +-----------+
462#[derive(Clone, Copy, Debug, PartialEq)]
463enum PeerState {
464    /// `Peer` was just added and have not received any traffic events.
465    New,
466
467    /// Some sources have reported that traffic is flowing, but not all of
468    /// them.
469    Starting,
470
471    /// All of the sources have reported that traffic is flowing.
472    Started,
473
474    /// At least one of sources have reported that traffic has stopped.
475    Stopped,
476}
477
478/// Current stats of `Peer`.
479///
480/// Also this structure may be considered as subscription to this `Peer` state
481/// updates.
482#[derive(Debug)]
483struct PeerStat {
484    /// [`PeerId`] of `Peer` which this [`PeerStat`] represents.
485    peer_id: PeerId,
486
487    /// Current state of this [`PeerStat`].
488    state: PeerState,
489
490    /// `SpawnHandle` to `Peer` init task (`check_is_started`)
491    init_task_handler: Option<SpawnHandle>,
492
493    /// List of [`FlowMetricSource`]s from which [`TrafficFlows`] should be
494    /// received for validation that traffic is really going.
495    tracked_sources: HashSet<FlowMetricSource>,
496
497    /// [`DateTime`] when this [`PeerStat`] is started.
498    ///
499    /// If `None` then [`PeerStat`] not started.
500    started_at: Option<DateTime<Utc>>,
501
502    /// All [`FlowMetricSource`]s received at this moment with time at which
503    /// they are received lastly.
504    received_sources: HashMap<FlowMetricSource, Instant>,
505
506    /// Media source traffic report ttl. Media sources must continuously report
507    /// that traffic is flowing, if some media source wont send new reports for
508    /// this timeout, then it is considered that this source is not flowing any
509    /// more.
510    traffic_flowing_timeout: Duration,
511}
512
513impl PeerStat {
514    /// Returns `true` if this [`PeerStat`] is considered valid.
515    ///
516    /// Checks that all required [`FlowMetricSource`]s reported that traffic is
517    /// flowing within `now() - traffic_flowing_timeout`.
518    fn is_flowing(&self) -> bool {
519        for tracked_source in &self.tracked_sources {
520            if let Some(at) = self.received_sources.get(tracked_source) {
521                if at.elapsed() > self.traffic_flowing_timeout {
522                    return false;
523                }
524            } else {
525                return false;
526            }
527        }
528
529        true
530    }
531
532    /// Sets [`PeerStat`] state to the [`PeerState::Stopped`] and resets
533    /// [`PeerStat::received_sources`].
534    fn stop(&mut self) {
535        self.state = PeerState::Stopped;
536        self.received_sources.clear();
537    }
538}
539
540/// Stores [`PeerStat`]s of `Peer`s for which stats updates [`Room`]
541/// is watching.
542///
543/// [`Room`]: crate::signalling::room::Room
544#[derive(Debug)]
545struct RoomStats {
546    /// [`RoomId`] of all [`PeerStat`] which stored here.
547    room_id: RoomId,
548
549    /// Handler of the [`PeerStat`] events.
550    handler: Box<dyn PeerConnectionStateEventsHandler>,
551
552    /// [`PeerStat`] for which [`Room`] is watching.
553    ///
554    /// [`Room`]: crate::signalling::room::Room
555    peers: HashMap<PeerId, PeerStat>,
556}
557
558/// Registers new [`Room`] as [`PeerStat`]s listener.
559///
560/// This message will only add provided [`Room`] to the list. For real
561/// subscription to a [`PeerStat`] [`RegisterPeer`] message should be used.
562///
563/// [`RegisterRoom`] will be called in [`RoomService`] for every [`Room`] when
564/// it created.
565///
566/// [`Room`]: crate::signalling::room::Room
567/// [`RoomService`]: crate::signalling::room_service::RoomService
568#[derive(Debug, Message)]
569#[rtype(result = "()")]
570struct RegisterRoom {
571    /// [`RoomId`] of [`Room`] which requested to register in the
572    /// [`PeersTrafficWatcherImpl`].
573    ///
574    /// [`Room`]: crate::signalling::room::Room
575    room_id: RoomId,
576
577    /// Handler of the [`PeerStat`] events.
578    handler: Box<dyn PeerConnectionStateEventsHandler>,
579}
580
581impl Handler<RegisterRoom> for PeersTrafficWatcherImpl {
582    type Result = ();
583
584    fn handle(
585        &mut self,
586        msg: RegisterRoom,
587        _: &mut Self::Context,
588    ) -> Self::Result {
589        debug!(
590            "Room [id = {}] was registered in the PeersTrafficWatcher.",
591            msg.room_id
592        );
593        self.stats.insert(
594            msg.room_id.clone(),
595            RoomStats {
596                room_id: msg.room_id,
597                handler: msg.handler,
598                peers: HashMap::new(),
599            },
600        );
601    }
602}
603
604/// Unregisters [`Room`] with provided [`RoomId`] from the
605/// [`PeersTrafficWatcherImpl`].
606///
607/// This message will just remove the subscription without emitting
608/// any stop events.
609///
610/// [`Room`]: crate::signalling::room::Room
611#[derive(Debug, Message)]
612#[rtype(result = "()")]
613struct UnregisterRoom(pub RoomId);
614
615impl Handler<UnregisterRoom> for PeersTrafficWatcherImpl {
616    type Result = ();
617
618    fn handle(
619        &mut self,
620        msg: UnregisterRoom,
621        _: &mut Self::Context,
622    ) -> Self::Result {
623        if self.stats.remove(&msg.0).is_some() {
624            debug!(
625                "Room [id = {}] was unregistered in the PeersTrafficWatcher.",
626                msg.0
627            );
628        };
629    }
630}
631
632/// Subscribes [`Room`] with provided [`RoomId`] to [`PeerStat`] with provided
633/// [`PeerId`].
634///
635/// [`Room`]: crate::signalling::room::Room
636#[derive(Debug, Message)]
637#[rtype(result = "()")]
638struct RegisterPeer {
639    /// [`RoomId`] of [`Room`] which subscribes on [`PeerStat`]'s [`PeerState`]
640    /// changes.
641    ///
642    /// [`Room`]: crate::signalling::room::Room
643    room_id: RoomId,
644
645    /// [`PeerId`] of [`PeerStat`] for which subscription is requested.
646    peer_id: PeerId,
647
648    /// List of [`FlowMetricSource`]s from which [`TrafficFlows`] should be
649    /// received for validation that traffic is really going.
650    flow_metrics_sources: HashSet<FlowMetricSource>,
651}
652
653impl Handler<RegisterPeer> for PeersTrafficWatcherImpl {
654    type Result = ();
655
656    fn handle(
657        &mut self,
658        msg: RegisterPeer,
659        _: &mut Self::Context,
660    ) -> Self::Result {
661        if let Some(room) = self.stats.get_mut(&msg.room_id) {
662            if let Some(peer) = room.peers.get_mut(&msg.peer_id) {
663                peer.tracked_sources.extend(msg.flow_metrics_sources);
664            } else {
665                debug!(
666                    "Peer [id = {}] from a Room [id = {}] was registered in \
667                     the PeersTrafficWatcher with {:?} sources.",
668                    msg.peer_id, msg.room_id, msg.flow_metrics_sources
669                );
670                room.peers.insert(
671                    msg.peer_id,
672                    PeerStat {
673                        peer_id: msg.peer_id,
674                        state: PeerState::New,
675                        init_task_handler: None,
676                        tracked_sources: msg.flow_metrics_sources,
677                        started_at: None,
678                        received_sources: HashMap::new(),
679                        traffic_flowing_timeout: self.traffic_report_ttl,
680                    },
681                );
682            }
683        }
684    }
685}
686
687/// Unregisters [`Room`] with provided [`RoomId`] from [`PeerStat`] with
688/// provided [`PeerId`] updates receiving.
689///
690/// [`Room`]: crate::signalling::room::Room
691#[derive(Debug, Message)]
692#[rtype(result = "()")]
693struct UnregisterPeers {
694    /// [`RoomId`] of [`Room`] which unregisters from [`PeerStat`]'s
695    /// [`PeerState`] changes.
696    ///
697    /// [`Room`]: crate::signalling::room::Room
698    room_id: RoomId,
699
700    /// [`PeerId`] of [`PeerStat`] from which unregistration is requested.
701    ///
702    /// [`Room`]: crate::signalling::room::Room
703    peers_ids: Vec<PeerId>,
704}
705
706impl Handler<UnregisterPeers> for PeersTrafficWatcherImpl {
707    type Result = ();
708
709    fn handle(
710        &mut self,
711        msg: UnregisterPeers,
712        _: &mut Self::Context,
713    ) -> Self::Result {
714        if let Some(room_stats) = self.stats.get_mut(&msg.room_id) {
715            let room_id = msg.room_id;
716            for peer_id in msg.peers_ids {
717                if room_stats.peers.remove(&peer_id).is_some() {
718                    debug!(
719                        "Peer [id = {}] from a Room [id = {}] was \
720                         unregistered in the PeersTrafficWatcher.",
721                        peer_id, room_id,
722                    );
723                };
724            }
725        }
726    }
727}
728
729#[cfg(test)]
730mod tests {
731    use futures::{channel::mpsc, stream::LocalBoxStream, StreamExt};
732
733    use tokio::time::timeout;
734
735    use super::*;
736
737    /// Helper for the all [`traffic_watcher`] unit tests.
738    struct Helper {
739        /// Stream which will receive all
740        /// [`PeerConnectionStateEventsHandler::peer_stopped`] calls.
741        peer_stopped_rx: LocalBoxStream<'static, (PeerId, DateTime<Utc>)>,
742
743        /// Stream which will receive all
744        /// [`PeerConnectionStateEventsHandler::peer_started`] calls.
745        peer_started_rx: LocalBoxStream<'static, PeerId>,
746
747        /// [`PeerTrafficWatcherImpl`] [`Actor`].
748        traffic_watcher: Addr<PeersTrafficWatcherImpl>,
749    }
750
751    impl Helper {
752        /// Returns new [`Helper`] with empty [`PeersTrafficWatcher`].
753        pub async fn new(cfg: &conf::Media) -> Self {
754            let watcher = PeersTrafficWatcherImpl::new(cfg).start();
755            let mut handler = MockPeerConnectionStateEventsHandler::new();
756            let (peer_stopped_tx, peer_stopped_rx) = mpsc::unbounded();
757            let (peer_started_tx, peer_started_rx) = mpsc::unbounded();
758            handler.expect_peer_stopped().returning(move |peer_id, at| {
759                peer_stopped_tx.unbounded_send((peer_id, at)).unwrap();
760            });
761            handler.expect_peer_started().returning(move |peer_id| {
762                peer_started_tx.unbounded_send(peer_id).unwrap();
763            });
764            watcher
765                .register_room(Self::room_id(), Box::new(handler))
766                .await
767                .unwrap();
768
769            Self {
770                traffic_watcher: watcher,
771                peer_started_rx: Box::pin(peer_started_rx),
772                peer_stopped_rx: Box::pin(peer_stopped_rx),
773            }
774        }
775
776        /// Returns [`RoomId`] used for the [`traffic_watcher`] unit tests.
777        fn room_id() -> RoomId {
778            RoomId::from("test-room")
779        }
780
781        /// Returns [`Addr`] to the underlying [`PeersTrafficWatcherImpl`].
782        pub fn watcher(&self) -> Addr<PeersTrafficWatcherImpl> {
783            self.traffic_watcher.clone()
784        }
785
786        /// Waits for the [`PeerConnectionStateEventsHandler::peer_stopped`]
787        /// call.
788        pub async fn next_peer_stopped(&mut self) -> (PeerId, DateTime<Utc>) {
789            self.peer_stopped_rx.next().await.unwrap()
790        }
791
792        /// Waits for the [`PeerConnectionStateEventsHandler::peer_started`]
793        /// call.
794        pub async fn next_peer_started(&mut self) -> PeerId {
795            self.peer_started_rx.next().await.unwrap()
796        }
797    }
798
799    /// Checks that [`PeerTrafficWatcher`] provides correct stop time into
800    /// [`PeerConnectionStateEventsHandler::peer_stopped`] function.
801    #[actix_rt::test]
802    async fn correct_stopped_at_when_init_timeout_stop() {
803        let mut helper = Helper::new(&conf::Media {
804            init_timeout: Duration::from_millis(100),
805            max_lag: Duration::from_secs(999),
806        })
807        .await;
808        helper
809            .watcher()
810            .register_peer(Helper::room_id(), PeerId(1), false)
811            .await
812            .unwrap();
813        helper.watcher().traffic_flows(
814            Helper::room_id(),
815            PeerId(1),
816            FlowMetricSource::Peer,
817        );
818        assert_eq!(helper.next_peer_started().await, PeerId(1));
819        let start_time = Utc::now();
820        let (_, at) = helper.next_peer_stopped().await;
821        assert_eq!(at.timestamp() / 10, start_time.timestamp() / 10);
822    }
823
824    /// Checks that [`PeerConnectionStateEventsHandler::peer_stopped`] will be
825    /// called if no [`TrafficFlows`] will be received within `max_lag`
826    /// timeout.
827    async fn stop_on_max_lag_helper() -> Helper {
828        let mut helper = Helper::new(&conf::Media {
829            init_timeout: Duration::from_secs(999),
830            max_lag: Duration::from_millis(50),
831        })
832        .await;
833        helper
834            .watcher()
835            .register_peer(Helper::room_id(), PeerId(1), false)
836            .await
837            .unwrap();
838        helper.watcher().traffic_flows(
839            Helper::room_id(),
840            PeerId(1),
841            FlowMetricSource::Peer,
842        );
843        helper.watcher().traffic_flows(
844            Helper::room_id(),
845            PeerId(1),
846            FlowMetricSource::PartnerPeer,
847        );
848        timeout(Duration::from_millis(30), helper.next_peer_started())
849            .await
850            .unwrap();
851        timeout(Duration::from_millis(1100), helper.next_peer_stopped())
852            .await
853            .unwrap();
854        helper
855    }
856
857    #[actix_rt::test]
858    async fn stop_on_max_lag() {
859        stop_on_max_lag_helper().await;
860    }
861
862    /// Checks correct `Peer` start after it was stopped cause max lag timeout
863    /// was exceeded.
864    #[actix_rt::test]
865    async fn start_after_stop_on_max_lag() {
866        let mut helper = stop_on_max_lag_helper().await;
867        helper.watcher().traffic_flows(
868            Helper::room_id(),
869            PeerId(1),
870            FlowMetricSource::Peer,
871        );
872        timeout(Duration::from_millis(30), helper.next_peer_started())
873            .await
874            .unwrap_err();
875        helper.watcher().traffic_flows(
876            Helper::room_id(),
877            PeerId(1),
878            FlowMetricSource::PartnerPeer,
879        );
880        timeout(Duration::from_millis(30), helper.next_peer_started())
881            .await
882            .unwrap();
883    }
884
885    /// Helper for `init_timeout` tests.
886    /// 1. Creates `PeersTrafficWatcherImpl` with `init_timeout = 30ms`, and
887    /// `max_lag = 999s`.
888    /// 2. Registers `Peer` with provided `should_watch_turn`
889    /// 3. Invokes `traffic_flows` for each provided
890    /// `traffic_flows_invocations`.
891    /// 4. Expects `peer_started` within `50ms` if `should_start = true`.
892    /// 5. Expects `peer_stopped` within `50ms` if `should_stop = true`.
893    async fn init_timeout_tests_helper(
894        should_watch_turn: bool,
895        traffic_flows_invocations: &[FlowMetricSource],
896        should_start: bool,
897        should_stop: bool,
898    ) -> Helper {
899        let mut helper = Helper::new(&conf::Media {
900            init_timeout: Duration::from_millis(30),
901            max_lag: Duration::from_secs(999),
902        })
903        .await;
904        helper
905            .watcher()
906            .register_peer(Helper::room_id(), PeerId(1), should_watch_turn)
907            .await
908            .unwrap();
909        for source in traffic_flows_invocations {
910            helper.watcher().traffic_flows(
911                Helper::room_id(),
912                PeerId(1),
913                *source,
914            );
915        }
916
917        let peer_started =
918            timeout(Duration::from_millis(50), helper.next_peer_started())
919                .await;
920        if should_start {
921            peer_started.unwrap();
922        } else {
923            peer_started.unwrap_err();
924        }
925
926        let peer_stopped =
927            timeout(Duration::from_millis(50), helper.next_peer_stopped())
928                .await;
929        if should_stop {
930            peer_stopped.unwrap();
931        } else {
932            peer_stopped.unwrap_err();
933        };
934
935        helper
936    }
937
938    /// Pass different combinations of `traffic_flows` to concrete peer and see
939    /// if `init_timeout` triggers.
940    #[actix_rt::test]
941    async fn init_timeout_tests() {
942        use FlowMetricSource::{Coturn, PartnerPeer, Peer};
943
944        init_timeout_tests_helper(false, &[], false, false).await;
945        init_timeout_tests_helper(false, &[Peer], true, true).await;
946        init_timeout_tests_helper(false, &[Peer, Peer], true, true).await;
947        init_timeout_tests_helper(false, &[Peer, Coturn], true, true).await;
948        init_timeout_tests_helper(true, &[Peer, PartnerPeer], true, true).await;
949
950        init_timeout_tests_helper(false, &[Peer, PartnerPeer], true, false)
951            .await;
952        init_timeout_tests_helper(
953            true,
954            &[Peer, PartnerPeer, Coturn],
955            true,
956            false,
957        )
958        .await;
959    }
960
961    /// Checks correct `Peer` start after it was stopped cause init timeout
962    /// was exceeded.
963    #[actix_rt::test]
964    async fn start_after_init_timeout_stop() {
965        let mut helper = init_timeout_tests_helper(
966            false,
967            &[FlowMetricSource::Peer],
968            true,
969            true,
970        )
971        .await;
972        helper.watcher().traffic_flows(
973            Helper::room_id(),
974            PeerId(1),
975            FlowMetricSource::Peer,
976        );
977        timeout(Duration::from_millis(30), helper.next_peer_started())
978            .await
979            .unwrap_err();
980        helper.watcher().traffic_flows(
981            Helper::room_id(),
982            PeerId(1),
983            FlowMetricSource::PartnerPeer,
984        );
985        timeout(Duration::from_millis(30), helper.next_peer_started())
986            .await
987            .unwrap();
988    }
989
990    #[actix_rt::test]
991    async fn peer_stop_when_traffic_stop() {
992        {
993            // `traffic_stopped` on started `Peer`
994            let mut helper = init_timeout_tests_helper(
995                false,
996                &[FlowMetricSource::Peer, FlowMetricSource::PartnerPeer],
997                true,
998                false,
999            )
1000            .await;
1001            helper.watcher().traffic_stopped(
1002                Helper::room_id(),
1003                PeerId(1),
1004                Instant::now(),
1005            );
1006            timeout(Duration::from_millis(10), helper.next_peer_stopped())
1007                .await
1008                .unwrap();
1009        }
1010        {
1011            // `traffic_stopped` on starting `Peer`
1012            let mut helper = Helper::new(&conf::Media {
1013                init_timeout: Duration::from_secs(999),
1014                max_lag: Duration::from_secs(999),
1015            })
1016            .await;
1017            helper
1018                .watcher()
1019                .register_peer(Helper::room_id(), PeerId(1), false)
1020                .await
1021                .unwrap();
1022            helper.watcher().traffic_flows(
1023                Helper::room_id(),
1024                PeerId(1),
1025                FlowMetricSource::Peer,
1026            );
1027            timeout(Duration::from_millis(10), helper.next_peer_started())
1028                .await
1029                .unwrap();
1030            helper.watcher().traffic_stopped(
1031                Helper::room_id(),
1032                PeerId(1),
1033                Instant::now(),
1034            );
1035            timeout(Duration::from_millis(10), helper.next_peer_stopped())
1036                .await
1037                .unwrap();
1038        }
1039        {
1040            // `traffic_stopped` on stopped `Peer`
1041            let mut helper = init_timeout_tests_helper(
1042                false,
1043                &[FlowMetricSource::Peer],
1044                true,
1045                true,
1046            )
1047            .await;
1048            helper.watcher().traffic_stopped(
1049                Helper::room_id(),
1050                PeerId(1),
1051                Instant::now(),
1052            );
1053            timeout(Duration::from_millis(10), helper.next_peer_stopped())
1054                .await
1055                .unwrap_err();
1056        }
1057    }
1058}