Skip to main content

oximedia_net/abr/
streaming.rs

1//! Streaming bandwidth measurement and quality adaptation utilities.
2
3/// A single bandwidth measurement sample.
4#[derive(Debug, Clone)]
5pub struct BandwidthSample {
6    /// Bytes downloaded in this measurement.
7    pub bytes: u64,
8    /// Wall-clock download time in milliseconds.
9    pub duration_ms: u64,
10    /// Instant when the sample was recorded.
11    pub timestamp: std::time::Instant,
12}
13
14/// Exponentially-weighted moving average (EWMA) bandwidth estimator.
15///
16/// Maintains a sliding window of [`BandwidthSample`]s and computes an EWMA
17/// estimate as well as order-statistic (percentile) estimates.
18#[derive(Debug)]
19pub struct AbrBandwidthEstimator {
20    window: std::collections::VecDeque<BandwidthSample>,
21    window_size: usize,
22    smoothing_factor: f64,
23    current_estimate_bps: f64,
24}
25
26impl AbrBandwidthEstimator {
27    /// Creates a new estimator.
28    ///
29    /// * `window_size`      — maximum number of samples retained (default 10).
30    /// * `smoothing_factor` — EWMA alpha ∈ `[0, 1]` (default 0.3, lower is smoother).
31    #[must_use]
32    pub fn new(window_size: usize) -> Self {
33        Self {
34            window: std::collections::VecDeque::with_capacity(window_size.max(1)),
35            window_size: window_size.max(1),
36            smoothing_factor: 0.3,
37            current_estimate_bps: 0.0,
38        }
39    }
40
41    /// Returns the current EWMA bandwidth estimate in bits per second.
42    #[must_use]
43    pub fn estimate_bps(&self) -> f64 {
44        self.current_estimate_bps
45    }
46
47    /// Returns the current estimate in kilobits per second.
48    #[must_use]
49    pub fn estimate_kbps(&self) -> f64 {
50        self.current_estimate_bps / 1_000.0
51    }
52
53    /// Returns the current estimate in megabits per second.
54    #[must_use]
55    pub fn estimate_mbps(&self) -> f64 {
56        self.current_estimate_bps / 1_000_000.0
57    }
58
59    /// Adds a new download measurement and updates the EWMA estimate.
60    pub fn add_sample(&mut self, bytes: u64, duration_ms: u64) {
61        // Compute instantaneous rate in bits per second.
62        let sample_bps = if duration_ms == 0 {
63            0.0
64        } else {
65            (bytes as f64 * 8.0 * 1_000.0) / duration_ms as f64
66        };
67
68        if self.current_estimate_bps <= 0.0 {
69            self.current_estimate_bps = sample_bps;
70        } else {
71            self.current_estimate_bps = self.smoothing_factor * sample_bps
72                + (1.0 - self.smoothing_factor) * self.current_estimate_bps;
73        }
74
75        let sample = BandwidthSample {
76            bytes,
77            duration_ms,
78            timestamp: std::time::Instant::now(),
79        };
80
81        if self.window.len() >= self.window_size {
82            self.window.pop_front();
83        }
84        self.window.push_back(sample);
85    }
86
87    /// Returns the `percentile`-th order statistic of the sample rates,
88    /// where `percentile` ∈ `[0.0, 1.0]`.
89    ///
90    /// For example `percentile_bps(0.15)` gives the 15th-percentile rate —
91    /// a conservative estimate suitable for cautious ABR logic.
92    #[must_use]
93    pub fn percentile_bps(&self, percentile: f64) -> f64 {
94        if self.window.is_empty() {
95            return 0.0;
96        }
97
98        let mut rates: Vec<f64> = self
99            .window
100            .iter()
101            .map(|s| {
102                if s.duration_ms == 0 {
103                    0.0
104                } else {
105                    (s.bytes as f64 * 8.0 * 1_000.0) / s.duration_ms as f64
106                }
107            })
108            .collect();
109        rates.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
110
111        let p = percentile.clamp(0.0, 1.0);
112        let idx = ((rates.len() as f64 - 1.0) * p) as usize;
113        rates[idx.min(rates.len() - 1)]
114    }
115
116    /// Returns the number of samples currently retained.
117    #[must_use]
118    pub fn sample_count(&self) -> usize {
119        self.window.len()
120    }
121}
122
123/// One rendition of an adaptive stream (a single quality level).
124#[derive(Debug, Clone)]
125pub struct AbrVariant {
126    /// Peak bandwidth in bits per second.
127    pub bandwidth: u64,
128    /// Frame width in pixels.
129    pub width: u32,
130    /// Frame height in pixels.
131    pub height: u32,
132    /// Codec string (e.g. `"avc1.42c01e,mp4a.40.2"`).
133    pub codecs: String,
134    /// Playlist / segment URI for this rendition.
135    pub uri: String,
136    /// Human-readable name such as `"1080p"`.
137    pub name: String,
138    /// Frame rate, if known.
139    pub frame_rate: Option<f64>,
140    /// HDCP level, if specified.
141    pub hdcp_level: Option<String>,
142}
143
144/// Reason why a quality switch was made.
145#[derive(Debug, Clone, Copy, PartialEq, Eq)]
146pub enum AbrSwitchReason {
147    /// Estimated bandwidth increased enough to warrant a higher rendition.
148    BandwidthIncrease,
149    /// Estimated bandwidth decreased — switching down to stay sustainable.
150    BandwidthDecrease,
151    /// Buffer fell below the panic threshold.
152    BufferStarvation,
153    /// The application requested a specific rendition.
154    UserRequested,
155}
156
157/// Result returned by [`AbrController::select_variant`].
158#[derive(Debug, Clone, Copy, PartialEq, Eq)]
159pub enum SelectionResult {
160    /// Staying on the current variant.
161    Stay {
162        /// Index of the current variant.
163        variant: usize,
164    },
165    /// Switching to a higher-quality variant.
166    SwitchUp {
167        /// Index of the old variant.
168        from: usize,
169        /// Index of the new variant.
170        to: usize,
171        /// Reason for the switch.
172        reason: AbrSwitchReason,
173    },
174    /// Switching to a lower-quality variant.
175    SwitchDown {
176        /// Index of the old variant.
177        from: usize,
178        /// Index of the new variant.
179        to: usize,
180        /// Reason for the switch.
181        reason: AbrSwitchReason,
182    },
183    /// Emergency switch to the lowest available variant.
184    EmergencySwitch {
185        /// Index of the old variant.
186        from: usize,
187        /// Index of the new (lowest) variant.
188        to: usize,
189    },
190}
191
192impl SelectionResult {
193    /// Returns the variant index that should be used after this decision.
194    #[must_use]
195    pub const fn variant_index(&self) -> usize {
196        match self {
197            Self::Stay { variant } => *variant,
198            Self::SwitchUp { to, .. } => *to,
199            Self::SwitchDown { to, .. } => *to,
200            Self::EmergencySwitch { to, .. } => *to,
201        }
202    }
203
204    /// Returns `true` if this decision involves changing the active variant.
205    #[must_use]
206    pub const fn is_switch(&self) -> bool {
207        !matches!(self, Self::Stay { .. })
208    }
209
210    /// Returns `true` if this is an emergency switch.
211    #[must_use]
212    pub const fn is_emergency(&self) -> bool {
213        matches!(self, Self::EmergencySwitch { .. })
214    }
215}
216
217/// Adaptive bitrate controller for segment-based streaming.
218///
219/// Maintains a sorted list of [`AbrVariant`]s and selects the most appropriate
220/// one at each segment boundary based on measured bandwidth and buffer level.
221#[derive(Debug)]
222pub struct AbrController {
223    /// All renditions, sorted by bandwidth ascending.
224    variants: Vec<AbrVariant>,
225    /// Index into `variants` of the currently active rendition.
226    current_index: usize,
227    /// Bandwidth estimator.
228    bandwidth_estimator: AbrBandwidthEstimator,
229    /// Current buffer level in seconds.
230    buffer_duration_s: f64,
231    /// Minimum buffer required before attempting a switch-up (seconds).
232    min_buffer_s: f64,
233    /// Buffer level below which an emergency switch-down is triggered (seconds).
234    panic_buffer_s: f64,
235    /// Fraction of estimated bandwidth to use for selection (headroom).
236    safety_factor: f64,
237    /// Number of segments to wait between quality switches.
238    switch_cooldown_segments: u32,
239    /// Segments downloaded since the last quality switch.
240    segments_since_switch: u32,
241}
242
243impl AbrController {
244    /// Creates a new controller from a list of variants.
245    ///
246    /// Variants are sorted by bandwidth ascending so that index 0 is always
247    /// the lowest quality.  Returns `Err` if `variants` is empty.
248    pub fn new(mut variants: Vec<AbrVariant>) -> Result<Self, String> {
249        if variants.is_empty() {
250            return Err("AbrController requires at least one variant".into());
251        }
252        variants.sort_by_key(|v| v.bandwidth);
253        Ok(Self {
254            bandwidth_estimator: AbrBandwidthEstimator::new(10),
255            variants,
256            current_index: 0,
257            buffer_duration_s: 0.0,
258            min_buffer_s: 15.0,
259            panic_buffer_s: 5.0,
260            safety_factor: 0.8,
261            switch_cooldown_segments: 3,
262            segments_since_switch: 0,
263        })
264    }
265
266    /// Returns a reference to the currently active variant.
267    #[must_use]
268    pub fn current_variant(&self) -> &AbrVariant {
269        &self.variants[self.current_index]
270    }
271
272    /// Returns the total number of variants.
273    #[must_use]
274    pub fn variant_count(&self) -> usize {
275        self.variants.len()
276    }
277
278    /// Feeds a new segment download measurement into the bandwidth estimator.
279    pub fn update_bandwidth(&mut self, bytes: u64, duration_ms: u64) {
280        self.bandwidth_estimator.add_sample(bytes, duration_ms);
281    }
282
283    /// Updates the current buffer level.
284    pub fn update_buffer(&mut self, buffer_duration_s: f64) {
285        self.buffer_duration_s = buffer_duration_s;
286    }
287
288    /// Runs the core ABR logic and returns a [`SelectionResult`].
289    ///
290    /// Decision rules (in priority order):
291    /// 1. Buffer below panic threshold → emergency switch to index 0.
292    /// 2. Cooldown active → stay.
293    /// 3. Compute `safe_bw = estimate * safety_factor`.
294    /// 4. Find highest variant whose bandwidth ≤ safe_bw.
295    /// 5. If buffer ≥ min_buffer_s allow switching up one step; otherwise
296    ///    only allow switching down.
297    pub fn select_variant(&mut self) -> SelectionResult {
298        let old = self.current_index;
299
300        // Rule 1: emergency.
301        if self.buffer_duration_s < self.panic_buffer_s && old > 0 {
302            self.current_index = 0;
303            self.segments_since_switch = 0;
304            return SelectionResult::EmergencySwitch { from: old, to: 0 };
305        }
306
307        // Rule 2: cooldown.
308        if self.segments_since_switch < self.switch_cooldown_segments {
309            self.segments_since_switch += 1;
310            return SelectionResult::Stay { variant: old };
311        }
312
313        // Rule 3-4: find best variant by bandwidth.
314        let safe_bw = self.bandwidth_estimator.estimate_bps() * self.safety_factor;
315        let mut target = 0usize;
316        for (i, v) in self.variants.iter().enumerate() {
317            if v.bandwidth as f64 <= safe_bw {
318                target = i;
319            }
320        }
321
322        // Rule 5: buffer-gated upswitch.
323        let result = if target > old {
324            if self.buffer_duration_s >= self.min_buffer_s {
325                // Allow at most one step up.
326                let next = (old + 1).min(target);
327                self.current_index = next;
328                self.segments_since_switch = 0;
329                SelectionResult::SwitchUp {
330                    from: old,
331                    to: next,
332                    reason: AbrSwitchReason::BandwidthIncrease,
333                }
334            } else {
335                // Buffer too low to switch up → stay.
336                self.segments_since_switch += 1;
337                SelectionResult::Stay { variant: old }
338            }
339        } else if target < old {
340            self.current_index = target;
341            self.segments_since_switch = 0;
342            SelectionResult::SwitchDown {
343                from: old,
344                to: target,
345                reason: AbrSwitchReason::BandwidthDecrease,
346            }
347        } else {
348            self.segments_since_switch += 1;
349            SelectionResult::Stay { variant: old }
350        };
351
352        result
353    }
354
355    /// Forces a specific variant index, bypassing ABR logic.
356    pub fn force_variant(&mut self, index: usize) -> Result<(), String> {
357        if index >= self.variants.len() {
358            return Err(format!(
359                "Variant index {index} out of range (max {})",
360                self.variants.len() - 1
361            ));
362        }
363        self.current_index = index;
364        self.segments_since_switch = 0;
365        Ok(())
366    }
367}
368
369/// A segment that has been downloaded and placed in the playback buffer.
370#[derive(Debug, Clone)]
371pub struct BufferedSegment {
372    /// Sequence number of this segment.
373    pub sequence: u64,
374    /// Variant index this segment was downloaded at.
375    pub variant_index: usize,
376    /// Raw segment bytes.
377    pub data: Vec<u8>,
378    /// Playback duration of this segment in seconds.
379    pub duration_s: f64,
380    /// Download time in milliseconds.
381    pub download_time_ms: u64,
382}
383
384/// Drives an [`AbrController`] and maintains an in-memory segment buffer.
385#[derive(Debug)]
386pub struct SegmentFetcher {
387    /// Underlying ABR controller.
388    controller: AbrController,
389    /// Default segment playback duration in seconds.
390    segment_duration_s: f64,
391    /// Maximum number of segments to keep buffered.
392    max_buffer_segments: usize,
393    /// The buffered segment queue (oldest at front).
394    buffered_segments: std::collections::VecDeque<BufferedSegment>,
395}
396
397impl SegmentFetcher {
398    /// Creates a new fetcher wrapping the given controller.
399    #[must_use]
400    pub fn new(controller: AbrController, segment_duration_s: f64) -> Self {
401        Self {
402            controller,
403            segment_duration_s,
404            max_buffer_segments: 30,
405            buffered_segments: std::collections::VecDeque::new(),
406        }
407    }
408
409    /// Returns the total playback duration currently buffered, in seconds.
410    #[must_use]
411    pub fn buffer_level_s(&self) -> f64 {
412        self.buffered_segments.iter().map(|s| s.duration_s).sum()
413    }
414
415    /// Selects the variant to use for the next segment download.
416    ///
417    /// Calls [`AbrController::select_variant`] then updates the buffer level
418    /// inside the controller so the next call has accurate state.
419    pub fn next_variant(&mut self) -> &AbrVariant {
420        let _result = self.controller.select_variant();
421        let buf = self.buffer_level_s();
422        self.controller.update_buffer(buf);
423        self.controller.current_variant()
424    }
425
426    /// Records a completed segment download.
427    ///
428    /// Updates bandwidth estimation and appends a [`BufferedSegment`] to the
429    /// buffer.  If the buffer exceeds `max_buffer_segments`, the oldest entry
430    /// is silently dropped.
431    pub fn record_download(
432        &mut self,
433        sequence: u64,
434        bytes: u64,
435        duration_ms: u64,
436        segment_duration_s: f64,
437    ) {
438        self.controller.update_bandwidth(bytes, duration_ms);
439        let variant_index = self.controller.current_index;
440        let seg = BufferedSegment {
441            sequence,
442            variant_index,
443            data: Vec::new(), // caller fills in real data separately if needed
444            duration_s: segment_duration_s,
445            download_time_ms: duration_ms,
446        };
447        if self.buffered_segments.len() >= self.max_buffer_segments {
448            self.buffered_segments.pop_front();
449        }
450        self.buffered_segments.push_back(seg);
451    }
452
453    /// Removes and returns the oldest buffered segment (simulating playback).
454    pub fn pop_segment(&mut self) -> Option<BufferedSegment> {
455        self.buffered_segments.pop_front()
456    }
457
458    /// Returns the number of segments currently buffered.
459    #[must_use]
460    pub fn buffered_count(&self) -> usize {
461        self.buffered_segments.len()
462    }
463}
464
465// ─────────────────────────────────────────────────────────────────────────────
466// Tests for streaming ABR types
467// ─────────────────────────────────────────────────────────────────────────────
468
469#[cfg(test)]
470mod streaming_abr_tests {
471    use super::*;
472
473    fn make_variants() -> Vec<AbrVariant> {
474        vec![
475            AbrVariant {
476                bandwidth: 500_000,
477                width: 640,
478                height: 360,
479                codecs: "avc1.42c01e,mp4a.40.2".into(),
480                uri: "low.m3u8".into(),
481                name: "360p".into(),
482                frame_rate: Some(30.0),
483                hdcp_level: None,
484            },
485            AbrVariant {
486                bandwidth: 1_500_000,
487                width: 1280,
488                height: 720,
489                codecs: "avc1.42c01e,mp4a.40.2".into(),
490                uri: "mid.m3u8".into(),
491                name: "720p".into(),
492                frame_rate: Some(30.0),
493                hdcp_level: None,
494            },
495            AbrVariant {
496                bandwidth: 4_000_000,
497                width: 1920,
498                height: 1080,
499                codecs: "avc1.640028,mp4a.40.2".into(),
500                uri: "high.m3u8".into(),
501                name: "1080p".into(),
502                frame_rate: Some(60.0),
503                hdcp_level: None,
504            },
505        ]
506    }
507
508    // ── BandwidthEstimator tests ──────────────────────────────────────────
509
510    #[test]
511    fn test_bandwidth_estimator_basic() {
512        let mut est = AbrBandwidthEstimator::new(10);
513        est.add_sample(1_000_000, 1_000); // 8 Mbps
514        est.add_sample(2_000_000, 1_000); // 16 Mbps
515        est.add_sample(1_500_000, 1_000); // 12 Mbps
516        assert!(est.estimate_bps() > 0.0, "estimate should be positive");
517        assert_eq!(est.sample_count(), 3);
518    }
519
520    #[test]
521    fn test_bandwidth_estimator_percentile() {
522        let mut est = AbrBandwidthEstimator::new(20);
523        // 5 slow samples at 1 Mbps then 5 fast samples at 10 Mbps
524        for _ in 0..5 {
525            est.add_sample(125_000, 1_000); // 1 Mbps
526        }
527        for _ in 0..5 {
528            est.add_sample(1_250_000, 1_000); // 10 Mbps
529        }
530        let p15 = est.percentile_bps(0.15);
531        let p85 = est.percentile_bps(0.85);
532        assert!(p15 < p85, "15th percentile should be lower than 85th");
533        assert!(p15 > 0.0, "percentile should be positive");
534    }
535
536    // ── AbrController tests ───────────────────────────────────────────────
537
538    #[test]
539    fn test_abr_controller_creation() {
540        // Provide variants out of bandwidth order; controller must sort them.
541        let mut variants = make_variants();
542        variants.reverse(); // highest bandwidth first
543        let ctrl = AbrController::new(variants).expect("should create controller");
544        assert_eq!(ctrl.variant_count(), 3);
545        // After sorting, index 0 must be the lowest bandwidth.
546        assert_eq!(ctrl.current_variant().bandwidth, 500_000);
547    }
548
549    #[test]
550    fn test_abr_stay_on_low_buffer() {
551        let mut ctrl = AbrController::new(make_variants()).expect("should succeed in test");
552        // Force to highest variant.
553        ctrl.force_variant(2).expect("should succeed in test");
554        // Feed some bandwidth samples so estimate is non-zero.
555        ctrl.update_bandwidth(500_000, 1_000); // 4 Mbps
556                                               // Panic-level buffer.
557        ctrl.update_buffer(2.0);
558        // Reset cooldown so decision runs.
559        ctrl.segments_since_switch = ctrl.switch_cooldown_segments;
560
561        let result = ctrl.select_variant();
562        assert!(
563            result.is_emergency(),
564            "expected emergency switch, got {result:?}"
565        );
566        assert_eq!(
567            result.variant_index(),
568            0,
569            "emergency switch must go to index 0"
570        );
571    }
572
573    #[test]
574    fn test_abr_switch_up_good_bandwidth() {
575        let mut ctrl = AbrController::new(make_variants()).expect("should succeed in test");
576        // Start at index 0, simulate 40 Mbps link.
577        // 5_000_000 bytes in 1000 ms = 40 Mbps
578        ctrl.update_bandwidth(5_000_000, 1_000);
579        // Healthy buffer.
580        ctrl.update_buffer(20.0);
581        // Ensure cooldown has expired.
582        ctrl.segments_since_switch = ctrl.switch_cooldown_segments;
583
584        let result = ctrl.select_variant();
585        assert!(
586            result.is_switch(),
587            "expected a switch with excellent bandwidth"
588        );
589        assert!(
590            result.variant_index() > 0,
591            "should switch up from index 0, got {}",
592            result.variant_index()
593        );
594    }
595
596    #[test]
597    fn test_abr_cooldown() {
598        let mut ctrl = AbrController::new(make_variants()).expect("should succeed in test");
599        // Feed strong bandwidth.
600        ctrl.update_bandwidth(5_000_000, 1_000);
601        ctrl.update_buffer(20.0);
602        // Expire cooldown for the first call.
603        ctrl.segments_since_switch = ctrl.switch_cooldown_segments;
604
605        let first = ctrl.select_variant();
606        // First call may switch up.
607        let _ = first;
608
609        // Immediately call again — cooldown should prevent another switch.
610        let second = ctrl.select_variant();
611        assert!(
612            matches!(second, SelectionResult::Stay { .. }),
613            "cooldown should prevent immediate second switch, got {second:?}"
614        );
615    }
616
617    // ── SelectionResult tests ─────────────────────────────────────────────
618
619    #[test]
620    fn test_selection_result_accessors() {
621        let stay = SelectionResult::Stay { variant: 1 };
622        assert_eq!(stay.variant_index(), 1);
623        assert!(!stay.is_switch());
624        assert!(!stay.is_emergency());
625
626        let up = SelectionResult::SwitchUp {
627            from: 0,
628            to: 1,
629            reason: AbrSwitchReason::BandwidthIncrease,
630        };
631        assert_eq!(up.variant_index(), 1);
632        assert!(up.is_switch());
633        assert!(!up.is_emergency());
634
635        let down = SelectionResult::SwitchDown {
636            from: 2,
637            to: 1,
638            reason: AbrSwitchReason::BandwidthDecrease,
639        };
640        assert_eq!(down.variant_index(), 1);
641        assert!(down.is_switch());
642        assert!(!down.is_emergency());
643
644        let emergency = SelectionResult::EmergencySwitch { from: 2, to: 0 };
645        assert_eq!(emergency.variant_index(), 0);
646        assert!(emergency.is_switch());
647        assert!(emergency.is_emergency());
648    }
649
650    // ── SegmentFetcher tests ──────────────────────────────────────────────
651
652    #[test]
653    fn test_segment_fetcher_buffer_level() {
654        let ctrl = AbrController::new(make_variants()).expect("should succeed in test");
655        let mut fetcher = SegmentFetcher::new(ctrl, 4.0);
656
657        fetcher.record_download(0, 500_000, 1_000, 4.0);
658        fetcher.record_download(1, 500_000, 1_000, 4.0);
659        fetcher.record_download(2, 500_000, 1_000, 4.0);
660
661        let level = fetcher.buffer_level_s();
662        assert!(
663            (level - 12.0).abs() < f64::EPSILON,
664            "3 × 4 s segments = 12 s, got {level}"
665        );
666        assert_eq!(fetcher.buffered_count(), 3);
667    }
668
669    #[test]
670    fn test_segment_fetcher_pop() {
671        let ctrl = AbrController::new(make_variants()).expect("should succeed in test");
672        let mut fetcher = SegmentFetcher::new(ctrl, 6.0);
673
674        fetcher.record_download(0, 750_000, 800, 6.0);
675        fetcher.record_download(1, 750_000, 800, 6.0);
676
677        assert_eq!(fetcher.buffered_count(), 2);
678
679        let seg = fetcher.pop_segment().expect("should return a segment");
680        assert_eq!(seg.sequence, 0);
681        assert_eq!(fetcher.buffered_count(), 1);
682
683        let level = fetcher.buffer_level_s();
684        assert!(
685            (level - 6.0).abs() < f64::EPSILON,
686            "after pop, 1 × 6 s segment remains, got {level}"
687        );
688    }
689}