1use std::collections::VecDeque;
7use std::path::{Path, PathBuf};
8use std::sync::Arc;
9use std::sync::Mutex;
10use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, Ordering};
11use std::thread::{self, JoinHandle};
12use std::time::{Duration, Instant};
13
14use ff_decode::{AudioDecoder, SeekMode};
15use ff_format::SampleFormat;
16
17use super::clock::MasterClock;
18use super::decode_buffer::{DecodeBuffer, FrameResult, SeekEvent};
19use super::sink::FrameSink;
20use crate::error::PreviewError;
21
22const AUDIO_MAX_BUF: usize = 96_000;
27
28pub struct PreviewPlayer {
45 path: PathBuf,
48 decode_buf: DecodeBuffer,
50 fps: f64,
52 sink: Option<Box<dyn FrameSink>>,
55 paused: AtomicBool,
57 stopped: Arc<AtomicBool>,
59 clock: MasterClock,
61 av_offset_ms: AtomicI64,
66 audio_buf: Option<Arc<Mutex<VecDeque<f32>>>>,
69 audio_cancel: Option<Arc<AtomicBool>>,
72 audio_handle: Option<JoinHandle<()>>,
74 sws: super::playback_inner::SwsRgbaConverter,
77 rgba_buf: Vec<u8>,
79 active_path: PathBuf,
82 started: AtomicBool,
85}
86
87impl PreviewPlayer {
88 pub fn open(path: &Path) -> Result<Self, PreviewError> {
98 let info = ff_probe::open(path)?;
99
100 let fps = info.frame_rate().unwrap_or(30.0).max(1.0);
101
102 let clock = if info.has_audio() {
103 let sample_rate = info.sample_rate().unwrap_or(48_000);
104 MasterClock::Audio {
105 samples_consumed: Arc::new(AtomicU64::new(0)),
106 sample_rate,
107 }
108 } else {
109 log::debug!(
110 "using system clock fallback path={} no_audio=true",
111 path.display()
112 );
113 MasterClock::System {
114 started_at: Instant::now(),
115 base_pts: Duration::ZERO,
116 }
117 };
118
119 let decode_buf = DecodeBuffer::open(path).build()?;
120
121 let (audio_buf, audio_cancel, audio_handle) = if let MasterClock::Audio { .. } = &clock {
123 let buf = Arc::new(Mutex::new(VecDeque::<f32>::new()));
124 let cancel = Arc::new(AtomicBool::new(false));
125 let handle = spawn_audio_thread(
126 path.to_path_buf(),
127 Duration::ZERO,
128 Arc::clone(&buf),
129 Arc::clone(&cancel),
130 );
131 (Some(buf), Some(cancel), Some(handle))
132 } else {
133 (None, None, None)
134 };
135
136 Ok(PreviewPlayer {
137 path: path.to_path_buf(),
138 decode_buf,
139 fps,
140 sink: None,
141 paused: AtomicBool::new(false),
142 stopped: Arc::new(AtomicBool::new(false)),
143 clock,
144 av_offset_ms: AtomicI64::new(0),
145 audio_buf,
146 audio_cancel,
147 audio_handle,
148 sws: super::playback_inner::SwsRgbaConverter::new(),
149 rgba_buf: Vec::new(),
150 active_path: path.to_path_buf(),
151 started: AtomicBool::new(false),
152 })
153 }
154
155 pub fn set_sink(&mut self, sink: Box<dyn FrameSink>) {
157 self.sink = Some(sink);
158 }
159
160 pub fn play(&mut self) {
165 self.started.store(true, Ordering::Release);
166 self.paused.store(false, Ordering::Release);
167 self.stopped.store(false, Ordering::Release);
168 }
169
170 pub fn pause(&mut self) {
173 self.paused.store(true, Ordering::Release);
174 }
175
176 pub fn stop(&mut self) {
180 self.stopped.store(true, Ordering::Release);
181 }
182
183 pub fn stop_handle(&self) -> Arc<AtomicBool> {
198 Arc::clone(&self.stopped)
199 }
200
201 pub fn pop_frame(&mut self) -> FrameResult {
206 self.decode_buf.pop_frame()
207 }
208
209 pub fn seek(&mut self, target_pts: Duration) -> Result<(), PreviewError> {
217 self.decode_buf.seek(target_pts)
218 }
219
220 pub fn use_proxy_if_available(&mut self, proxy_dir: &Path) -> bool {
233 if self.started.load(Ordering::Acquire) {
234 log::warn!("use_proxy_if_available called after play; ignored");
235 return false;
236 }
237 let stem = self
238 .path
239 .file_stem()
240 .and_then(|s| s.to_str())
241 .unwrap_or("output")
242 .to_owned();
243
244 for suffix in ["half", "quarter", "eighth"] {
245 let candidate = proxy_dir.join(format!("{stem}_proxy_{suffix}.mp4"));
246 if candidate.exists() {
247 match self.activate_proxy(&candidate) {
248 Ok(()) => {
249 log::debug!("proxy activated path={}", candidate.display());
250 return true;
251 }
252 Err(e) => {
253 log::warn!(
254 "proxy activation failed path={} error={e}",
255 candidate.display()
256 );
257 }
258 }
259 }
260 }
261 false
262 }
263
264 pub fn active_source(&self) -> &Path {
267 &self.active_path
268 }
269
270 fn activate_proxy(&mut self, proxy_path: &Path) -> Result<(), PreviewError> {
273 let info = ff_probe::open(proxy_path)?;
274 let fps = info.frame_rate().unwrap_or(30.0).max(1.0);
275 let decode_buf = DecodeBuffer::open(proxy_path).build()?;
276
277 if let Some(cancel) = &self.audio_cancel {
279 cancel.store(true, Ordering::Release);
280 }
281 if let Some(buf) = &self.audio_buf {
282 buf.lock()
283 .unwrap_or_else(std::sync::PoisonError::into_inner)
284 .clear();
285 }
286 drop(self.audio_handle.take());
288
289 let (clock, audio_buf, audio_cancel, audio_handle) = if info.has_audio() {
290 let sample_rate = info.sample_rate().unwrap_or(48_000);
291 let buf = Arc::new(Mutex::new(VecDeque::<f32>::new()));
292 let cancel = Arc::new(AtomicBool::new(false));
293 let handle = spawn_audio_thread(
294 proxy_path.to_path_buf(),
295 Duration::ZERO,
296 Arc::clone(&buf),
297 Arc::clone(&cancel),
298 );
299 let clock = MasterClock::Audio {
300 samples_consumed: Arc::new(AtomicU64::new(0)),
301 sample_rate,
302 };
303 (clock, Some(buf), Some(cancel), Some(handle))
304 } else {
305 log::debug!(
306 "proxy has no audio, using system clock path={}",
307 proxy_path.display()
308 );
309 let clock = MasterClock::System {
310 started_at: Instant::now(),
311 base_pts: Duration::ZERO,
312 };
313 (clock, None, None, None)
314 };
315
316 self.active_path = proxy_path.to_path_buf();
317 self.fps = fps;
318 self.decode_buf = decode_buf;
319 self.clock = clock;
320 self.audio_buf = audio_buf;
321 self.audio_cancel = audio_cancel;
322 self.audio_handle = audio_handle;
323 Ok(())
324 }
325
326 pub fn set_av_offset(&self, ms: i64) {
336 const MAX_OFFSET_MS: i64 = 5_000;
337 let clamped = if ms.abs() > MAX_OFFSET_MS {
338 log::warn!("av_offset clamped value={ms}");
339 ms.clamp(-MAX_OFFSET_MS, MAX_OFFSET_MS)
340 } else {
341 ms
342 };
343 self.av_offset_ms.store(clamped, Ordering::Relaxed);
344 }
345
346 pub fn av_offset(&self) -> i64 {
350 self.av_offset_ms.load(Ordering::Relaxed)
351 }
352
353 pub fn pop_audio_samples(&mut self, n_samples: usize) -> Vec<f32> {
371 if self.paused.load(Ordering::Relaxed) || self.stopped.load(Ordering::Relaxed) {
372 return Vec::new();
373 }
374 let MasterClock::Audio {
375 samples_consumed, ..
376 } = &self.clock
377 else {
378 return Vec::new();
379 };
380 if n_samples == 0 {
381 return Vec::new();
382 }
383 let Some(buf) = &self.audio_buf else {
384 return Vec::new();
385 };
386 let mut guard = buf
387 .lock()
388 .unwrap_or_else(std::sync::PoisonError::into_inner);
389 let take = n_samples.min(guard.len());
390 if take == 0 {
391 return Vec::new();
392 }
393 let samples: Vec<f32> = guard.drain(..take).collect();
394 samples_consumed.fetch_add((take / 2) as u64, Ordering::Relaxed);
397 samples
398 }
399
400 pub fn run(&mut self) -> Result<(), PreviewError> {
418 let fps = self.fps.max(1.0);
419 let frame_period = Duration::from_secs_f64(1.0 / fps);
420
421 self.clock.reset(Duration::ZERO);
424
425 loop {
426 if self.stopped.load(Ordering::Acquire) {
427 break;
428 }
429 if self.paused.load(Ordering::Acquire) {
430 thread::sleep(Duration::from_millis(5));
431 continue;
432 }
433
434 match self.decode_buf.pop_frame() {
435 FrameResult::Eof => break,
436 FrameResult::Seeking(last) => {
437 if let Some(ref f) = last {
438 self.present_frame(f);
439 }
440 }
442 FrameResult::Frame(frame) => {
443 while let Ok(SeekEvent::Completed { pts }) =
445 self.decode_buf.seek_events().try_recv()
446 {
447 self.clock.reset(pts);
448 self.restart_audio_from(pts);
451 }
452
453 if self.clock.should_sync() {
454 let video_pts = if frame.timestamp().is_valid() {
455 frame.timestamp().as_duration()
456 } else {
457 Duration::ZERO
458 };
459
460 let offset_ms = self.av_offset_ms.load(Ordering::Relaxed);
462 let offset = Duration::from_millis(offset_ms.unsigned_abs());
463 let adjusted_video_pts = if offset_ms >= 0 {
464 video_pts.saturating_sub(offset)
467 } else {
468 video_pts + offset
471 };
472
473 let clock_pts = self.clock.current_pts();
474 let diff = adjusted_video_pts.as_secs_f64() - clock_pts.as_secs_f64();
475 let fp = frame_period.as_secs_f64();
476
477 if diff > fp {
478 let sleep_secs = (diff - fp / 2.0).max(0.0);
480 thread::sleep(Duration::from_secs_f64(sleep_secs));
481 } else if diff < -fp {
482 log::debug!(
484 "dropped late frame video_pts={video_pts:?} \
485 clock_pts={clock_pts:?}"
486 );
487 continue;
488 }
489 }
490
491 self.present_frame(&frame);
492 }
493 }
494 }
495 if let Some(sink) = self.sink.as_mut() {
496 sink.flush();
497 }
498 Ok(())
499 }
500
501 fn present_frame(&mut self, frame: &ff_format::VideoFrame) {
503 let Some(sink) = self.sink.as_mut() else {
504 return;
505 };
506 let width = frame.width();
507 let height = frame.height();
508 let pts = frame.timestamp().as_duration();
509 if self.sws.convert(frame, &mut self.rgba_buf) {
510 sink.push_frame(&self.rgba_buf, width, height, pts);
511 }
512 }
513
514 fn restart_audio_from(&mut self, pts: Duration) {
521 if let Some(buf) = &self.audio_buf {
523 buf.lock()
524 .unwrap_or_else(std::sync::PoisonError::into_inner)
525 .clear();
526 }
527 if let Some(cancel) = &self.audio_cancel {
529 cancel.store(true, Ordering::Release);
530 }
531 drop(self.audio_handle.take());
533 if let Some(buf) = &self.audio_buf {
535 let new_cancel = Arc::new(AtomicBool::new(false));
536 let handle = spawn_audio_thread(
537 self.active_path.clone(),
538 pts,
539 Arc::clone(buf),
540 Arc::clone(&new_cancel),
541 );
542 self.audio_cancel = Some(new_cancel);
543 self.audio_handle = Some(handle);
544 }
545 }
546}
547
548impl Drop for PreviewPlayer {
549 fn drop(&mut self) {
550 if let Some(cancel) = &self.audio_cancel {
554 cancel.store(true, Ordering::Release);
555 }
556 if let Some(h) = self.audio_handle.take() {
557 let _ = h.join();
558 }
559 }
560}
561
562fn spawn_audio_thread(
571 path: PathBuf,
572 start_pts: Duration,
573 buf: Arc<Mutex<VecDeque<f32>>>,
574 cancel: Arc<AtomicBool>,
575) -> JoinHandle<()> {
576 thread::spawn(move || {
577 let mut decoder = match AudioDecoder::open(&path)
578 .output_format(SampleFormat::F32)
579 .output_sample_rate(48_000)
580 .output_channels(2)
581 .build()
582 {
583 Ok(d) => d,
584 Err(e) => {
585 log::warn!("audio decode thread open failed error={e}");
586 return;
587 }
588 };
589
590 if start_pts != Duration::ZERO
591 && let Err(e) = decoder.seek(start_pts, SeekMode::Backward)
592 {
593 log::warn!("audio seek failed pts={start_pts:?} error={e}");
594 }
595
596 loop {
597 if cancel.load(Ordering::Acquire) {
598 break;
599 }
600
601 let buf_len = buf
602 .lock()
603 .unwrap_or_else(std::sync::PoisonError::into_inner)
604 .len();
605 if buf_len >= AUDIO_MAX_BUF {
606 thread::sleep(Duration::from_millis(1));
607 continue;
608 }
609
610 match decoder.decode_one() {
611 Ok(Some(frame)) => {
612 let samples = super::playback_inner::audio_frame_to_f32(&frame);
613 if !samples.is_empty() {
614 let mut guard = buf
615 .lock()
616 .unwrap_or_else(std::sync::PoisonError::into_inner);
617 let space = AUDIO_MAX_BUF.saturating_sub(guard.len());
618 guard.extend(samples.into_iter().take(space));
619 }
620 }
621 Ok(None) => break, Err(e) => {
623 log::warn!("audio decode error error={e}");
624 break;
625 }
626 }
627 }
628 })
629}
630
631#[cfg(test)]
634mod tests {
635 use super::*;
636 use std::path::Path;
637
638 fn test_video_path() -> std::path::PathBuf {
639 std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("../../assets/video/gameplay.mp4")
640 }
641
642 #[test]
645 fn preview_player_open_should_fail_for_nonexistent_file() {
646 let result = PreviewPlayer::open(Path::new("nonexistent_preview.mp4"));
647 assert!(
648 result.is_err(),
649 "open() must return Err for a non-existent file"
650 );
651 }
652
653 #[test]
654 fn preview_player_play_pause_stop_should_update_state() {
655 let path = test_video_path();
656 let mut player = match PreviewPlayer::open(&path) {
657 Ok(p) => p,
658 Err(e) => {
659 println!("skipping: video file not available: {e}");
660 return;
661 }
662 };
663
664 assert!(!player.paused.load(Ordering::Relaxed));
666 assert!(!player.stopped.load(Ordering::Relaxed));
667
668 player.pause();
669 assert!(player.paused.load(Ordering::Relaxed));
670
671 player.play();
672 assert!(!player.paused.load(Ordering::Relaxed));
673 assert!(!player.stopped.load(Ordering::Relaxed));
674
675 player.stop();
676 assert!(player.stopped.load(Ordering::Relaxed));
677 }
678
679 #[test]
680 fn preview_player_run_should_deliver_frames_to_sink() {
681 use std::sync::{Arc, Mutex};
682
683 struct CountingSink(Arc<Mutex<usize>>);
684 impl FrameSink for CountingSink {
685 fn push_frame(&mut self, _rgba: &[u8], _width: u32, _height: u32, _pts: Duration) {
686 *self
687 .0
688 .lock()
689 .unwrap_or_else(std::sync::PoisonError::into_inner) += 1;
690 }
691 }
692
693 let path = test_video_path();
694 let mut player = match PreviewPlayer::open(&path) {
695 Ok(p) => p,
696 Err(e) => {
697 println!("skipping: video file not available: {e}");
698 return;
699 }
700 };
701
702 let count = Arc::new(Mutex::new(0usize));
703 player.set_sink(Box::new(CountingSink(Arc::clone(&count))));
704 player.play();
705
706 match player.run() {
708 Ok(()) => {}
709 Err(e) => {
710 println!("skipping: run() error: {e}");
711 return;
712 }
713 }
714
715 let frames = *count
716 .lock()
717 .unwrap_or_else(std::sync::PoisonError::into_inner);
718 assert!(
719 frames > 0,
720 "run() must deliver at least one frame to the sink"
721 );
722 }
723
724 #[test]
727 fn pop_audio_samples_should_return_empty_when_paused() {
728 let path = test_video_path();
729 let mut player = match PreviewPlayer::open(&path) {
730 Ok(p) => p,
731 Err(e) => {
732 println!("skipping: video file not available: {e}");
733 return;
734 }
735 };
736 player.pause();
737 let samples = player.pop_audio_samples(1024);
738 assert!(
739 samples.is_empty(),
740 "pop_audio_samples() must return empty while paused"
741 );
742 }
743
744 #[test]
745 fn pop_audio_samples_should_return_empty_when_stopped() {
746 let path = test_video_path();
747 let mut player = match PreviewPlayer::open(&path) {
748 Ok(p) => p,
749 Err(e) => {
750 println!("skipping: video file not available: {e}");
751 return;
752 }
753 };
754 player.stop();
755 let samples = player.pop_audio_samples(1024);
756 assert!(
757 samples.is_empty(),
758 "pop_audio_samples() must return empty while stopped"
759 );
760 }
761
762 #[test]
763 fn pop_audio_samples_should_return_empty_for_zero_n_samples() {
764 let path = test_video_path();
765 let mut player = match PreviewPlayer::open(&path) {
766 Ok(p) => p,
767 Err(e) => {
768 println!("skipping: video file not available: {e}");
769 return;
770 }
771 };
772 player.play();
773 let samples = player.pop_audio_samples(0);
774 assert!(
775 samples.is_empty(),
776 "pop_audio_samples(0) must always return empty"
777 );
778 }
779
780 #[test]
781 fn pop_audio_samples_clock_increment_should_equal_half_sample_count() {
782 let stereo_samples: usize = 9_600;
785 let expected_frames: u64 = (stereo_samples / 2) as u64;
786 assert_eq!(
787 expected_frames, 4_800,
788 "9600 stereo samples must yield 4800 clock frames"
789 );
790 let pts = Duration::from_secs_f64(f64::from(48_000u32).recip() * expected_frames as f64);
792 assert!(
793 (pts.as_secs_f64() - 0.1).abs() < 1e-6,
794 "4800 frames at 48 kHz must equal 100 ms; got {pts:?}"
795 );
796 }
797
798 #[test]
801 fn av_offset_default_should_be_zero() {
802 use std::sync::atomic::{AtomicI64, Ordering};
803 let offset = AtomicI64::new(0);
805 assert_eq!(offset.load(Ordering::Relaxed), 0);
806 }
807
808 #[test]
809 fn set_av_offset_should_clamp_large_positive_value() {
810 let path = test_video_path();
811 let player = match PreviewPlayer::open(&path) {
812 Ok(p) => p,
813 Err(e) => {
814 println!("skipping: video file not available: {e}");
815 return;
816 }
817 };
818 player.set_av_offset(10_000);
819 assert_eq!(player.av_offset(), 5_000, "offset must be clamped to +5000");
820 }
821
822 #[test]
823 fn set_av_offset_should_clamp_large_negative_value() {
824 let path = test_video_path();
825 let player = match PreviewPlayer::open(&path) {
826 Ok(p) => p,
827 Err(e) => {
828 println!("skipping: video file not available: {e}");
829 return;
830 }
831 };
832 player.set_av_offset(-10_000);
833 assert_eq!(
834 player.av_offset(),
835 -5_000,
836 "offset must be clamped to -5000"
837 );
838 }
839
840 #[test]
841 fn positive_av_offset_should_reduce_adjusted_video_pts() {
842 let video_pts = Duration::from_millis(1_000);
844 let offset_ms: i64 = 200;
845 let adjusted = if offset_ms >= 0 {
846 let offset = Duration::from_millis(offset_ms as u64);
847 video_pts.saturating_sub(offset)
848 } else {
849 let offset = Duration::from_millis(offset_ms.unsigned_abs());
850 video_pts + offset
851 };
852 assert_eq!(
853 adjusted,
854 Duration::from_millis(800),
855 "positive offset must reduce adjusted_video_pts by offset amount"
856 );
857 }
858
859 #[test]
860 fn negative_av_offset_should_increase_adjusted_video_pts() {
861 let video_pts = Duration::from_millis(1_000);
862 let offset_ms: i64 = -200;
863 let adjusted = if offset_ms >= 0 {
864 let offset = Duration::from_millis(offset_ms as u64);
865 video_pts.saturating_sub(offset)
866 } else {
867 let offset = Duration::from_millis(offset_ms.unsigned_abs());
868 video_pts + offset
869 };
870 assert_eq!(
871 adjusted,
872 Duration::from_millis(1_200),
873 "negative offset must increase adjusted_video_pts by offset amount"
874 );
875 }
876
877 #[test]
878 fn positive_av_offset_at_zero_pts_should_saturate_to_zero() {
879 let video_pts = Duration::ZERO;
880 let offset_ms: i64 = 100;
881 let adjusted = video_pts.saturating_sub(Duration::from_millis(offset_ms as u64));
882 assert_eq!(
883 adjusted,
884 Duration::ZERO,
885 "saturating_sub on zero pts must clamp to zero not underflow"
886 );
887 }
888
889 #[test]
892 fn use_proxy_if_available_should_return_false_when_no_proxy_in_dir() {
893 let path = test_video_path();
894 let mut player = match PreviewPlayer::open(&path) {
895 Ok(p) => p,
896 Err(e) => {
897 println!("skipping: video file not available: {e}");
898 return;
899 }
900 };
901 let tmp = std::env::temp_dir().join("ff_preview_no_proxy_dir_test");
902 let _ = std::fs::create_dir_all(&tmp);
903 let found = player.use_proxy_if_available(&tmp);
904 assert!(
905 !found,
906 "must return false when no proxy files exist in the directory"
907 );
908 }
909
910 #[test]
911 fn use_proxy_if_available_should_return_false_after_play() {
912 let path = test_video_path();
913 let mut player = match PreviewPlayer::open(&path) {
914 Ok(p) => p,
915 Err(e) => {
916 println!("skipping: video file not available: {e}");
917 return;
918 }
919 };
920 player.play();
921 let found = player.use_proxy_if_available(Path::new("."));
922 assert!(!found, "must return false when called after play()");
923 }
924
925 #[test]
926 fn active_source_should_return_original_path_before_proxy_activation() {
927 let path = test_video_path();
928 let player = match PreviewPlayer::open(&path) {
929 Ok(p) => p,
930 Err(e) => {
931 println!("skipping: video file not available: {e}");
932 return;
933 }
934 };
935 assert_eq!(
936 player.active_source(),
937 path.as_path(),
938 "active_source() must equal the original path before any proxy activation"
939 );
940 }
941}