use std::{
collections::HashMap,
pin::Pin,
sync::Arc,
time::{Duration, Instant},
};
use actix::{
fut::Either, Actor, ActorFuture, AsyncContext, StreamHandler, WrapFuture,
};
use futures::{channel::mpsc, StreamExt as _};
use redis::{ConnectionInfo, RedisError};
use crate::{
log::prelude::*,
signalling::peers::{FlowMetricSource, PeerTrafficWatcher},
};
use super::{
allocation_event::{CoturnAllocationEvent, CoturnEvent},
CoturnUsername,
};
const ALLOCATIONS_CHANNEL_PATTERN: &str = "turn/realm/*/user/*/allocation/*";
pub type ActFuture<O = ()> =
Pin<Box<dyn ActorFuture<Actor = CoturnMetricsService, Output = O>>>;
#[derive(Debug)]
pub struct CoturnMetricsService {
peer_traffic_watcher: Arc<dyn PeerTrafficWatcher>,
client: redis::Client,
allocations_count: HashMap<CoturnUsername, u64>,
}
impl CoturnMetricsService {
pub fn new(
cf: &crate::conf::turn::Turn,
peer_traffic_watcher: Arc<dyn PeerTrafficWatcher>,
) -> Result<Self, RedisError> {
let client = redis::Client::open(ConnectionInfo::from(&cf.db.redis))?;
Ok(Self {
client,
allocations_count: HashMap::new(),
peer_traffic_watcher,
})
}
fn connect_and_subscribe(&mut self) -> ActFuture<Result<(), RedisError>> {
let (msg_tx, msg_stream) = mpsc::unbounded();
let client = self.client.clone();
Box::pin(
async move {
let conn = client.get_async_connection().await?;
let mut pubsub = conn.into_pubsub();
pubsub.psubscribe(ALLOCATIONS_CHANNEL_PATTERN).await?;
Ok(pubsub)
}
.into_actor(self)
.map(|res: Result<_, RedisError>, this, ctx| {
let mut pubsub = res?;
ctx.spawn(
async move {
let mut msg_stream = pubsub.on_message();
while let Some(msg) = msg_stream.next().await {
if msg_tx.unbounded_send(msg).is_err() {
break;
}
}
}
.into_actor(this),
);
ctx.add_stream(msg_stream);
Ok(())
}),
)
}
fn connect_until_success(&mut self) -> ActFuture {
Box::pin(self.connect_and_subscribe().then(|res, this, _| {
if let Err(err) = res {
warn!(
"Error while creating Redis PubSub connection for the \
CoturnMetricsService: {:?}",
err
);
Either::Left(
tokio::time::delay_for(Duration::from_secs(1))
.into_actor(this)
.then(|_, this, _| this.connect_until_success()),
)
} else {
Either::Right(async {}.into_actor(this))
}
}))
}
}
impl Actor for CoturnMetricsService {
type Context = actix::Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
ctx.wait(self.connect_until_success());
}
}
impl StreamHandler<redis::Msg> for CoturnMetricsService {
fn handle(&mut self, msg: redis::Msg, _: &mut Self::Context) {
let event = match CoturnEvent::parse(&msg) {
Ok(ev) => ev,
Err(e) => {
error!("Error parsing CoturnEvent: {}", e);
return;
}
};
let username = CoturnUsername {
room_id: event.room_id.clone(),
peer_id: event.peer_id,
};
let allocations_count =
self.allocations_count.entry(username).or_insert(0);
match event.event {
CoturnAllocationEvent::Traffic { traffic } => {
*allocations_count += 1;
let is_traffic_really_going =
traffic.sent_packets + traffic.received_packets > 10;
if is_traffic_really_going {
self.peer_traffic_watcher.traffic_flows(
event.room_id,
event.peer_id,
FlowMetricSource::Coturn,
)
}
}
CoturnAllocationEvent::Deleted => {
*allocations_count -= 1;
if *allocations_count == 0 {
self.peer_traffic_watcher.traffic_stopped(
event.room_id,
event.peer_id,
Instant::now(),
);
}
}
_ => (),
}
}
fn finished(&mut self, ctx: &mut Self::Context) {
ctx.wait(self.connect_until_success());
}
}