use libwebrtc::prelude::*;
use livekit_api::signal_client::{SignalError, SignalOptions};
use livekit_datatrack::backend as dt;
use livekit_protocol as proto;
use livekit_runtime::{interval, Interval, JoinHandle};
use parking_lot::{RwLock, RwLockReadGuard};
use std::{borrow::Cow, fmt::Debug, sync::Arc, time::Duration};
use thiserror::Error;
use tokio::sync::{
mpsc, oneshot, Mutex as AsyncMutex, Notify, RwLock as AsyncRwLock,
RwLockReadGuard as AsyncRwLockReadGuard,
};
pub use self::rtc_session::{SessionStats, INITIAL_BUFFERED_AMOUNT_LOW_THRESHOLD};
use crate::prelude::ParticipantIdentity;
use crate::{
id::ParticipantSid,
options::TrackPublishOptions,
prelude::LocalTrack,
room::DisconnectReason,
rtc_engine::{
lk_runtime::LkRuntime,
rtc_session::{RtcSession, SessionEvent, SessionEvents},
},
DataPacketKind,
};
use crate::{ChatMessage, E2eeManager, TranscriptionSegment};
mod dc_sender;
pub mod lk_runtime;
mod peer_transport;
mod rtc_events;
mod rtc_session;
pub(crate) type EngineEmitter = mpsc::UnboundedSender<EngineEvent>;
pub(crate) type EngineEvents = mpsc::UnboundedReceiver<EngineEvent>;
pub(crate) type EngineResult<T> = Result<T, EngineError>;
pub const RECONNECT_ATTEMPTS: u32 = 10;
pub const RECONNECT_INTERVAL: Duration = Duration::from_secs(5);
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub enum SimulateScenario {
SignalReconnect,
Speaker,
NodeFailure,
ServerLeave,
Migration,
ForceTcp,
ForceTls,
}
#[derive(Error, Debug)]
pub enum EngineError {
#[error("signal failure: {0}")]
Signal(#[from] SignalError),
#[error("internal webrtc failure")]
Rtc(#[from] RtcError),
#[error("connection error: {0}")]
Connection(Cow<'static, str>), #[error("internal error: {0}")]
Internal(Cow<'static, str>), }
#[derive(Default, Debug, Clone)]
pub struct EngineOptions {
pub rtc_config: RtcConfiguration,
pub signal_options: SignalOptions,
pub join_retries: u32,
pub single_peer_connection: bool,
}
#[derive(Debug)]
pub enum EngineEvent {
ParticipantUpdate {
updates: Vec<proto::ParticipantInfo>,
},
MediaTrack {
track: MediaStreamTrack,
stream: MediaStream,
transceiver: RtpTransceiver,
},
Data {
participant_sid: Option<ParticipantSid>,
participant_identity: Option<ParticipantIdentity>,
payload: Vec<u8>,
topic: Option<String>,
kind: DataPacketKind,
encryption_type: proto::encryption::Type,
},
ChatMessage {
participant_identity: ParticipantIdentity,
message: ChatMessage,
},
Transcription {
participant_identity: ParticipantIdentity,
track_sid: String,
segments: Vec<TranscriptionSegment>,
},
SipDTMF {
participant_identity: Option<ParticipantIdentity>,
code: u32,
digit: Option<String>,
},
RpcRequest {
caller_identity: Option<ParticipantIdentity>,
request_id: String,
method: String,
payload: String,
response_timeout: Duration,
version: u32,
},
RpcResponse {
request_id: String,
payload: Option<String>,
error: Option<proto::RpcError>,
},
RpcAck {
request_id: String,
},
SpeakersChanged {
speakers: Vec<proto::SpeakerInfo>,
},
ConnectionQuality {
updates: Vec<proto::ConnectionQualityInfo>,
},
RoomUpdate {
room: proto::Room,
},
RoomMoved {
moved: proto::RoomMovedResponse,
},
Resuming(oneshot::Sender<()>),
Resumed(oneshot::Sender<()>),
SignalResumed {
reconnect_response: proto::ReconnectResponse,
tx: oneshot::Sender<()>,
},
Restarting(oneshot::Sender<()>),
Restarted(oneshot::Sender<()>),
SignalRestarted {
join_response: proto::JoinResponse,
tx: oneshot::Sender<()>,
},
Disconnected {
reason: DisconnectReason,
},
LocalTrackSubscribed {
track_sid: String,
},
DataStreamHeader {
header: proto::data_stream::Header,
participant_identity: String,
encryption_type: proto::encryption::Type,
},
DataStreamChunk {
chunk: proto::data_stream::Chunk,
participant_identity: String,
encryption_type: proto::encryption::Type,
},
DataStreamTrailer {
trailer: proto::data_stream::Trailer,
participant_identity: String,
},
DataChannelBufferedAmountLowThresholdChanged {
kind: DataPacketKind,
threshold: u64,
},
RefreshToken {
url: String,
token: String,
},
TrackMuted {
sid: String,
muted: bool,
},
LocalDataTrackInput(dt::local::InputEvent),
RemoteDataTrackInput(dt::remote::InputEvent),
}
#[derive(Debug)]
struct EngineHandle {
session: Arc<RtcSession>,
closed: bool,
reconnecting: bool,
can_reconnect: bool,
full_reconnect: bool,
engine_task: Option<(JoinHandle<()>, oneshot::Sender<()>)>,
}
struct EngineInner {
#[allow(dead_code)]
lk_runtime: Arc<LkRuntime>,
engine_tx: EngineEmitter,
options: EngineOptions,
close_notifier: Arc<Notify>,
running_handle: RwLock<EngineHandle>,
reconnecting_lock: AsyncRwLock<()>,
reconnecting_interval: AsyncMutex<Interval>,
}
pub struct RtcEngine {
inner: Arc<EngineInner>,
}
impl Debug for RtcEngine {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RtcEngine").finish()
}
}
impl RtcEngine {
pub async fn connect(
url: &str,
token: &str,
options: EngineOptions,
e2ee_manager: Option<E2eeManager>,
) -> EngineResult<(Self, proto::JoinResponse, EngineEvents)> {
let (inner, join_response, engine_events) =
EngineInner::connect(url, token, options, e2ee_manager).await?;
Ok((Self { inner }, join_response, engine_events))
}
pub async fn close(&self, reason: DisconnectReason) {
self.inner.close(reason).await
}
pub async fn publish_data(
&self,
data: proto::DataPacket,
kind: DataPacketKind,
is_raw_packet: bool,
) -> EngineResult<()> {
let (session, _r_lock) = {
let (handle, _r_lock) = self.inner.wait_reconnection().await?;
(handle.session.clone(), _r_lock)
};
session.publish_data(data, kind, is_raw_packet).await
}
pub async fn simulate_scenario(&self, scenario: SimulateScenario) -> EngineResult<()> {
let (session, _r_lock) = {
let (handle, _r_lock) = self.inner.wait_reconnection().await?;
(handle.session.clone(), _r_lock)
};
session.simulate_scenario(scenario).await
}
pub async fn handle_local_data_track_output(
&self,
event: dt::local::OutputEvent,
) -> EngineResult<()> {
let (session, _r_lock) = {
let (handle, _r_lock) = self.inner.wait_reconnection().await?;
(handle.session.clone(), _r_lock)
};
session.handle_local_data_track_output(event).await;
Ok(())
}
pub async fn handle_remote_data_track_output(
&self,
event: dt::remote::OutputEvent,
) -> EngineResult<()> {
let (session, _r_lock) = {
let (handle, _r_lock) = self.inner.wait_reconnection().await?;
(handle.session.clone(), _r_lock)
};
session.handle_remote_data_track_output(event).await;
Ok(())
}
pub async fn add_track(&self, req: proto::AddTrackRequest) -> EngineResult<proto::TrackInfo> {
let (session, _r_lock) = {
let (handle, _r_lock) = self.inner.wait_reconnection().await?;
(handle.session.clone(), _r_lock)
};
session.add_track(req).await
}
pub fn remove_track(&self, sender: RtpSender) -> EngineResult<()> {
let session = self.inner.running_handle.read().session.clone();
session.remove_track(sender) }
pub async fn mute_track(&self, req: proto::MuteTrackRequest) -> EngineResult<()> {
let (session, _r_lock) = {
let (handle, _r_lock) = self.inner.wait_reconnection().await?;
(handle.session.clone(), _r_lock)
};
session.mute_track(req).await
}
pub async fn create_sender(
&self,
track: LocalTrack,
options: TrackPublishOptions,
encodings: Vec<RtpEncodingParameters>,
) -> EngineResult<RtpTransceiver> {
let (session, _r_lock) = {
let (handle, _r_lock) = self.inner.wait_reconnection().await?;
(handle.session.clone(), _r_lock)
};
session.create_sender(track, options, encodings).await
}
pub fn publisher_negotiation_needed(&self) {
let inner = self.inner.clone();
livekit_runtime::spawn(async move {
if let Ok((handle, _)) = inner.wait_reconnection().await {
handle.session.publisher_negotiation_needed()
}
});
}
pub async fn send_request(&self, msg: proto::signal_request::Message) {
let session = self.inner.running_handle.read().session.clone();
session.signal_client().send(msg).await }
pub async fn get_response(&self, request_id: u32) -> proto::RequestResponse {
let session = self.inner.running_handle.read().session.clone();
session.get_response(request_id).await
}
pub async fn get_stats(&self) -> EngineResult<SessionStats> {
let session = self.inner.running_handle.read().session.clone();
session.get_stats().await
}
pub fn session(&self) -> Arc<RtcSession> {
self.inner.running_handle.read().session.clone()
}
}
impl EngineInner {
async fn connect(
url: &str,
token: &str,
options: EngineOptions,
e2ee_manager: Option<E2eeManager>,
) -> EngineResult<(Arc<Self>, proto::JoinResponse, EngineEvents)> {
let lk_runtime = LkRuntime::instance();
let max_retries = options.join_retries;
let try_connect = {
move || {
let options = options.clone();
let lk_runtime = lk_runtime.clone();
let e2ee_manager = e2ee_manager.clone();
async move {
let (session, join_response, session_events) =
RtcSession::connect(url, token, options.clone(), e2ee_manager).await?;
session.wait_pc_connection().await?;
let (engine_tx, engine_rx) = mpsc::unbounded_channel();
let inner = Arc::new(Self {
lk_runtime,
engine_tx,
close_notifier: Arc::new(Notify::new()),
running_handle: RwLock::new(EngineHandle {
session: Arc::new(session),
closed: false,
reconnecting: false,
can_reconnect: true,
full_reconnect: false,
engine_task: None,
}),
options,
reconnecting_lock: AsyncRwLock::default(),
reconnecting_interval: AsyncMutex::new(interval(RECONNECT_INTERVAL)),
});
let (close_tx, close_rx) = oneshot::channel();
let session_task = livekit_runtime::spawn(Self::engine_task(
inner.clone(),
session_events,
close_rx,
));
inner.running_handle.write().engine_task = Some((session_task, close_tx));
Ok((inner, join_response, engine_rx))
}
}
};
let mut last_error = None;
for i in 0..(max_retries + 1) {
match try_connect().await {
Ok(res) => return Ok(res),
Err(e) => {
let attempt_i = i + 1;
if i < max_retries {
log::warn!(
"failed to connect: {:?}, retrying... ({}/{})",
e,
attempt_i,
max_retries
);
}
last_error = Some(e)
}
}
}
Err(last_error.unwrap())
}
async fn engine_task(
self: Arc<Self>,
mut session_events: SessionEvents,
mut close_rx: oneshot::Receiver<()>,
) {
loop {
tokio::select! {
Some(event) = session_events.recv() => {
let debug = format!("{:?}", event);
let inner = self.clone();
let (tx, rx) = oneshot::channel();
let task = livekit_runtime::spawn(async move {
if let Err(err) = inner.on_session_event(event).await {
log::error!("failed to handle session event: {:?}", err);
}
let _ = tx.send(());
});
tokio::select! {
_ = rx => {},
_ = livekit_runtime::sleep(Duration::from_secs(10)) => {
log::error!("session_event is taking too much time: {}", debug);
}
}
task.await;
},
_ = &mut close_rx => {
break;
}
}
}
log::debug!("engine task closed");
}
async fn on_session_event(self: &Arc<Self>, event: SessionEvent) -> EngineResult<()> {
match event {
SessionEvent::Close { source, reason, action, retry_now } => {
match action {
proto::leave_request::Action::Resume
| proto::leave_request::Action::Reconnect => {
{
let running_handle = self.running_handle.read();
if !running_handle.can_reconnect {
return Ok(());
}
}
log::warn!(
"received session close: {:?} {:?} {:?}",
source,
reason,
action
);
self.reconnection_needed(
retry_now,
action == proto::leave_request::Action::Reconnect,
);
}
proto::leave_request::Action::Disconnect => {
let mut running_handle = self.running_handle.write();
running_handle.can_reconnect = false;
livekit_runtime::spawn({
let inner = self.clone();
async move {
inner.close(reason).await;
}
});
}
}
}
SessionEvent::Data {
participant_sid,
participant_identity,
payload,
topic,
kind,
encryption_type,
} => {
let _ = self.engine_tx.send(EngineEvent::Data {
participant_sid,
participant_identity,
payload,
topic,
kind,
encryption_type,
});
}
SessionEvent::ChatMessage { participant_identity, message } => {
let _ =
self.engine_tx.send(EngineEvent::ChatMessage { participant_identity, message });
}
SessionEvent::SipDTMF { participant_identity, code, digit } => {
let _ =
self.engine_tx.send(EngineEvent::SipDTMF { participant_identity, code, digit });
}
SessionEvent::Transcription { participant_identity, track_sid, segments } => {
let _ = self.engine_tx.send(EngineEvent::Transcription {
participant_identity,
track_sid,
segments,
});
}
SessionEvent::SipDTMF { participant_identity, code, digit } => {
let _ =
self.engine_tx.send(EngineEvent::SipDTMF { participant_identity, code, digit });
}
SessionEvent::RpcRequest {
caller_identity,
request_id,
method,
payload,
response_timeout,
version,
} => {
let _ = self.engine_tx.send(EngineEvent::RpcRequest {
caller_identity,
request_id,
method,
payload,
response_timeout,
version,
});
}
SessionEvent::RpcResponse { request_id, payload, error } => {
let _ =
self.engine_tx.send(EngineEvent::RpcResponse { request_id, payload, error });
}
SessionEvent::RpcAck { request_id } => {
let _ = self.engine_tx.send(EngineEvent::RpcAck { request_id });
}
SessionEvent::MediaTrack { track, stream, transceiver } => {
let _ = self.engine_tx.send(EngineEvent::MediaTrack { track, stream, transceiver });
}
SessionEvent::ParticipantUpdate { updates } => {
let _ = self.engine_tx.send(EngineEvent::ParticipantUpdate { updates });
}
SessionEvent::SpeakersChanged { speakers } => {
let _ = self.engine_tx.send(EngineEvent::SpeakersChanged { speakers });
}
SessionEvent::ConnectionQuality { updates } => {
let _ = self.engine_tx.send(EngineEvent::ConnectionQuality { updates });
}
SessionEvent::RoomUpdate { room } => {
let _ = self.engine_tx.send(EngineEvent::RoomUpdate { room });
}
SessionEvent::RoomMoved { moved } => {
let _ = self.engine_tx.send(EngineEvent::RoomMoved { moved });
}
SessionEvent::LocalTrackSubscribed { track_sid } => {
let _ = self.engine_tx.send(EngineEvent::LocalTrackSubscribed { track_sid });
}
SessionEvent::DataStreamHeader { header, participant_identity, encryption_type } => {
let _ = self.engine_tx.send(EngineEvent::DataStreamHeader {
header,
participant_identity,
encryption_type,
});
}
SessionEvent::DataStreamChunk { chunk, participant_identity, encryption_type } => {
let _ = self.engine_tx.send(EngineEvent::DataStreamChunk {
chunk,
participant_identity,
encryption_type,
});
}
SessionEvent::DataStreamTrailer { trailer, participant_identity } => {
let _ = self
.engine_tx
.send(EngineEvent::DataStreamTrailer { trailer, participant_identity });
}
SessionEvent::DataChannelBufferedAmountLowThresholdChanged { kind, threshold } => {
let _ = self.engine_tx.send(
EngineEvent::DataChannelBufferedAmountLowThresholdChanged { kind, threshold },
);
}
SessionEvent::RefreshToken { url, token } => {
let _ = self.engine_tx.send(EngineEvent::RefreshToken { url, token });
}
SessionEvent::TrackMuted { sid, muted } => {
let _ = self.engine_tx.send(EngineEvent::TrackMuted { sid, muted });
}
SessionEvent::LocalDataTrackInput(event) => {
let _ = self.engine_tx.send(EngineEvent::LocalDataTrackInput(event));
}
SessionEvent::RemoteDataTrackInput(event) => {
let _ = self.engine_tx.send(EngineEvent::RemoteDataTrackInput(event));
}
}
Ok(())
}
async fn close(&self, reason: DisconnectReason) {
let (session, engine_task) = {
let mut running_handle = self.running_handle.write();
running_handle.closed = true;
let session = running_handle.session.clone();
let engine_task = running_handle.engine_task.take();
(session, engine_task)
};
if let Some((engine_task, close_tx)) = engine_task {
session.close(reason).await;
let _ = close_tx.send(());
let _ = engine_task.await;
let _ = self.engine_tx.send(EngineEvent::Disconnected { reason });
}
}
async fn wait_reconnection(
&self,
) -> EngineResult<(RwLockReadGuard<EngineHandle>, AsyncRwLockReadGuard<()>)> {
let r_lock = self.reconnecting_lock.read().await;
let running_handle = self.running_handle.read();
if running_handle.closed {
return Err(EngineError::Connection("engine is closed".into()));
}
Ok((running_handle, r_lock))
}
fn reconnection_needed(self: &Arc<Self>, retry_now: bool, full_reconnect: bool) {
let mut running_handle = self.running_handle.write();
if !running_handle.can_reconnect {
return;
}
if running_handle.reconnecting {
if full_reconnect {
running_handle.full_reconnect = true;
}
if retry_now {
let inner = self.clone();
livekit_runtime::spawn(async move {
inner.reconnecting_interval.lock().await.reset();
});
}
return;
}
running_handle.reconnecting = true;
running_handle.full_reconnect = full_reconnect;
livekit_runtime::spawn({
let inner = self.clone();
async move {
let _r_lock = inner.reconnecting_lock.write().await;
let close_notifier = inner.close_notifier.clone();
let close_receiver = close_notifier.notified();
tokio::pin!(close_receiver);
tokio::select! {
_ = &mut close_receiver => {
log::debug!("reconnection cancelled");
return;
}
res = inner.reconnect_task() => {
if res.is_err() {
log::error!("failed to reconnect to the livekit room");
inner.close(DisconnectReason::UnknownReason).await;
} else {
log::info!("RtcEngine successfully recovered")
}
}
}
let mut running_handle = inner.running_handle.write();
running_handle.reconnecting = false;
}
});
}
async fn reconnect_task(self: &Arc<Self>) -> EngineResult<()> {
let (url, token, e2ee_manager) = {
let running_handle = self.running_handle.read();
let signal_client = running_handle.session.signal_client();
let e2ee_manager = running_handle.session.e2ee_manager();
(
signal_client.url(),
signal_client.token(), e2ee_manager.clone(),
)
};
for i in 0..RECONNECT_ATTEMPTS {
let (is_closed, full_reconnect) = {
let running_handle = self.running_handle.read();
(running_handle.closed, running_handle.full_reconnect)
};
if is_closed {
return Err(EngineError::Connection("attempt canncelled, engine is closed".into()));
}
if full_reconnect {
if i == 0 {
let (tx, rx) = oneshot::channel();
let _ = self.engine_tx.send(EngineEvent::Restarting(tx));
let _ = rx.await;
}
log::error!("restarting connection... attempt: {}", i);
if let Err(err) = self
.try_restart_connection(
&url,
&token,
self.options.clone(),
e2ee_manager.clone(),
)
.await
{
log::error!("restarting connection failed: {}", err);
} else {
let (tx, rx) = oneshot::channel();
let _ = self.engine_tx.send(EngineEvent::Restarted(tx));
let _ = rx.await;
return Ok(());
}
} else {
if i == 0 {
let (tx, rx) = oneshot::channel();
let _ = self.engine_tx.send(EngineEvent::Resuming(tx));
let _ = rx.await;
}
log::error!("resuming connection... attempt: {}", i);
if let Err(err) = self.try_resume_connection().await {
log::error!("resuming connection failed: {}", err);
let mut running_handle = self.running_handle.write();
running_handle.full_reconnect = true;
} else {
let (tx, rx) = oneshot::channel();
let _ = self.engine_tx.send(EngineEvent::Resumed(tx));
let _ = rx.await;
return Ok(());
}
}
self.reconnecting_interval.lock().await.tick().await;
}
Err(EngineError::Connection(
format!("failed to reconnect after {}", RECONNECT_ATTEMPTS).into(),
))
}
async fn try_restart_connection(
self: &Arc<Self>,
url: &str,
token: &str,
options: EngineOptions,
e2ee_manager: Option<E2eeManager>,
) -> EngineResult<()> {
let (session, engine_task) = {
let mut running_handle = self.running_handle.write();
let session = running_handle.session.clone();
let engine_task = running_handle.engine_task.take();
(session, engine_task)
};
if let Some((engine_task, close_tx)) = engine_task {
session.close(DisconnectReason::ClientInitiated).await;
let _ = close_tx.send(());
let _ = engine_task.await;
}
let (new_session, join_response, session_events) =
RtcSession::connect(url, token, options, e2ee_manager).await?;
let (tx, rx) = oneshot::channel();
let _ = self.engine_tx.send(EngineEvent::SignalRestarted { join_response, tx });
let _ = rx.await;
new_session.wait_pc_connection().await?;
let mut handle = self.running_handle.write();
handle.session = Arc::new(new_session);
let (close_tx, close_rx) = oneshot::channel();
let task = livekit_runtime::spawn(self.clone().engine_task(session_events, close_rx));
handle.engine_task = Some((task, close_tx));
Ok(())
}
async fn try_resume_connection(&self) -> EngineResult<()> {
let session = self.running_handle.read().session.clone();
let reconnect_response = session.restart().await?;
let (tx, rx) = oneshot::channel();
let _ = self.engine_tx.send(EngineEvent::SignalResumed { reconnect_response, tx });
let _ = rx.await;
session.restart_publisher().await?;
session.wait_pc_connection().await
}
}
impl From<livekit_datatrack::api::InternalError> for EngineError {
fn from(err: livekit_datatrack::api::InternalError) -> Self {
Self::Internal(err.to_string().into())
}
}