use std::error::Error;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use creek::{ReadDiskStream, SeekMode, SymphoniaDecoder};
use crossbeam_channel::{Receiver, Sender};
use crate::{AtomicF64, AudioBuffer, RENDER_QUANTUM_SIZE};
pub(crate) struct RTSStream {
stream: ReadDiskStream<SymphoniaDecoder>,
number_of_channels: usize,
current_time: Arc<AtomicF64>,
receiver: Receiver<MediaElementAction>,
loop_: Arc<AtomicBool>,
paused: Arc<AtomicBool>,
playback_rate: Arc<AtomicF64>,
}
pub(crate) enum MediaElementAction {
Seek(f64),
SetLoop(bool),
Play,
Pause,
SetPlaybackRate(f64),
}
pub struct MediaElement {
stream: Option<RTSStream>,
current_time: Arc<AtomicF64>,
sender: Sender<MediaElementAction>,
loop_: Arc<AtomicBool>,
paused: Arc<AtomicBool>,
playback_rate: Arc<AtomicF64>,
}
impl MediaElement {
pub fn new<P: Into<PathBuf>>(file: P) -> Result<Self, Box<dyn Error>> {
let mut read_disk_stream = ReadDiskStream::<SymphoniaDecoder>::new(
file, 0, Default::default(), )?;
let number_of_channels = read_disk_stream.info().num_channels as usize;
let _ = read_disk_stream.cache(0, 0);
read_disk_stream.seek(0, SeekMode::default())?;
read_disk_stream.block_until_ready()?;
let (sender, receiver) = crossbeam_channel::bounded(32);
let current_time = Arc::new(AtomicF64::new(0.));
let loop_ = Arc::new(AtomicBool::new(false));
let paused = Arc::new(AtomicBool::new(true));
let playback_rate = Arc::new(AtomicF64::new(1.));
let rts_stream = RTSStream {
stream: read_disk_stream,
number_of_channels,
current_time: Arc::clone(¤t_time),
receiver,
loop_: Arc::clone(&loop_),
paused: Arc::clone(&paused),
playback_rate: Arc::clone(&playback_rate),
};
Ok(Self {
stream: Some(rts_stream),
current_time,
sender,
loop_,
paused,
playback_rate,
})
}
pub(crate) fn take_stream(&mut self) -> Option<RTSStream> {
self.stream.take()
}
pub fn current_time(&self) -> f64 {
self.current_time.load(Ordering::SeqCst)
}
pub fn set_current_time(&self, value: f64) {
let _ = self.sender.send(MediaElementAction::Seek(value));
}
pub fn loop_(&self) -> bool {
self.loop_.load(Ordering::SeqCst)
}
pub fn set_loop(&self, value: bool) {
let _ = self.sender.send(MediaElementAction::SetLoop(value));
}
pub fn play(&self) {
let _ = self.sender.send(MediaElementAction::Play);
}
pub fn pause(&self) {
let _ = self.sender.send(MediaElementAction::Pause);
}
pub fn paused(&self) -> bool {
self.paused.load(Ordering::SeqCst)
}
pub fn playback_rate(&self) -> f64 {
self.playback_rate.load(Ordering::SeqCst)
}
pub fn set_playback_rate(&self, value: f64) {
let _ = self.sender.send(MediaElementAction::SetPlaybackRate(value));
}
}
impl Iterator for RTSStream {
type Item = Result<AudioBuffer, Box<dyn Error + Send + Sync>>;
fn next(&mut self) -> Option<Self::Item> {
let sample_rate = self.stream.info().sample_rate.unwrap() as f32;
if let Ok(msg) = self.receiver.try_recv() {
use MediaElementAction::*;
match msg {
Seek(value) => {
self.current_time.store(value, Ordering::SeqCst);
let frame = (value * sample_rate as f64) as usize;
self.stream.seek(frame, SeekMode::default()).unwrap();
}
SetLoop(value) => {
self.loop_.store(value, Ordering::SeqCst);
}
Play => self.paused.store(false, Ordering::SeqCst),
Pause => self.paused.store(true, Ordering::SeqCst),
SetPlaybackRate(value) => self.playback_rate.store(value, Ordering::SeqCst),
};
}
if self.paused.load(Ordering::SeqCst) {
let silence = AudioBuffer::from(
vec![vec![0.; RENDER_QUANTUM_SIZE]; self.number_of_channels],
sample_rate,
);
return Some(Ok(silence));
}
let playback_rate = self.playback_rate.load(Ordering::SeqCst).abs();
let _reverse = playback_rate < 0.; let samples = (RENDER_QUANTUM_SIZE as f64 * playback_rate) as usize;
let next = match self.stream.read(samples) {
Ok(data) => {
let channels: Vec<_> = (0..data.num_channels())
.map(|i| data.read_channel(i).to_vec())
.collect();
let buf = AudioBuffer::from(channels, sample_rate * playback_rate as f32);
if self.loop_.load(Ordering::SeqCst) && data.reached_end_of_file() {
self.stream.seek(0, SeekMode::default()).unwrap();
self.current_time.store(0., Ordering::SeqCst);
} else {
let current_time = self.current_time.load(Ordering::SeqCst);
self.current_time.store(
current_time + (RENDER_QUANTUM_SIZE as f64 / sample_rate as f64),
Ordering::SeqCst,
);
}
Ok(buf)
}
Err(e) => Err(Box::new(e) as _),
};
Some(next)
}
}