Skip to main content

oximedia_edit/
frame_prefetch.rs

1//! Predictive pre-fetch for frames near the playhead position.
2//!
3//! Anticipates which frames will be needed next based on playback
4//! direction and speed, and requests them in advance to reduce
5//! latency during playback.
6
7use std::collections::VecDeque;
8
9/// Direction of playback for prefetch prediction.
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub enum PlayDirection {
12    /// Forward playback (normal).
13    Forward,
14    /// Reverse playback.
15    Reverse,
16    /// Stationary (paused / scrubbing).
17    Stationary,
18}
19
20/// Configuration for the prefetch engine.
21#[derive(Debug, Clone)]
22pub struct PrefetchConfig {
23    /// Number of frames to prefetch ahead of the playhead.
24    pub lookahead: usize,
25    /// Number of frames to keep behind the playhead (for reverse scrub).
26    pub lookbehind: usize,
27    /// Playback speed multiplier (affects stride).
28    pub speed: f64,
29    /// Frame duration in timebase units (e.g. 33 for ~30fps at ms timebase).
30    pub frame_duration: i64,
31    /// Whether prefetch is enabled.
32    pub enabled: bool,
33}
34
35impl Default for PrefetchConfig {
36    fn default() -> Self {
37        Self {
38            lookahead: 30,
39            lookbehind: 5,
40            speed: 1.0,
41            frame_duration: 33,
42            enabled: true,
43        }
44    }
45}
46
47impl PrefetchConfig {
48    /// Create a config for real-time playback at given FPS.
49    #[must_use]
50    #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
51    pub fn for_playback(fps: f64, lookahead_seconds: f64) -> Self {
52        let frame_dur = if fps > 0.0 {
53            (1000.0 / fps).round() as i64
54        } else {
55            33
56        };
57        Self {
58            lookahead: (fps * lookahead_seconds).ceil() as usize,
59            lookbehind: 5,
60            speed: 1.0,
61            frame_duration: frame_dur,
62            enabled: true,
63        }
64    }
65}
66
67/// A request to prefetch a specific frame.
68#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
69pub struct PrefetchRequest {
70    /// Timeline position of the frame to prefetch.
71    pub position: i64,
72    /// Priority (lower = higher priority, 0 = immediate).
73    pub priority: u32,
74}
75
76impl PrefetchRequest {
77    /// Create a new prefetch request.
78    #[must_use]
79    pub fn new(position: i64, priority: u32) -> Self {
80        Self { position, priority }
81    }
82}
83
84/// Prefetch engine that generates frame requests based on playhead movement.
85#[derive(Debug)]
86pub struct PrefetchEngine {
87    /// Configuration.
88    config: PrefetchConfig,
89    /// Current playhead position.
90    playhead: i64,
91    /// Current play direction.
92    direction: PlayDirection,
93    /// Positions that are already cached or pending.
94    cached_positions: VecDeque<i64>,
95    /// Maximum timeline position.
96    max_position: i64,
97    /// History of recent playhead positions (for direction detection).
98    position_history: VecDeque<i64>,
99    /// Maximum history length.
100    history_limit: usize,
101}
102
103impl PrefetchEngine {
104    /// Create a new prefetch engine.
105    #[must_use]
106    pub fn new(config: PrefetchConfig, max_position: i64) -> Self {
107        Self {
108            config,
109            playhead: 0,
110            direction: PlayDirection::Stationary,
111            cached_positions: VecDeque::new(),
112            max_position,
113            position_history: VecDeque::new(),
114            history_limit: 10,
115        }
116    }
117
118    /// Update the playhead position and get new prefetch requests.
119    ///
120    /// Call this every time the playhead moves. Returns a list of
121    /// positions to prefetch, sorted by priority (most urgent first).
122    pub fn update(&mut self, new_position: i64) -> Vec<PrefetchRequest> {
123        if !self.config.enabled {
124            return Vec::new();
125        }
126
127        let old_position = self.playhead;
128        self.playhead = new_position;
129
130        // Track direction
131        self.position_history.push_back(new_position);
132        if self.position_history.len() > self.history_limit {
133            self.position_history.pop_front();
134        }
135        self.direction = self.detect_direction();
136
137        // Remove cached positions that are now far from playhead
138        let keep_range = self.keep_range();
139        self.cached_positions
140            .retain(|&pos| pos >= keep_range.0 && pos <= keep_range.1);
141
142        // Generate requests
143        let mut requests = Vec::new();
144
145        match self.direction {
146            PlayDirection::Forward => {
147                self.generate_forward_requests(&mut requests);
148            }
149            PlayDirection::Reverse => {
150                self.generate_reverse_requests(&mut requests);
151            }
152            PlayDirection::Stationary => {
153                // Prefetch a small window around the playhead
154                self.generate_bidirectional_requests(&mut requests);
155            }
156        }
157
158        // Sort by priority
159        requests.sort_by_key(|r| r.priority);
160
161        // Mark these as pending
162        for req in &requests {
163            if !self.cached_positions.contains(&req.position) {
164                self.cached_positions.push_back(req.position);
165            }
166        }
167
168        let _ = old_position; // suppress unused
169        requests
170    }
171
172    /// Mark a position as cached (already decoded).
173    pub fn mark_cached(&mut self, position: i64) {
174        if !self.cached_positions.contains(&position) {
175            self.cached_positions.push_back(position);
176        }
177    }
178
179    /// Invalidate a cached position.
180    pub fn invalidate(&mut self, position: i64) {
181        self.cached_positions.retain(|&p| p != position);
182    }
183
184    /// Invalidate all cached positions.
185    pub fn invalidate_all(&mut self) {
186        self.cached_positions.clear();
187    }
188
189    /// Get the current detected play direction.
190    #[must_use]
191    pub fn direction(&self) -> PlayDirection {
192        self.direction
193    }
194
195    /// Get the current playhead position.
196    #[must_use]
197    pub fn playhead(&self) -> i64 {
198        self.playhead
199    }
200
201    /// Get count of cached/pending positions.
202    #[must_use]
203    pub fn cached_count(&self) -> usize {
204        self.cached_positions.len()
205    }
206
207    // ── Internal helpers ─────────────────────────────────────────────────
208
209    fn detect_direction(&self) -> PlayDirection {
210        if self.position_history.len() < 2 {
211            return PlayDirection::Stationary;
212        }
213        let len = self.position_history.len();
214        let recent = self.position_history[len - 1];
215        let prev = self.position_history[len - 2];
216        let delta = recent - prev;
217
218        if delta > 0 {
219            PlayDirection::Forward
220        } else if delta < 0 {
221            PlayDirection::Reverse
222        } else {
223            PlayDirection::Stationary
224        }
225    }
226
227    fn keep_range(&self) -> (i64, i64) {
228        let behind = self.config.lookbehind as i64 * self.config.frame_duration;
229        let ahead = self.config.lookahead as i64 * self.config.frame_duration;
230        (
231            (self.playhead - behind).max(0),
232            (self.playhead + ahead).min(self.max_position),
233        )
234    }
235
236    #[allow(clippy::cast_possible_truncation, clippy::cast_precision_loss)]
237    fn generate_forward_requests(&self, requests: &mut Vec<PrefetchRequest>) {
238        let stride = (self.config.frame_duration as f64 * self.config.speed).round() as i64;
239        let stride = stride.max(1);
240
241        for i in 0..self.config.lookahead {
242            let pos = self.playhead + (i as i64 + 1) * stride;
243            if pos > self.max_position {
244                break;
245            }
246            if !self.cached_positions.contains(&pos) {
247                requests.push(PrefetchRequest::new(pos, i as u32));
248            }
249        }
250    }
251
252    #[allow(clippy::cast_possible_truncation, clippy::cast_precision_loss)]
253    fn generate_reverse_requests(&self, requests: &mut Vec<PrefetchRequest>) {
254        let stride = (self.config.frame_duration as f64 * self.config.speed).round() as i64;
255        let stride = stride.max(1);
256
257        for i in 0..self.config.lookbehind {
258            let pos = self.playhead - (i as i64 + 1) * stride;
259            if pos < 0 {
260                break;
261            }
262            if !self.cached_positions.contains(&pos) {
263                requests.push(PrefetchRequest::new(pos, i as u32));
264            }
265        }
266    }
267
268    fn generate_bidirectional_requests(&self, requests: &mut Vec<PrefetchRequest>) {
269        let half_ahead = self.config.lookahead / 2;
270        let stride = self.config.frame_duration;
271
272        for i in 0..half_ahead {
273            let forward = self.playhead + (i as i64 + 1) * stride;
274            let backward = self.playhead - (i as i64 + 1) * stride;
275
276            if forward <= self.max_position && !self.cached_positions.contains(&forward) {
277                requests.push(PrefetchRequest::new(forward, i as u32));
278            }
279            if backward >= 0 && !self.cached_positions.contains(&backward) {
280                requests.push(PrefetchRequest::new(backward, (i + half_ahead) as u32));
281            }
282        }
283    }
284}
285
286// ─────────────────────────────────────────────────────────────────────────────
287// Tests
288// ─────────────────────────────────────────────────────────────────────────────
289
290#[cfg(test)]
291mod tests {
292    use super::*;
293
294    #[test]
295    fn test_prefetch_config_default() {
296        let cfg = PrefetchConfig::default();
297        assert_eq!(cfg.lookahead, 30);
298        assert_eq!(cfg.lookbehind, 5);
299        assert!(cfg.enabled);
300    }
301
302    #[test]
303    fn test_prefetch_config_for_playback() {
304        let cfg = PrefetchConfig::for_playback(30.0, 1.0);
305        assert_eq!(cfg.lookahead, 30);
306        assert_eq!(cfg.frame_duration, 33);
307    }
308
309    #[test]
310    fn test_prefetch_request() {
311        let req = PrefetchRequest::new(1000, 0);
312        assert_eq!(req.position, 1000);
313        assert_eq!(req.priority, 0);
314    }
315
316    #[test]
317    fn test_engine_disabled() {
318        let cfg = PrefetchConfig {
319            enabled: false,
320            ..Default::default()
321        };
322        let mut engine = PrefetchEngine::new(cfg, 10000);
323        let requests = engine.update(500);
324        assert!(requests.is_empty());
325    }
326
327    #[test]
328    fn test_engine_forward_playback() {
329        let cfg = PrefetchConfig {
330            lookahead: 5,
331            lookbehind: 2,
332            frame_duration: 33,
333            speed: 1.0,
334            enabled: true,
335        };
336        let mut engine = PrefetchEngine::new(cfg, 10000);
337
338        // First update at 0
339        let _r1 = engine.update(0);
340
341        // Second update at 33 (forward)
342        let r2 = engine.update(33);
343        assert_eq!(engine.direction(), PlayDirection::Forward);
344        assert!(!r2.is_empty(), "should generate forward prefetch requests");
345
346        // Check requests are ahead of playhead
347        for req in &r2 {
348            assert!(req.position > 33, "prefetch should be ahead of playhead");
349        }
350    }
351
352    #[test]
353    fn test_engine_reverse_playback() {
354        let cfg = PrefetchConfig {
355            lookahead: 5,
356            lookbehind: 5,
357            frame_duration: 33,
358            speed: 1.0,
359            enabled: true,
360        };
361        let mut engine = PrefetchEngine::new(cfg, 10000);
362
363        engine.update(5000);
364        let requests = engine.update(4967); // moved backward
365
366        assert_eq!(engine.direction(), PlayDirection::Reverse);
367        // Reverse requests should be behind the playhead
368        for req in &requests {
369            assert!(req.position < 4967);
370        }
371    }
372
373    #[test]
374    fn test_engine_stationary() {
375        let cfg = PrefetchConfig {
376            lookahead: 10,
377            lookbehind: 2,
378            frame_duration: 33,
379            speed: 1.0,
380            enabled: true,
381        };
382        let mut engine = PrefetchEngine::new(cfg, 10000);
383
384        engine.update(5000);
385        let requests = engine.update(5000); // no movement
386
387        assert_eq!(engine.direction(), PlayDirection::Stationary);
388        // Should generate bidirectional requests
389        let has_forward = requests.iter().any(|r| r.position > 5000);
390        let has_backward = requests.iter().any(|r| r.position < 5000);
391        assert!(has_forward || has_backward);
392    }
393
394    #[test]
395    fn test_engine_mark_cached() {
396        let cfg = PrefetchConfig::default();
397        let mut engine = PrefetchEngine::new(cfg, 10000);
398        engine.mark_cached(100);
399        engine.mark_cached(200);
400        assert_eq!(engine.cached_count(), 2);
401    }
402
403    #[test]
404    fn test_engine_invalidate() {
405        let cfg = PrefetchConfig::default();
406        let mut engine = PrefetchEngine::new(cfg, 10000);
407        engine.mark_cached(100);
408        engine.mark_cached(200);
409        engine.invalidate(100);
410        assert_eq!(engine.cached_count(), 1);
411        engine.invalidate_all();
412        assert_eq!(engine.cached_count(), 0);
413    }
414
415    #[test]
416    fn test_engine_does_not_exceed_max_position() {
417        let cfg = PrefetchConfig {
418            lookahead: 100,
419            lookbehind: 2,
420            frame_duration: 33,
421            speed: 1.0,
422            enabled: true,
423        };
424        let mut engine = PrefetchEngine::new(cfg, 1000);
425        engine.update(0);
426        let requests = engine.update(33);
427
428        for req in &requests {
429            assert!(req.position <= 1000, "should not exceed max position");
430        }
431    }
432
433    #[test]
434    fn test_engine_does_not_go_below_zero() {
435        let cfg = PrefetchConfig {
436            lookahead: 5,
437            lookbehind: 100,
438            frame_duration: 33,
439            speed: 1.0,
440            enabled: true,
441        };
442        let mut engine = PrefetchEngine::new(cfg, 10000);
443        engine.update(100);
444        let requests = engine.update(67); // reverse
445
446        for req in &requests {
447            assert!(req.position >= 0, "should not go below zero");
448        }
449    }
450
451    #[test]
452    fn test_cached_positions_not_re_requested() {
453        let cfg = PrefetchConfig {
454            lookahead: 3,
455            lookbehind: 0,
456            frame_duration: 100,
457            speed: 1.0,
458            enabled: true,
459        };
460        let mut engine = PrefetchEngine::new(cfg, 10000);
461        engine.update(0);
462        let r1 = engine.update(100);
463
464        // Update again at same position
465        let r2 = engine.update(200);
466
467        // Requests from r1 that were marked as pending should not appear in r2
468        // (unless they've been evicted from the keep range)
469        for req in &r2 {
470            // Positions from r1 should not be re-requested if still in range
471            let was_in_r1 = r1.iter().any(|r| r.position == req.position);
472            if was_in_r1 {
473                // This is fine if position was evicted; just checking the mechanism works
474            }
475        }
476        // The engine should track cached positions
477        assert!(engine.cached_count() > 0);
478    }
479}