1use std::collections::{BTreeMap, HashMap};
48use std::fmt;
49
50#[derive(Debug, Clone)]
54pub struct MediaSegment {
55 pub segment_id: String,
57 pub stream_id: String,
59 pub sequence: u64,
61 pub duration_secs: f32,
63 pub data: Vec<u8>,
65 pub content_type: String,
67}
68
69impl MediaSegment {
70 #[inline]
72 pub fn byte_len(&self) -> u64 {
73 self.data.len() as u64
74 }
75}
76
77#[derive(Debug, Clone, PartialEq, Eq, Hash)]
79pub struct SegmentRef {
80 pub stream_id: String,
82 pub sequence: u64,
84}
85
86#[derive(Debug, Clone)]
90pub struct SegmentCacheConfig {
91 pub max_segments: usize,
93 pub max_bytes: u64,
95 pub prefetch_ahead: u8,
97 pub evict_played: bool,
100}
101
102impl Default for SegmentCacheConfig {
103 fn default() -> Self {
104 Self {
105 max_segments: 512,
106 max_bytes: 256 * 1024 * 1024, prefetch_ahead: 3,
108 evict_played: true,
109 }
110 }
111}
112
113#[derive(Debug, Clone, PartialEq, Eq)]
117pub enum SegmentCacheError {
118 CacheFull,
120 SegmentTooLarge,
122 DuplicateSegment,
124}
125
126impl fmt::Display for SegmentCacheError {
127 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
128 match self {
129 SegmentCacheError::CacheFull => write!(f, "segment cache is full"),
130 SegmentCacheError::SegmentTooLarge => {
131 write!(f, "segment exceeds cache byte budget")
132 }
133 SegmentCacheError::DuplicateSegment => {
134 write!(f, "segment with this (stream_id, sequence) already cached")
135 }
136 }
137 }
138}
139
140impl std::error::Error for SegmentCacheError {}
141
142#[derive(Debug, Clone, Default)]
146pub struct SegmentCacheStats {
147 pub total_segments: usize,
149 pub total_bytes: u64,
151 pub hit_count: u64,
153 pub miss_count: u64,
155}
156
157struct StreamMeta {
161 sequences: BTreeMap<u64, PlayState>,
163}
164
165#[derive(Debug, Clone, Copy, PartialEq, Eq)]
166enum PlayState {
167 Unplayed,
168 Played,
169}
170
171impl StreamMeta {
172 fn new() -> Self {
173 Self {
174 sequences: BTreeMap::new(),
175 }
176 }
177
178 fn oldest_sequence(&self) -> Option<u64> {
179 self.sequences.keys().next().copied()
180 }
181}
182
183pub struct SegmentCache {
193 config: SegmentCacheConfig,
194 segments: HashMap<SegmentRef, MediaSegment>,
196 streams: HashMap<String, StreamMeta>,
198 total_bytes: u64,
200 hit_count: u64,
201 miss_count: u64,
202}
203
204impl SegmentCache {
205 pub fn new(config: SegmentCacheConfig) -> Self {
207 Self {
208 config,
209 segments: HashMap::new(),
210 streams: HashMap::new(),
211 total_bytes: 0,
212 hit_count: 0,
213 miss_count: 0,
214 }
215 }
216
217 pub fn insert(&mut self, segment: MediaSegment) -> Result<(), SegmentCacheError> {
229 let byte_len = segment.byte_len();
230
231 if byte_len > self.config.max_bytes {
233 return Err(SegmentCacheError::SegmentTooLarge);
234 }
235
236 let key = SegmentRef {
237 stream_id: segment.stream_id.clone(),
238 sequence: segment.sequence,
239 };
240
241 if self.segments.contains_key(&key) {
243 return Err(SegmentCacheError::DuplicateSegment);
244 }
245
246 let mut eviction_attempts = 0usize;
248 loop {
249 let count_ok = self.segments.len() < self.config.max_segments;
250 let bytes_ok = self.total_bytes + byte_len <= self.config.max_bytes;
251 if count_ok && bytes_ok {
252 break;
253 }
254 let freed = self.evict_one();
255 if freed == 0 {
256 return Err(SegmentCacheError::CacheFull);
257 }
258 eviction_attempts += 1;
259 if eviction_attempts > self.config.max_segments + 1 {
262 return Err(SegmentCacheError::CacheFull);
263 }
264 }
265
266 self.streams
268 .entry(segment.stream_id.clone())
269 .or_insert_with(StreamMeta::new)
270 .sequences
271 .insert(segment.sequence, PlayState::Unplayed);
272
273 self.total_bytes += byte_len;
274 self.segments.insert(key, segment);
275 Ok(())
276 }
277
278 pub fn get(&mut self, ref_: &SegmentRef) -> Option<&MediaSegment> {
282 match self.segments.get(ref_) {
283 Some(seg) => {
284 self.hit_count += 1;
285 Some(seg)
286 }
287 None => {
288 self.miss_count += 1;
289 None
290 }
291 }
292 }
293
294 pub fn mark_played(&mut self, ref_: &SegmentRef) {
301 if let Some(meta) = self.streams.get_mut(&ref_.stream_id) {
302 if let Some(state) = meta.sequences.get_mut(&ref_.sequence) {
303 *state = PlayState::Played;
304 }
305 }
306 }
307
308 pub fn prefetch_hints(&self, current_seq: u64, stream_id: &str) -> Vec<SegmentRef> {
313 let ahead = self.config.prefetch_ahead as u64;
314 let mut hints = Vec::with_capacity(ahead as usize);
315 for delta in 1..=ahead {
316 let seq = match current_seq.checked_add(delta) {
317 Some(s) => s,
318 None => break,
319 };
320 let ref_ = SegmentRef {
321 stream_id: stream_id.to_string(),
322 sequence: seq,
323 };
324 if !self.segments.contains_key(&ref_) {
325 hints.push(ref_);
326 }
327 }
328 hints
329 }
330
331 pub fn evict_oldest_stream(&mut self) -> usize {
337 self.evict_one()
338 }
339
340 pub fn stats(&self) -> SegmentCacheStats {
342 SegmentCacheStats {
343 total_segments: self.segments.len(),
344 total_bytes: self.total_bytes,
345 hit_count: self.hit_count,
346 miss_count: self.miss_count,
347 }
348 }
349
350 pub fn total_bytes(&self) -> u64 {
352 self.total_bytes
353 }
354
355 pub fn segment_count(&self) -> usize {
357 self.segments.len()
358 }
359
360 fn evict_one(&mut self) -> usize {
370 if let Some(target) = self.find_eviction_target() {
371 return self.remove_segment(&target);
372 }
373 0
374 }
375
376 fn find_eviction_target(&self) -> Option<SegmentRef> {
378 if self.config.evict_played {
380 if let Some(r) = self.oldest_played() {
381 return Some(r);
382 }
383 }
384 self.globally_oldest()
385 }
386
387 fn oldest_played(&self) -> Option<SegmentRef> {
389 let mut best: Option<(u64, &str)> = None; for (stream_id, meta) in &self.streams {
391 for (seq, state) in &meta.sequences {
392 if *state == PlayState::Played {
393 match best {
394 None => best = Some((*seq, stream_id.as_str())),
395 Some((best_seq, _)) if *seq < best_seq => {
396 best = Some((*seq, stream_id.as_str()));
397 }
398 _ => {}
399 }
400 }
401 }
402 }
403 best.map(|(seq, sid)| SegmentRef {
404 stream_id: sid.to_string(),
405 sequence: seq,
406 })
407 }
408
409 fn globally_oldest(&self) -> Option<SegmentRef> {
411 let mut best: Option<(u64, &str)> = None;
412 for (stream_id, meta) in &self.streams {
413 if let Some(seq) = meta.oldest_sequence() {
414 match best {
415 None => best = Some((seq, stream_id.as_str())),
416 Some((best_seq, _)) if seq < best_seq => {
417 best = Some((seq, stream_id.as_str()));
418 }
419 _ => {}
420 }
421 }
422 }
423 best.map(|(seq, sid)| SegmentRef {
424 stream_id: sid.to_string(),
425 sequence: seq,
426 })
427 }
428
429 fn remove_segment(&mut self, ref_: &SegmentRef) -> usize {
433 if let Some(seg) = self.segments.remove(ref_) {
434 let freed = seg.data.len();
435 self.total_bytes = self.total_bytes.saturating_sub(freed as u64);
436
437 if let Some(meta) = self.streams.get_mut(&ref_.stream_id) {
438 meta.sequences.remove(&ref_.sequence);
439 if meta.sequences.is_empty() {
440 self.streams.remove(&ref_.stream_id);
441 }
442 }
443 freed
444 } else {
445 0
446 }
447 }
448}
449
450#[cfg(test)]
453mod tests {
454 use super::*;
455
456 fn seg(stream_id: &str, seq: u64, bytes: usize) -> MediaSegment {
457 MediaSegment {
458 segment_id: format!("{stream_id}-{seq:04}"),
459 stream_id: stream_id.to_string(),
460 sequence: seq,
461 duration_secs: 6.0,
462 data: vec![0u8; bytes],
463 content_type: "video/mp2t".to_string(),
464 }
465 }
466
467 fn default_config() -> SegmentCacheConfig {
468 SegmentCacheConfig {
469 max_segments: 16,
470 max_bytes: 1024 * 1024, prefetch_ahead: 3,
472 evict_played: true,
473 }
474 }
475
476 #[test]
477 fn test_insert_and_get() {
478 let mut cache = SegmentCache::new(default_config());
479 cache.insert(seg("s1", 0, 100)).expect("insert");
480 let r = SegmentRef {
481 stream_id: "s1".to_string(),
482 sequence: 0,
483 };
484 assert!(cache.get(&r).is_some());
485 assert_eq!(cache.stats().hit_count, 1);
486 }
487
488 #[test]
489 fn test_miss_increments_miss_count() {
490 let mut cache = SegmentCache::new(default_config());
491 let r = SegmentRef {
492 stream_id: "s1".to_string(),
493 sequence: 99,
494 };
495 assert!(cache.get(&r).is_none());
496 assert_eq!(cache.stats().miss_count, 1);
497 }
498
499 #[test]
500 fn test_duplicate_segment_rejected() {
501 let mut cache = SegmentCache::new(default_config());
502 cache.insert(seg("s1", 0, 100)).expect("first insert");
503 let err = cache.insert(seg("s1", 0, 100)).expect_err("should fail");
504 assert_eq!(err, SegmentCacheError::DuplicateSegment);
505 }
506
507 #[test]
508 fn test_segment_too_large_rejected() {
509 let config = SegmentCacheConfig {
510 max_bytes: 500,
511 ..default_config()
512 };
513 let mut cache = SegmentCache::new(config);
514 let err = cache.insert(seg("s1", 0, 1000)).expect_err("too large");
515 assert_eq!(err, SegmentCacheError::SegmentTooLarge);
516 }
517
518 #[test]
519 fn test_byte_limit_evicts_oldest() {
520 let config = SegmentCacheConfig {
521 max_segments: 100,
522 max_bytes: 1000,
523 prefetch_ahead: 2,
524 evict_played: false,
525 };
526 let mut cache = SegmentCache::new(config);
527 for i in 0..5u64 {
529 cache.insert(seg("s1", i, 200)).expect("insert");
530 }
531 cache.insert(seg("s1", 5, 200)).expect("insert 6th");
533 let r0 = SegmentRef {
535 stream_id: "s1".to_string(),
536 sequence: 0,
537 };
538 assert!(cache.get(&r0).is_none());
539 assert!(cache.total_bytes() <= 1000);
540 }
541
542 #[test]
543 fn test_segment_count_limit_evicts() {
544 let config = SegmentCacheConfig {
545 max_segments: 3,
546 max_bytes: 10 * 1024 * 1024,
547 prefetch_ahead: 1,
548 evict_played: false,
549 };
550 let mut cache = SegmentCache::new(config);
551 for i in 0..4u64 {
552 cache.insert(seg("s1", i, 10)).expect("insert");
553 }
554 assert_eq!(cache.segment_count(), 3);
555 let r0 = SegmentRef {
557 stream_id: "s1".to_string(),
558 sequence: 0,
559 };
560 assert!(cache.get(&r0).is_none());
561 }
562
563 #[test]
564 fn test_played_eviction_prioritised() {
565 let config = SegmentCacheConfig {
566 max_segments: 3,
567 max_bytes: 10 * 1024 * 1024,
568 prefetch_ahead: 1,
569 evict_played: true,
570 };
571 let mut cache = SegmentCache::new(config);
572 for i in 0..3u64 {
574 cache.insert(seg("s1", i, 100)).expect("insert");
575 }
576 let r2 = SegmentRef {
577 stream_id: "s1".to_string(),
578 sequence: 2,
579 };
580 cache.mark_played(&r2);
581
582 cache.insert(seg("s1", 3, 100)).expect("insert");
584 assert!(cache.get(&r2).is_none(), "played segment should be evicted");
585 let r0 = SegmentRef {
586 stream_id: "s1".to_string(),
587 sequence: 0,
588 };
589 assert!(cache.get(&r0).is_some(), "unplayed seq 0 should remain");
590 }
591
592 #[test]
593 fn test_prefetch_hints_excludes_cached() {
594 let mut cache = SegmentCache::new(default_config());
595 cache.insert(seg("stream", 1, 50)).expect("insert");
597 cache.insert(seg("stream", 3, 50)).expect("insert");
598
599 let hints = cache.prefetch_hints(0, "stream");
600 assert_eq!(hints.len(), 1);
606 assert_eq!(hints[0].sequence, 2);
607 }
608
609 #[test]
610 fn test_prefetch_hints_respects_ahead_count() {
611 let config = SegmentCacheConfig {
612 prefetch_ahead: 5,
613 ..default_config()
614 };
615 let cache = SegmentCache::new(config);
616 let hints = cache.prefetch_hints(10, "stream");
617 assert_eq!(hints.len(), 5);
618 for (i, h) in hints.iter().enumerate() {
619 assert_eq!(h.sequence, 11 + i as u64);
620 }
621 }
622
623 #[test]
624 fn test_evict_oldest_stream_returns_bytes_freed() {
625 let mut cache = SegmentCache::new(default_config());
626 cache.insert(seg("s1", 0, 512)).expect("insert");
627 let freed = cache.evict_oldest_stream();
628 assert_eq!(freed, 512);
629 assert_eq!(cache.segment_count(), 0);
630 assert_eq!(cache.total_bytes(), 0);
631 }
632
633 #[test]
634 fn test_stats_total_bytes() {
635 let mut cache = SegmentCache::new(default_config());
636 cache.insert(seg("s1", 0, 100)).expect("insert");
637 cache.insert(seg("s1", 1, 200)).expect("insert");
638 let s = cache.stats();
639 assert_eq!(s.total_segments, 2);
640 assert_eq!(s.total_bytes, 300);
641 }
642
643 #[test]
644 fn test_multi_stream_eviction_fairness() {
645 let config = SegmentCacheConfig {
646 max_segments: 4,
647 max_bytes: 10 * 1024 * 1024,
648 prefetch_ahead: 2,
649 evict_played: false,
650 };
651 let mut cache = SegmentCache::new(config);
652 cache.insert(seg("streamA", 0, 10)).expect("insert");
653 cache.insert(seg("streamB", 0, 10)).expect("insert");
654 cache.insert(seg("streamA", 1, 10)).expect("insert");
655 cache.insert(seg("streamB", 1, 10)).expect("insert");
656 cache.insert(seg("streamA", 2, 10)).expect("insert 5th");
660 assert_eq!(cache.segment_count(), 4);
661 }
662}