Skip to main content

ff_preview/playback/
player_runner.rs

1//! Exclusive owner of the decode pipeline for ff-preview.
2//!
3//! Move [`PlayerRunner`] to a background thread and call [`PlayerRunner::run`].
4
5use std::collections::VecDeque;
6use std::path::{Path, PathBuf};
7use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
8use std::sync::{Arc, Mutex, mpsc};
9use std::thread::{self, JoinHandle};
10use std::time::{Duration, Instant};
11
12use ff_decode::{AudioDecoder, HardwareAccel, SeekMode};
13use ff_format::SampleFormat;
14
15use super::decode_buffer::{DecodeBuffer, FrameResult};
16use super::master_clock::MasterClock;
17use super::player::{DECODED_SAMPLE_RATE, PlayerCommand};
18use super::sink::FrameSink;
19use crate::cache::FrameCache;
20use crate::error::PreviewError;
21use crate::event::PlayerEvent;
22
23// -- Constants -----------------------------------------------------------
24
25const AUDIO_MAX_BUF: usize = 96_000;
26const AUDIO_STALL_FRAMES: u32 = 5;
27
28// ── PlayerRunner ─────────────────────────────────────────────────────────────
29
30/// Exclusive owner of the decode pipeline. Move to a background thread and
31/// call [`run`](Self::run).
32///
33/// Configure with [`set_sink`](Self::set_sink),
34/// [`use_proxy_if_available`](Self::use_proxy_if_available), and
35/// [`set_hardware_accel`](Self::set_hardware_accel) **before** calling `run`.
36pub struct PlayerRunner {
37    pub(crate) path: PathBuf,
38    pub(crate) cmd_rx: mpsc::Receiver<PlayerCommand>,
39    pub(crate) event_tx: mpsc::SyncSender<PlayerEvent>,
40    pub(crate) decode_buf: Option<DecodeBuffer>,
41    pub(crate) fps: f64,
42    pub(crate) sink: Option<Box<dyn FrameSink>>,
43    pub(crate) clock: MasterClock,
44    pub(crate) audio_buf: Option<Arc<Mutex<VecDeque<f32>>>>,
45    pub(crate) audio_cancel: Option<Arc<AtomicBool>>,
46    pub(crate) audio_handle: Option<JoinHandle<()>>,
47    pub(crate) sws: super::playback_inner::SwsRgbaConverter,
48    pub(crate) rgba_buf: Vec<u8>,
49    pub(crate) active_path: PathBuf,
50    pub(crate) current_pts: Arc<AtomicU64>,
51    pub(crate) paused: Arc<AtomicBool>,
52    pub(crate) stopped: Arc<AtomicBool>,
53    pub(crate) av_offset_ms: i64,
54    pub(crate) rate: f64,
55    pub(crate) duration_millis: u64,
56    pub(crate) frame_cache: Option<FrameCache>,
57    pub(crate) hw_accel: HardwareAccel,
58}
59
60impl PlayerRunner {
61    /// Register the frame sink. Call before [`run`](Self::run).
62    pub fn set_sink(&mut self, sink: Box<dyn FrameSink>) {
63        self.sink = Some(sink);
64    }
65
66    /// Configure hardware acceleration. Call before [`run`](Self::run).
67    ///
68    /// The setting takes effect at the start of `run()`. [`HardwareAccel::Auto`]
69    /// (the default) probes available backends and falls back to software.
70    /// [`HardwareAccel::None`] forces CPU-only decoding.
71    pub fn set_hardware_accel(&mut self, accel: HardwareAccel) -> &mut Self {
72        self.hw_accel = accel;
73        self
74    }
75
76    /// Returns the path currently being decoded (original or active proxy).
77    #[must_use]
78    pub fn active_source(&self) -> &Path {
79        &self.active_path
80    }
81
82    /// Enable an in-memory RGBA frame cache with the given byte budget.
83    ///
84    /// When the budget is set, frames decoded during playback are stored
85    /// and served on cache hit without re-decoding, enabling instant scrubbing.
86    /// The cache is invalidated automatically whenever a seek targets a PTS
87    /// outside the currently cached range.
88    ///
89    /// Example: `runner.with_frame_cache_budget(512 * 1024 * 1024)` for 512 MB.
90    #[must_use]
91    pub fn with_frame_cache_budget(mut self, bytes: usize) -> Self {
92        self.frame_cache = Some(FrameCache::new(bytes));
93        self
94    }
95
96    /// Container-reported duration, or `None` for live / streaming sources.
97    #[must_use]
98    pub fn duration(&self) -> Option<Duration> {
99        if self.duration_millis == u64::MAX {
100            None
101        } else {
102            Some(Duration::from_millis(self.duration_millis))
103        }
104    }
105
106    /// Activate a lower-resolution proxy if one exists in `proxy_dir`.
107    ///
108    /// Must be called before [`run`](Self::run). Returns `true` if a proxy was
109    /// found and activated; `false` if no proxy exists or activation failed.
110    ///
111    /// Proxy lookup order: `half` → `quarter` → `eighth`; first match wins.
112    pub fn use_proxy_if_available(&mut self, proxy_dir: &Path) -> bool {
113        let stem = self
114            .path
115            .file_stem()
116            .and_then(|s| s.to_str())
117            .unwrap_or("output")
118            .to_owned();
119
120        for suffix in ["half", "quarter", "eighth"] {
121            let candidate = proxy_dir.join(format!("{stem}_proxy_{suffix}.mp4"));
122            if candidate.exists() {
123                match self.activate_proxy(&candidate) {
124                    Ok(()) => {
125                        log::debug!("proxy activated path={}", candidate.display());
126                        return true;
127                    }
128                    Err(e) => {
129                        log::warn!(
130                            "proxy activation failed path={} error={e}",
131                            candidate.display()
132                        );
133                    }
134                }
135            }
136        }
137        false
138    }
139
140    /// A/V sync presentation loop.
141    ///
142    /// Blocks until a [`PlayerCommand::Stop`] is received, the end of file is
143    /// reached, or an unrecoverable decode error occurs.
144    ///
145    /// At the top of each frame, all pending commands are drained from the
146    /// channel. Consecutive [`PlayerCommand::Seek`] commands are coalesced —
147    /// only the last one executes.
148    ///
149    /// Emits [`PlayerEvent::SeekCompleted`] after each successful seek,
150    /// [`PlayerEvent::PositionUpdate`] after each presented video frame,
151    /// [`PlayerEvent::Error`] on non-fatal decode errors, and
152    /// [`PlayerEvent::Eof`] before returning.
153    ///
154    /// # Errors
155    ///
156    /// Returns [`PreviewError`] if a seek fails.
157    #[allow(clippy::too_many_lines)]
158    pub fn run(mut self) -> Result<(), PreviewError> {
159        let fps = self.fps.max(1.0);
160        let frame_period = Duration::from_secs_f64(1.0 / fps);
161
162        // Rebuild the decode buffer when the caller has explicitly configured a
163        // hardware acceleration mode other than the default (Auto). The initial
164        // buffer is always built with Auto by PreviewPlayer::open(); rebuilding
165        // here ensures the user's explicit setting is respected.
166        if self.hw_accel != HardwareAccel::Auto && self.decode_buf.is_some() {
167            match DecodeBuffer::open(&self.active_path)
168                .hardware_accel(self.hw_accel)
169                .build()
170            {
171                Ok(buf) => {
172                    self.decode_buf = Some(buf);
173                }
174                Err(e) => {
175                    log::warn!(
176                        "hwaccel decode buffer rebuild failed accel={} error={e}",
177                        self.hw_accel.name()
178                    );
179                }
180            }
181        }
182
183        self.clock.reset(Duration::ZERO);
184
185        // Audio stall detection state: tracks whether samples_consumed is
186        // advancing. When it stops for AUDIO_STALL_FRAMES consecutive
187        // presented frames, the audio track has ended before the video track
188        // and the wall-clock fallback is re-armed so pacing continues.
189        let mut prev_audio_samples: u64 = 0;
190        let mut audio_stall_frames: u32 = 0;
191
192        loop {
193            // ── Drain commands ────────────────────────────────────────────────
194            let mut pending_seek: Option<Duration> = None;
195            while let Ok(cmd) = self.cmd_rx.try_recv() {
196                match cmd {
197                    PlayerCommand::Seek(pts) => pending_seek = Some(pts),
198                    PlayerCommand::Play => {
199                        self.stopped.store(false, Ordering::Release);
200                        self.paused.store(false, Ordering::Release);
201                        // The cpal hardware callback advances `samples_consumed` even
202                        // while paused, so `MasterClock::Audio` drifts forward during
203                        // silence. Reset the clock to the last presented video frame so
204                        // frames are not immediately dropped as "late" on resume.
205                        if self.rate > 0.0 {
206                            let pts =
207                                Duration::from_micros(self.current_pts.load(Ordering::Relaxed));
208                            if self.clock.current_pts().saturating_sub(pts)
209                                > Duration::from_millis(100)
210                            {
211                                self.clock.reset(pts);
212                                self.restart_audio_from(pts);
213                            }
214                        }
215                    }
216                    PlayerCommand::Pause => {
217                        self.paused.store(true, Ordering::Release);
218                    }
219                    PlayerCommand::Stop => {
220                        self.stopped.store(true, Ordering::Release);
221                    }
222                    PlayerCommand::SetRate(r) => {
223                        if r != 0.0 {
224                            let was_negative = self.rate < 0.0;
225                            self.rate = r;
226                            if r > 0.0 {
227                                self.clock.set_rate(r);
228                                // Returning from reverse: the MasterClock kept advancing
229                                // forward during reverse playback, so its position is now
230                                // ahead of the video position. Reset it to the current
231                                // video position and re-seek the decode buffer so the
232                                // forward path resumes from the right frame.
233                                if was_negative {
234                                    let pts = Duration::from_micros(
235                                        self.current_pts.load(Ordering::Relaxed),
236                                    );
237                                    self.clock.reset(pts);
238                                    // Use coarse seek (no forward-decode discard) so the
239                                    // first video frame arrives before the audio clock
240                                    // has advanced past pts, preventing A/V drift.
241                                    if let Some(buf) = self.decode_buf.as_mut()
242                                        && let Err(e) = buf.seek_coarse(pts)
243                                    {
244                                        log::warn!(
245                                            "reverse→forward seek failed pts={pts:?} \
246                                             error={e}"
247                                        );
248                                    }
249                                    self.restart_audio_from(pts);
250                                }
251                            } else {
252                                // Entering reverse: mute audio by cancelling the decode thread
253                                // and clearing the buffer.
254                                if let Some(cancel) = &self.audio_cancel {
255                                    cancel.store(true, Ordering::Release);
256                                }
257                                if let Some(buf) = &self.audio_buf {
258                                    buf.lock()
259                                        .unwrap_or_else(std::sync::PoisonError::into_inner)
260                                        .clear();
261                                }
262                            }
263                        }
264                    }
265                    PlayerCommand::SetAvOffset(ms) => {
266                        const MAX_OFFSET_MS: i64 = 5_000;
267                        self.av_offset_ms = ms.clamp(-MAX_OFFSET_MS, MAX_OFFSET_MS);
268                    }
269                    #[cfg(feature = "timeline")]
270                    PlayerCommand::UpdateLayout(_) => {}
271                }
272            }
273
274            // ── Apply pending seek ────────────────────────────────────────────
275            let had_seek = pending_seek.is_some();
276            if let Some(pts) = pending_seek {
277                // Invalidate the frame cache when seeking outside its range.
278                if let Some(cache) = &mut self.frame_cache {
279                    let in_range = cache
280                        .pts_range()
281                        .is_some_and(|(lo, hi)| pts >= lo && pts <= hi);
282                    if !in_range {
283                        cache.invalidate();
284                    }
285                }
286                if let Some(buf) = self.decode_buf.as_mut() {
287                    buf.seek(pts)?;
288                }
289                self.clock.reset(pts);
290                self.restart_audio_from(pts);
291                let _ = self.event_tx.try_send(PlayerEvent::SeekCompleted(pts));
292            }
293
294            // When a seek arrives while paused, present one preview frame so
295            // the sink reflects the new position without resuming playback.
296            if had_seek
297                && self.paused.load(Ordering::Acquire)
298                && let Some(buf) = self.decode_buf.as_mut()
299            {
300                let deadline = std::time::Instant::now() + Duration::from_millis(300);
301                loop {
302                    match buf.pop_frame() {
303                        FrameResult::Frame(f) => {
304                            self.present_frame(&f);
305                            let pts = f.timestamp().as_duration();
306                            let _ = self.event_tx.try_send(PlayerEvent::PositionUpdate(pts));
307                            break;
308                        }
309                        FrameResult::Seeking(_) => {
310                            if std::time::Instant::now() > deadline {
311                                break;
312                            }
313                            thread::sleep(Duration::from_millis(2));
314                        }
315                        FrameResult::Eof => break,
316                    }
317                }
318            }
319
320            // Surface non-fatal decode errors from the background thread.
321            if let Some(buf) = self.decode_buf.as_ref() {
322                while let Ok(msg) = buf.error_events().try_recv() {
323                    let _ = self.event_tx.try_send(PlayerEvent::Error(msg));
324                }
325            }
326
327            if self.stopped.load(Ordering::Acquire) {
328                break;
329            }
330            if self.paused.load(Ordering::Acquire) {
331                thread::sleep(Duration::from_millis(5));
332                continue;
333            }
334
335            // ── Reverse playback path ─────────────────────────────────────────
336            if self.rate < 0.0 {
337                if let Some(buf) = self.decode_buf.as_mut() {
338                    let current = Duration::from_micros(self.current_pts.load(Ordering::Relaxed));
339                    // Step size = one frame at the requested speed.
340                    let step =
341                        Duration::from_secs_f64(self.rate.abs() / fps.max(f64::MIN_POSITIVE));
342                    let target = current.saturating_sub(step);
343
344                    if buf.seek_coarse(target).is_err() {
345                        break;
346                    }
347
348                    // Drain pop_frame until a decoded frame arrives (with timeout).
349                    let deadline = std::time::Instant::now() + Duration::from_millis(300);
350                    let frame = loop {
351                        match buf.pop_frame() {
352                            FrameResult::Frame(f) => break Some(f),
353                            FrameResult::Seeking(_) => {
354                                if std::time::Instant::now() > deadline {
355                                    break None;
356                                }
357                                thread::sleep(Duration::from_millis(2));
358                            }
359                            FrameResult::Eof => break None,
360                        }
361                    };
362
363                    if let Some(f) = frame {
364                        self.present_frame(&f);
365                        let pts = f.timestamp().as_duration();
366                        let _ = self.event_tx.try_send(PlayerEvent::PositionUpdate(pts));
367                    }
368
369                    if target == Duration::ZERO {
370                        // Reached the start of the clip — pause automatically.
371                        self.paused.store(true, Ordering::Release);
372                    }
373                }
374                thread::sleep(frame_period);
375                continue;
376            }
377
378            // ── Audio-only path ───────────────────────────────────────────────
379            if self.decode_buf.is_none() {
380                let poll_secs =
381                    (10.0_f64 / self.rate.max(f64::MIN_POSITIVE)).clamp(1.0, 50.0) / 1_000.0;
382                thread::sleep(Duration::from_secs_f64(poll_secs));
383                if let Some(audio_buf) = &self.audio_buf {
384                    let empty = audio_buf
385                        .lock()
386                        .unwrap_or_else(std::sync::PoisonError::into_inner)
387                        .is_empty();
388                    if empty
389                        && self
390                            .audio_handle
391                            .as_ref()
392                            .is_none_or(JoinHandle::is_finished)
393                    {
394                        break;
395                    }
396                } else {
397                    break;
398                }
399                continue;
400            }
401
402            // ── Frame cache hit ───────────────────────────────────────────────
403            let current = self.clock.current_pts();
404            let cache_hit = self
405                .frame_cache
406                .as_ref()
407                .and_then(|c| c.get(current))
408                .map(|f| (f.rgba.clone(), f.width, f.height));
409            if let Some((rgba, width, height)) = cache_hit {
410                if let Some(sink) = self.sink.as_mut() {
411                    sink.push_frame(&rgba, width, height, current);
412                }
413                self.current_pts.store(
414                    u64::try_from(current.as_micros()).unwrap_or(u64::MAX),
415                    Ordering::Relaxed,
416                );
417                let _ = self.event_tx.try_send(PlayerEvent::PositionUpdate(current));
418                continue;
419            }
420
421            // ── Video decode path ─────────────────────────────────────────────
422            let pop_result = if let Some(buf) = self.decode_buf.as_mut() {
423                buf.pop_frame()
424            } else {
425                FrameResult::Eof
426            };
427
428            match pop_result {
429                FrameResult::Eof => break,
430                FrameResult::Seeking(last) => {
431                    if let Some(ref f) = last {
432                        self.present_frame(f);
433                    }
434                }
435                FrameResult::Frame(frame) => {
436                    if self.clock.should_sync() {
437                        let video_pts = if frame.timestamp().is_valid() {
438                            frame.timestamp().as_duration()
439                        } else {
440                            Duration::ZERO
441                        };
442
443                        let offset_ms = self.av_offset_ms;
444                        let offset = Duration::from_millis(offset_ms.unsigned_abs());
445                        let adjusted_video_pts = if offset_ms >= 0 {
446                            video_pts.saturating_sub(offset)
447                        } else {
448                            video_pts + offset
449                        };
450
451                        let clock_pts = self.clock.current_pts();
452                        let diff = adjusted_video_pts.as_secs_f64() - clock_pts.as_secs_f64();
453                        let fp = frame_period.as_secs_f64();
454
455                        if diff > fp {
456                            let sleep_secs =
457                                (diff - fp / 2.0).max(0.0) / self.rate.max(f64::MIN_POSITIVE);
458                            // Cap at one scaled frame period so the loop still wakes up
459                            // when the audio clock freezes, but slow rates (< 1×) are
460                            // not artificially capped to a value shorter than their
461                            // required inter-frame sleep.
462                            let max_sleep = fp / self.rate.max(f64::MIN_POSITIVE);
463                            thread::sleep(Duration::from_secs_f64(sleep_secs.min(max_sleep)));
464                        } else if diff < -fp {
465                            log::debug!(
466                                "dropped late frame video_pts={video_pts:?} \
467                                 clock_pts={clock_pts:?}"
468                            );
469                            continue;
470                        }
471                    }
472
473                    self.present_frame(&frame);
474                    let pts = frame.timestamp().as_duration();
475                    let _ = self.event_tx.try_send(PlayerEvent::PositionUpdate(pts));
476
477                    // Grace period: after the first frame, arm the wall-clock fallback
478                    // if no audio consumer has started consuming samples yet.
479                    // This ensures real-time pacing even when pop_audio_samples() is
480                    // never called (e.g. no cpal stream attached to the handle).
481                    self.clock.activate_fallback_if_no_audio(pts);
482
483                    // Audio-EOF detection: if samples_consumed stops advancing for
484                    // AUDIO_STALL_FRAMES consecutive frames while non-zero (audio was
485                    // playing but has now ended), re-arm the wall-clock fallback so the
486                    // remaining video plays at its native frame rate.
487                    let cur_audio = self.clock.audio_samples_snapshot();
488                    if cur_audio > 0 && cur_audio == prev_audio_samples {
489                        audio_stall_frames = audio_stall_frames.saturating_add(1);
490                        if audio_stall_frames == AUDIO_STALL_FRAMES {
491                            self.clock.rearm_fallback_at(pts);
492                        }
493                    } else {
494                        prev_audio_samples = cur_audio;
495                        audio_stall_frames = 0;
496                    }
497
498                    // Populate cache after conversion (rgba_buf holds the converted frame).
499                    if let Some(cache) = &mut self.frame_cache
500                        && !self.rgba_buf.is_empty()
501                    {
502                        cache.insert(pts, self.rgba_buf.clone(), frame.width(), frame.height());
503                    }
504                }
505            }
506        }
507
508        let _ = self.event_tx.try_send(PlayerEvent::Eof);
509        if let Some(sink) = self.sink.as_mut() {
510            sink.flush();
511        }
512        Ok(())
513    }
514
515    fn present_frame(&mut self, frame: &ff_format::VideoFrame) {
516        let pts = frame.timestamp().as_duration();
517        self.current_pts.store(
518            u64::try_from(pts.as_micros()).unwrap_or(u64::MAX),
519            Ordering::Relaxed,
520        );
521        let Some(sink) = self.sink.as_mut() else {
522            return;
523        };
524        let width = frame.width();
525        let height = frame.height();
526        if self.sws.convert(frame, &mut self.rgba_buf) {
527            sink.push_frame(&self.rgba_buf, width, height, pts);
528        }
529    }
530
531    fn restart_audio_from(&mut self, pts: Duration) {
532        if let Some(buf) = &self.audio_buf {
533            buf.lock()
534                .unwrap_or_else(std::sync::PoisonError::into_inner)
535                .clear();
536        }
537        if let Some(cancel) = &self.audio_cancel {
538            cancel.store(true, Ordering::Release);
539        }
540        drop(self.audio_handle.take());
541        if let Some(buf) = &self.audio_buf {
542            let new_cancel = Arc::new(AtomicBool::new(false));
543            let handle = spawn_audio_thread(
544                self.active_path.clone(),
545                pts,
546                Arc::clone(buf),
547                Arc::clone(&new_cancel),
548            );
549            self.audio_cancel = Some(new_cancel);
550            self.audio_handle = Some(handle);
551        }
552    }
553
554    fn activate_proxy(&mut self, proxy_path: &Path) -> Result<(), PreviewError> {
555        let info = ff_probe::open(proxy_path)?;
556        let fps = info.frame_rate().unwrap_or(30.0).max(1.0);
557        let decode_buf = DecodeBuffer::open(proxy_path)
558            .hardware_accel(self.hw_accel)
559            .build()?;
560
561        if let Some(cancel) = &self.audio_cancel {
562            cancel.store(true, Ordering::Release);
563        }
564        if let Some(buf) = &self.audio_buf {
565            buf.lock()
566                .unwrap_or_else(std::sync::PoisonError::into_inner)
567                .clear();
568        }
569        drop(self.audio_handle.take());
570
571        let (clock, audio_buf, audio_cancel, audio_handle) = if info.has_audio() {
572            let buf = Arc::new(Mutex::new(VecDeque::<f32>::new()));
573            let cancel = Arc::new(AtomicBool::new(false));
574            let handle = spawn_audio_thread(
575                proxy_path.to_path_buf(),
576                Duration::ZERO,
577                Arc::clone(&buf),
578                Arc::clone(&cancel),
579            );
580            let clock = MasterClock::Audio {
581                samples_consumed: Arc::new(AtomicU64::new(0)),
582                sample_rate: DECODED_SAMPLE_RATE,
583                rate: 1.0,
584                samples_base: 0,
585                pts_base: Duration::ZERO,
586                fallback: None,
587            };
588            (clock, Some(buf), Some(cancel), Some(handle))
589        } else {
590            log::debug!(
591                "proxy has no audio, using system clock path={}",
592                proxy_path.display()
593            );
594            let clock = MasterClock::System {
595                started_at: Instant::now(),
596                base_pts: Duration::ZERO,
597                rate: 1.0,
598            };
599            (clock, None, None, None)
600        };
601
602        self.active_path = proxy_path.to_path_buf();
603        self.fps = fps;
604        self.decode_buf = Some(decode_buf);
605        self.clock = clock;
606        self.audio_buf = audio_buf;
607        self.audio_cancel = audio_cancel;
608        self.audio_handle = audio_handle;
609        Ok(())
610    }
611}
612
613impl Drop for PlayerRunner {
614    fn drop(&mut self) {
615        if let Some(cancel) = &self.audio_cancel {
616            cancel.store(true, Ordering::Release);
617        }
618        if let Some(h) = self.audio_handle.take() {
619            let _ = h.join();
620        }
621    }
622}
623
624// ── spawn_audio_thread ────────────────────────────────────────────────────────
625
626pub(crate) fn spawn_audio_thread(
627    path: PathBuf,
628    start_pts: Duration,
629    buf: Arc<Mutex<VecDeque<f32>>>,
630    cancel: Arc<AtomicBool>,
631) -> JoinHandle<()> {
632    thread::spawn(move || {
633        let mut decoder = match AudioDecoder::open(&path)
634            .output_format(SampleFormat::F32)
635            .output_sample_rate(DECODED_SAMPLE_RATE)
636            .output_channels(2)
637            .build()
638        {
639            Ok(d) => d,
640            Err(e) => {
641                log::warn!("audio decode thread open failed error={e}");
642                return;
643            }
644        };
645
646        if start_pts != Duration::ZERO
647            && let Err(e) = decoder.seek(start_pts, SeekMode::Backward)
648        {
649            log::warn!("audio seek failed pts={start_pts:?} error={e}");
650        }
651
652        loop {
653            if cancel.load(Ordering::Acquire) {
654                break;
655            }
656
657            match decoder.decode_one() {
658                Ok(Some(frame)) => {
659                    let samples = super::playback_inner::audio_frame_to_f32(&frame);
660                    // Push ALL samples without dropping. When the ring buffer is
661                    // full, wait for cpal to drain space before continuing.
662                    // Using take(space) instead would silently discard samples on
663                    // platforms where sleep(1ms) sleeps much longer (e.g. ~10ms on
664                    // Windows), causing audio to play at ~2x speed (issue #18).
665                    let mut offset = 0;
666                    while offset < samples.len() {
667                        if cancel.load(Ordering::Acquire) {
668                            return;
669                        }
670                        let mut guard = buf
671                            .lock()
672                            .unwrap_or_else(std::sync::PoisonError::into_inner);
673                        let space = AUDIO_MAX_BUF.saturating_sub(guard.len());
674                        if space == 0 {
675                            drop(guard);
676                            thread::sleep(Duration::from_millis(1));
677                            continue;
678                        }
679                        let take = space.min(samples.len() - offset);
680                        guard.extend(samples[offset..offset + take].iter().copied());
681                        offset += take;
682                    }
683                }
684                Ok(None) => break,
685                Err(e) => {
686                    log::warn!("audio decode error error={e}");
687                    break;
688                }
689            }
690        }
691    })
692}
693
694// ── Tests ─────────────────────────────────────────────────────────────────────
695
696#[cfg(test)]
697#[path = "player_runner_tests.rs"]
698mod tests;