1#[derive(Debug, Clone)]
5pub struct BandwidthSample {
6 pub bytes: u64,
8 pub duration_ms: u64,
10 pub timestamp: std::time::Instant,
12}
13
14#[derive(Debug)]
19pub struct AbrBandwidthEstimator {
20 window: std::collections::VecDeque<BandwidthSample>,
21 window_size: usize,
22 smoothing_factor: f64,
23 current_estimate_bps: f64,
24}
25
26impl AbrBandwidthEstimator {
27 #[must_use]
32 pub fn new(window_size: usize) -> Self {
33 Self {
34 window: std::collections::VecDeque::with_capacity(window_size.max(1)),
35 window_size: window_size.max(1),
36 smoothing_factor: 0.3,
37 current_estimate_bps: 0.0,
38 }
39 }
40
41 #[must_use]
43 pub fn estimate_bps(&self) -> f64 {
44 self.current_estimate_bps
45 }
46
47 #[must_use]
49 pub fn estimate_kbps(&self) -> f64 {
50 self.current_estimate_bps / 1_000.0
51 }
52
53 #[must_use]
55 pub fn estimate_mbps(&self) -> f64 {
56 self.current_estimate_bps / 1_000_000.0
57 }
58
59 pub fn add_sample(&mut self, bytes: u64, duration_ms: u64) {
61 let sample_bps = if duration_ms == 0 {
63 0.0
64 } else {
65 (bytes as f64 * 8.0 * 1_000.0) / duration_ms as f64
66 };
67
68 if self.current_estimate_bps <= 0.0 {
69 self.current_estimate_bps = sample_bps;
70 } else {
71 self.current_estimate_bps = self.smoothing_factor * sample_bps
72 + (1.0 - self.smoothing_factor) * self.current_estimate_bps;
73 }
74
75 let sample = BandwidthSample {
76 bytes,
77 duration_ms,
78 timestamp: std::time::Instant::now(),
79 };
80
81 if self.window.len() >= self.window_size {
82 self.window.pop_front();
83 }
84 self.window.push_back(sample);
85 }
86
87 #[must_use]
93 pub fn percentile_bps(&self, percentile: f64) -> f64 {
94 if self.window.is_empty() {
95 return 0.0;
96 }
97
98 let mut rates: Vec<f64> = self
99 .window
100 .iter()
101 .map(|s| {
102 if s.duration_ms == 0 {
103 0.0
104 } else {
105 (s.bytes as f64 * 8.0 * 1_000.0) / s.duration_ms as f64
106 }
107 })
108 .collect();
109 rates.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
110
111 let p = percentile.clamp(0.0, 1.0);
112 let idx = ((rates.len() as f64 - 1.0) * p) as usize;
113 rates[idx.min(rates.len() - 1)]
114 }
115
116 #[must_use]
118 pub fn sample_count(&self) -> usize {
119 self.window.len()
120 }
121}
122
123#[derive(Debug, Clone)]
125pub struct AbrVariant {
126 pub bandwidth: u64,
128 pub width: u32,
130 pub height: u32,
132 pub codecs: String,
134 pub uri: String,
136 pub name: String,
138 pub frame_rate: Option<f64>,
140 pub hdcp_level: Option<String>,
142}
143
144#[derive(Debug, Clone, Copy, PartialEq, Eq)]
146pub enum AbrSwitchReason {
147 BandwidthIncrease,
149 BandwidthDecrease,
151 BufferStarvation,
153 UserRequested,
155}
156
157#[derive(Debug, Clone, Copy, PartialEq, Eq)]
159pub enum SelectionResult {
160 Stay {
162 variant: usize,
164 },
165 SwitchUp {
167 from: usize,
169 to: usize,
171 reason: AbrSwitchReason,
173 },
174 SwitchDown {
176 from: usize,
178 to: usize,
180 reason: AbrSwitchReason,
182 },
183 EmergencySwitch {
185 from: usize,
187 to: usize,
189 },
190}
191
192impl SelectionResult {
193 #[must_use]
195 pub const fn variant_index(&self) -> usize {
196 match self {
197 Self::Stay { variant } => *variant,
198 Self::SwitchUp { to, .. } => *to,
199 Self::SwitchDown { to, .. } => *to,
200 Self::EmergencySwitch { to, .. } => *to,
201 }
202 }
203
204 #[must_use]
206 pub const fn is_switch(&self) -> bool {
207 !matches!(self, Self::Stay { .. })
208 }
209
210 #[must_use]
212 pub const fn is_emergency(&self) -> bool {
213 matches!(self, Self::EmergencySwitch { .. })
214 }
215}
216
217#[derive(Debug)]
222pub struct AbrController {
223 variants: Vec<AbrVariant>,
225 current_index: usize,
227 bandwidth_estimator: AbrBandwidthEstimator,
229 buffer_duration_s: f64,
231 min_buffer_s: f64,
233 panic_buffer_s: f64,
235 safety_factor: f64,
237 switch_cooldown_segments: u32,
239 segments_since_switch: u32,
241}
242
243impl AbrController {
244 pub fn new(mut variants: Vec<AbrVariant>) -> Result<Self, String> {
249 if variants.is_empty() {
250 return Err("AbrController requires at least one variant".into());
251 }
252 variants.sort_by_key(|v| v.bandwidth);
253 Ok(Self {
254 bandwidth_estimator: AbrBandwidthEstimator::new(10),
255 variants,
256 current_index: 0,
257 buffer_duration_s: 0.0,
258 min_buffer_s: 15.0,
259 panic_buffer_s: 5.0,
260 safety_factor: 0.8,
261 switch_cooldown_segments: 3,
262 segments_since_switch: 0,
263 })
264 }
265
266 #[must_use]
268 pub fn current_variant(&self) -> &AbrVariant {
269 &self.variants[self.current_index]
270 }
271
272 #[must_use]
274 pub fn variant_count(&self) -> usize {
275 self.variants.len()
276 }
277
278 pub fn update_bandwidth(&mut self, bytes: u64, duration_ms: u64) {
280 self.bandwidth_estimator.add_sample(bytes, duration_ms);
281 }
282
283 pub fn update_buffer(&mut self, buffer_duration_s: f64) {
285 self.buffer_duration_s = buffer_duration_s;
286 }
287
288 pub fn select_variant(&mut self) -> SelectionResult {
298 let old = self.current_index;
299
300 if self.buffer_duration_s < self.panic_buffer_s && old > 0 {
302 self.current_index = 0;
303 self.segments_since_switch = 0;
304 return SelectionResult::EmergencySwitch { from: old, to: 0 };
305 }
306
307 if self.segments_since_switch < self.switch_cooldown_segments {
309 self.segments_since_switch += 1;
310 return SelectionResult::Stay { variant: old };
311 }
312
313 let safe_bw = self.bandwidth_estimator.estimate_bps() * self.safety_factor;
315 let mut target = 0usize;
316 for (i, v) in self.variants.iter().enumerate() {
317 if v.bandwidth as f64 <= safe_bw {
318 target = i;
319 }
320 }
321
322 let result = if target > old {
324 if self.buffer_duration_s >= self.min_buffer_s {
325 let next = (old + 1).min(target);
327 self.current_index = next;
328 self.segments_since_switch = 0;
329 SelectionResult::SwitchUp {
330 from: old,
331 to: next,
332 reason: AbrSwitchReason::BandwidthIncrease,
333 }
334 } else {
335 self.segments_since_switch += 1;
337 SelectionResult::Stay { variant: old }
338 }
339 } else if target < old {
340 self.current_index = target;
341 self.segments_since_switch = 0;
342 SelectionResult::SwitchDown {
343 from: old,
344 to: target,
345 reason: AbrSwitchReason::BandwidthDecrease,
346 }
347 } else {
348 self.segments_since_switch += 1;
349 SelectionResult::Stay { variant: old }
350 };
351
352 result
353 }
354
355 pub fn force_variant(&mut self, index: usize) -> Result<(), String> {
357 if index >= self.variants.len() {
358 return Err(format!(
359 "Variant index {index} out of range (max {})",
360 self.variants.len() - 1
361 ));
362 }
363 self.current_index = index;
364 self.segments_since_switch = 0;
365 Ok(())
366 }
367}
368
369#[derive(Debug, Clone)]
371pub struct BufferedSegment {
372 pub sequence: u64,
374 pub variant_index: usize,
376 pub data: Vec<u8>,
378 pub duration_s: f64,
380 pub download_time_ms: u64,
382}
383
384#[derive(Debug)]
386pub struct SegmentFetcher {
387 controller: AbrController,
389 segment_duration_s: f64,
391 max_buffer_segments: usize,
393 buffered_segments: std::collections::VecDeque<BufferedSegment>,
395}
396
397impl SegmentFetcher {
398 #[must_use]
400 pub fn new(controller: AbrController, segment_duration_s: f64) -> Self {
401 Self {
402 controller,
403 segment_duration_s,
404 max_buffer_segments: 30,
405 buffered_segments: std::collections::VecDeque::new(),
406 }
407 }
408
409 #[must_use]
411 pub fn buffer_level_s(&self) -> f64 {
412 self.buffered_segments.iter().map(|s| s.duration_s).sum()
413 }
414
415 pub fn next_variant(&mut self) -> &AbrVariant {
420 let _result = self.controller.select_variant();
421 let buf = self.buffer_level_s();
422 self.controller.update_buffer(buf);
423 self.controller.current_variant()
424 }
425
426 pub fn record_download(
432 &mut self,
433 sequence: u64,
434 bytes: u64,
435 duration_ms: u64,
436 segment_duration_s: f64,
437 ) {
438 self.controller.update_bandwidth(bytes, duration_ms);
439 let variant_index = self.controller.current_index;
440 let seg = BufferedSegment {
441 sequence,
442 variant_index,
443 data: Vec::new(), duration_s: segment_duration_s,
445 download_time_ms: duration_ms,
446 };
447 if self.buffered_segments.len() >= self.max_buffer_segments {
448 self.buffered_segments.pop_front();
449 }
450 self.buffered_segments.push_back(seg);
451 }
452
453 pub fn pop_segment(&mut self) -> Option<BufferedSegment> {
455 self.buffered_segments.pop_front()
456 }
457
458 #[must_use]
460 pub fn buffered_count(&self) -> usize {
461 self.buffered_segments.len()
462 }
463}
464
465#[cfg(test)]
470mod streaming_abr_tests {
471 use super::*;
472
473 fn make_variants() -> Vec<AbrVariant> {
474 vec![
475 AbrVariant {
476 bandwidth: 500_000,
477 width: 640,
478 height: 360,
479 codecs: "avc1.42c01e,mp4a.40.2".into(),
480 uri: "low.m3u8".into(),
481 name: "360p".into(),
482 frame_rate: Some(30.0),
483 hdcp_level: None,
484 },
485 AbrVariant {
486 bandwidth: 1_500_000,
487 width: 1280,
488 height: 720,
489 codecs: "avc1.42c01e,mp4a.40.2".into(),
490 uri: "mid.m3u8".into(),
491 name: "720p".into(),
492 frame_rate: Some(30.0),
493 hdcp_level: None,
494 },
495 AbrVariant {
496 bandwidth: 4_000_000,
497 width: 1920,
498 height: 1080,
499 codecs: "avc1.640028,mp4a.40.2".into(),
500 uri: "high.m3u8".into(),
501 name: "1080p".into(),
502 frame_rate: Some(60.0),
503 hdcp_level: None,
504 },
505 ]
506 }
507
508 #[test]
511 fn test_bandwidth_estimator_basic() {
512 let mut est = AbrBandwidthEstimator::new(10);
513 est.add_sample(1_000_000, 1_000); est.add_sample(2_000_000, 1_000); est.add_sample(1_500_000, 1_000); assert!(est.estimate_bps() > 0.0, "estimate should be positive");
517 assert_eq!(est.sample_count(), 3);
518 }
519
520 #[test]
521 fn test_bandwidth_estimator_percentile() {
522 let mut est = AbrBandwidthEstimator::new(20);
523 for _ in 0..5 {
525 est.add_sample(125_000, 1_000); }
527 for _ in 0..5 {
528 est.add_sample(1_250_000, 1_000); }
530 let p15 = est.percentile_bps(0.15);
531 let p85 = est.percentile_bps(0.85);
532 assert!(p15 < p85, "15th percentile should be lower than 85th");
533 assert!(p15 > 0.0, "percentile should be positive");
534 }
535
536 #[test]
539 fn test_abr_controller_creation() {
540 let mut variants = make_variants();
542 variants.reverse(); let ctrl = AbrController::new(variants).expect("should create controller");
544 assert_eq!(ctrl.variant_count(), 3);
545 assert_eq!(ctrl.current_variant().bandwidth, 500_000);
547 }
548
549 #[test]
550 fn test_abr_stay_on_low_buffer() {
551 let mut ctrl = AbrController::new(make_variants()).expect("should succeed in test");
552 ctrl.force_variant(2).expect("should succeed in test");
554 ctrl.update_bandwidth(500_000, 1_000); ctrl.update_buffer(2.0);
558 ctrl.segments_since_switch = ctrl.switch_cooldown_segments;
560
561 let result = ctrl.select_variant();
562 assert!(
563 result.is_emergency(),
564 "expected emergency switch, got {result:?}"
565 );
566 assert_eq!(
567 result.variant_index(),
568 0,
569 "emergency switch must go to index 0"
570 );
571 }
572
573 #[test]
574 fn test_abr_switch_up_good_bandwidth() {
575 let mut ctrl = AbrController::new(make_variants()).expect("should succeed in test");
576 ctrl.update_bandwidth(5_000_000, 1_000);
579 ctrl.update_buffer(20.0);
581 ctrl.segments_since_switch = ctrl.switch_cooldown_segments;
583
584 let result = ctrl.select_variant();
585 assert!(
586 result.is_switch(),
587 "expected a switch with excellent bandwidth"
588 );
589 assert!(
590 result.variant_index() > 0,
591 "should switch up from index 0, got {}",
592 result.variant_index()
593 );
594 }
595
596 #[test]
597 fn test_abr_cooldown() {
598 let mut ctrl = AbrController::new(make_variants()).expect("should succeed in test");
599 ctrl.update_bandwidth(5_000_000, 1_000);
601 ctrl.update_buffer(20.0);
602 ctrl.segments_since_switch = ctrl.switch_cooldown_segments;
604
605 let first = ctrl.select_variant();
606 let _ = first;
608
609 let second = ctrl.select_variant();
611 assert!(
612 matches!(second, SelectionResult::Stay { .. }),
613 "cooldown should prevent immediate second switch, got {second:?}"
614 );
615 }
616
617 #[test]
620 fn test_selection_result_accessors() {
621 let stay = SelectionResult::Stay { variant: 1 };
622 assert_eq!(stay.variant_index(), 1);
623 assert!(!stay.is_switch());
624 assert!(!stay.is_emergency());
625
626 let up = SelectionResult::SwitchUp {
627 from: 0,
628 to: 1,
629 reason: AbrSwitchReason::BandwidthIncrease,
630 };
631 assert_eq!(up.variant_index(), 1);
632 assert!(up.is_switch());
633 assert!(!up.is_emergency());
634
635 let down = SelectionResult::SwitchDown {
636 from: 2,
637 to: 1,
638 reason: AbrSwitchReason::BandwidthDecrease,
639 };
640 assert_eq!(down.variant_index(), 1);
641 assert!(down.is_switch());
642 assert!(!down.is_emergency());
643
644 let emergency = SelectionResult::EmergencySwitch { from: 2, to: 0 };
645 assert_eq!(emergency.variant_index(), 0);
646 assert!(emergency.is_switch());
647 assert!(emergency.is_emergency());
648 }
649
650 #[test]
653 fn test_segment_fetcher_buffer_level() {
654 let ctrl = AbrController::new(make_variants()).expect("should succeed in test");
655 let mut fetcher = SegmentFetcher::new(ctrl, 4.0);
656
657 fetcher.record_download(0, 500_000, 1_000, 4.0);
658 fetcher.record_download(1, 500_000, 1_000, 4.0);
659 fetcher.record_download(2, 500_000, 1_000, 4.0);
660
661 let level = fetcher.buffer_level_s();
662 assert!(
663 (level - 12.0).abs() < f64::EPSILON,
664 "3 × 4 s segments = 12 s, got {level}"
665 );
666 assert_eq!(fetcher.buffered_count(), 3);
667 }
668
669 #[test]
670 fn test_segment_fetcher_pop() {
671 let ctrl = AbrController::new(make_variants()).expect("should succeed in test");
672 let mut fetcher = SegmentFetcher::new(ctrl, 6.0);
673
674 fetcher.record_download(0, 750_000, 800, 6.0);
675 fetcher.record_download(1, 750_000, 800, 6.0);
676
677 assert_eq!(fetcher.buffered_count(), 2);
678
679 let seg = fetcher.pop_segment().expect("should return a segment");
680 assert_eq!(seg.sequence, 0);
681 assert_eq!(fetcher.buffered_count(), 1);
682
683 let level = fetcher.buffer_level_s();
684 assert!(
685 (level - 6.0).abs() < f64::EPSILON,
686 "after pop, 1 × 6 s segment remains, got {level}"
687 );
688 }
689}