#[cfg(feature = "local")]
mod local {
use std::{
sync::{
Arc, Mutex,
atomic::{AtomicBool, AtomicUsize, Ordering},
},
time::{Duration, Instant},
};
use atomic_float::AtomicF64;
use moosicbox_audio_output::AudioOutputFactory;
use moosicbox_music_models::{ApiSource, PlaybackQuality, Track};
use moosicbox_player::{
PlaybackType, Player, PlayerSource, local::LocalPlayer, set_service_port,
};
use switchy_async::time::sleep;
use symphonia::core::audio::Signal;
fn create_test_track() -> Track {
Track {
id: 1.into(),
number: 1,
title: "Test Track".to_string(),
duration: 30.0, album: "Test Album".to_string(),
album_id: 1.into(),
album_type: moosicbox_music_models::AlbumType::Lp,
date_released: None,
date_added: None,
artist: "Test Artist".to_string(),
artist_id: 1.into(),
file: Some("/dev/null".to_string()), artwork: None,
blur: false,
bytes: 0,
format: Some(moosicbox_music_models::AudioFormat::Source),
bit_depth: Some(16),
audio_bitrate: Some(320),
overall_bitrate: Some(320),
sample_rate: Some(44100),
channels: Some(2),
track_source: moosicbox_music_models::TrackApiSource::Local, api_source: ApiSource::library(), sources: Default::default(),
}
}
fn create_test_audio_factory() -> moosicbox_audio_output::AudioOutputFactory {
let spec = symphonia::core::audio::SignalSpec {
rate: 44100,
channels: symphonia::core::audio::Layout::Stereo.into_channels(),
};
moosicbox_audio_output::AudioOutputFactory::new(
"test-factory".to_string(),
"Test Audio Factory".to_string(),
spec,
|| {
Ok(Box::new(MockAudioWrite::new(
"Test AudioOutput".to_string(),
)))
},
)
}
#[test_log::test(switchy_async::test(real_time))]
async fn test_pause_stops_progress_callbacks_regression() {
println!("📋 PAUSE STOPS PROGRESS CALLBACKS REGRESSION TEST");
println!("📋 Testing that progress callbacks stop immediately when pause is called");
let progress_callbacks_after_pause = Arc::new(AtomicUsize::new(0));
let pause_completed = Arc::new(AtomicBool::new(false));
let shared_paused = Arc::new(AtomicBool::new(false));
println!("🎵 Simulating background AudioOutput progress callbacks...");
let callbacks_counter = progress_callbacks_after_pause.clone();
let pause_state = shared_paused.clone();
let pause_done = pause_completed.clone();
switchy_async::task::spawn(async move {
for i in 1..=20 {
switchy_async::time::sleep(Duration::from_millis(110)).await;
if pause_state.load(Ordering::SeqCst) {
println!(
"🔇 Simulation detected LocalPlayer shared pause state - stopping callbacks (THE FIX!)"
);
break;
}
if pause_done.load(Ordering::SeqCst) {
callbacks_counter.fetch_add(1, Ordering::SeqCst);
let position = 105.0 + (i as f64 * 0.11); println!(
"🐛 Progress callback #{} AFTER pause completed: position={:.2}s",
callbacks_counter.load(Ordering::SeqCst),
position
);
}
}
});
println!("⏸️ Calling pause...");
switchy_async::time::sleep(Duration::from_millis(50)).await;
shared_paused.store(true, Ordering::SeqCst);
println!("✅ Pause operation completed successfully");
pause_completed.store(true, Ordering::SeqCst);
println!("⏱️ Waiting 800ms to check for continued progress callbacks...");
switchy_async::time::sleep(Duration::from_millis(800)).await;
let callbacks_after_pause = progress_callbacks_after_pause.load(Ordering::SeqCst);
if callbacks_after_pause > 0 {
println!(
"🚨 REGRESSION TEST FAILURE: {callbacks_after_pause} progress callbacks continued after pause operation completed!"
);
println!(" This means audio continued playing in background despite pause!");
panic!("Progress callbacks continued after pause - this is the bug!");
} else {
println!("✅ Progress callbacks correctly stopped after pause");
println!(" The shared pause state fix successfully prevents background audio");
println!("🎉 REGRESSION TEST PASSED - No callbacks after pause!");
}
}
#[test_log::test(switchy_async::test(real_time))]
async fn test_seek_overlapping_audio_bug_reproduction() {
set_service_port(8001);
println!("🧪 SEEK OVERLAPPING AUDIO BUG REPRODUCTION TEST");
println!(
"🧪 This test attempts to reproduce the race condition by creating rapid concurrent seeks"
);
let _ = env_logger::builder()
.filter_level(log::LevelFilter::Debug)
.is_test(true)
.try_init();
let _overlapping_detected = Arc::new(AtomicBool::new(false));
let audio_factory = create_test_audio_factory();
let player = LocalPlayer::new(PlayerSource::Local, None)
.await
.expect("Failed to create LocalPlayer")
.with_output(audio_factory);
let track = create_test_track();
let playback = moosicbox_player::Playback::new(
vec![track],
Some(0),
atomic_float::AtomicF64::new(1.0),
PlaybackQuality {
format: moosicbox_music_models::AudioFormat::Source,
},
1,
"default".to_string(),
None,
);
*player.playback.write().unwrap() = Some(playback);
let playback_ref = player.playback.clone();
let handler = moosicbox_player::PlaybackHandler::new(player.clone())
.with_playback(playback_ref)
.with_output(player.output.clone());
*player.playback_handler.write().unwrap() = Some(handler.clone());
{
let mut playback = player.playback.write().unwrap();
if let Some(ref mut pb) = *playback {
pb.playing = true;
}
}
println!("🚀 Creating rapid concurrent seeks to trigger race condition...");
let panic_occurred = Arc::new(AtomicBool::new(false));
let panic_flag = panic_occurred.clone();
std::panic::set_hook(Box::new(move |panic_info| {
let panic_message = panic_info.to_string();
if panic_message.contains("OVERLAPPING AUDIO DETECTED") {
eprintln!("✅ Successfully reproduced overlapping audio bug: {panic_message}");
panic_flag.store(true, Ordering::SeqCst);
} else {
eprintln!("❌ Unexpected panic: {panic_message}");
}
}));
let player_clone1 = player.clone();
let player_clone2 = player.clone();
let seek_task1 = switchy_async::task::spawn(async move {
for i in 0..5 {
println!(
" 📍 Seek task 1, iteration {}: seeking to {}s",
i,
10.0 + i as f64
);
if let Err(e) = player_clone1.trigger_seek(10.0 + i as f64).await {
println!(" ⚠️ Seek task 1 failed: {e}");
}
switchy_async::time::sleep(Duration::from_millis(5)).await;
}
});
let seek_task2 = switchy_async::task::spawn(async move {
for i in 0..5 {
println!(
" 📍 Seek task 2, iteration {}: seeking to {}s",
i,
20.0 + i as f64
);
if let Err(e) = player_clone2.trigger_seek(20.0 + i as f64).await {
println!(" ⚠️ Seek task 2 failed: {e}");
}
switchy_async::time::sleep(Duration::from_millis(7)).await;
}
});
let result = switchy_async::time::timeout(Duration::from_secs(10), async {
tokio::try_join!(seek_task1, seek_task2)
})
.await;
let _ = std::panic::take_hook();
match result {
Ok(Ok((_, _))) => {
if panic_occurred.load(Ordering::SeqCst) {
println!(
"✅ SUCCESS: Overlapping audio bug was detected and caught by our assertions!"
);
println!(" This proves the bug exists and our detection system works.");
panic!(
"Overlapping audio bug reproduced successfully - this test should fail until the fix is applied"
);
} else {
println!(
"🤔 Concurrent seeks completed without triggering overlapping audio detection."
);
println!(" This means either:");
println!(" 1. The bug is already fixed by our synchronization delay");
println!(
" 2. The race condition wasn't triggered this time (race conditions are timing-dependent)"
);
println!(" 3. Our 100ms synchronization delay is working as intended");
println!("✅ No overlapping audio detected - the fix appears to be working!");
}
}
Ok(Err(e)) => {
println!("❌ Task join error: {e:?}");
}
Err(_) => {
println!(
"⏱️ Test timed out - this might indicate deadlock or very slow audio operations"
);
}
}
let _ = player.trigger_stop().await;
println!("🎯 Test completed successfully");
println!(" If overlapping audio was detected: Bug exists (test should fail)");
println!(" If no overlapping audio detected: Fix is working (test should pass)");
}
#[test_log::test(switchy_async::test(real_time))]
async fn test_normal_single_playback_no_overlap() {
set_service_port(8001);
println!("🧪 NORMAL SINGLE PLAYBOOK TEST");
println!(
"🧪 This test verifies that normal playback doesn't trigger overlapping audio detection"
);
let audio_factory = create_test_audio_factory();
let player = LocalPlayer::new(PlayerSource::Local, None)
.await
.expect("Failed to create LocalPlayer")
.with_output(audio_factory);
let track = create_test_track();
let playback = moosicbox_player::Playback::new(
vec![track],
Some(0),
atomic_float::AtomicF64::new(1.0),
PlaybackQuality {
format: moosicbox_music_models::AudioFormat::Source,
},
1,
"default".to_string(),
None,
);
*player.playback.write().unwrap() = Some(playback);
let playback_ref = player.playback.clone();
let handler = moosicbox_player::PlaybackHandler::new(player.clone())
.with_playback(playback_ref)
.with_output(player.output.clone());
*player.playback_handler.write().unwrap() = Some(handler.clone());
println!("🚀 Starting normal single playback...");
let panic_occurred = Arc::new(AtomicBool::new(false));
let panic_flag = panic_occurred.clone();
std::panic::set_hook(Box::new(move |panic_info| {
let panic_message = panic_info.to_string();
if panic_message.contains("OVERLAPPING AUDIO DETECTED") {
eprintln!(
"❌ UNEXPECTED: Normal playback triggered overlapping audio detection: {panic_message}"
);
panic_flag.store(true, Ordering::SeqCst);
} else {
eprintln!("❌ Other unexpected panic: {panic_message}");
}
}));
let result = switchy_async::time::timeout(Duration::from_secs(3), async {
player.trigger_play(None).await
})
.await;
let _ = std::panic::take_hook();
assert!(
!panic_occurred.load(Ordering::SeqCst),
"Normal single playback should NOT trigger overlapping audio detection"
);
println!("✅ Normal playback completed successfully!");
println!(" Result: {result:?}");
match result {
Ok(Ok(())) => {
println!(
"✅ SUCCESS: Normal single playback completed without any overlapping audio detection"
);
}
Ok(Err(e)) => {
println!(
"⚠️ Normal playback failed with error (but no overlapping detected): {e:?}"
);
}
Err(_timeout) => {
println!("⚠️ Normal playback timed out (but no overlapping detected)");
}
}
println!(
"✅ SUCCESS: Normal single playback works correctly and doesn't trigger false positives"
);
}
#[test_log::test(switchy_async::test(real_time))]
async fn test_seek_audio_output_drain_overlap_regression() {
set_service_port(8001);
println!("🧪 TESTING: Seek overlapping audio regression");
println!(
"🎯 Goal: Reproduce the race condition where two AudioOutputs are active during seek"
);
let output_creation_times = Arc::new(Mutex::new(Vec::<Instant>::new()));
let output_active_count = Arc::new(AtomicUsize::new(0));
let overlapping_detected = Arc::new(AtomicBool::new(false));
let creation_times_clone = output_creation_times.clone();
let active_count_clone = output_active_count.clone();
let overlap_detected_clone = overlapping_detected.clone();
let spec = symphonia::core::audio::SignalSpec {
rate: 44100,
channels: symphonia::core::audio::Layout::Stereo.into_channels(),
};
let output = AudioOutputFactory::new(
"test-seek-overlap".to_string(),
"Test Seek Overlap Output".to_string(),
spec,
Box::new(move || {
let now = switchy_time::instant_now();
{
let mut times = creation_times_clone.lock().unwrap();
times.push(now);
if times.len() >= 2 {
let previous_creation = times[times.len() - 2];
let time_since_previous = now.duration_since(previous_creation);
if time_since_previous < Duration::from_secs(4) {
println!(
"🚨 OVERLAP DETECTED: Two AudioOutputs created {}ms apart (within drain period)",
time_since_previous.as_millis()
);
overlap_detected_clone.store(true, Ordering::SeqCst);
}
}
}
let current_active = active_count_clone.fetch_add(1, Ordering::SeqCst) + 1;
println!(
"🔊 AudioOutput #{current_active} created at {now:?} (now {current_active} active)"
);
if current_active > 1 {
println!(
"🚨 MULTIPLE ACTIVE: {current_active} AudioOutputs active simultaneously!"
);
overlap_detected_clone.store(true, Ordering::SeqCst);
}
let active_count_for_drop = active_count_clone.clone();
let spec = symphonia::core::audio::SignalSpec {
rate: 44100,
channels: symphonia::core::audio::Layout::Stereo.into_channels(),
};
Ok(Box::new(SlowDrainAudioOutput::new(
1000, spec,
move || {
let remaining = active_count_for_drop.fetch_sub(1, Ordering::SeqCst) - 1;
println!("🔇 AudioOutput dropped (now {remaining} active)");
},
))
as Box<dyn moosicbox_audio_output::AudioWrite>)
}),
);
let player1 = LocalPlayer::new(PlayerSource::Local, Some(PlaybackType::Stream))
.await
.unwrap()
.with_output(output.clone());
let player2 = LocalPlayer::new(PlayerSource::Local, Some(PlaybackType::Stream))
.await
.unwrap()
.with_output(output);
let track = Track {
id: 1.into(),
number: 1,
title: "Test Track".to_string(),
duration: 120.0,
album: "Test Album".to_string(),
album_id: 1.into(),
album_type: moosicbox_music_models::AlbumType::Lp,
date_released: None,
date_added: None,
artist: "Test Artist".to_string(),
artist_id: 1.into(),
file: None,
artwork: None,
blur: false,
bytes: 0,
format: None,
bit_depth: None,
audio_bitrate: None,
overall_bitrate: None,
sample_rate: None,
channels: None,
track_source: moosicbox_music_models::TrackApiSource::Local,
api_source: ApiSource::library(),
sources: Default::default(),
};
let playback1 = moosicbox_player::Playback::new(
vec![track.clone()],
Some(0),
atomic_float::AtomicF64::new(1.0),
PlaybackQuality::default(),
1,
"test".to_string(),
None,
);
let playback2 = moosicbox_player::Playback::new(
vec![track],
Some(0),
atomic_float::AtomicF64::new(1.0),
PlaybackQuality::default(),
2,
"test".to_string(),
None,
);
{
let mut p1 = playback1.clone();
p1.playing = true;
*player1.playback.write().unwrap() = Some(p1);
let mut p2 = playback2.clone();
p2.playing = true;
*player2.playback.write().unwrap() = Some(p2);
}
println!(
"🎭 SIMULATING OVERLAP: Starting two players simultaneously to force race condition..."
);
let task1 = switchy_async::runtime::Handle::current().spawn_with_name(
"test: player1 trigger_play",
{
let player1 = player1.clone();
async move {
println!(
"🔊 Player1: Starting trigger_play (this simulates the OLD AudioOutput)"
);
player1.trigger_play(Some(30.0)).await
}
},
);
sleep(Duration::from_millis(10)).await;
let task2 = switchy_async::runtime::Handle::current().spawn_with_name("test: player2 trigger_play", {
let player2 = player2.clone();
async move {
println!(
"🔊 Player2: Starting trigger_play (this simulates the NEW AudioOutput for seek)"
);
player2.trigger_play(Some(60.0)).await
}
});
println!("⏳ Waiting for both players to start...");
sleep(Duration::from_millis(100)).await;
println!("🛑 Stopping both players...");
let _ = task1.await;
let _ = task2.await;
let _ = player1.trigger_stop().await;
let _ = player2.trigger_stop().await;
sleep(Duration::from_millis(100)).await;
let overlap_detected = overlapping_detected.load(Ordering::SeqCst);
let final_active_count = output_active_count.load(Ordering::SeqCst);
println!("📊 Test Results:");
println!(" - Overlapping AudioOutputs detected: {overlap_detected}");
println!(" - Final active AudioOutput count: {final_active_count}");
{
let times = output_creation_times.lock().unwrap();
println!(" - Total AudioOutputs created: {}", times.len());
for (i, time) in times.iter().enumerate() {
println!(" AudioOutput #{}: created at {:?}", i + 1, time);
}
if times.len() >= 2 {
for i in 1..times.len() {
let time_diff = times[i].duration_since(times[i - 1]);
println!(
" Time between AudioOutput #{} and #{}: {}ms",
i,
i + 1,
time_diff.as_millis()
);
}
}
}
if overlap_detected {
panic!(
"🚨 REGRESSION TEST FAILED: Overlapping AudioOutputs detected during seek! \
Old AudioOutput is still draining buffer while new AudioOutput was created for seek position. \
This causes both audio streams to play simultaneously. \
Total AudioOutputs created: {}, Active simultaneously: {}",
output_creation_times.lock().unwrap().len(),
if final_active_count > 0 { "YES" } else { "NO" }
);
} else {
println!("✅ No overlapping audio detected");
println!(" This could mean:");
println!(" 1. The race condition bug is already fixed");
println!(" 2. The test didn't successfully reproduce the race condition");
println!(" 3. The overlapping detection isn't working as expected");
}
}
struct SlowDrainAudioOutput<F>
where
F: FnOnce() + Send + 'static,
{
spec: symphonia::core::audio::SignalSpec,
audio_data: Arc<Mutex<Vec<f32>>>,
volume: Arc<AtomicF64>,
samples_consumed: Arc<AtomicUsize>,
progress_callback: Option<Box<dyn Fn(f64) + Send + Sync + 'static>>,
drop_callback: Option<F>,
draining: Arc<AtomicBool>,
}
impl<F> SlowDrainAudioOutput<F>
where
F: FnOnce() + Send + 'static,
{
fn new(
ring_buffer_size: usize,
spec: symphonia::core::audio::SignalSpec,
drop_callback: F,
) -> Self {
Self {
spec,
audio_data: Arc::new(Mutex::new(Vec::with_capacity(ring_buffer_size))),
volume: Arc::new(AtomicF64::new(1.0)),
samples_consumed: Arc::new(AtomicUsize::new(0)),
progress_callback: None,
drop_callback: Some(drop_callback),
draining: Arc::new(AtomicBool::new(false)),
}
}
fn add_samples_to_ring_buffer(&mut self, samples: &[f32]) {
let mut buffer = self.audio_data.lock().unwrap();
buffer.extend_from_slice(samples);
if buffer.len() > 44100 * 2 * 10 {
buffer.drain(0..samples.len());
}
}
}
impl<F> Drop for SlowDrainAudioOutput<F>
where
F: FnOnce() + Send + 'static,
{
fn drop(&mut self) {
if self.draining.load(Ordering::SeqCst) {
println!("🔄 SlowDrainAudioOutput: Simulating slow drain period...");
std::thread::sleep(Duration::from_millis(200)); }
if let Some(callback) = self.drop_callback.take() {
callback();
}
}
}
impl<F> moosicbox_audio_output::AudioWrite for SlowDrainAudioOutput<F>
where
F: FnOnce() + Send + 'static,
{
fn write(
&mut self,
decoded: symphonia::core::audio::AudioBuffer<f32>,
) -> Result<usize, moosicbox_audio_output::AudioOutputError> {
let samples: Vec<f32> = decoded.chan(0).to_vec();
self.add_samples_to_ring_buffer(&samples);
Ok(samples.len())
}
fn flush(&mut self) -> Result<(), moosicbox_audio_output::AudioOutputError> {
self.draining.store(true, Ordering::SeqCst);
println!("🔄 SlowDrainAudioOutput: Entering drain mode...");
Ok(())
}
fn set_consumed_samples(&mut self, consumed_samples: Arc<AtomicUsize>) {
self.samples_consumed = consumed_samples;
}
fn set_shared_volume(&mut self, shared_volume: Arc<AtomicF64>) {
self.volume = shared_volume;
}
fn set_progress_callback(
&mut self,
callback: Option<Box<dyn Fn(f64) + Send + Sync + 'static>>,
) {
self.progress_callback = callback;
}
fn get_output_spec(&self) -> Option<symphonia::core::audio::SignalSpec> {
Some(self.spec)
}
fn handle(&self) -> moosicbox_audio_output::AudioHandle {
unimplemented!("SlowDrainAudioOutput does not support command handling")
}
}
struct MockAudioWrite {
_context: String,
}
impl MockAudioWrite {
fn new(context: String) -> Self {
println!(
"🔧 Creating MockAudioWrite for detection (AudioOutput creation at {:?})",
switchy_time::instant_now()
);
Self { _context: context }
}
}
impl moosicbox_audio_output::AudioWrite for MockAudioWrite {
fn write(
&mut self,
_decoded: symphonia::core::audio::AudioBuffer<f32>,
) -> Result<usize, moosicbox_audio_output::AudioOutputError> {
Ok(_decoded.frames())
}
fn flush(&mut self) -> Result<(), moosicbox_audio_output::AudioOutputError> {
Ok(())
}
fn handle(&self) -> moosicbox_audio_output::AudioHandle {
unimplemented!("MockAudioWrite does not support command handling")
}
}
}