#![allow(clippy::module_name_repetitions)]
use flume::Receiver;
use moosicbox_audio_decoder::{AudioDecodeError, AudioDecodeHandler};
use moosicbox_audio_output::encoder::AudioEncoder;
use moosicbox_resampler::Resampler;
use symphonia::core::{
audio::{AudioBuffer, Signal},
io::{MediaSource, MediaSourceStream, MediaSourceStreamOptions},
probe::Hint,
};
use thiserror::Error;
use super::symphonia_unsync::{PlaybackError, play_media_source};
#[derive(Debug, Error)]
pub enum SignalChainError {
#[error(transparent)]
Playback(#[from] PlaybackError),
#[error("SignalChain is empty")]
Empty,
}
type CreateAudioDecodeStream = Box<dyn (FnOnce() -> AudioDecodeHandler) + Send + 'static>;
type CreateAudioEncoder = Box<dyn (FnOnce() -> Box<dyn AudioEncoder>) + Send + 'static>;
pub struct SignalChain {
steps: Vec<SignalChainStep>,
}
impl SignalChain {
#[must_use]
pub const fn new() -> Self {
Self { steps: vec![] }
}
#[must_use]
pub fn with_hint(mut self, hint: Hint) -> Self {
if let Some(step) = self.steps.pop() {
self.steps.push(step.with_hint(hint));
}
self
}
#[must_use]
pub fn with_audio_decode_handler<F: (FnOnce() -> AudioDecodeHandler) + Send + 'static>(
mut self,
handler: F,
) -> Self {
if let Some(step) = self.steps.pop() {
self.steps.push(step.with_audio_decode_handler(handler));
}
self
}
#[must_use]
pub fn with_encoder<F: (FnOnce() -> Box<dyn AudioEncoder>) + Send + 'static>(
mut self,
encoder: F,
) -> Self {
if let Some(step) = self.steps.pop() {
self.steps.push(step.with_encoder(encoder));
}
self
}
#[must_use]
pub fn with_verify(mut self, verify: bool) -> Self {
if let Some(step) = self.steps.pop() {
self.steps.push(step.with_verify(verify));
}
self
}
#[must_use]
pub fn with_seek(mut self, seek: Option<f64>) -> Self {
if let Some(step) = self.steps.pop() {
self.steps.push(step.with_seek(seek));
}
self
}
#[must_use]
pub fn next_step(mut self) -> Self {
self.steps.push(SignalChainStep::new());
self
}
#[must_use]
pub fn add_step(mut self, step: SignalChainStep) -> Self {
self.steps.push(step);
self
}
#[must_use]
pub fn add_encoder_step<F: (FnOnce() -> Box<dyn AudioEncoder>) + Send + 'static>(
mut self,
encoder: F,
) -> Self {
self.steps
.push(SignalChainStep::new().with_encoder(encoder));
self
}
#[must_use]
pub fn add_resampler_step(mut self, resampler: Resampler<f32>) -> Self {
self.steps
.push(SignalChainStep::new().with_resampler(resampler));
self
}
pub fn process(
mut self,
media_source: Box<dyn MediaSource>,
) -> Result<Box<dyn MediaSource>, SignalChainError> {
log::trace!("process: starting SignalChain processor");
if self.steps.is_empty() {
return Err(SignalChainError::Empty);
}
let mut processor = self.steps.remove(0).process(media_source)?;
while !self.steps.is_empty() {
let step = self.steps.remove(0);
processor = step.process(Box::new(processor))?;
}
Ok(Box::new(processor))
}
}
impl Default for SignalChain {
fn default() -> Self {
Self::new()
}
}
pub struct SignalChainStep {
hint: Option<Hint>,
audio_output_handler: Option<CreateAudioDecodeStream>,
encoder: Option<CreateAudioEncoder>,
resampler: Option<Resampler<f32>>,
enable_gapless: bool,
verify: bool,
seek: Option<f64>,
}
impl SignalChainStep {
#[must_use]
pub fn new() -> Self {
Self {
hint: None,
audio_output_handler: None,
encoder: None,
resampler: None,
enable_gapless: true,
verify: true,
seek: None,
}
}
#[must_use]
pub fn with_hint(mut self, hint: Hint) -> Self {
self.hint.replace(hint);
self
}
#[must_use]
pub fn with_audio_decode_handler<F: (FnOnce() -> AudioDecodeHandler) + Send + 'static>(
mut self,
handler: F,
) -> Self {
self.audio_output_handler.replace(Box::new(handler));
self
}
#[must_use]
pub fn with_encoder<F: (FnOnce() -> Box<dyn AudioEncoder>) + Send + 'static>(
mut self,
encoder: F,
) -> Self {
self.encoder.replace(Box::new(encoder));
self
}
#[must_use]
pub fn with_resampler(mut self, resampler: Resampler<f32>) -> Self {
self.resampler.replace(resampler);
self
}
#[must_use]
pub const fn with_verify(mut self, verify: bool) -> Self {
self.verify = verify;
self
}
#[must_use]
pub const fn with_seek(mut self, seek: Option<f64>) -> Self {
self.seek = seek;
self
}
pub fn process(
self,
media_source: Box<dyn MediaSource>,
) -> Result<SignalChainStepProcessor, SignalChainError> {
let hint = self.hint.unwrap_or_default();
let mss = MediaSourceStream::new(media_source, MediaSourceStreamOptions::default());
let receiver = play_media_source(
mss,
&hint,
self.enable_gapless,
self.verify,
None,
self.seek,
)?;
let encoder = self.encoder.map(|get_encoder| get_encoder());
Ok(SignalChainStepProcessor {
encoder,
resampler: self.resampler,
receiver,
overflow: vec![],
})
}
}
impl Default for SignalChainStep {
fn default() -> Self {
Self::new()
}
}
pub struct SignalChainStepProcessor {
encoder: Option<Box<dyn AudioEncoder>>,
resampler: Option<Resampler<f32>>,
receiver: Receiver<AudioBuffer<f32>>,
overflow: Vec<u8>,
}
impl SignalChainStepProcessor {}
impl std::io::Seek for SignalChainStepProcessor {
fn seek(&mut self, _pos: std::io::SeekFrom) -> std::io::Result<u64> {
Err(std::io::Error::other(
"SignalChainStepProcessor does not support seeking",
))
}
}
impl std::io::Read for SignalChainStepProcessor {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
if !self.overflow.is_empty() {
log::debug!("buf len={} overflow len={}", buf.len(), self.overflow.len());
let end = std::cmp::min(buf.len(), self.overflow.len());
buf[..end].copy_from_slice(&self.overflow.drain(..end).collect::<Vec<_>>());
log::debug!("Returned buffer from overflow buf");
return Ok(end);
}
let bytes = loop {
log::debug!("Waiting for samples from receiver...");
let audio = self
.receiver
.recv_timeout(std::time::Duration::from_secs(1))
.map_err(|e| std::io::Error::new(std::io::ErrorKind::TimedOut, e))?;
log::debug!("Received {} frames from receiver", audio.frames());
let audio = if let Some(resampler) = &mut self.resampler {
let channels = audio.spec().channels.count();
log::debug!("Resampling frames...");
let samples = resampler
.resample(&audio)
.ok_or_else(|| std::io::Error::other("Failed to resample"))?;
let buf = AudioBuffer::new((samples.len() / channels) as u64, resampler.spec);
log::debug!("Resampled into {} frames", buf.frames());
buf
} else {
audio
};
if let Some(encoder) = &mut self.encoder {
log::debug!("Encoding frames...");
let bytes = encoder.encode(audio).map_err(std::io::Error::other)?;
log::debug!("Encoded into {} bytes", bytes.len());
if !bytes.is_empty() {
break Some(bytes);
}
} else {
break None;
}
};
let bytes = bytes.unwrap();
let (bytes_now, overflow) = bytes.split_at(std::cmp::min(buf.len(), bytes.len()));
log::debug!(
"buf len={} bytes_now len={} overflow len={}",
buf.len(),
bytes_now.len(),
overflow.len()
);
buf[..bytes_now.len()].copy_from_slice(bytes_now);
self.overflow.extend_from_slice(overflow);
Ok(bytes_now.len())
}
}
impl MediaSource for SignalChainStepProcessor {
fn is_seekable(&self) -> bool {
false
}
fn byte_len(&self) -> Option<u64> {
None
}
}
#[derive(Debug, Error)]
pub enum SignalChainProcessorError {
#[error(transparent)]
Playback(#[from] PlaybackError),
#[error(transparent)]
AudioDecode(#[from] AudioDecodeError),
}
#[cfg(test)]
mod tests {
use super::*;
#[test_log::test]
fn test_signal_chain_new_creates_empty_chain() {
let chain = SignalChain::new();
assert_eq!(chain.steps.len(), 0);
}
#[test_log::test]
fn test_signal_chain_default_creates_empty_chain() {
let chain = SignalChain::default();
assert_eq!(chain.steps.len(), 0);
}
#[test_log::test]
fn test_signal_chain_add_step_increases_count() {
let chain = SignalChain::new();
assert_eq!(chain.steps.len(), 0);
let chain = chain.add_step(SignalChainStep::new());
assert_eq!(chain.steps.len(), 1);
let chain = chain.add_step(SignalChainStep::new());
assert_eq!(chain.steps.len(), 2);
}
#[test_log::test]
fn test_signal_chain_next_step_adds_empty_step() {
let chain = SignalChain::new().next_step();
assert_eq!(chain.steps.len(), 1);
let chain = chain.next_step();
assert_eq!(chain.steps.len(), 2);
}
#[test_log::test]
fn test_signal_chain_with_hint_modifies_last_step() {
let mut hint = Hint::new();
hint.with_extension("mp3");
let chain = SignalChain::new().next_step().with_hint(hint);
assert_eq!(chain.steps.len(), 1);
}
#[test_log::test]
fn test_signal_chain_with_hint_on_empty_chain_does_nothing() {
let mut hint = Hint::new();
hint.with_extension("flac");
let chain = SignalChain::new().with_hint(hint);
assert_eq!(chain.steps.len(), 0);
}
#[test_log::test]
fn test_signal_chain_with_verify_modifies_last_step() {
let chain = SignalChain::new().next_step().with_verify(false);
assert_eq!(chain.steps.len(), 1);
}
#[test_log::test]
fn test_signal_chain_with_seek_modifies_last_step() {
let chain = SignalChain::new().next_step().with_seek(Some(30.0));
assert_eq!(chain.steps.len(), 1);
}
#[test_log::test]
fn test_signal_chain_step_new_has_defaults() {
let step = SignalChainStep::new();
assert!(step.hint.is_none());
assert!(step.audio_output_handler.is_none());
assert!(step.encoder.is_none());
assert!(step.resampler.is_none());
assert!(step.enable_gapless);
assert!(step.verify);
assert!(step.seek.is_none());
}
#[test_log::test]
fn test_signal_chain_step_default_has_defaults() {
let step = SignalChainStep::default();
assert!(step.hint.is_none());
assert!(step.audio_output_handler.is_none());
assert!(step.encoder.is_none());
assert!(step.resampler.is_none());
assert!(step.enable_gapless);
assert!(step.verify);
assert!(step.seek.is_none());
}
#[test_log::test]
fn test_signal_chain_step_with_verify_sets_value() {
let step = SignalChainStep::new().with_verify(false);
assert!(!step.verify);
let step = SignalChainStep::new().with_verify(true);
assert!(step.verify);
}
#[test_log::test]
fn test_signal_chain_step_with_seek_sets_value() {
let step = SignalChainStep::new().with_seek(Some(45.5));
assert_eq!(step.seek, Some(45.5));
let step = SignalChainStep::new().with_seek(None);
assert_eq!(step.seek, None);
}
#[test_log::test]
fn test_signal_chain_step_with_hint_sets_value() {
let mut hint = Hint::new();
hint.with_extension("opus");
let step = SignalChainStep::new().with_hint(hint);
assert!(step.hint.is_some());
}
#[test_log::test]
fn test_signal_chain_builder_pattern_chaining() {
let mut hint = Hint::new();
hint.with_extension("aac");
let chain = SignalChain::new()
.next_step()
.with_hint(hint)
.with_verify(false)
.with_seek(Some(10.0))
.next_step()
.with_verify(true);
assert_eq!(chain.steps.len(), 2);
}
#[test_log::test]
fn test_signal_chain_step_builder_pattern_chaining() {
let mut hint = Hint::new();
hint.with_extension("flac");
let step = SignalChainStep::new()
.with_hint(hint)
.with_verify(false)
.with_seek(Some(20.0));
assert!(step.hint.is_some());
assert!(!step.verify);
assert_eq!(step.seek, Some(20.0));
}
#[test_log::test]
fn test_signal_chain_process_empty_chain_returns_error() {
use std::io::Cursor;
let chain = SignalChain::new();
let media_source: Box<dyn MediaSource> = Box::new(Cursor::new(vec![]));
let result = chain.process(media_source);
assert!(result.is_err());
let err = result.err().unwrap();
let err_str = err.to_string();
assert!(
err_str.to_lowercase().contains("empty"),
"Expected 'empty' in error message, got: {err_str}"
);
}
#[test_log::test]
fn test_signal_chain_error_display() {
let error = SignalChainError::Empty;
let err_str = error.to_string().to_lowercase();
assert!(
err_str.contains("empty"),
"Expected 'empty' in error message, got: {err_str}"
);
}
#[test_log::test]
fn test_signal_chain_processor_error_display() {
use moosicbox_audio_decoder::AudioDecodeError;
let audio_error = SignalChainProcessorError::AudioDecode(AudioDecodeError::OpenStream);
assert!(!audio_error.to_string().is_empty());
}
#[test_log::test]
fn test_signal_chain_with_encoder_on_empty_chain_does_nothing() {
let chain = SignalChain::new().with_encoder(|| {
panic!("Encoder factory should not be called on empty chain")
});
assert_eq!(chain.steps.len(), 0);
}
#[test_log::test]
fn test_signal_chain_with_audio_decode_handler_on_empty_chain_does_nothing() {
let chain = SignalChain::new().with_audio_decode_handler(|| {
panic!("Handler factory should not be called on empty chain")
});
assert_eq!(chain.steps.len(), 0);
}
#[test_log::test]
fn test_signal_chain_with_verify_on_empty_chain_does_nothing() {
let chain = SignalChain::new().with_verify(false);
assert_eq!(chain.steps.len(), 0);
}
#[test_log::test]
fn test_signal_chain_with_seek_on_empty_chain_does_nothing() {
let chain = SignalChain::new().with_seek(Some(10.0));
assert_eq!(chain.steps.len(), 0);
}
#[test_log::test]
fn test_signal_chain_add_encoder_step_creates_step_with_encoder() {
use moosicbox_audio_output::AudioOutputError;
struct MockEncoder;
impl moosicbox_audio_output::encoder::AudioEncoder for MockEncoder {
fn encode(
&mut self,
_decoded: symphonia::core::audio::AudioBuffer<f32>,
) -> Result<bytes::Bytes, AudioOutputError> {
Ok(bytes::Bytes::new())
}
fn spec(&self) -> symphonia::core::audio::SignalSpec {
symphonia::core::audio::SignalSpec {
rate: 44100,
channels: symphonia::core::audio::Layout::Stereo.into_channels(),
}
}
}
let chain = SignalChain::new().add_encoder_step(|| {
Box::new(MockEncoder)
});
assert_eq!(chain.steps.len(), 1);
}
#[test_log::test]
fn test_signal_chain_step_processor_seek_returns_error() {
use std::io::{Seek, SeekFrom};
let (_tx, rx) = flume::unbounded();
let mut processor = SignalChainStepProcessor {
encoder: None,
resampler: None,
receiver: rx,
overflow: vec![],
};
let result = processor.seek(SeekFrom::Start(0));
assert!(result.is_err());
let err = result.unwrap_err();
assert_eq!(err.kind(), std::io::ErrorKind::Other);
assert!(err.to_string().contains("does not support seeking"));
}
#[test_log::test]
fn test_signal_chain_step_processor_is_not_seekable() {
use symphonia::core::io::MediaSource;
let (_tx, rx) = flume::unbounded();
let processor = SignalChainStepProcessor {
encoder: None,
resampler: None,
receiver: rx,
overflow: vec![],
};
assert!(!processor.is_seekable());
}
#[test_log::test]
fn test_signal_chain_step_processor_byte_len_returns_none() {
use symphonia::core::io::MediaSource;
let (_tx, rx) = flume::unbounded();
let processor = SignalChainStepProcessor {
encoder: None,
resampler: None,
receiver: rx,
overflow: vec![],
};
assert!(processor.byte_len().is_none());
}
#[test_log::test]
fn test_signal_chain_step_processor_read_from_overflow() {
use std::io::Read;
let (_tx, rx) = flume::unbounded::<symphonia::core::audio::AudioBuffer<f32>>();
let mut processor = SignalChainStepProcessor {
encoder: None,
resampler: None,
receiver: rx,
overflow: vec![1, 2, 3, 4, 5, 6, 7, 8],
};
let mut buf = [0u8; 4];
let result = processor.read(&mut buf);
assert!(result.is_ok());
assert_eq!(result.unwrap(), 4);
assert_eq!(&buf, &[1, 2, 3, 4]);
assert_eq!(processor.overflow, vec![5, 6, 7, 8]);
}
#[test_log::test]
fn test_signal_chain_step_processor_read_entire_overflow() {
use std::io::Read;
let (_tx, rx) = flume::unbounded::<symphonia::core::audio::AudioBuffer<f32>>();
let mut processor = SignalChainStepProcessor {
encoder: None,
resampler: None,
receiver: rx,
overflow: vec![10, 20, 30],
};
let mut buf = [0u8; 10];
let result = processor.read(&mut buf);
assert!(result.is_ok());
assert_eq!(result.unwrap(), 3);
assert_eq!(&buf[..3], &[10, 20, 30]);
assert!(processor.overflow.is_empty());
}
#[test_log::test]
fn test_signal_chain_step_processor_read_timeout_when_no_data() {
use std::io::Read;
let (tx, rx) = flume::unbounded::<symphonia::core::audio::AudioBuffer<f32>>();
let mut processor = SignalChainStepProcessor {
encoder: None,
resampler: None,
receiver: rx,
overflow: vec![],
};
drop(tx);
let mut buf = [0u8; 10];
let result = processor.read(&mut buf);
assert!(result.is_err());
let err = result.unwrap_err();
assert!(
err.kind() == std::io::ErrorKind::TimedOut || err.to_string().contains("Disconnected")
);
}
}