Skip to main content

oximedia_cache/
segment_cache.rs

1//! Segment-aware cache optimized for media streaming (HLS/DASH segments).
2//!
3//! Media streaming protocols like HLS and DASH split content into short
4//! *segments* — individually addressable byte ranges that are requested
5//! sequentially per stream.  A general-purpose cache is sub-optimal for this
6//! workload because:
7//!
8//! * Segments from the same stream must be evicted together or in sequence-
9//!   order; random-key eviction damages buffering performance.
10//! * Played (consumed) segments are rarely re-requested and can be evicted
11//!   early to free room for upcoming segments.
12//! * Clients typically need lookahead prefetch hints so the application layer
13//!   can start fetching segments before they are requested.
14//!
15//! [`SegmentCache`] addresses all of these requirements with a byte-budget and
16//! per-stream sequence ordering.
17//!
18//! # Example
19//!
20//! ```rust
21//! use oximedia_cache::segment_cache::{
22//!     MediaSegment, SegmentCache, SegmentCacheConfig, SegmentRef,
23//! };
24//!
25//! let config = SegmentCacheConfig {
26//!     max_segments: 128,
27//!     max_bytes: 64 * 1024 * 1024,
28//!     prefetch_ahead: 3,
29//!     evict_played: true,
30//! };
31//! let mut cache = SegmentCache::new(config);
32//!
33//! let seg = MediaSegment {
34//!     segment_id: "stream1-0000".to_string(),
35//!     stream_id:  "stream1".to_string(),
36//!     sequence:   0,
37//!     duration_secs: 6.0,
38//!     data:       vec![0u8; 1024],
39//!     content_type: "video/mp2t".to_string(),
40//! };
41//! cache.insert(seg).expect("insert");
42//!
43//! let r = SegmentRef { stream_id: "stream1".to_string(), sequence: 0 };
44//! assert!(cache.get(&r).is_some());
45//! ```
46
47use std::collections::{BTreeMap, HashMap};
48use std::fmt;
49
50// ── Public data types ─────────────────────────────────────────────────────────
51
52/// A single media segment as stored in the cache.
53#[derive(Debug, Clone)]
54pub struct MediaSegment {
55    /// Unique identifier for this segment (e.g. `"stream1-0003"`).
56    pub segment_id: String,
57    /// Logical stream this segment belongs to (e.g. a rendition URL prefix).
58    pub stream_id: String,
59    /// Monotonically increasing sequence number within the stream.
60    pub sequence: u64,
61    /// Playback duration of this segment in seconds.
62    pub duration_secs: f32,
63    /// Raw encoded media data.
64    pub data: Vec<u8>,
65    /// MIME content type (e.g. `"video/mp2t"` or `"video/mp4"`).
66    pub content_type: String,
67}
68
69impl MediaSegment {
70    /// Byte length of the segment data.
71    #[inline]
72    pub fn byte_len(&self) -> u64 {
73        self.data.len() as u64
74    }
75}
76
77/// Lightweight reference used as a cache look-up key.
78#[derive(Debug, Clone, PartialEq, Eq, Hash)]
79pub struct SegmentRef {
80    /// Stream identifier.
81    pub stream_id: String,
82    /// Sequence number within the stream.
83    pub sequence: u64,
84}
85
86// ── Configuration ─────────────────────────────────────────────────────────────
87
88/// Configuration for [`SegmentCache`].
89#[derive(Debug, Clone)]
90pub struct SegmentCacheConfig {
91    /// Maximum number of segments across all streams.
92    pub max_segments: usize,
93    /// Maximum total byte budget across all segments.
94    pub max_bytes: u64,
95    /// How many segments ahead of `current_seq` to include in prefetch hints.
96    pub prefetch_ahead: u8,
97    /// When `true`, segments marked as played are eligible for immediate
98    /// eviction ahead of unplayed segments during the next capacity check.
99    pub evict_played: bool,
100}
101
102impl Default for SegmentCacheConfig {
103    fn default() -> Self {
104        Self {
105            max_segments: 512,
106            max_bytes: 256 * 1024 * 1024, // 256 MiB
107            prefetch_ahead: 3,
108            evict_played: true,
109        }
110    }
111}
112
113// ── Error type ────────────────────────────────────────────────────────────────
114
115/// Errors that can occur during segment cache operations.
116#[derive(Debug, Clone, PartialEq, Eq)]
117pub enum SegmentCacheError {
118    /// The cache is full and no segments could be evicted to make room.
119    CacheFull,
120    /// The segment data exceeds the entire byte budget of the cache.
121    SegmentTooLarge,
122    /// A segment with the same `(stream_id, sequence)` already exists.
123    DuplicateSegment,
124}
125
126impl fmt::Display for SegmentCacheError {
127    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
128        match self {
129            SegmentCacheError::CacheFull => write!(f, "segment cache is full"),
130            SegmentCacheError::SegmentTooLarge => {
131                write!(f, "segment exceeds cache byte budget")
132            }
133            SegmentCacheError::DuplicateSegment => {
134                write!(f, "segment with this (stream_id, sequence) already cached")
135            }
136        }
137    }
138}
139
140impl std::error::Error for SegmentCacheError {}
141
142// ── Stats ─────────────────────────────────────────────────────────────────────
143
144/// Snapshot of [`SegmentCache`] statistics.
145#[derive(Debug, Clone, Default)]
146pub struct SegmentCacheStats {
147    /// Total number of segments currently in the cache.
148    pub total_segments: usize,
149    /// Total bytes occupied by all cached segments.
150    pub total_bytes: u64,
151    /// Cumulative successful lookups.
152    pub hit_count: u64,
153    /// Cumulative failed lookups.
154    pub miss_count: u64,
155}
156
157// ── Internal per-stream state ─────────────────────────────────────────────────
158
159/// Per-stream metadata (sequence numbers present, played flags).
160struct StreamMeta {
161    /// Sorted set of sequence numbers present for this stream.
162    sequences: BTreeMap<u64, PlayState>,
163}
164
165#[derive(Debug, Clone, Copy, PartialEq, Eq)]
166enum PlayState {
167    Unplayed,
168    Played,
169}
170
171impl StreamMeta {
172    fn new() -> Self {
173        Self {
174            sequences: BTreeMap::new(),
175        }
176    }
177
178    fn oldest_sequence(&self) -> Option<u64> {
179        self.sequences.keys().next().copied()
180    }
181}
182
183// ── SegmentCache ──────────────────────────────────────────────────────────────
184
185/// Segment-aware cache for HLS/DASH media segments.
186///
187/// Capacity is managed by two independent budgets: segment count
188/// (`max_segments`) and total byte size (`max_bytes`).  When either budget is
189/// exceeded, the cache evicts the **oldest played** segment first (if
190/// `evict_played` is set), then falls back to the globally oldest segment
191/// across all streams.
192pub struct SegmentCache {
193    config: SegmentCacheConfig,
194    /// Primary segment storage, keyed by `(stream_id, sequence)`.
195    segments: HashMap<SegmentRef, MediaSegment>,
196    /// Per-stream metadata.
197    streams: HashMap<String, StreamMeta>,
198    /// Cumulative byte count across all stored segments.
199    total_bytes: u64,
200    hit_count: u64,
201    miss_count: u64,
202}
203
204impl SegmentCache {
205    /// Create a new `SegmentCache` with the given configuration.
206    pub fn new(config: SegmentCacheConfig) -> Self {
207        Self {
208            config,
209            segments: HashMap::new(),
210            streams: HashMap::new(),
211            total_bytes: 0,
212            hit_count: 0,
213            miss_count: 0,
214        }
215    }
216
217    /// Insert a segment into the cache.
218    ///
219    /// # Errors
220    ///
221    /// * [`SegmentCacheError::SegmentTooLarge`] — segment's byte size exceeds
222    ///   the entire `max_bytes` budget.
223    /// * [`SegmentCacheError::DuplicateSegment`] — a segment with the same
224    ///   `(stream_id, sequence)` is already present.
225    /// * [`SegmentCacheError::CacheFull`] — the cache cannot be freed enough
226    ///   to accommodate the new segment (should not happen under normal
227    ///   operation but is returned as a safeguard).
228    pub fn insert(&mut self, segment: MediaSegment) -> Result<(), SegmentCacheError> {
229        let byte_len = segment.byte_len();
230
231        // Hard reject: segment is larger than the entire byte budget.
232        if byte_len > self.config.max_bytes {
233            return Err(SegmentCacheError::SegmentTooLarge);
234        }
235
236        let key = SegmentRef {
237            stream_id: segment.stream_id.clone(),
238            sequence: segment.sequence,
239        };
240
241        // Reject duplicates.
242        if self.segments.contains_key(&key) {
243            return Err(SegmentCacheError::DuplicateSegment);
244        }
245
246        // Evict until both budgets have room.
247        let mut eviction_attempts = 0usize;
248        loop {
249            let count_ok = self.segments.len() < self.config.max_segments;
250            let bytes_ok = self.total_bytes + byte_len <= self.config.max_bytes;
251            if count_ok && bytes_ok {
252                break;
253            }
254            let freed = self.evict_one();
255            if freed == 0 {
256                return Err(SegmentCacheError::CacheFull);
257            }
258            eviction_attempts += 1;
259            // Safety valve: if we have evicted more than the total number of
260            // segments we started with something is deeply wrong.
261            if eviction_attempts > self.config.max_segments + 1 {
262                return Err(SegmentCacheError::CacheFull);
263            }
264        }
265
266        // Register in per-stream metadata.
267        self.streams
268            .entry(segment.stream_id.clone())
269            .or_insert_with(StreamMeta::new)
270            .sequences
271            .insert(segment.sequence, PlayState::Unplayed);
272
273        self.total_bytes += byte_len;
274        self.segments.insert(key, segment);
275        Ok(())
276    }
277
278    /// Retrieve the segment identified by `ref_`.
279    ///
280    /// Returns `None` if the segment is not in the cache.
281    pub fn get(&mut self, ref_: &SegmentRef) -> Option<&MediaSegment> {
282        match self.segments.get(ref_) {
283            Some(seg) => {
284                self.hit_count += 1;
285                Some(seg)
286            }
287            None => {
288                self.miss_count += 1;
289                None
290            }
291        }
292    }
293
294    /// Mark `ref_` as played/consumed.
295    ///
296    /// Played segments remain in the cache until eviction but are prioritised
297    /// for removal when `evict_played` is enabled in the configuration.
298    ///
299    /// No-ops if the segment is not in the cache.
300    pub fn mark_played(&mut self, ref_: &SegmentRef) {
301        if let Some(meta) = self.streams.get_mut(&ref_.stream_id) {
302            if let Some(state) = meta.sequences.get_mut(&ref_.sequence) {
303                *state = PlayState::Played;
304            }
305        }
306    }
307
308    /// Generate prefetch hints for `stream_id` starting at `current_seq + 1`.
309    ///
310    /// Returns up to `prefetch_ahead` [`SegmentRef`]s for sequences that are
311    /// **not yet cached**.  The caller uses these to drive background fetches.
312    pub fn prefetch_hints(&self, current_seq: u64, stream_id: &str) -> Vec<SegmentRef> {
313        let ahead = self.config.prefetch_ahead as u64;
314        let mut hints = Vec::with_capacity(ahead as usize);
315        for delta in 1..=ahead {
316            let seq = match current_seq.checked_add(delta) {
317                Some(s) => s,
318                None => break,
319            };
320            let ref_ = SegmentRef {
321                stream_id: stream_id.to_string(),
322                sequence: seq,
323            };
324            if !self.segments.contains_key(&ref_) {
325                hints.push(ref_);
326            }
327        }
328        hints
329    }
330
331    /// Evict the oldest sequence from the stream that has the most bytes
332    /// invested in played segments (or globally oldest if no played segments
333    /// exist).
334    ///
335    /// Returns the number of bytes freed.  Returns `0` if the cache is empty.
336    pub fn evict_oldest_stream(&mut self) -> usize {
337        self.evict_one()
338    }
339
340    /// Return a statistics snapshot.
341    pub fn stats(&self) -> SegmentCacheStats {
342        SegmentCacheStats {
343            total_segments: self.segments.len(),
344            total_bytes: self.total_bytes,
345            hit_count: self.hit_count,
346            miss_count: self.miss_count,
347        }
348    }
349
350    /// Current total bytes used by all cached segments.
351    pub fn total_bytes(&self) -> u64 {
352        self.total_bytes
353    }
354
355    /// Current number of segments in the cache.
356    pub fn segment_count(&self) -> usize {
357        self.segments.len()
358    }
359
360    // ── private helpers ───────────────────────────────────────────────────────
361
362    /// Attempt to evict a single segment.
363    ///
364    /// Priority order (highest to lowest):
365    ///  1. If `evict_played` is set: oldest played segment across all streams.
366    ///  2. Oldest unplayed segment across all streams.
367    ///
368    /// Returns bytes freed (0 if cache is empty).
369    fn evict_one(&mut self) -> usize {
370        if let Some(target) = self.find_eviction_target() {
371            return self.remove_segment(&target);
372        }
373        0
374    }
375
376    /// Find the best eviction candidate.
377    fn find_eviction_target(&self) -> Option<SegmentRef> {
378        // Prefer played segments when `evict_played` is enabled.
379        if self.config.evict_played {
380            if let Some(r) = self.oldest_played() {
381                return Some(r);
382            }
383        }
384        self.globally_oldest()
385    }
386
387    /// Find the oldest played segment across all streams.
388    fn oldest_played(&self) -> Option<SegmentRef> {
389        let mut best: Option<(u64, &str)> = None; // (sequence, stream_id)
390        for (stream_id, meta) in &self.streams {
391            for (seq, state) in &meta.sequences {
392                if *state == PlayState::Played {
393                    match best {
394                        None => best = Some((*seq, stream_id.as_str())),
395                        Some((best_seq, _)) if *seq < best_seq => {
396                            best = Some((*seq, stream_id.as_str()));
397                        }
398                        _ => {}
399                    }
400                }
401            }
402        }
403        best.map(|(seq, sid)| SegmentRef {
404            stream_id: sid.to_string(),
405            sequence: seq,
406        })
407    }
408
409    /// Find the globally oldest segment across all streams.
410    fn globally_oldest(&self) -> Option<SegmentRef> {
411        let mut best: Option<(u64, &str)> = None;
412        for (stream_id, meta) in &self.streams {
413            if let Some(seq) = meta.oldest_sequence() {
414                match best {
415                    None => best = Some((seq, stream_id.as_str())),
416                    Some((best_seq, _)) if seq < best_seq => {
417                        best = Some((seq, stream_id.as_str()));
418                    }
419                    _ => {}
420                }
421            }
422        }
423        best.map(|(seq, sid)| SegmentRef {
424            stream_id: sid.to_string(),
425            sequence: seq,
426        })
427    }
428
429    /// Remove the segment at `ref_` and update all tracking structures.
430    ///
431    /// Returns bytes freed.
432    fn remove_segment(&mut self, ref_: &SegmentRef) -> usize {
433        if let Some(seg) = self.segments.remove(ref_) {
434            let freed = seg.data.len();
435            self.total_bytes = self.total_bytes.saturating_sub(freed as u64);
436
437            if let Some(meta) = self.streams.get_mut(&ref_.stream_id) {
438                meta.sequences.remove(&ref_.sequence);
439                if meta.sequences.is_empty() {
440                    self.streams.remove(&ref_.stream_id);
441                }
442            }
443            freed
444        } else {
445            0
446        }
447    }
448}
449
450// ── Tests ─────────────────────────────────────────────────────────────────────
451
452#[cfg(test)]
453mod tests {
454    use super::*;
455
456    fn seg(stream_id: &str, seq: u64, bytes: usize) -> MediaSegment {
457        MediaSegment {
458            segment_id: format!("{stream_id}-{seq:04}"),
459            stream_id: stream_id.to_string(),
460            sequence: seq,
461            duration_secs: 6.0,
462            data: vec![0u8; bytes],
463            content_type: "video/mp2t".to_string(),
464        }
465    }
466
467    fn default_config() -> SegmentCacheConfig {
468        SegmentCacheConfig {
469            max_segments: 16,
470            max_bytes: 1024 * 1024, // 1 MiB
471            prefetch_ahead: 3,
472            evict_played: true,
473        }
474    }
475
476    #[test]
477    fn test_insert_and_get() {
478        let mut cache = SegmentCache::new(default_config());
479        cache.insert(seg("s1", 0, 100)).expect("insert");
480        let r = SegmentRef {
481            stream_id: "s1".to_string(),
482            sequence: 0,
483        };
484        assert!(cache.get(&r).is_some());
485        assert_eq!(cache.stats().hit_count, 1);
486    }
487
488    #[test]
489    fn test_miss_increments_miss_count() {
490        let mut cache = SegmentCache::new(default_config());
491        let r = SegmentRef {
492            stream_id: "s1".to_string(),
493            sequence: 99,
494        };
495        assert!(cache.get(&r).is_none());
496        assert_eq!(cache.stats().miss_count, 1);
497    }
498
499    #[test]
500    fn test_duplicate_segment_rejected() {
501        let mut cache = SegmentCache::new(default_config());
502        cache.insert(seg("s1", 0, 100)).expect("first insert");
503        let err = cache.insert(seg("s1", 0, 100)).expect_err("should fail");
504        assert_eq!(err, SegmentCacheError::DuplicateSegment);
505    }
506
507    #[test]
508    fn test_segment_too_large_rejected() {
509        let config = SegmentCacheConfig {
510            max_bytes: 500,
511            ..default_config()
512        };
513        let mut cache = SegmentCache::new(config);
514        let err = cache.insert(seg("s1", 0, 1000)).expect_err("too large");
515        assert_eq!(err, SegmentCacheError::SegmentTooLarge);
516    }
517
518    #[test]
519    fn test_byte_limit_evicts_oldest() {
520        let config = SegmentCacheConfig {
521            max_segments: 100,
522            max_bytes: 1000,
523            prefetch_ahead: 2,
524            evict_played: false,
525        };
526        let mut cache = SegmentCache::new(config);
527        // Insert 5 × 200 B = 1000 B exactly.
528        for i in 0..5u64 {
529            cache.insert(seg("s1", i, 200)).expect("insert");
530        }
531        // Inserting a 6th 200 B segment should evict the oldest to stay within budget.
532        cache.insert(seg("s1", 5, 200)).expect("insert 6th");
533        // Sequence 0 should have been evicted.
534        let r0 = SegmentRef {
535            stream_id: "s1".to_string(),
536            sequence: 0,
537        };
538        assert!(cache.get(&r0).is_none());
539        assert!(cache.total_bytes() <= 1000);
540    }
541
542    #[test]
543    fn test_segment_count_limit_evicts() {
544        let config = SegmentCacheConfig {
545            max_segments: 3,
546            max_bytes: 10 * 1024 * 1024,
547            prefetch_ahead: 1,
548            evict_played: false,
549        };
550        let mut cache = SegmentCache::new(config);
551        for i in 0..4u64 {
552            cache.insert(seg("s1", i, 10)).expect("insert");
553        }
554        assert_eq!(cache.segment_count(), 3);
555        // Oldest (seq 0) evicted.
556        let r0 = SegmentRef {
557            stream_id: "s1".to_string(),
558            sequence: 0,
559        };
560        assert!(cache.get(&r0).is_none());
561    }
562
563    #[test]
564    fn test_played_eviction_prioritised() {
565        let config = SegmentCacheConfig {
566            max_segments: 3,
567            max_bytes: 10 * 1024 * 1024,
568            prefetch_ahead: 1,
569            evict_played: true,
570        };
571        let mut cache = SegmentCache::new(config);
572        // seq 0, 1, 2 inserted; mark seq 2 as played.
573        for i in 0..3u64 {
574            cache.insert(seg("s1", i, 100)).expect("insert");
575        }
576        let r2 = SegmentRef {
577            stream_id: "s1".to_string(),
578            sequence: 2,
579        };
580        cache.mark_played(&r2);
581
582        // Inserting seq 3 should evict seq 2 (played) rather than seq 0.
583        cache.insert(seg("s1", 3, 100)).expect("insert");
584        assert!(cache.get(&r2).is_none(), "played segment should be evicted");
585        let r0 = SegmentRef {
586            stream_id: "s1".to_string(),
587            sequence: 0,
588        };
589        assert!(cache.get(&r0).is_some(), "unplayed seq 0 should remain");
590    }
591
592    #[test]
593    fn test_prefetch_hints_excludes_cached() {
594        let mut cache = SegmentCache::new(default_config());
595        // Seq 1 and 3 already cached; seq 2 is not.
596        cache.insert(seg("stream", 1, 50)).expect("insert");
597        cache.insert(seg("stream", 3, 50)).expect("insert");
598
599        let hints = cache.prefetch_hints(0, "stream");
600        // Should include seq 1 (present? no — already cached), seq 2, seq 3.
601        // prefetch_ahead=3 so we check seq 1,2,3:
602        //   seq 1 → cached → excluded
603        //   seq 2 → not cached → included
604        //   seq 3 → cached → excluded
605        assert_eq!(hints.len(), 1);
606        assert_eq!(hints[0].sequence, 2);
607    }
608
609    #[test]
610    fn test_prefetch_hints_respects_ahead_count() {
611        let config = SegmentCacheConfig {
612            prefetch_ahead: 5,
613            ..default_config()
614        };
615        let cache = SegmentCache::new(config);
616        let hints = cache.prefetch_hints(10, "stream");
617        assert_eq!(hints.len(), 5);
618        for (i, h) in hints.iter().enumerate() {
619            assert_eq!(h.sequence, 11 + i as u64);
620        }
621    }
622
623    #[test]
624    fn test_evict_oldest_stream_returns_bytes_freed() {
625        let mut cache = SegmentCache::new(default_config());
626        cache.insert(seg("s1", 0, 512)).expect("insert");
627        let freed = cache.evict_oldest_stream();
628        assert_eq!(freed, 512);
629        assert_eq!(cache.segment_count(), 0);
630        assert_eq!(cache.total_bytes(), 0);
631    }
632
633    #[test]
634    fn test_stats_total_bytes() {
635        let mut cache = SegmentCache::new(default_config());
636        cache.insert(seg("s1", 0, 100)).expect("insert");
637        cache.insert(seg("s1", 1, 200)).expect("insert");
638        let s = cache.stats();
639        assert_eq!(s.total_segments, 2);
640        assert_eq!(s.total_bytes, 300);
641    }
642
643    #[test]
644    fn test_multi_stream_eviction_fairness() {
645        let config = SegmentCacheConfig {
646            max_segments: 4,
647            max_bytes: 10 * 1024 * 1024,
648            prefetch_ahead: 2,
649            evict_played: false,
650        };
651        let mut cache = SegmentCache::new(config);
652        cache.insert(seg("streamA", 0, 10)).expect("insert");
653        cache.insert(seg("streamB", 0, 10)).expect("insert");
654        cache.insert(seg("streamA", 1, 10)).expect("insert");
655        cache.insert(seg("streamB", 1, 10)).expect("insert");
656        // All 4 slots full; next insert evicts the globally-oldest (streamA-0 or streamB-0,
657        // whichever has the lower sequence; both are 0 — determinism depends on HashMap order,
658        // so we just verify the cache stays within capacity).
659        cache.insert(seg("streamA", 2, 10)).expect("insert 5th");
660        assert_eq!(cache.segment_count(), 4);
661    }
662}