#[cfg(feature = "internals")]
pub mod bench_internals;
pub(crate) mod connection;
mod crypto;
#[cfg(feature = "receive")]
mod decode_mode;
mod mix_mode;
pub mod retry;
mod scheduler;
pub(crate) mod tasks;
#[cfg(test)]
pub(crate) mod test_config;
#[cfg(any(test, feature = "internals"))]
mod test_impls;
use connection::error::{Error, Result};
pub use crypto::CryptoMode;
pub(crate) use crypto::CryptoState;
#[cfg(feature = "receive")]
pub use decode_mode::*;
pub use mix_mode::MixMode;
pub use scheduler::{
get_default_scheduler,
Config as SchedulerConfig,
Error as SchedulerError,
LiveStatBlock,
Mode as SchedulerMode,
Scheduler,
};
#[cfg(test)]
pub use test_config::*;
#[cfg(any(test, feature = "internals"))]
pub use test_impls::*;
#[cfg(feature = "builtin-queue")]
use crate::tracks::TrackQueue;
use crate::{
events::EventData,
input::Input,
tracks::{Track, TrackHandle},
Config,
ConnectionInfo,
Event,
EventHandler,
};
use core::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use flume::{r#async::RecvFut, SendError, Sender};
pub use opus2::{self as opus, Bitrate};
#[cfg(feature = "builtin-queue")]
use std::time::Duration;
#[allow(unused_imports)]
pub use tasks::disposal::DisposalThread;
use tasks::message::CoreMessage;
use tracing::instrument;
#[derive(Clone, Debug)]
pub struct Driver {
config: Config,
self_mute: bool,
sender: Sender<CoreMessage>,
#[cfg(feature = "builtin-queue")]
queue: Option<TrackQueue>,
}
impl Driver {
#[inline]
#[must_use]
pub fn new(config: Config) -> Self {
let sender = Self::start_inner(config.clone());
Driver {
config,
self_mute: false,
sender,
#[cfg(feature = "builtin-queue")]
queue: Some(TrackQueue::default()),
}
}
fn start_inner(config: Config) -> Sender<CoreMessage> {
let (tx, rx) = flume::unbounded();
tasks::start(config, rx, tx.clone());
tx
}
fn restart_inner(&mut self) {
self.sender = Self::start_inner(self.config.clone());
self.mute(self.self_mute);
}
#[instrument(skip(self))]
pub fn connect(&mut self, info: ConnectionInfo) -> Connect {
let (tx, rx) = flume::bounded(1);
self.raw_connect(info, tx);
Connect {
inner: rx.into_recv_async(),
}
}
#[instrument(skip(self))]
pub(crate) fn raw_connect(&mut self, info: ConnectionInfo, tx: Sender<Result<()>>) {
self.send(CoreMessage::ConnectWithResult(info, tx));
}
#[instrument(skip(self))]
pub fn leave(&mut self) {
self.send(CoreMessage::Disconnect);
}
#[instrument(skip(self))]
pub fn mute(&mut self, mute: bool) {
self.self_mute = mute;
self.send(CoreMessage::Mute(mute));
}
#[instrument(skip(self))]
pub fn is_mute(&self) -> bool {
self.self_mute
}
#[instrument(skip(self, input))]
pub fn play_input(&mut self, input: Input) -> TrackHandle {
self.play(input.into())
}
#[instrument(skip(self, input))]
pub fn play_only_input(&mut self, input: Input) -> TrackHandle {
self.play_only(input.into())
}
#[instrument(skip(self, track))]
pub fn play(&mut self, track: Track) -> TrackHandle {
let (handle, ctx) = track.into_context();
self.send(CoreMessage::AddTrack(Box::new(ctx)));
handle
}
#[instrument(skip(self, track))]
pub fn play_only(&mut self, track: Track) -> TrackHandle {
let (handle, ctx) = track.into_context();
self.send(CoreMessage::SetTrack(Some(Box::new(ctx))));
handle
}
#[instrument(skip(self))]
pub fn set_bitrate(&mut self, bitrate: Bitrate) {
self.send(CoreMessage::SetBitrate(bitrate));
}
#[instrument(skip(self))]
pub fn stop(&mut self) {
self.send(CoreMessage::SetTrack(None));
}
#[instrument(skip(self))]
pub fn set_config(&mut self, config: Config) {
self.config = config.clone();
self.send(CoreMessage::SetConfig(config));
}
#[instrument(skip(self))]
pub fn config(&self) -> &Config {
&self.config
}
#[instrument(skip(self, action))]
pub fn add_global_event<F: EventHandler + 'static>(&mut self, event: Event, action: F) {
self.send(CoreMessage::AddEvent(EventData::new(event, action)));
}
#[instrument(skip(self))]
pub fn remove_all_global_events(&mut self) {
self.send(CoreMessage::RemoveGlobalEvents);
}
fn send(&mut self, status: CoreMessage) {
if let Err(SendError(status)) = self.sender.send(status) {
self.restart_inner();
self.sender.send(status).unwrap();
}
}
}
#[cfg(feature = "builtin-queue")]
impl Driver {
#[must_use]
pub fn queue(&self) -> &TrackQueue {
self.queue
.as_ref()
.expect("Queue: The only case this can fail is if a previous queue operation panicked.")
}
pub async fn enqueue_input(&mut self, input: Input) -> TrackHandle {
self.enqueue(input.into()).await
}
pub async fn enqueue(&mut self, mut track: Track) -> TrackHandle {
let preload_time = TrackQueue::get_preload_time(&mut track).await;
self.enqueue_with_preload(track, preload_time)
}
pub fn enqueue_with_preload(
&mut self,
track: Track,
preload_time: Option<Duration>,
) -> TrackHandle {
let queue = self.queue.take().expect(
"Enqueue: The only case this can fail is if a previous queue operation panicked.",
);
let handle = queue.add_with_preload(track, self, preload_time);
self.queue = Some(queue);
handle
}
}
impl Default for Driver {
fn default() -> Self {
Self::new(Config::default())
}
}
impl Drop for Driver {
fn drop(&mut self) {
drop(self.sender.send(CoreMessage::Poison));
}
}
pub struct Connect {
inner: RecvFut<'static, Result<()>>,
}
impl Future for Connect {
type Output = Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match Pin::new(&mut self.inner).poll(cx) {
Poll::Ready(r) => Poll::Ready(r.map_err(|_| Error::AttemptDiscarded).and_then(|x| x)),
Poll::Pending => Poll::Pending,
}
}
}