use std::{fmt, slice, sync::Arc};
use async_trait::async_trait;
use derive_more::{Display, From};
use failure::Fail;
use futures::{
channel::mpsc,
future::{self, AbortHandle},
StreamExt as _,
};
use medea_client_api_proto::{PeerId, RoomId};
use redis::ConnectionInfo;
use crate::{
conf,
log::prelude as log,
turn::{
cli::{CoturnCliError, CoturnTelnetClient},
ice_user::{IcePassword, IceUsername},
repo::{TurnDatabase, TurnDatabaseErr},
},
utils::MpscOneshotSender,
};
use super::IceUser;
#[derive(Display, Debug, Fail, From)]
pub enum TurnServiceErr {
#[display(fmt = "Error accessing TurnAuthRepo: {}", _0)]
TurnAuthRepoErr(TurnDatabaseErr),
#[display(fmt = "Error operating CoturnTelnetClient: {}", _0)]
CoturnCliErr(CoturnCliError),
#[display(fmt = "Timeout exceeded while trying to insert/delete IceUser")]
#[from(ignore)]
TimedOut,
}
#[derive(Debug)]
pub enum UnreachablePolicy {
ReturnErr,
ReturnStatic,
}
#[async_trait]
pub trait TurnAuthService: fmt::Debug + Send + Sync {
async fn create(
&self,
room_id: RoomId,
peer_id: PeerId,
policy: UnreachablePolicy,
) -> Result<IceUser, TurnServiceErr>;
}
#[derive(Debug)]
struct Service {
turn_db: TurnDatabase,
coturn_cli: CoturnTelnetClient,
db_pass: String,
turn_address: String,
turn_username: String,
turn_password: String,
drop_tx: MpscOneshotSender<IceUsername>,
users_cleanup_task: AbortHandle,
}
impl Service {
fn static_user(&self) -> IceUser {
IceUser::new_static(
self.turn_address.clone(),
self.turn_username.clone(),
self.turn_password.clone(),
)
}
}
#[async_trait]
impl TurnAuthService for Service {
async fn create(
&self,
room_id: RoomId,
peer_id: PeerId,
policy: UnreachablePolicy,
) -> Result<IceUser, TurnServiceErr> {
let ice_user = IceUser::new_non_static(
self.turn_address.clone(),
&room_id,
peer_id,
IcePassword::generate(),
self.drop_tx.clone(),
);
match self.turn_db.insert(&ice_user).await {
Ok(_) => Ok(ice_user),
Err(err) => match policy {
UnreachablePolicy::ReturnErr => Err(err.into()),
UnreachablePolicy::ReturnStatic => Ok(self.static_user()),
},
}
}
}
impl Drop for Service {
#[inline]
fn drop(&mut self) {
self.users_cleanup_task.abort();
}
}
pub fn new_turn_auth_service<'a>(
cf: &conf::Turn,
) -> Result<Arc<dyn TurnAuthService + 'a>, TurnServiceErr> {
let turn_db = TurnDatabase::new(
cf.db.redis.connect_timeout,
ConnectionInfo::from(&cf.db.redis),
)?;
let coturn_cli = CoturnTelnetClient::new(
(cf.cli.host.clone(), cf.cli.port),
cf.cli.pass.to_string(),
cf.cli.pool.into(),
);
let (tx, mut rx) = mpsc::unbounded();
let users_cleanup_task = {
let db = turn_db.clone();
let cli = coturn_cli.clone();
let (fut, handle) = future::abortable(async move {
while let Some(user) = rx.next().await {
let users = slice::from_ref(&user);
if let Err(e) = db.remove(users).await {
log::warn!(
"Failed to remove IceUser(name: {}) from Redis: {}",
user,
e,
);
}
if let Err(e) = cli.delete_sessions(users).await {
log::warn!(
"Failed to remove IceUser(name: {}) from Coturn: {}",
user,
e,
);
}
}
});
tokio::spawn(fut);
handle
};
let turn_service = Service {
turn_db,
coturn_cli,
db_pass: cf.db.redis.pass.to_string(),
turn_address: cf.addr(),
turn_username: cf.user.to_string(),
turn_password: cf.pass.to_string(),
drop_tx: MpscOneshotSender::from(tx),
users_cleanup_task,
};
Ok(Arc::new(turn_service))
}
#[cfg(test)]
pub mod test {
use std::sync::Arc;
use crate::turn::IceUser;
use super::*;
#[derive(Clone, Copy, Debug)]
struct TurnAuthServiceMock;
#[async_trait]
impl TurnAuthService for TurnAuthServiceMock {
async fn create(
&self,
_: RoomId,
_: PeerId,
_: UnreachablePolicy,
) -> Result<IceUser, TurnServiceErr> {
Ok(IceUser::new_static(
"5.5.5.5:1234".parse().unwrap(),
"username".into(),
"password".into(),
))
}
}
pub fn new_turn_auth_service_mock() -> Arc<dyn TurnAuthService> {
Arc::new(TurnAuthServiceMock)
}
}