use super::util::copy_seek_to;
use crate::{
driver::tasks::message::MixerInputResultMessage,
input::{AudioStream, AudioStreamError, Compose, Input, LiveInput, Parsed},
Config,
};
use flume::Sender;
use rusty_pool::ThreadPool;
use std::{result::Result as StdResult, sync::Arc, time::Duration};
use symphonia_core::{
formats::{SeekMode, SeekTo},
io::MediaSource,
};
use tokio::runtime::Handle;
#[derive(Clone)]
pub struct BlockyTaskPool {
pool: ThreadPool,
handle: Handle,
}
impl BlockyTaskPool {
pub fn new(handle: Handle) -> Self {
Self {
pool: ThreadPool::new(0, 64, Duration::from_secs(5)),
handle,
}
}
pub fn create(
&self,
callback: Sender<MixerInputResultMessage>,
input: Input,
seek_time: Option<SeekTo>,
config: Arc<Config>,
) {
match input {
Input::Lazy(mut lazy) => {
let far_pool = self.clone();
if lazy.should_create_async() {
self.handle.spawn(async move {
let out = lazy.create_async().await;
far_pool.send_to_parse(out, lazy, callback, seek_time, config);
});
} else {
self.pool.execute(move || {
let out = lazy.create();
far_pool.send_to_parse(out, lazy, callback, seek_time, config);
});
}
},
Input::Live(live, maybe_create) =>
self.parse(config, callback, live, maybe_create, seek_time),
}
}
pub fn send_to_parse(
&self,
create_res: StdResult<AudioStream<Box<dyn MediaSource>>, AudioStreamError>,
rec: Box<dyn Compose>,
callback: Sender<MixerInputResultMessage>,
seek_time: Option<SeekTo>,
config: Arc<Config>,
) {
match create_res {
Ok(o) => {
self.parse(config, callback, LiveInput::Raw(o), Some(rec), seek_time);
},
Err(e) => {
drop(callback.send(MixerInputResultMessage::CreateErr(e.into())));
},
}
}
pub fn parse(
&self,
config: Arc<Config>,
callback: Sender<MixerInputResultMessage>,
input: LiveInput,
rec: Option<Box<dyn Compose>>,
seek_time: Option<SeekTo>,
) {
let pool_clone = self.clone();
self.pool.execute(move || {
match input.promote(config.codec_registry, config.format_registry) {
Ok(LiveInput::Parsed(parsed)) => match seek_time {
Some(seek_time) if !super::util::seek_to_is_zero(&seek_time) => {
pool_clone.seek(callback, parsed, rec, seek_time, false, config);
},
_ => {
drop(callback.send(MixerInputResultMessage::Built(parsed, rec)));
},
},
Ok(_) => unreachable!(),
Err(e) => {
drop(callback.send(MixerInputResultMessage::ParseErr(e.into())));
},
}
});
}
pub fn seek(
&self,
callback: Sender<MixerInputResultMessage>,
mut input: Parsed,
rec: Option<Box<dyn Compose>>,
seek_time: SeekTo,
backseek_needed: bool,
config: Arc<Config>,
) {
let pool_clone = self.clone();
self.pool.execute(move || match rec {
Some(rec) if (!input.supports_backseek) && backseek_needed => {
pool_clone.create(callback, Input::Lazy(rec), Some(seek_time), config);
},
_ => {
let seek_result = input
.format
.seek(SeekMode::Accurate, copy_seek_to(&seek_time));
input.decoder.reset();
drop(callback.send(MixerInputResultMessage::Seek(
input,
rec,
seek_result.map_err(Arc::new),
)));
},
});
}
}