medea/signalling/peers/traffic_watcher.rs
1//! Provides [`PeerTrafficWatcher`] trait and its impl.
2//!
3//! [`PeerTrafficWatcher`] analyzes `Peer` traffic metrics and notifies
4//! [`PeerConnectionStateEventsHandler`] about traffic flowing changes.
5//!
6//! Traffic metrics, consumed by [`PeerTrafficWatcher`] can originate from
7//! different sources:
8//! 1. [`FlowMetricSource::Peer`] - Stats received from member that owns target
9//! `Peer`.
10//! 2. [`FlowMetricSource::PartnerPeer`] - Stats received from member,
11//! that owns `Peer`, connected to target `Peer`.
12//! 3. [`FlowMetricSource::Coturn`] - Stats reported by Coturn TURN server, this
13//! source is only being tracked if target `Peer` traffic is being relayed.
14//!
15//! At first you should register [`Room`] (`PeerTrafficWatcher.register_room()`)
16//! and `Peer` (`PeerTrafficWatcher.register_peer()`). When first source will
17//! report that traffic is flowing (`PeerTrafficWatcher.traffic_flows()`)
18//! [`PeerConnectionStateEventsHandler::peer_started`] will be called.
19//!
20//! After that [`PeerTrafficWatcher`] will wait for other sources to report that
21//! traffic is flowing for `init_timeout`, or
22//! [`PeerConnectionStateEventsHandler::peer_stopped`] will be called.
23//!
24//! If some source will report that it observes traffic stopped flowing
25//! (`PeerTrafficWatcher.traffic_stopped()`), then
26//! [`PeerConnectionStateEventsHandler::peer_stopped`] will be called.
27//!
28//! [`Room`]: crate::signalling::room::Room
29
30use std::{
31 collections::{HashMap, HashSet},
32 fmt::Debug,
33 sync::Arc,
34 time::{Duration, Instant},
35};
36
37use actix::{
38 Actor, Addr, AsyncContext, Handler, MailboxError, Message, SpawnHandle,
39};
40use async_trait::async_trait;
41use chrono::{DateTime, Utc};
42use medea_client_api_proto::{PeerId, RoomId};
43
44use crate::{conf, log::prelude::*, utils::instant_into_utc};
45
46/// Subscriber of `Peer` traffic flowing changes.
47#[cfg_attr(test, mockall::automock)]
48pub trait PeerConnectionStateEventsHandler: Send + Debug {
49 /// [`PeerTrafficWatcher`] believes that traffic was started.
50 fn peer_started(&self, peer_id: PeerId);
51
52 /// [`PeerTrafficWatcher`] believes that traffic was stopped.
53 fn peer_stopped(&self, peer_id: PeerId, at: DateTime<Utc>);
54}
55
56#[cfg(test)]
57impl_debug_by_struct_name!(MockPeerConnectionStateEventsHandler);
58
59/// Builds [`PeerTrafficWatcher`].
60#[cfg(test)]
61pub fn build_peers_traffic_watcher(
62 conf: &conf::Media,
63) -> Arc<dyn PeerTrafficWatcher> {
64 Arc::new(PeersTrafficWatcherImpl::new(conf).start())
65}
66
67// TODO: Returns dummy implementation cause this component is currently
68// disabled.
69// Will be enabled in https://github.com/instrumentisto/medea/pull/91
70/// Builds [`PeerTrafficWatcher`].
71#[cfg(not(test))]
72#[must_use]
73pub fn build_peers_traffic_watcher(
74 _: &conf::Media,
75) -> Arc<dyn PeerTrafficWatcher> {
76 #[derive(Debug)]
77 struct DummyPeerTrafficWatcher;
78
79 #[async_trait(?Send)]
80 impl PeerTrafficWatcher for DummyPeerTrafficWatcher {
81 async fn register_room(
82 &self,
83 _: RoomId,
84 _: Box<dyn PeerConnectionStateEventsHandler>,
85 ) -> Result<(), MailboxError> {
86 Ok(())
87 }
88
89 fn unregister_room(&self, _: RoomId) {}
90
91 async fn register_peer(
92 &self,
93 _: RoomId,
94 _: PeerId,
95 _: bool,
96 ) -> Result<(), MailboxError> {
97 Ok(())
98 }
99
100 fn unregister_peers(&self, _: RoomId, _: Vec<PeerId>) {}
101
102 fn traffic_flows(&self, _: RoomId, _: PeerId, _: FlowMetricSource) {}
103
104 fn traffic_stopped(&self, _: RoomId, _: PeerId, _: Instant) {}
105 }
106 Arc::new(DummyPeerTrafficWatcher)
107}
108
109/// Consumes `Peer` traffic metrics for further processing.
110#[async_trait(?Send)]
111#[cfg_attr(test, mockall::automock)]
112pub trait PeerTrafficWatcher: Debug + Send + Sync {
113 /// Registers provided [`PeerConnectionStateEventsHandler`] as `Peer`s state
114 /// messages listener, preparing [`PeerTrafficWatcher`] for registering
115 /// `Peer`s from this [`PeerConnectionStateEventsHandler`].
116 async fn register_room(
117 &self,
118 room_id: RoomId,
119 handler: Box<dyn PeerConnectionStateEventsHandler>,
120 ) -> Result<(), MailboxError>;
121
122 /// Unregisters [`Room`] as `Peer`s state messages listener.
123 ///
124 /// All `Peer` subscriptions related to this [`Room`] will be removed.
125 ///
126 /// [`Room`]: crate::signalling::room::Room
127 fn unregister_room(&self, room_id: RoomId);
128
129 /// Registers `Peer`, so that [`PeerTrafficWatcher`] will be able to
130 /// process traffic flow events of this `Peer`.
131 async fn register_peer(
132 &self,
133 room_id: RoomId,
134 peer_id: PeerId,
135 should_watch_turn: bool,
136 ) -> Result<(), MailboxError>;
137
138 /// Unregisters `Peer`s, so that [`PeerTrafficWatcher`] will not be able
139 /// to process traffic flow events of this `Peer` anymore.
140 fn unregister_peers(&self, room_id: RoomId, peers_ids: Vec<PeerId>);
141
142 /// Notifies [`PeerTrafficWatcher`] that some `Peer` traffic flowing.
143 fn traffic_flows(
144 &self,
145 room_id: RoomId,
146 peer_id: PeerId,
147 source: FlowMetricSource,
148 );
149
150 /// Notifies [`PeerTrafficWatcher`] that some `Peer`s traffic flowing was
151 /// stopped.
152 fn traffic_stopped(&self, room_id: RoomId, peer_id: PeerId, at: Instant);
153}
154
155#[cfg(test)]
156impl_debug_by_struct_name!(MockPeerTrafficWatcher);
157
158/// Returns [`FlowMetricSource`]s, which will be used to emit `Peer` state
159/// events.
160///
161/// [`FlowMetricSource::Peer`] and [`FlowMetricSource::PartnerPeer`] are
162/// always returned, [`FlowMetricSource::Coturn`] is optional (should be used
163/// only if media is forcibly relayed).
164fn build_flow_sources(should_watch_turn: bool) -> HashSet<FlowMetricSource> {
165 let mut sources =
166 hashset![FlowMetricSource::Peer, FlowMetricSource::PartnerPeer];
167 if should_watch_turn {
168 sources.insert(FlowMetricSource::Coturn);
169 }
170
171 sources
172}
173
174#[async_trait(?Send)]
175impl PeerTrafficWatcher for Addr<PeersTrafficWatcherImpl> {
176 async fn register_room(
177 &self,
178 room_id: RoomId,
179 handler: Box<dyn PeerConnectionStateEventsHandler>,
180 ) -> Result<(), MailboxError> {
181 self.send(RegisterRoom { room_id, handler }).await
182 }
183
184 fn unregister_room(&self, room_id: RoomId) {
185 self.do_send(UnregisterRoom(room_id))
186 }
187
188 async fn register_peer(
189 &self,
190 room_id: RoomId,
191 peer_id: PeerId,
192 should_watch_turn: bool,
193 ) -> Result<(), MailboxError> {
194 self.send(RegisterPeer {
195 room_id,
196 peer_id,
197 flow_metrics_sources: build_flow_sources(should_watch_turn),
198 })
199 .await
200 }
201
202 fn unregister_peers(&self, room_id: RoomId, peers_ids: Vec<PeerId>) {
203 self.do_send(UnregisterPeers { room_id, peers_ids })
204 }
205
206 fn traffic_flows(
207 &self,
208 room_id: RoomId,
209 peer_id: PeerId,
210 source: FlowMetricSource,
211 ) {
212 debug!("TrafficFlows: in {}/{} from {:?}", room_id, peer_id, source);
213 self.do_send(TrafficFlows {
214 room_id,
215 peer_id,
216 source,
217 })
218 }
219
220 fn traffic_stopped(&self, room_id: RoomId, peer_id: PeerId, at: Instant) {
221 debug!("TrafficStopped: in {}/{}", room_id, peer_id);
222 self.do_send(TrafficStopped {
223 room_id,
224 peer_id,
225 at,
226 })
227 }
228}
229
230/// Service which analyzes `Peer` traffic metrics and notifies about traffic
231/// flowing changes [`PeerConnectionStateEventsHandler`]s.
232#[derive(Debug, Default)]
233pub struct PeersTrafficWatcherImpl {
234 /// All `Room`s which exists on the Medea server.
235 stats: HashMap<RoomId, RoomStats>,
236
237 /// Media source traffic report ttl. Media sources must continuously report
238 /// that traffic is flowing, if some media source wont send new reports for
239 /// this timeout, then it is considered that this source is not flowing any
240 /// more.
241 traffic_report_ttl: Duration,
242
243 /// Duration after which [`PeersTrafficWatcherImpl`] will check that all
244 /// tracked traffic sources have reported that traffic is flowing.
245 init_timeout: Duration,
246}
247
248impl PeersTrafficWatcherImpl {
249 /// Returns new [`PeersTrafficWatcherImpl`].
250 pub fn new(conf: &conf::Media) -> Self {
251 Self {
252 stats: HashMap::new(),
253 traffic_report_ttl: conf.max_lag,
254 init_timeout: conf.init_timeout,
255 }
256 }
257
258 /// Checks that all [`FlowMetricSource`] have reported that `Peer` traffic
259 /// is flowing for `Peer` in `PeerState::Starting` state.
260 ///
261 /// If this check fails, then
262 /// [`PeerConnectionStateEventsHandler::peer_stopped`] will be called with a
263 /// time, when first source reported that `Peer` traffic is flowing.
264 ///
265 /// If check succeeds then `Peer` is transitioned to `PeerState::Started`
266 /// state.
267 ///
268 /// Called for every `Peer` after `init_timeout` passed since first source
269 /// reported that `Peer` traffic is flowing.
270 fn check_is_started(&mut self, room_id: &RoomId, peer_id: PeerId) {
271 if let Some(room) = self.stats.get_mut(room_id) {
272 if let Some(peer) = room.peers.get_mut(&peer_id) {
273 if peer.state == PeerState::Starting {
274 if peer.is_flowing() {
275 peer.state = PeerState::Started;
276 } else {
277 peer.stop();
278 let at = peer.started_at.unwrap_or_else(Utc::now);
279 room.handler.peer_stopped(peer_id, at);
280 }
281 };
282 }
283 }
284 }
285}
286
287impl Actor for PeersTrafficWatcherImpl {
288 type Context = actix::Context<Self>;
289
290 /// Checks if [`PeerState::Started`] [`PeerStat`]s traffic is still
291 /// flowing.
292 fn started(&mut self, ctx: &mut Self::Context) {
293 ctx.run_interval(Duration::from_secs(1), |this, _| {
294 for room in this.stats.values_mut() {
295 for peer in room.peers.values_mut() {
296 if peer.state == PeerState::Started && !peer.is_flowing() {
297 peer.stop();
298 room.handler.peer_stopped(
299 peer.peer_id,
300 instant_into_utc(Instant::now()),
301 );
302 }
303 }
304 }
305 });
306 }
307}
308
309/// Some [`FlowMetricSource`] notifies that it observes that
310/// `Peer`s traffic is flowing.
311#[derive(Debug, Message)]
312#[rtype(result = "()")]
313struct TrafficFlows {
314 /// [`RoomId`] of [`Room`] where this `Peer` is stored.
315 ///
316 /// [`Room`]: crate::signalling::room::Room
317 room_id: RoomId,
318
319 /// [`PeerId`] of `Peer` which flows.
320 peer_id: PeerId,
321
322 /// Source of this metric.
323 source: FlowMetricSource,
324}
325
326impl Handler<TrafficFlows> for PeersTrafficWatcherImpl {
327 type Result = ();
328
329 /// Saves that provided [`FlowMetricSource`] reported that it observes
330 /// `Peer` traffic flowing.
331 ///
332 /// If [`PeerStat`] is in [`PeerState::Stopped`] state:
333 /// 1. This stat is changed to [`PeerState::Starting`] state in which
334 /// `Peer` init
335 /// 2. [`PeerConnectionStateEventsHandler::peer_started`] is called.
336 /// 3. [`PeersTrafficWatcherImpl::check_is_started`] is scheduled to run
337 /// for this [`PeerStat`] in [`PeersTrafficWatcherImpl::init_timeout`].
338 ///
339 /// If [`PeerStat`] is in [`PeerState::Starting`] state then provided
340 /// [`FlowMetricSource`] is saved to list of received
341 /// [`FlowMetricSource`]. This list will be checked in the
342 /// [`PeersTrafficWatcherImpl::check_is_started`] function.
343 ///
344 /// If [`PeerStat`] is in [`PeerState::Started`] then last update time of
345 /// the provided [`FlowMetricSource`] will be updated.
346 ///
347 /// If [`PeerStat`] is in [`PeerState::Stopped`] state then
348 /// [`FlowMetricSource`] will be save and it'll check
349 /// [`FlowMetricSource`]s will be received then [`PeerStat`] will be
350 /// transferred into [`PeerState::Started`] with [`FlowMetricSource`]s from
351 /// the [`PeerState::Stopped`] state with [`Instant::now`] time.
352 fn handle(
353 &mut self,
354 msg: TrafficFlows,
355 ctx: &mut Self::Context,
356 ) -> Self::Result {
357 if let Some(room) = self.stats.get_mut(&msg.room_id) {
358 if let Some(peer) = room.peers.get_mut(&msg.peer_id) {
359 peer.received_sources.insert(msg.source, Instant::now());
360 match &mut peer.state {
361 PeerState::New => {
362 peer.state = PeerState::Starting;
363 peer.started_at = Some(Utc::now());
364
365 room.handler.peer_started(peer.peer_id);
366
367 let init_check_task_handle =
368 ctx.run_later(self.init_timeout, move |this, _| {
369 this.check_is_started(
370 &msg.room_id,
371 msg.peer_id,
372 );
373 });
374 peer.init_task_handler.replace(init_check_task_handle);
375 }
376 PeerState::Starting => {
377 if peer.state == PeerState::Starting
378 && peer.is_flowing()
379 {
380 peer.state = PeerState::Started;
381 peer.init_task_handler.take();
382 };
383 }
384 PeerState::Stopped => {
385 if peer.is_flowing() {
386 peer.state = PeerState::Started;
387 peer.started_at = Some(Utc::now());
388 room.handler.peer_started(peer.peer_id);
389 }
390 }
391 _ => (),
392 }
393 }
394 }
395 }
396}
397
398/// Some [`FlowMetricSource`] notifies that it observes that
399/// `Peer`s traffic stopped flowing.
400#[derive(Debug, Message)]
401#[rtype(result = "()")]
402struct TrafficStopped {
403 /// [`RoomId`] of [`Room`] where this `Peer` is stored.
404 ///
405 /// [`Room`]: crate::signalling::room::Room
406 room_id: RoomId,
407
408 /// [`PeerId`] of `Peer` which traffic was stopped.
409 peer_id: PeerId,
410
411 /// Time when proof of `Peer`s traffic stopping was gotten.
412 at: Instant,
413}
414
415impl Handler<TrafficStopped> for PeersTrafficWatcherImpl {
416 type Result = ();
417
418 /// Calls [`PeerConnectionStateEventsHandler::peer_stopped`] if [`PeerStat`]
419 /// isn't in [`PeerState::Stopped`] state.
420 ///
421 /// Transfers [`PeerStat`] of the stopped `Peer` into
422 /// [`PeerState::Stopped`].
423 fn handle(
424 &mut self,
425 msg: TrafficStopped,
426 _: &mut Self::Context,
427 ) -> Self::Result {
428 if let Some(room) = self.stats.get_mut(&msg.room_id) {
429 if let Some(peer) = room.peers.get_mut(&msg.peer_id) {
430 if peer.state != PeerState::Stopped {
431 peer.stop();
432 room.handler
433 .peer_stopped(peer.peer_id, instant_into_utc(msg.at));
434 }
435 }
436 }
437 }
438}
439
440/// All possible sources of traffic flows signal.
441///
442/// It's considered that traffic is flowing if all listed sources confirm that
443/// it does.
444#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
445pub enum FlowMetricSource {
446 /// Metrics from the partner `Peer`.
447 PartnerPeer,
448
449 /// Metrics from the `Peer`.
450 Peer,
451
452 /// Metrics for this `Peer` from the Coturn TURN server.
453 Coturn,
454}
455
456/// Current state of [`PeerStat`].
457///
458/// Transitions:
459/// +-------+ +----------+ +-----------+ +-----------+
460/// | New +--->+ Starting +--->+ Started +<--->+ Stopped |
461/// +-------+ +----------+ +-----------+ +-----------+
462#[derive(Clone, Copy, Debug, PartialEq)]
463enum PeerState {
464 /// `Peer` was just added and have not received any traffic events.
465 New,
466
467 /// Some sources have reported that traffic is flowing, but not all of
468 /// them.
469 Starting,
470
471 /// All of the sources have reported that traffic is flowing.
472 Started,
473
474 /// At least one of sources have reported that traffic has stopped.
475 Stopped,
476}
477
478/// Current stats of `Peer`.
479///
480/// Also this structure may be considered as subscription to this `Peer` state
481/// updates.
482#[derive(Debug)]
483struct PeerStat {
484 /// [`PeerId`] of `Peer` which this [`PeerStat`] represents.
485 peer_id: PeerId,
486
487 /// Current state of this [`PeerStat`].
488 state: PeerState,
489
490 /// `SpawnHandle` to `Peer` init task (`check_is_started`)
491 init_task_handler: Option<SpawnHandle>,
492
493 /// List of [`FlowMetricSource`]s from which [`TrafficFlows`] should be
494 /// received for validation that traffic is really going.
495 tracked_sources: HashSet<FlowMetricSource>,
496
497 /// [`DateTime`] when this [`PeerStat`] is started.
498 ///
499 /// If `None` then [`PeerStat`] not started.
500 started_at: Option<DateTime<Utc>>,
501
502 /// All [`FlowMetricSource`]s received at this moment with time at which
503 /// they are received lastly.
504 received_sources: HashMap<FlowMetricSource, Instant>,
505
506 /// Media source traffic report ttl. Media sources must continuously report
507 /// that traffic is flowing, if some media source wont send new reports for
508 /// this timeout, then it is considered that this source is not flowing any
509 /// more.
510 traffic_flowing_timeout: Duration,
511}
512
513impl PeerStat {
514 /// Returns `true` if this [`PeerStat`] is considered valid.
515 ///
516 /// Checks that all required [`FlowMetricSource`]s reported that traffic is
517 /// flowing within `now() - traffic_flowing_timeout`.
518 fn is_flowing(&self) -> bool {
519 for tracked_source in &self.tracked_sources {
520 if let Some(at) = self.received_sources.get(tracked_source) {
521 if at.elapsed() > self.traffic_flowing_timeout {
522 return false;
523 }
524 } else {
525 return false;
526 }
527 }
528
529 true
530 }
531
532 /// Sets [`PeerStat`] state to the [`PeerState::Stopped`] and resets
533 /// [`PeerStat::received_sources`].
534 fn stop(&mut self) {
535 self.state = PeerState::Stopped;
536 self.received_sources.clear();
537 }
538}
539
540/// Stores [`PeerStat`]s of `Peer`s for which stats updates [`Room`]
541/// is watching.
542///
543/// [`Room`]: crate::signalling::room::Room
544#[derive(Debug)]
545struct RoomStats {
546 /// [`RoomId`] of all [`PeerStat`] which stored here.
547 room_id: RoomId,
548
549 /// Handler of the [`PeerStat`] events.
550 handler: Box<dyn PeerConnectionStateEventsHandler>,
551
552 /// [`PeerStat`] for which [`Room`] is watching.
553 ///
554 /// [`Room`]: crate::signalling::room::Room
555 peers: HashMap<PeerId, PeerStat>,
556}
557
558/// Registers new [`Room`] as [`PeerStat`]s listener.
559///
560/// This message will only add provided [`Room`] to the list. For real
561/// subscription to a [`PeerStat`] [`RegisterPeer`] message should be used.
562///
563/// [`RegisterRoom`] will be called in [`RoomService`] for every [`Room`] when
564/// it created.
565///
566/// [`Room`]: crate::signalling::room::Room
567/// [`RoomService`]: crate::signalling::room_service::RoomService
568#[derive(Debug, Message)]
569#[rtype(result = "()")]
570struct RegisterRoom {
571 /// [`RoomId`] of [`Room`] which requested to register in the
572 /// [`PeersTrafficWatcherImpl`].
573 ///
574 /// [`Room`]: crate::signalling::room::Room
575 room_id: RoomId,
576
577 /// Handler of the [`PeerStat`] events.
578 handler: Box<dyn PeerConnectionStateEventsHandler>,
579}
580
581impl Handler<RegisterRoom> for PeersTrafficWatcherImpl {
582 type Result = ();
583
584 fn handle(
585 &mut self,
586 msg: RegisterRoom,
587 _: &mut Self::Context,
588 ) -> Self::Result {
589 debug!(
590 "Room [id = {}] was registered in the PeersTrafficWatcher.",
591 msg.room_id
592 );
593 self.stats.insert(
594 msg.room_id.clone(),
595 RoomStats {
596 room_id: msg.room_id,
597 handler: msg.handler,
598 peers: HashMap::new(),
599 },
600 );
601 }
602}
603
604/// Unregisters [`Room`] with provided [`RoomId`] from the
605/// [`PeersTrafficWatcherImpl`].
606///
607/// This message will just remove the subscription without emitting
608/// any stop events.
609///
610/// [`Room`]: crate::signalling::room::Room
611#[derive(Debug, Message)]
612#[rtype(result = "()")]
613struct UnregisterRoom(pub RoomId);
614
615impl Handler<UnregisterRoom> for PeersTrafficWatcherImpl {
616 type Result = ();
617
618 fn handle(
619 &mut self,
620 msg: UnregisterRoom,
621 _: &mut Self::Context,
622 ) -> Self::Result {
623 if self.stats.remove(&msg.0).is_some() {
624 debug!(
625 "Room [id = {}] was unregistered in the PeersTrafficWatcher.",
626 msg.0
627 );
628 };
629 }
630}
631
632/// Subscribes [`Room`] with provided [`RoomId`] to [`PeerStat`] with provided
633/// [`PeerId`].
634///
635/// [`Room`]: crate::signalling::room::Room
636#[derive(Debug, Message)]
637#[rtype(result = "()")]
638struct RegisterPeer {
639 /// [`RoomId`] of [`Room`] which subscribes on [`PeerStat`]'s [`PeerState`]
640 /// changes.
641 ///
642 /// [`Room`]: crate::signalling::room::Room
643 room_id: RoomId,
644
645 /// [`PeerId`] of [`PeerStat`] for which subscription is requested.
646 peer_id: PeerId,
647
648 /// List of [`FlowMetricSource`]s from which [`TrafficFlows`] should be
649 /// received for validation that traffic is really going.
650 flow_metrics_sources: HashSet<FlowMetricSource>,
651}
652
653impl Handler<RegisterPeer> for PeersTrafficWatcherImpl {
654 type Result = ();
655
656 fn handle(
657 &mut self,
658 msg: RegisterPeer,
659 _: &mut Self::Context,
660 ) -> Self::Result {
661 if let Some(room) = self.stats.get_mut(&msg.room_id) {
662 if let Some(peer) = room.peers.get_mut(&msg.peer_id) {
663 peer.tracked_sources.extend(msg.flow_metrics_sources);
664 } else {
665 debug!(
666 "Peer [id = {}] from a Room [id = {}] was registered in \
667 the PeersTrafficWatcher with {:?} sources.",
668 msg.peer_id, msg.room_id, msg.flow_metrics_sources
669 );
670 room.peers.insert(
671 msg.peer_id,
672 PeerStat {
673 peer_id: msg.peer_id,
674 state: PeerState::New,
675 init_task_handler: None,
676 tracked_sources: msg.flow_metrics_sources,
677 started_at: None,
678 received_sources: HashMap::new(),
679 traffic_flowing_timeout: self.traffic_report_ttl,
680 },
681 );
682 }
683 }
684 }
685}
686
687/// Unregisters [`Room`] with provided [`RoomId`] from [`PeerStat`] with
688/// provided [`PeerId`] updates receiving.
689///
690/// [`Room`]: crate::signalling::room::Room
691#[derive(Debug, Message)]
692#[rtype(result = "()")]
693struct UnregisterPeers {
694 /// [`RoomId`] of [`Room`] which unregisters from [`PeerStat`]'s
695 /// [`PeerState`] changes.
696 ///
697 /// [`Room`]: crate::signalling::room::Room
698 room_id: RoomId,
699
700 /// [`PeerId`] of [`PeerStat`] from which unregistration is requested.
701 ///
702 /// [`Room`]: crate::signalling::room::Room
703 peers_ids: Vec<PeerId>,
704}
705
706impl Handler<UnregisterPeers> for PeersTrafficWatcherImpl {
707 type Result = ();
708
709 fn handle(
710 &mut self,
711 msg: UnregisterPeers,
712 _: &mut Self::Context,
713 ) -> Self::Result {
714 if let Some(room_stats) = self.stats.get_mut(&msg.room_id) {
715 let room_id = msg.room_id;
716 for peer_id in msg.peers_ids {
717 if room_stats.peers.remove(&peer_id).is_some() {
718 debug!(
719 "Peer [id = {}] from a Room [id = {}] was \
720 unregistered in the PeersTrafficWatcher.",
721 peer_id, room_id,
722 );
723 };
724 }
725 }
726 }
727}
728
729#[cfg(test)]
730mod tests {
731 use futures::{channel::mpsc, stream::LocalBoxStream, StreamExt};
732
733 use tokio::time::timeout;
734
735 use super::*;
736
737 /// Helper for the all [`traffic_watcher`] unit tests.
738 struct Helper {
739 /// Stream which will receive all
740 /// [`PeerConnectionStateEventsHandler::peer_stopped`] calls.
741 peer_stopped_rx: LocalBoxStream<'static, (PeerId, DateTime<Utc>)>,
742
743 /// Stream which will receive all
744 /// [`PeerConnectionStateEventsHandler::peer_started`] calls.
745 peer_started_rx: LocalBoxStream<'static, PeerId>,
746
747 /// [`PeerTrafficWatcherImpl`] [`Actor`].
748 traffic_watcher: Addr<PeersTrafficWatcherImpl>,
749 }
750
751 impl Helper {
752 /// Returns new [`Helper`] with empty [`PeersTrafficWatcher`].
753 pub async fn new(cfg: &conf::Media) -> Self {
754 let watcher = PeersTrafficWatcherImpl::new(cfg).start();
755 let mut handler = MockPeerConnectionStateEventsHandler::new();
756 let (peer_stopped_tx, peer_stopped_rx) = mpsc::unbounded();
757 let (peer_started_tx, peer_started_rx) = mpsc::unbounded();
758 handler.expect_peer_stopped().returning(move |peer_id, at| {
759 peer_stopped_tx.unbounded_send((peer_id, at)).unwrap();
760 });
761 handler.expect_peer_started().returning(move |peer_id| {
762 peer_started_tx.unbounded_send(peer_id).unwrap();
763 });
764 watcher
765 .register_room(Self::room_id(), Box::new(handler))
766 .await
767 .unwrap();
768
769 Self {
770 traffic_watcher: watcher,
771 peer_started_rx: Box::pin(peer_started_rx),
772 peer_stopped_rx: Box::pin(peer_stopped_rx),
773 }
774 }
775
776 /// Returns [`RoomId`] used for the [`traffic_watcher`] unit tests.
777 fn room_id() -> RoomId {
778 RoomId::from("test-room")
779 }
780
781 /// Returns [`Addr`] to the underlying [`PeersTrafficWatcherImpl`].
782 pub fn watcher(&self) -> Addr<PeersTrafficWatcherImpl> {
783 self.traffic_watcher.clone()
784 }
785
786 /// Waits for the [`PeerConnectionStateEventsHandler::peer_stopped`]
787 /// call.
788 pub async fn next_peer_stopped(&mut self) -> (PeerId, DateTime<Utc>) {
789 self.peer_stopped_rx.next().await.unwrap()
790 }
791
792 /// Waits for the [`PeerConnectionStateEventsHandler::peer_started`]
793 /// call.
794 pub async fn next_peer_started(&mut self) -> PeerId {
795 self.peer_started_rx.next().await.unwrap()
796 }
797 }
798
799 /// Checks that [`PeerTrafficWatcher`] provides correct stop time into
800 /// [`PeerConnectionStateEventsHandler::peer_stopped`] function.
801 #[actix_rt::test]
802 async fn correct_stopped_at_when_init_timeout_stop() {
803 let mut helper = Helper::new(&conf::Media {
804 init_timeout: Duration::from_millis(100),
805 max_lag: Duration::from_secs(999),
806 })
807 .await;
808 helper
809 .watcher()
810 .register_peer(Helper::room_id(), PeerId(1), false)
811 .await
812 .unwrap();
813 helper.watcher().traffic_flows(
814 Helper::room_id(),
815 PeerId(1),
816 FlowMetricSource::Peer,
817 );
818 assert_eq!(helper.next_peer_started().await, PeerId(1));
819 let start_time = Utc::now();
820 let (_, at) = helper.next_peer_stopped().await;
821 assert_eq!(at.timestamp() / 10, start_time.timestamp() / 10);
822 }
823
824 /// Checks that [`PeerConnectionStateEventsHandler::peer_stopped`] will be
825 /// called if no [`TrafficFlows`] will be received within `max_lag`
826 /// timeout.
827 async fn stop_on_max_lag_helper() -> Helper {
828 let mut helper = Helper::new(&conf::Media {
829 init_timeout: Duration::from_secs(999),
830 max_lag: Duration::from_millis(50),
831 })
832 .await;
833 helper
834 .watcher()
835 .register_peer(Helper::room_id(), PeerId(1), false)
836 .await
837 .unwrap();
838 helper.watcher().traffic_flows(
839 Helper::room_id(),
840 PeerId(1),
841 FlowMetricSource::Peer,
842 );
843 helper.watcher().traffic_flows(
844 Helper::room_id(),
845 PeerId(1),
846 FlowMetricSource::PartnerPeer,
847 );
848 timeout(Duration::from_millis(30), helper.next_peer_started())
849 .await
850 .unwrap();
851 timeout(Duration::from_millis(1100), helper.next_peer_stopped())
852 .await
853 .unwrap();
854 helper
855 }
856
857 #[actix_rt::test]
858 async fn stop_on_max_lag() {
859 stop_on_max_lag_helper().await;
860 }
861
862 /// Checks correct `Peer` start after it was stopped cause max lag timeout
863 /// was exceeded.
864 #[actix_rt::test]
865 async fn start_after_stop_on_max_lag() {
866 let mut helper = stop_on_max_lag_helper().await;
867 helper.watcher().traffic_flows(
868 Helper::room_id(),
869 PeerId(1),
870 FlowMetricSource::Peer,
871 );
872 timeout(Duration::from_millis(30), helper.next_peer_started())
873 .await
874 .unwrap_err();
875 helper.watcher().traffic_flows(
876 Helper::room_id(),
877 PeerId(1),
878 FlowMetricSource::PartnerPeer,
879 );
880 timeout(Duration::from_millis(30), helper.next_peer_started())
881 .await
882 .unwrap();
883 }
884
885 /// Helper for `init_timeout` tests.
886 /// 1. Creates `PeersTrafficWatcherImpl` with `init_timeout = 30ms`, and
887 /// `max_lag = 999s`.
888 /// 2. Registers `Peer` with provided `should_watch_turn`
889 /// 3. Invokes `traffic_flows` for each provided
890 /// `traffic_flows_invocations`.
891 /// 4. Expects `peer_started` within `50ms` if `should_start = true`.
892 /// 5. Expects `peer_stopped` within `50ms` if `should_stop = true`.
893 async fn init_timeout_tests_helper(
894 should_watch_turn: bool,
895 traffic_flows_invocations: &[FlowMetricSource],
896 should_start: bool,
897 should_stop: bool,
898 ) -> Helper {
899 let mut helper = Helper::new(&conf::Media {
900 init_timeout: Duration::from_millis(30),
901 max_lag: Duration::from_secs(999),
902 })
903 .await;
904 helper
905 .watcher()
906 .register_peer(Helper::room_id(), PeerId(1), should_watch_turn)
907 .await
908 .unwrap();
909 for source in traffic_flows_invocations {
910 helper.watcher().traffic_flows(
911 Helper::room_id(),
912 PeerId(1),
913 *source,
914 );
915 }
916
917 let peer_started =
918 timeout(Duration::from_millis(50), helper.next_peer_started())
919 .await;
920 if should_start {
921 peer_started.unwrap();
922 } else {
923 peer_started.unwrap_err();
924 }
925
926 let peer_stopped =
927 timeout(Duration::from_millis(50), helper.next_peer_stopped())
928 .await;
929 if should_stop {
930 peer_stopped.unwrap();
931 } else {
932 peer_stopped.unwrap_err();
933 };
934
935 helper
936 }
937
938 /// Pass different combinations of `traffic_flows` to concrete peer and see
939 /// if `init_timeout` triggers.
940 #[actix_rt::test]
941 async fn init_timeout_tests() {
942 use FlowMetricSource::{Coturn, PartnerPeer, Peer};
943
944 init_timeout_tests_helper(false, &[], false, false).await;
945 init_timeout_tests_helper(false, &[Peer], true, true).await;
946 init_timeout_tests_helper(false, &[Peer, Peer], true, true).await;
947 init_timeout_tests_helper(false, &[Peer, Coturn], true, true).await;
948 init_timeout_tests_helper(true, &[Peer, PartnerPeer], true, true).await;
949
950 init_timeout_tests_helper(false, &[Peer, PartnerPeer], true, false)
951 .await;
952 init_timeout_tests_helper(
953 true,
954 &[Peer, PartnerPeer, Coturn],
955 true,
956 false,
957 )
958 .await;
959 }
960
961 /// Checks correct `Peer` start after it was stopped cause init timeout
962 /// was exceeded.
963 #[actix_rt::test]
964 async fn start_after_init_timeout_stop() {
965 let mut helper = init_timeout_tests_helper(
966 false,
967 &[FlowMetricSource::Peer],
968 true,
969 true,
970 )
971 .await;
972 helper.watcher().traffic_flows(
973 Helper::room_id(),
974 PeerId(1),
975 FlowMetricSource::Peer,
976 );
977 timeout(Duration::from_millis(30), helper.next_peer_started())
978 .await
979 .unwrap_err();
980 helper.watcher().traffic_flows(
981 Helper::room_id(),
982 PeerId(1),
983 FlowMetricSource::PartnerPeer,
984 );
985 timeout(Duration::from_millis(30), helper.next_peer_started())
986 .await
987 .unwrap();
988 }
989
990 #[actix_rt::test]
991 async fn peer_stop_when_traffic_stop() {
992 {
993 // `traffic_stopped` on started `Peer`
994 let mut helper = init_timeout_tests_helper(
995 false,
996 &[FlowMetricSource::Peer, FlowMetricSource::PartnerPeer],
997 true,
998 false,
999 )
1000 .await;
1001 helper.watcher().traffic_stopped(
1002 Helper::room_id(),
1003 PeerId(1),
1004 Instant::now(),
1005 );
1006 timeout(Duration::from_millis(10), helper.next_peer_stopped())
1007 .await
1008 .unwrap();
1009 }
1010 {
1011 // `traffic_stopped` on starting `Peer`
1012 let mut helper = Helper::new(&conf::Media {
1013 init_timeout: Duration::from_secs(999),
1014 max_lag: Duration::from_secs(999),
1015 })
1016 .await;
1017 helper
1018 .watcher()
1019 .register_peer(Helper::room_id(), PeerId(1), false)
1020 .await
1021 .unwrap();
1022 helper.watcher().traffic_flows(
1023 Helper::room_id(),
1024 PeerId(1),
1025 FlowMetricSource::Peer,
1026 );
1027 timeout(Duration::from_millis(10), helper.next_peer_started())
1028 .await
1029 .unwrap();
1030 helper.watcher().traffic_stopped(
1031 Helper::room_id(),
1032 PeerId(1),
1033 Instant::now(),
1034 );
1035 timeout(Duration::from_millis(10), helper.next_peer_stopped())
1036 .await
1037 .unwrap();
1038 }
1039 {
1040 // `traffic_stopped` on stopped `Peer`
1041 let mut helper = init_timeout_tests_helper(
1042 false,
1043 &[FlowMetricSource::Peer],
1044 true,
1045 true,
1046 )
1047 .await;
1048 helper.watcher().traffic_stopped(
1049 Helper::room_id(),
1050 PeerId(1),
1051 Instant::now(),
1052 );
1053 timeout(Duration::from_millis(10), helper.next_peer_stopped())
1054 .await
1055 .unwrap_err();
1056 }
1057 }
1058}