use std::error::Error;
use std::fs::File;
use std::io::BufReader;
use lewton::inside_ogg::OggStreamReader;
use lewton::VorbisError;
use crate::buffer::{AudioBuffer, ChannelData};
use crate::control::Controller;
use crate::{BufferDepletedError, SampleRate, BUFFER_SIZE};
#[cfg(not(test))]
use crossbeam_channel::Sender;
use crossbeam_channel::{self, Receiver, TryRecvError};
#[cfg(not(test))]
use crate::io;
#[cfg(not(test))]
use cpal::{traits::StreamTrait, Sample, Stream};
pub trait MediaStream:
Iterator<Item = Result<AudioBuffer, Box<dyn Error + Send>>> + Send + 'static
{
}
impl<M: Iterator<Item = Result<AudioBuffer, Box<dyn Error + Send>>> + Send + 'static> MediaStream
for M
{
}
pub struct MediaElement {
input: Receiver<Option<Result<AudioBuffer, Box<dyn Error + Send>>>>,
buffer: Vec<AudioBuffer>,
buffer_complete: bool,
buffer_index: usize,
controller: Controller,
timestamp: f64,
seeking: Option<f64>,
}
impl MediaElement {
pub fn new<S: MediaStream>(input: S) -> Self {
let (sender, receiver) = crossbeam_channel::unbounded();
let fill_buffer = move || {
let _ = sender.send(None); input.map(Some).for_each(|i| {
let _ = sender.send(i);
});
let _ = sender.send(None); };
std::thread::spawn(fill_buffer);
let ping = receiver.recv().expect("buffer channel disconnected");
assert!(ping.is_none());
Self {
input: receiver,
buffer: vec![],
buffer_complete: false,
buffer_index: 0,
controller: Controller::new(),
timestamp: 0.,
seeking: None,
}
}
pub fn controller(&self) -> &Controller {
&self.controller
}
fn load_next(&mut self) -> Option<Result<AudioBuffer, Box<dyn Error + Send>>> {
if !self.buffer_complete {
let next = match self.input.try_recv() {
Err(_) => return Some(Err(Box::new(BufferDepletedError {}))),
Ok(v) => v,
};
match next {
Some(Err(e)) => {
self.buffer_complete = true;
return Some(Err(e));
}
Some(Ok(data)) => {
self.buffer.push(data.clone());
self.buffer_index += 1;
self.timestamp += data.duration();
return Some(Ok(data));
}
None => {
self.buffer_complete = true;
return None;
}
}
}
None
}
pub fn seek(&mut self, ts: f64) {
if ts == 0. {
self.timestamp = 0.;
self.buffer_index = 0;
return;
}
self.timestamp = 0.;
for (i, buf) in self.buffer.iter().enumerate() {
self.buffer_index = i;
self.timestamp += buf.duration();
if self.timestamp > ts {
return; }
}
loop {
match self.load_next() {
Some(Ok(buf)) => {
self.timestamp += buf.duration();
if self.timestamp > ts {
return; }
}
Some(Err(e)) if e.is::<BufferDepletedError>() => {
self.seeking = Some(ts);
return;
}
_ => {
self.buffer_index += 1;
return;
}
}
}
}
}
impl Iterator for MediaElement {
type Item = Result<AudioBuffer, Box<dyn Error + Send>>;
fn next(&mut self) -> Option<Self::Item> {
if let Some(seek) = self.controller().should_seek() {
println!("seek requested {}", seek);
self.seek(seek);
} else if let Some(seek) = self.seeking.take() {
println!("leftover seek {}", seek);
self.seek(seek);
}
if self.seeking.is_some() {
println!("leftover seek, return silence");
return Some(Err(Box::new(BufferDepletedError {})));
}
if self.controller.loop_() && self.timestamp > self.controller.loop_end() {
self.seek(self.controller.loop_start());
}
if let Some(data) = self.buffer.get(self.buffer_index) {
self.buffer_index += 1;
self.timestamp += data.duration();
return Some(Ok(data.clone()));
}
match self.load_next() {
Some(Ok(data)) => {
return Some(Ok(data));
}
Some(Err(e)) if e.is::<BufferDepletedError>() => {
return Some(Err(e));
}
_ => (), };
if !self.controller.loop_() || self.buffer.is_empty() {
return None;
}
self.seek(self.controller.loop_start());
self.next()
}
}
pub struct Microphone {
receiver: Receiver<AudioBuffer>,
channels: usize,
sample_rate: SampleRate,
#[cfg(not(test))]
stream: Stream,
}
unsafe impl Send for Microphone {}
impl Microphone {
#[cfg(not(test))]
pub fn new() -> Self {
let buffer = 1; let (sender, receiver) = crossbeam_channel::bounded(buffer);
let io_builder = io::InputBuilder::new();
let config = io_builder.config();
log::debug!("Input {:?}", config);
let sample_rate = SampleRate(config.sample_rate.0);
let channels = config.channels as usize;
let render = MicrophoneRender {
channels,
sample_rate,
sender,
};
let stream = io_builder.build(render);
Self {
receiver,
channels,
sample_rate,
stream,
}
}
pub fn suspend(&self) {
#[cfg(not(test))] self.stream.pause().unwrap()
}
pub fn resume(&self) {
#[cfg(not(test))] self.stream.play().unwrap()
}
}
#[cfg(not(test))]
impl Default for Microphone {
fn default() -> Self {
Self::new()
}
}
impl Iterator for Microphone {
type Item = Result<AudioBuffer, Box<dyn Error + Send>>;
fn next(&mut self) -> Option<Self::Item> {
let next = match self.receiver.try_recv() {
Ok(buffer) => {
buffer
}
Err(TryRecvError::Empty) => {
log::debug!("input frame delayed");
AudioBuffer::new(self.channels, BUFFER_SIZE as usize, self.sample_rate)
}
Err(TryRecvError::Disconnected) => {
return None;
}
};
Some(Ok(next))
}
}
#[cfg(not(test))]
pub(crate) struct MicrophoneRender {
channels: usize,
sample_rate: SampleRate,
sender: Sender<AudioBuffer>,
}
#[cfg(not(test))]
impl MicrophoneRender {
pub fn render<S: Sample>(&self, data: &[S]) {
let mut channels = Vec::with_capacity(self.channels);
for i in 0..self.channels {
channels.push(ChannelData::from(
data.iter()
.skip(i)
.step_by(self.channels)
.map(|v| v.to_f32())
.collect(),
));
}
let buffer = AudioBuffer::from_channels(channels, self.sample_rate);
let result = self.sender.try_send(buffer); if result.is_err() {
log::debug!("input frame dropped");
}
}
}
pub struct OggVorbisDecoder {
stream: OggStreamReader<BufReader<File>>,
}
impl OggVorbisDecoder {
pub fn try_new(file: File) -> Result<Self, VorbisError> {
OggStreamReader::new(BufReader::new(file)).map(|stream| Self { stream })
}
}
impl Iterator for OggVorbisDecoder {
type Item = Result<AudioBuffer, Box<dyn Error + Send>>;
fn next(&mut self) -> Option<Self::Item> {
let packet: Vec<Vec<f32>> = match self.stream.read_dec_packet_generic() {
Err(e) => return Some(Err(Box::new(e))),
Ok(None) => return None,
Ok(Some(packet)) => packet,
};
let channel_data: Vec<_> = packet.into_iter().map(ChannelData::from).collect();
let sample_rate = SampleRate(self.stream.ident_hdr.audio_sample_rate);
let result = AudioBuffer::from_channels(channel_data, sample_rate);
Some(Ok(result))
}
}
pub struct WavDecoder {
stream: Box<dyn Iterator<Item = Result<f32, hound::Error>> + Send>,
channels: u32,
sample_rate: SampleRate,
}
impl WavDecoder {
pub fn try_new(file: File) -> Result<Self, hound::Error> {
hound::WavReader::new(BufReader::new(file)).map(|wav| {
let channels = wav.spec().channels as u32;
let sample_rate = SampleRate(wav.spec().sample_rate);
let stream: Box<dyn Iterator<Item = Result<_, _>> + Send> =
match wav.spec().sample_format {
hound::SampleFormat::Float => Box::new(wav.into_samples::<f32>()),
hound::SampleFormat::Int => {
let bits = wav.spec().bits_per_sample as f32;
Box::new(
wav.into_samples::<i32>()
.map(move |r| r.map(|i| i as f32 / bits)),
)
}
};
Self {
stream,
channels,
sample_rate,
}
})
}
}
impl Iterator for WavDecoder {
type Item = Result<AudioBuffer, Box<dyn Error + Send>>;
fn next(&mut self) -> Option<Self::Item> {
let mut data = vec![vec![]; self.channels as usize];
for (i, res) in self
.stream
.by_ref()
.take((self.channels * crate::BUFFER_SIZE) as usize)
.enumerate()
{
match res {
Err(e) => return Some(Err(Box::new(e))),
Ok(v) => data[i % self.channels as usize].push(v),
}
}
if data[0].is_empty() {
return None;
}
let channels = data.into_iter().map(ChannelData::from).collect();
let result = AudioBuffer::from_channels(channels, self.sample_rate);
Some(Ok(result))
}
}