1use std::collections::VecDeque;
6use std::path::{Path, PathBuf};
7use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
8use std::sync::{Arc, Mutex, mpsc};
9use std::thread::{self, JoinHandle};
10use std::time::{Duration, Instant};
11
12use ff_decode::{AudioDecoder, HardwareAccel, SeekMode};
13use ff_format::SampleFormat;
14
15use super::decode_buffer::{DecodeBuffer, FrameResult};
16use super::master_clock::MasterClock;
17use super::player::{DECODED_SAMPLE_RATE, PlayerCommand};
18use super::sink::FrameSink;
19use crate::cache::FrameCache;
20use crate::error::PreviewError;
21use crate::event::PlayerEvent;
22
23const AUDIO_MAX_BUF: usize = 96_000;
26const AUDIO_STALL_FRAMES: u32 = 5;
27
28pub struct PlayerRunner {
37 pub(crate) path: PathBuf,
38 pub(crate) cmd_rx: mpsc::Receiver<PlayerCommand>,
39 pub(crate) event_tx: mpsc::SyncSender<PlayerEvent>,
40 pub(crate) decode_buf: Option<DecodeBuffer>,
41 pub(crate) fps: f64,
42 pub(crate) sink: Option<Box<dyn FrameSink>>,
43 pub(crate) clock: MasterClock,
44 pub(crate) audio_buf: Option<Arc<Mutex<VecDeque<f32>>>>,
45 pub(crate) audio_cancel: Option<Arc<AtomicBool>>,
46 pub(crate) audio_handle: Option<JoinHandle<()>>,
47 pub(crate) sws: super::playback_inner::SwsRgbaConverter,
48 pub(crate) rgba_buf: Vec<u8>,
49 pub(crate) active_path: PathBuf,
50 pub(crate) current_pts: Arc<AtomicU64>,
51 pub(crate) paused: Arc<AtomicBool>,
52 pub(crate) stopped: Arc<AtomicBool>,
53 pub(crate) av_offset_ms: i64,
54 pub(crate) rate: f64,
55 pub(crate) duration_millis: u64,
56 pub(crate) frame_cache: Option<FrameCache>,
57 pub(crate) hw_accel: HardwareAccel,
58}
59
60impl PlayerRunner {
61 pub fn set_sink(&mut self, sink: Box<dyn FrameSink>) {
63 self.sink = Some(sink);
64 }
65
66 pub fn set_hardware_accel(&mut self, accel: HardwareAccel) -> &mut Self {
72 self.hw_accel = accel;
73 self
74 }
75
76 #[must_use]
78 pub fn active_source(&self) -> &Path {
79 &self.active_path
80 }
81
82 #[must_use]
91 pub fn with_frame_cache_budget(mut self, bytes: usize) -> Self {
92 self.frame_cache = Some(FrameCache::new(bytes));
93 self
94 }
95
96 #[must_use]
98 pub fn duration(&self) -> Option<Duration> {
99 if self.duration_millis == u64::MAX {
100 None
101 } else {
102 Some(Duration::from_millis(self.duration_millis))
103 }
104 }
105
106 pub fn use_proxy_if_available(&mut self, proxy_dir: &Path) -> bool {
113 let stem = self
114 .path
115 .file_stem()
116 .and_then(|s| s.to_str())
117 .unwrap_or("output")
118 .to_owned();
119
120 for suffix in ["half", "quarter", "eighth"] {
121 let candidate = proxy_dir.join(format!("{stem}_proxy_{suffix}.mp4"));
122 if candidate.exists() {
123 match self.activate_proxy(&candidate) {
124 Ok(()) => {
125 log::debug!("proxy activated path={}", candidate.display());
126 return true;
127 }
128 Err(e) => {
129 log::warn!(
130 "proxy activation failed path={} error={e}",
131 candidate.display()
132 );
133 }
134 }
135 }
136 }
137 false
138 }
139
140 #[allow(clippy::too_many_lines)]
158 pub fn run(mut self) -> Result<(), PreviewError> {
159 let fps = self.fps.max(1.0);
160 let frame_period = Duration::from_secs_f64(1.0 / fps);
161
162 if self.hw_accel != HardwareAccel::Auto && self.decode_buf.is_some() {
167 match DecodeBuffer::open(&self.active_path)
168 .hardware_accel(self.hw_accel)
169 .build()
170 {
171 Ok(buf) => {
172 self.decode_buf = Some(buf);
173 }
174 Err(e) => {
175 log::warn!(
176 "hwaccel decode buffer rebuild failed accel={} error={e}",
177 self.hw_accel.name()
178 );
179 }
180 }
181 }
182
183 self.clock.reset(Duration::ZERO);
184
185 let mut prev_audio_samples: u64 = 0;
190 let mut audio_stall_frames: u32 = 0;
191
192 loop {
193 let mut pending_seek: Option<Duration> = None;
195 while let Ok(cmd) = self.cmd_rx.try_recv() {
196 match cmd {
197 PlayerCommand::Seek(pts) => pending_seek = Some(pts),
198 PlayerCommand::Play => {
199 self.stopped.store(false, Ordering::Release);
200 self.paused.store(false, Ordering::Release);
201 if self.rate > 0.0 {
206 let pts =
207 Duration::from_micros(self.current_pts.load(Ordering::Relaxed));
208 if self.clock.current_pts().saturating_sub(pts)
209 > Duration::from_millis(100)
210 {
211 self.clock.reset(pts);
212 self.restart_audio_from(pts);
213 }
214 }
215 }
216 PlayerCommand::Pause => {
217 self.paused.store(true, Ordering::Release);
218 }
219 PlayerCommand::Stop => {
220 self.stopped.store(true, Ordering::Release);
221 }
222 PlayerCommand::SetRate(r) => {
223 if r != 0.0 {
224 let was_negative = self.rate < 0.0;
225 self.rate = r;
226 if r > 0.0 {
227 self.clock.set_rate(r);
228 if was_negative {
234 let pts = Duration::from_micros(
235 self.current_pts.load(Ordering::Relaxed),
236 );
237 self.clock.reset(pts);
238 if let Some(buf) = self.decode_buf.as_mut()
242 && let Err(e) = buf.seek_coarse(pts)
243 {
244 log::warn!(
245 "reverse→forward seek failed pts={pts:?} \
246 error={e}"
247 );
248 }
249 self.restart_audio_from(pts);
250 }
251 } else {
252 if let Some(cancel) = &self.audio_cancel {
255 cancel.store(true, Ordering::Release);
256 }
257 if let Some(buf) = &self.audio_buf {
258 buf.lock()
259 .unwrap_or_else(std::sync::PoisonError::into_inner)
260 .clear();
261 }
262 }
263 }
264 }
265 PlayerCommand::SetAvOffset(ms) => {
266 const MAX_OFFSET_MS: i64 = 5_000;
267 self.av_offset_ms = ms.clamp(-MAX_OFFSET_MS, MAX_OFFSET_MS);
268 }
269 #[cfg(feature = "timeline")]
270 PlayerCommand::UpdateLayout(_) => {}
271 }
272 }
273
274 let had_seek = pending_seek.is_some();
276 if let Some(pts) = pending_seek {
277 if let Some(cache) = &mut self.frame_cache {
279 let in_range = cache
280 .pts_range()
281 .is_some_and(|(lo, hi)| pts >= lo && pts <= hi);
282 if !in_range {
283 cache.invalidate();
284 }
285 }
286 if let Some(buf) = self.decode_buf.as_mut() {
287 buf.seek(pts)?;
288 }
289 self.clock.reset(pts);
290 self.restart_audio_from(pts);
291 let _ = self.event_tx.try_send(PlayerEvent::SeekCompleted(pts));
292 }
293
294 if had_seek
297 && self.paused.load(Ordering::Acquire)
298 && let Some(buf) = self.decode_buf.as_mut()
299 {
300 let deadline = std::time::Instant::now() + Duration::from_millis(300);
301 loop {
302 match buf.pop_frame() {
303 FrameResult::Frame(f) => {
304 self.present_frame(&f);
305 let pts = f.timestamp().as_duration();
306 let _ = self.event_tx.try_send(PlayerEvent::PositionUpdate(pts));
307 break;
308 }
309 FrameResult::Seeking(_) => {
310 if std::time::Instant::now() > deadline {
311 break;
312 }
313 thread::sleep(Duration::from_millis(2));
314 }
315 FrameResult::Eof => break,
316 }
317 }
318 }
319
320 if let Some(buf) = self.decode_buf.as_ref() {
322 while let Ok(msg) = buf.error_events().try_recv() {
323 let _ = self.event_tx.try_send(PlayerEvent::Error(msg));
324 }
325 }
326
327 if self.stopped.load(Ordering::Acquire) {
328 break;
329 }
330 if self.paused.load(Ordering::Acquire) {
331 thread::sleep(Duration::from_millis(5));
332 continue;
333 }
334
335 if self.rate < 0.0 {
337 if let Some(buf) = self.decode_buf.as_mut() {
338 let current = Duration::from_micros(self.current_pts.load(Ordering::Relaxed));
339 let step =
341 Duration::from_secs_f64(self.rate.abs() / fps.max(f64::MIN_POSITIVE));
342 let target = current.saturating_sub(step);
343
344 if buf.seek_coarse(target).is_err() {
345 break;
346 }
347
348 let deadline = std::time::Instant::now() + Duration::from_millis(300);
350 let frame = loop {
351 match buf.pop_frame() {
352 FrameResult::Frame(f) => break Some(f),
353 FrameResult::Seeking(_) => {
354 if std::time::Instant::now() > deadline {
355 break None;
356 }
357 thread::sleep(Duration::from_millis(2));
358 }
359 FrameResult::Eof => break None,
360 }
361 };
362
363 if let Some(f) = frame {
364 self.present_frame(&f);
365 let pts = f.timestamp().as_duration();
366 let _ = self.event_tx.try_send(PlayerEvent::PositionUpdate(pts));
367 }
368
369 if target == Duration::ZERO {
370 self.paused.store(true, Ordering::Release);
372 }
373 }
374 thread::sleep(frame_period);
375 continue;
376 }
377
378 if self.decode_buf.is_none() {
380 let poll_secs =
381 (10.0_f64 / self.rate.max(f64::MIN_POSITIVE)).clamp(1.0, 50.0) / 1_000.0;
382 thread::sleep(Duration::from_secs_f64(poll_secs));
383 if let Some(audio_buf) = &self.audio_buf {
384 let empty = audio_buf
385 .lock()
386 .unwrap_or_else(std::sync::PoisonError::into_inner)
387 .is_empty();
388 if empty
389 && self
390 .audio_handle
391 .as_ref()
392 .is_none_or(JoinHandle::is_finished)
393 {
394 break;
395 }
396 } else {
397 break;
398 }
399 continue;
400 }
401
402 let current = self.clock.current_pts();
404 let cache_hit = self
405 .frame_cache
406 .as_ref()
407 .and_then(|c| c.get(current))
408 .map(|f| (f.rgba.clone(), f.width, f.height));
409 if let Some((rgba, width, height)) = cache_hit {
410 if let Some(sink) = self.sink.as_mut() {
411 sink.push_frame(&rgba, width, height, current);
412 }
413 self.current_pts.store(
414 u64::try_from(current.as_micros()).unwrap_or(u64::MAX),
415 Ordering::Relaxed,
416 );
417 let _ = self.event_tx.try_send(PlayerEvent::PositionUpdate(current));
418 continue;
419 }
420
421 let pop_result = if let Some(buf) = self.decode_buf.as_mut() {
423 buf.pop_frame()
424 } else {
425 FrameResult::Eof
426 };
427
428 match pop_result {
429 FrameResult::Eof => break,
430 FrameResult::Seeking(last) => {
431 if let Some(ref f) = last {
432 self.present_frame(f);
433 }
434 }
435 FrameResult::Frame(frame) => {
436 if self.clock.should_sync() {
437 let video_pts = if frame.timestamp().is_valid() {
438 frame.timestamp().as_duration()
439 } else {
440 Duration::ZERO
441 };
442
443 let offset_ms = self.av_offset_ms;
444 let offset = Duration::from_millis(offset_ms.unsigned_abs());
445 let adjusted_video_pts = if offset_ms >= 0 {
446 video_pts.saturating_sub(offset)
447 } else {
448 video_pts + offset
449 };
450
451 let clock_pts = self.clock.current_pts();
452 let diff = adjusted_video_pts.as_secs_f64() - clock_pts.as_secs_f64();
453 let fp = frame_period.as_secs_f64();
454
455 if diff > fp {
456 let sleep_secs =
457 (diff - fp / 2.0).max(0.0) / self.rate.max(f64::MIN_POSITIVE);
458 let max_sleep = fp / self.rate.max(f64::MIN_POSITIVE);
463 thread::sleep(Duration::from_secs_f64(sleep_secs.min(max_sleep)));
464 } else if diff < -fp {
465 log::debug!(
466 "dropped late frame video_pts={video_pts:?} \
467 clock_pts={clock_pts:?}"
468 );
469 continue;
470 }
471 }
472
473 self.present_frame(&frame);
474 let pts = frame.timestamp().as_duration();
475 let _ = self.event_tx.try_send(PlayerEvent::PositionUpdate(pts));
476
477 self.clock.activate_fallback_if_no_audio(pts);
482
483 let cur_audio = self.clock.audio_samples_snapshot();
488 if cur_audio > 0 && cur_audio == prev_audio_samples {
489 audio_stall_frames = audio_stall_frames.saturating_add(1);
490 if audio_stall_frames == AUDIO_STALL_FRAMES {
491 self.clock.rearm_fallback_at(pts);
492 }
493 } else {
494 prev_audio_samples = cur_audio;
495 audio_stall_frames = 0;
496 }
497
498 if let Some(cache) = &mut self.frame_cache
500 && !self.rgba_buf.is_empty()
501 {
502 cache.insert(pts, self.rgba_buf.clone(), frame.width(), frame.height());
503 }
504 }
505 }
506 }
507
508 let _ = self.event_tx.try_send(PlayerEvent::Eof);
509 if let Some(sink) = self.sink.as_mut() {
510 sink.flush();
511 }
512 Ok(())
513 }
514
515 fn present_frame(&mut self, frame: &ff_format::VideoFrame) {
516 let pts = frame.timestamp().as_duration();
517 self.current_pts.store(
518 u64::try_from(pts.as_micros()).unwrap_or(u64::MAX),
519 Ordering::Relaxed,
520 );
521 let Some(sink) = self.sink.as_mut() else {
522 return;
523 };
524 let width = frame.width();
525 let height = frame.height();
526 if self.sws.convert(frame, &mut self.rgba_buf) {
527 sink.push_frame(&self.rgba_buf, width, height, pts);
528 }
529 }
530
531 fn restart_audio_from(&mut self, pts: Duration) {
532 if let Some(buf) = &self.audio_buf {
533 buf.lock()
534 .unwrap_or_else(std::sync::PoisonError::into_inner)
535 .clear();
536 }
537 if let Some(cancel) = &self.audio_cancel {
538 cancel.store(true, Ordering::Release);
539 }
540 drop(self.audio_handle.take());
541 if let Some(buf) = &self.audio_buf {
542 let new_cancel = Arc::new(AtomicBool::new(false));
543 let handle = spawn_audio_thread(
544 self.active_path.clone(),
545 pts,
546 Arc::clone(buf),
547 Arc::clone(&new_cancel),
548 );
549 self.audio_cancel = Some(new_cancel);
550 self.audio_handle = Some(handle);
551 }
552 }
553
554 fn activate_proxy(&mut self, proxy_path: &Path) -> Result<(), PreviewError> {
555 let info = ff_probe::open(proxy_path)?;
556 let fps = info.frame_rate().unwrap_or(30.0).max(1.0);
557 let decode_buf = DecodeBuffer::open(proxy_path)
558 .hardware_accel(self.hw_accel)
559 .build()?;
560
561 if let Some(cancel) = &self.audio_cancel {
562 cancel.store(true, Ordering::Release);
563 }
564 if let Some(buf) = &self.audio_buf {
565 buf.lock()
566 .unwrap_or_else(std::sync::PoisonError::into_inner)
567 .clear();
568 }
569 drop(self.audio_handle.take());
570
571 let (clock, audio_buf, audio_cancel, audio_handle) = if info.has_audio() {
572 let buf = Arc::new(Mutex::new(VecDeque::<f32>::new()));
573 let cancel = Arc::new(AtomicBool::new(false));
574 let handle = spawn_audio_thread(
575 proxy_path.to_path_buf(),
576 Duration::ZERO,
577 Arc::clone(&buf),
578 Arc::clone(&cancel),
579 );
580 let clock = MasterClock::Audio {
581 samples_consumed: Arc::new(AtomicU64::new(0)),
582 sample_rate: DECODED_SAMPLE_RATE,
583 rate: 1.0,
584 samples_base: 0,
585 pts_base: Duration::ZERO,
586 fallback: None,
587 };
588 (clock, Some(buf), Some(cancel), Some(handle))
589 } else {
590 log::debug!(
591 "proxy has no audio, using system clock path={}",
592 proxy_path.display()
593 );
594 let clock = MasterClock::System {
595 started_at: Instant::now(),
596 base_pts: Duration::ZERO,
597 rate: 1.0,
598 };
599 (clock, None, None, None)
600 };
601
602 self.active_path = proxy_path.to_path_buf();
603 self.fps = fps;
604 self.decode_buf = Some(decode_buf);
605 self.clock = clock;
606 self.audio_buf = audio_buf;
607 self.audio_cancel = audio_cancel;
608 self.audio_handle = audio_handle;
609 Ok(())
610 }
611}
612
613impl Drop for PlayerRunner {
614 fn drop(&mut self) {
615 if let Some(cancel) = &self.audio_cancel {
616 cancel.store(true, Ordering::Release);
617 }
618 if let Some(h) = self.audio_handle.take() {
619 let _ = h.join();
620 }
621 }
622}
623
624pub(crate) fn spawn_audio_thread(
627 path: PathBuf,
628 start_pts: Duration,
629 buf: Arc<Mutex<VecDeque<f32>>>,
630 cancel: Arc<AtomicBool>,
631) -> JoinHandle<()> {
632 thread::spawn(move || {
633 let mut decoder = match AudioDecoder::open(&path)
634 .output_format(SampleFormat::F32)
635 .output_sample_rate(DECODED_SAMPLE_RATE)
636 .output_channels(2)
637 .build()
638 {
639 Ok(d) => d,
640 Err(e) => {
641 log::warn!("audio decode thread open failed error={e}");
642 return;
643 }
644 };
645
646 if start_pts != Duration::ZERO
647 && let Err(e) = decoder.seek(start_pts, SeekMode::Backward)
648 {
649 log::warn!("audio seek failed pts={start_pts:?} error={e}");
650 }
651
652 loop {
653 if cancel.load(Ordering::Acquire) {
654 break;
655 }
656
657 match decoder.decode_one() {
658 Ok(Some(frame)) => {
659 let samples = super::playback_inner::audio_frame_to_f32(&frame);
660 let mut offset = 0;
666 while offset < samples.len() {
667 if cancel.load(Ordering::Acquire) {
668 return;
669 }
670 let mut guard = buf
671 .lock()
672 .unwrap_or_else(std::sync::PoisonError::into_inner);
673 let space = AUDIO_MAX_BUF.saturating_sub(guard.len());
674 if space == 0 {
675 drop(guard);
676 thread::sleep(Duration::from_millis(1));
677 continue;
678 }
679 let take = space.min(samples.len() - offset);
680 guard.extend(samples[offset..offset + take].iter().copied());
681 offset += take;
682 }
683 }
684 Ok(None) => break,
685 Err(e) => {
686 log::warn!("audio decode error error={e}");
687 break;
688 }
689 }
690 }
691 })
692}
693
694#[cfg(test)]
697#[path = "player_runner_tests.rs"]
698mod tests;