use std::{
collections::VecDeque,
convert::TryInto,
sync::{
atomic::{AtomicBool, AtomicI64, Ordering},
Arc,
},
time::Duration,
};
use crate::{
dsp::Frame,
sound::streaming::{decoder::Decoder, StreamingSoundSettings},
LoopBehavior,
};
use ringbuf::{HeapConsumer, HeapProducer, HeapRb};
const BUFFER_SIZE: usize = 16_384;
const SEEK_DESTINATION_NONE: i64 = i64::MAX;
const DECODER_THREAD_SLEEP_DURATION: Duration = Duration::from_millis(1);
pub(crate) enum NextStep {
Continue,
Wait,
End,
}
pub(crate) struct DecodeSchedulerController {
frame_consumer: HeapConsumer<(i64, Frame)>,
seek_destination_sender: Arc<AtomicI64>,
stopped_signal_sender: Arc<AtomicBool>,
finished_signal_receiver: Arc<AtomicBool>,
}
impl DecodeSchedulerController {
pub fn frame_consumer_mut(&mut self) -> &mut HeapConsumer<(i64, Frame)> {
&mut self.frame_consumer
}
pub fn seek(&self, index: i64) {
self.seek_destination_sender.store(index, Ordering::SeqCst);
}
pub fn finished(&self) -> bool {
self.finished_signal_receiver.load(Ordering::SeqCst)
}
}
impl Drop for DecodeSchedulerController {
fn drop(&mut self) {
self.stopped_signal_sender.store(true, Ordering::SeqCst);
}
}
pub(crate) struct DecodeScheduler<Error: Send + 'static> {
decoder: Box<dyn Decoder<Error = Error>>,
sample_rate: u32,
loop_behavior: Option<LoopBehavior>,
frame_producer: HeapProducer<(i64, Frame)>,
seek_destination_receiver: Arc<AtomicI64>,
stopped_signal_receiver: Arc<AtomicBool>,
finished_signal_sender: Arc<AtomicBool>,
decoded_frames: VecDeque<Frame>,
current_frame: i64,
error_producer: HeapProducer<Error>,
}
impl<Error: Send + 'static> DecodeScheduler<Error> {
pub fn new(
decoder: Box<dyn Decoder<Error = Error>>,
settings: StreamingSoundSettings,
error_producer: HeapProducer<Error>,
) -> Result<(Self, DecodeSchedulerController), Error> {
let (mut frame_producer, frame_consumer) = HeapRb::new(BUFFER_SIZE).split();
frame_producer
.push((0, Frame::ZERO))
.expect("The frame producer shouldn't be full because we just created it");
let seek_destination_sender = Arc::new(AtomicI64::new(SEEK_DESTINATION_NONE));
let seek_destination_receiver = seek_destination_sender.clone();
let stopped_signal_sender = Arc::new(AtomicBool::new(false));
let stopped_signal_receiver = stopped_signal_sender.clone();
let finished_signal_sender = Arc::new(AtomicBool::new(false));
let finished_signal_receiver = finished_signal_sender.clone();
let sample_rate = decoder.sample_rate();
let mut scheduler = Self {
decoder,
sample_rate,
loop_behavior: settings.loop_behavior,
frame_producer,
seek_destination_receiver,
stopped_signal_receiver,
finished_signal_sender,
decoded_frames: VecDeque::new(),
current_frame: 0,
error_producer,
};
scheduler.seek(settings.start_position)?;
let controller = DecodeSchedulerController {
frame_consumer,
seek_destination_sender,
stopped_signal_sender,
finished_signal_receiver,
};
Ok((scheduler, controller))
}
pub fn current_frame(&self) -> i64 {
self.current_frame
}
pub fn start(mut self) {
std::thread::spawn(move || loop {
match self.run() {
Ok(result) => match result {
NextStep::Continue => {}
NextStep::Wait => std::thread::sleep(DECODER_THREAD_SLEEP_DURATION),
NextStep::End => break,
},
Err(error) => {
self.error_producer.push(error).ok();
}
}
});
}
pub fn run(&mut self) -> Result<NextStep, Error> {
if self.stopped_signal_receiver.load(Ordering::SeqCst) {
return Ok(NextStep::End);
}
if self.frame_producer.is_full() {
return Ok(NextStep::Wait);
}
let seek_destination = self.seek_destination_receiver.load(Ordering::SeqCst);
if seek_destination != SEEK_DESTINATION_NONE {
self.seek_destination_receiver
.store(SEEK_DESTINATION_NONE, Ordering::SeqCst);
self.seek_to_index(seek_destination)?;
}
if let Some(frame) = self.decoded_frames.pop_front() {
self.frame_producer
.push((self.current_frame, frame))
.expect("Frame producer should not be full because we just checked that");
self.current_frame += 1;
} else if self.current_frame < 0 {
self.frame_producer
.push((self.current_frame, Frame::ZERO))
.expect("Frame producer should not be full because we just checked that");
self.current_frame += 1;
} else {
let reached_end_of_file = self.decoder.decode(&mut self.decoded_frames)?;
if reached_end_of_file {
if let Some(LoopBehavior { start_position }) = self.loop_behavior {
self.seek(start_position)?;
} else {
self.finished_signal_sender.store(true, Ordering::SeqCst);
return Ok(NextStep::End);
}
}
}
Ok(NextStep::Continue)
}
fn seek(&mut self, position: f64) -> Result<(), Error> {
let index = (position * self.sample_rate as f64).round() as i64;
self.seek_to_index(index)?;
Ok(())
}
fn seek_to_index(&mut self, index: i64) -> Result<(), Error> {
if index < 0 {
self.current_frame = index;
self.decoder.seek(0)?;
} else {
let desired_index = index.try_into().expect("can't convert i64 to u64");
self.current_frame = self
.decoder
.seek(desired_index)?
.try_into()
.expect("sound is too long, cannot convert u64 to i64");
}
Ok(())
}
}