#![allow(missing_docs)]
pub(crate) mod disposal;
pub mod error;
mod events;
pub mod message;
pub mod mixer;
#[cfg(feature = "receive")]
pub(crate) mod udp_rx;
pub(crate) mod ws;
use super::connection::{error::Error as ConnectionError, Connection};
use crate::{
events::{
context_data::{DisconnectKind, DisconnectReason},
internal_data::{InternalConnect, InternalDisconnect},
CoreContext,
},
Config,
ConnectionInfo,
FloatDuration,
};
use flume::{Receiver, Sender};
use message::*;
use tokio::{spawn, time::sleep as tsleep};
use tracing::{debug, instrument, trace};
pub(crate) fn start(config: Config, rx: Receiver<CoreMessage>, tx: Sender<CoreMessage>) {
spawn(async move {
trace!("Driver started.");
runner(config, rx, tx).await;
trace!("Driver finished.");
});
}
fn start_internals(core: Sender<CoreMessage>, config: &Config) -> Interconnect {
let (evt_tx, evt_rx) = flume::unbounded();
let (mix_tx, mix_rx) = flume::unbounded();
spawn(async move {
trace!("Event processor started.");
events::runner(evt_rx).await;
trace!("Event processor finished.");
});
let ic = Interconnect {
core,
events: evt_tx,
mixer: mix_tx,
};
config.get_scheduler().new_mixer(config, ic.clone(), mix_rx);
ic
}
#[instrument(skip(rx, tx))]
async fn runner(mut config: Config, rx: Receiver<CoreMessage>, tx: Sender<CoreMessage>) {
let mut next_config: Option<Config> = None;
let mut connection: Option<Connection> = None;
let mut interconnect = start_internals(tx, &config);
let mut retrying = None;
let mut attempt_idx = 0;
while let Ok(msg) = rx.recv_async().await {
match msg {
CoreMessage::ConnectWithResult(info, tx) => {
config = if let Some(new_config) = next_config.take() {
drop(
interconnect
.mixer
.send(MixerMessage::SetConfig(new_config.clone())),
);
new_config
} else {
config
};
if connection.as_ref().is_none_or(|conn| conn.info != info) {
connection = ConnectionRetryData::connect(tx, info, &mut attempt_idx)
.attempt(&mut retrying, &interconnect, &config)
.await;
} else {
drop(tx.send(Ok(())));
}
},
CoreMessage::RetryConnect(retry_idx) => {
debug!("Retrying idx: {} (vs. {})", retry_idx, attempt_idx);
if retry_idx == attempt_idx {
if let Some(progress) = retrying.take() {
connection = progress
.attempt(&mut retrying, &interconnect, &config)
.await;
}
}
},
CoreMessage::Disconnect => {
let last_conn = connection.take();
drop(interconnect.mixer.send(MixerMessage::DropConn));
drop(interconnect.mixer.send(MixerMessage::RebuildEncoder));
if let Some(conn) = last_conn {
drop(interconnect.events.send(EventMessage::FireCoreEvent(
CoreContext::DriverDisconnect(InternalDisconnect {
kind: DisconnectKind::Runtime,
reason: Some(DisconnectReason::Requested),
info: conn.info.clone(),
}),
)));
}
},
CoreMessage::SignalWsClosure(ws_idx, ws_info, mut reason) => {
let conn = if ws_idx == attempt_idx {
drop(interconnect.mixer.send(MixerMessage::DropConn));
drop(interconnect.mixer.send(MixerMessage::RebuildEncoder));
connection.take()
} else {
reason = None;
None
};
if conn.is_some() {
drop(interconnect.events.send(EventMessage::FireCoreEvent(
CoreContext::DriverDisconnect(InternalDisconnect {
kind: DisconnectKind::Runtime,
reason,
info: ws_info,
}),
)));
}
},
CoreMessage::SetTrack(s) => {
drop(interconnect.mixer.send(MixerMessage::SetTrack(s)));
},
CoreMessage::AddTrack(s) => {
drop(interconnect.mixer.send(MixerMessage::AddTrack(s)));
},
CoreMessage::SetBitrate(b) => {
drop(interconnect.mixer.send(MixerMessage::SetBitrate(b)));
},
CoreMessage::SetConfig(mut new_config) => {
next_config = Some(new_config.clone());
new_config.make_safe(&config, connection.is_some());
drop(interconnect.mixer.send(MixerMessage::SetConfig(new_config)));
},
CoreMessage::AddEvent(evt) => {
drop(interconnect.events.send(EventMessage::AddGlobalEvent(evt)));
},
CoreMessage::RemoveGlobalEvents => {
drop(interconnect.events.send(EventMessage::RemoveGlobalEvents));
},
CoreMessage::Mute(m) => {
drop(interconnect.mixer.send(MixerMessage::SetMute(m)));
},
CoreMessage::Reconnect => {
if let Some(mut conn) = connection.take() {
let info = conn.info.clone();
let full_connect = match conn.reconnect(&config).await {
Ok(()) => {
connection = Some(conn);
false
},
Err(ConnectionError::InterconnectFailure(_)) => {
interconnect.restart_volatile_internals();
match conn.reconnect(&config).await {
Ok(()) => {
connection = Some(conn);
false
},
_ => true,
}
},
_ => true,
};
if full_connect {
connection = ConnectionRetryData::reconnect(info, &mut attempt_idx)
.attempt(&mut retrying, &interconnect, &config)
.await;
} else if let Some(ref connection) = &connection {
drop(interconnect.events.send(EventMessage::FireCoreEvent(
CoreContext::DriverReconnect(InternalConnect {
info: connection.info.clone(),
ssrc: connection.ssrc,
}),
)));
}
}
},
CoreMessage::FullReconnect =>
if let Some(conn) = connection.take() {
let info = conn.info.clone();
connection = ConnectionRetryData::reconnect(info, &mut attempt_idx)
.attempt(&mut retrying, &interconnect, &config)
.await;
},
CoreMessage::RebuildInterconnect => {
interconnect.restart_volatile_internals();
},
CoreMessage::Poison => break,
}
}
trace!("Main thread exited");
interconnect.poison_all();
}
struct ConnectionRetryData {
flavour: ConnectionFlavour,
attempts: u8,
last_wait: Option<FloatDuration>,
info: ConnectionInfo,
idx: usize,
}
impl ConnectionRetryData {
fn connect(
tx: Sender<Result<(), ConnectionError>>,
info: ConnectionInfo,
idx_src: &mut usize,
) -> Self {
Self::base(ConnectionFlavour::Connect(tx), info, idx_src)
}
fn reconnect(info: ConnectionInfo, idx_src: &mut usize) -> Self {
Self::base(ConnectionFlavour::Reconnect, info, idx_src)
}
fn base(flavour: ConnectionFlavour, info: ConnectionInfo, idx_src: &mut usize) -> Self {
*idx_src = idx_src.wrapping_add(1);
Self {
flavour,
attempts: 0,
last_wait: None,
info,
idx: *idx_src,
}
}
async fn attempt(
mut self,
attempt_slot: &mut Option<Self>,
interconnect: &Interconnect,
config: &Config,
) -> Option<Connection> {
match Connection::new(self.info.clone(), interconnect, config, self.idx).await {
Ok(connection) => {
match self.flavour {
ConnectionFlavour::Connect(tx) => {
drop(tx.send(Ok(())));
drop(interconnect.events.send(EventMessage::FireCoreEvent(
CoreContext::DriverConnect(InternalConnect {
info: connection.info.clone(),
ssrc: connection.ssrc,
}),
)));
},
ConnectionFlavour::Reconnect => {
drop(interconnect.events.send(EventMessage::FireCoreEvent(
CoreContext::DriverReconnect(InternalConnect {
info: connection.info.clone(),
ssrc: connection.ssrc,
}),
)));
},
}
Some(connection)
},
Err(why) => {
debug!("Failed to connect for {:?}: {}", self.info.guild_id, why);
if let Some(t) = config.driver_retry.retry_in(self.last_wait, self.attempts) {
let remote_ic = interconnect.clone();
let idx = self.idx;
spawn(async move {
tsleep(t.into()).await;
drop(remote_ic.core.send(CoreMessage::RetryConnect(idx)));
});
self.attempts += 1;
self.last_wait = Some(t);
debug!(
"Retrying connection for {:?} in {}s ({}/{:?})",
self.info.guild_id,
t.as_secs_f32(),
self.attempts,
config.driver_retry.retry_limit
);
*attempt_slot = Some(self);
} else {
let reason = Some(DisconnectReason::from(&why));
match self.flavour {
ConnectionFlavour::Connect(tx) => {
drop(tx.send(Err(why)));
drop(interconnect.events.send(EventMessage::FireCoreEvent(
CoreContext::DriverDisconnect(InternalDisconnect {
kind: DisconnectKind::Connect,
reason,
info: self.info,
}),
)));
},
ConnectionFlavour::Reconnect => {
drop(interconnect.events.send(EventMessage::FireCoreEvent(
CoreContext::DriverDisconnect(InternalDisconnect {
kind: DisconnectKind::Reconnect,
reason,
info: self.info,
}),
)));
},
}
}
None
},
}
}
}
enum ConnectionFlavour {
Connect(Sender<Result<(), ConnectionError>>),
Reconnect,
}