use crate::{
driver::Driver,
events::{Event, EventContext, EventData, EventHandler, TrackEvent},
input::Input,
tracks::{Track, TrackHandle, TrackResult},
};
use async_trait::async_trait;
use parking_lot::Mutex;
use std::{collections::VecDeque, ops::Deref, sync::Arc, time::Duration};
use tracing::{info, warn};
#[derive(Clone, Debug, Default)]
pub struct TrackQueue {
inner: Arc<Mutex<TrackQueueCore>>,
}
#[derive(Debug)]
pub struct Queued(TrackHandle);
impl Deref for Queued {
type Target = TrackHandle;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl Queued {
#[must_use]
pub fn handle(&self) -> TrackHandle {
self.0.clone()
}
}
#[derive(Debug, Default)]
struct TrackQueueCore {
tracks: VecDeque<Queued>,
}
struct QueueHandler {
remote_lock: Arc<Mutex<TrackQueueCore>>,
}
#[async_trait]
impl EventHandler for QueueHandler {
async fn act(&self, ctx: &EventContext<'_>) -> Option<Event> {
let mut inner = self.remote_lock.lock();
match ctx {
EventContext::Track(ts) => {
if inner.tracks.front()?.uuid() != ts.first()?.1.uuid() {
return None;
}
},
_ => return None,
}
let _old = inner.tracks.pop_front();
info!("Queued track ended: {:?}.", ctx);
info!("{} tracks remain.", inner.tracks.len());
while let Some(new) = inner.tracks.front() {
if new.play().is_err() {
warn!("Track in Queue couldn't be played...");
inner.tracks.pop_front();
} else {
break;
}
}
None
}
}
struct SongPreloader {
remote_lock: Arc<Mutex<TrackQueueCore>>,
}
#[async_trait]
impl EventHandler for SongPreloader {
async fn act(&self, _ctx: &EventContext<'_>) -> Option<Event> {
let inner = self.remote_lock.lock();
if let Some(track) = inner.tracks.get(1) {
drop(track.0.make_playable());
}
None
}
}
impl TrackQueue {
#[must_use]
pub fn new() -> Self {
Self {
inner: Arc::new(Mutex::new(TrackQueueCore {
tracks: VecDeque::new(),
})),
}
}
pub async fn add_source(&self, input: Input, driver: &mut Driver) -> TrackHandle {
self.add(input.into(), driver).await
}
pub async fn add(&self, mut track: Track, driver: &mut Driver) -> TrackHandle {
let preload_time = Self::get_preload_time(&mut track).await;
self.add_with_preload(track, driver, preload_time)
}
pub(crate) async fn get_preload_time(track: &mut Track) -> Option<Duration> {
let meta = match track.input {
Input::Lazy(ref mut rec) | Input::Live(_, Some(ref mut rec)) =>
rec.aux_metadata().await.ok(),
Input::Live(_, None) => None,
};
meta.and_then(|meta| meta.duration)
.map(|d| d.saturating_sub(Duration::from_secs(5)))
}
#[inline]
pub fn add_with_preload(
&self,
mut track: Track,
driver: &mut Driver,
preload_time: Option<Duration>,
) -> TrackHandle {
info!("Track added to queue.");
let remote_lock = self.inner.clone();
track.events.add_event(
EventData::new(Event::Track(TrackEvent::End), QueueHandler { remote_lock }),
Duration::ZERO,
);
if let Some(time) = preload_time {
let remote_lock = self.inner.clone();
track.events.add_event(
EventData::new(Event::Delayed(time), SongPreloader { remote_lock }),
Duration::ZERO,
);
}
let (should_play, handle) = {
let mut inner = self.inner.lock();
let handle = driver.play(track.pause());
inner.tracks.push_back(Queued(handle.clone()));
(inner.tracks.len() == 1, handle)
};
if should_play {
drop(handle.play());
}
handle
}
#[must_use]
pub fn current(&self) -> Option<TrackHandle> {
let inner = self.inner.lock();
inner.tracks.front().map(Queued::handle)
}
#[must_use]
pub fn dequeue(&self, index: usize) -> Option<Queued> {
self.modify_queue(|vq| vq.remove(index))
}
#[must_use]
pub fn len(&self) -> usize {
let inner = self.inner.lock();
inner.tracks.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
let inner = self.inner.lock();
inner.tracks.is_empty()
}
pub fn modify_queue<F, O>(&self, func: F) -> O
where
F: FnOnce(&mut VecDeque<Queued>) -> O,
{
let mut inner = self.inner.lock();
func(&mut inner.tracks)
}
pub fn pause(&self) -> TrackResult<()> {
let inner = self.inner.lock();
if let Some(handle) = inner.tracks.front() {
handle.pause()
} else {
Ok(())
}
}
pub fn resume(&self) -> TrackResult<()> {
let inner = self.inner.lock();
if let Some(handle) = inner.tracks.front() {
handle.play()
} else {
Ok(())
}
}
pub fn stop(&self) {
let mut inner = self.inner.lock();
for track in inner.tracks.drain(..) {
drop(track.stop());
}
}
pub fn skip(&self) -> TrackResult<()> {
let inner = self.inner.lock();
inner.stop_current()
}
#[must_use]
pub fn current_queue(&self) -> Vec<TrackHandle> {
let inner = self.inner.lock();
inner.tracks.iter().map(Queued::handle).collect()
}
}
impl TrackQueueCore {
fn stop_current(&self) -> TrackResult<()> {
if let Some(handle) = self.tracks.front() {
handle.stop()
} else {
Ok(())
}
}
}
#[cfg(all(test, feature = "builtin-queue"))]
mod tests {
use crate::{
driver::Driver,
input::{File, HttpRequest},
tracks::PlayMode,
Config,
};
use reqwest::Client;
use std::time::Duration;
#[tokio::test]
#[ntest::timeout(20_000)]
async fn next_track_plays_on_end() {
let (t_handle, config) = Config::test_cfg(true);
let mut driver = Driver::new(config.clone());
let file1 = File::new("resources/ting.wav");
let file2 = file1.clone();
let h1 = driver.enqueue_input(file1.into()).await;
let h2 = driver.enqueue_input(file2.into()).await;
t_handle
.ready_track(&h1, Some(Duration::from_millis(1)))
.await;
t_handle
.ready_track(&h2, Some(Duration::from_millis(1)))
.await;
t_handle.tick(1);
t_handle.wait(1);
let h1a = h1.get_info();
let h2a = h2.get_info();
t_handle.tick(2);
assert!(h1a.await.is_err());
assert_eq!(h2a.await.unwrap().playing, PlayMode::Play);
}
#[tokio::test]
#[ntest::timeout(15_000)]
async fn next_track_plays_on_skip() {
let (t_handle, config) = Config::test_cfg(true);
let mut driver = Driver::new(config.clone());
let file1 = File::new("resources/ting.wav");
let file2 = file1.clone();
let h1 = driver.enqueue_input(file1.into()).await;
let h2 = driver.enqueue_input(file2.into()).await;
t_handle
.ready_track(&h1, Some(Duration::from_millis(1)))
.await;
assert!(driver.queue().skip().is_ok());
t_handle
.ready_track(&h2, Some(Duration::from_millis(1)))
.await;
t_handle.skip(1).await;
let h1a = h1.get_info();
let h2a = h2.get_info();
t_handle.tick(2);
assert!(h1a.await.is_err());
assert_eq!(h2a.await.unwrap().playing, PlayMode::Play);
}
#[tokio::test]
#[ntest::timeout(15_000)]
async fn next_track_plays_on_err() {
let (t_handle, config) = Config::test_cfg(true);
let mut driver = Driver::new(config.clone());
let file1 = HttpRequest::new(
Client::new(),
"http://github.com/serenity-rs/songbird/".into(),
);
let file2 = File::new("resources/ting.wav");
let h1 = driver.enqueue_input(file1.into()).await;
let h2 = driver.enqueue_input(file2.into()).await;
t_handle
.ready_track(&h2, Some(Duration::from_millis(1)))
.await;
t_handle.tick(1);
t_handle.wait(1);
let h1a = h1.get_info();
let h2a = h2.get_info();
t_handle.tick(2);
assert!(h1a.await.is_err());
assert_eq!(h2a.await.unwrap().playing, PlayMode::Play);
}
}