use super::{AUDIO_ONLY_BPS, HIGH_MIN_BPS, LOW_MIN_BPS, MEDIUM_MIN_BPS, UPGRADE_STREAK};
use crate::ids::SfuRid;
#[must_use = "PacerAction must be applied to the subscriber's forwarding state"]
#[cfg_attr(docsrs, doc(cfg(feature = "pacer")))]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PacerAction {
NoChange,
ChangeLayer(SfuRid),
GoAudioOnly,
RestoreVideo,
}
#[derive(Debug)]
pub(crate) struct SubscriberPacer {
current_layer: SfuRid,
audio_only: bool,
upgrade_streak: u8,
}
impl SubscriberPacer {
pub(crate) fn new() -> Self {
Self {
current_layer: SfuRid::LOW,
audio_only: false,
upgrade_streak: 0,
}
}
pub(crate) fn update(&mut self, bps: u64) -> PacerAction {
if self.audio_only {
if bps >= LOW_MIN_BPS {
self.audio_only = false;
self.current_layer = SfuRid::LOW;
self.upgrade_streak = 0;
return PacerAction::RestoreVideo;
}
return PacerAction::NoChange;
}
if bps < AUDIO_ONLY_BPS {
self.audio_only = true;
self.upgrade_streak = 0;
return PacerAction::GoAudioOnly;
}
let target = layer_for_bps(bps);
if rank(target) < rank(self.current_layer) {
self.current_layer = target;
self.upgrade_streak = 0;
return PacerAction::ChangeLayer(target);
}
if rank(target) > rank(self.current_layer) {
self.upgrade_streak += 1;
if self.upgrade_streak >= UPGRADE_STREAK {
self.current_layer = target;
self.upgrade_streak = 0;
return PacerAction::ChangeLayer(target);
}
} else {
self.upgrade_streak = 0;
}
PacerAction::NoChange
}
#[cfg(test)]
fn layer(&self) -> SfuRid {
self.current_layer
}
#[cfg(test)]
fn audio_only(&self) -> bool {
self.audio_only
}
}
fn rank(r: SfuRid) -> u8 {
if r == SfuRid::LOW {
0
} else if r == SfuRid::MEDIUM {
1
} else if r == SfuRid::HIGH {
2
} else {
unreachable!("unhandled SfuRid in pacer rank")
}
}
fn layer_for_bps(bps: u64) -> SfuRid {
if bps >= HIGH_MIN_BPS {
SfuRid::HIGH
} else if bps >= MEDIUM_MIN_BPS {
SfuRid::MEDIUM
} else {
SfuRid::LOW
}
}
#[cfg(test)]
#[allow(unused_must_use)] mod tests {
use super::*;
fn pump(p: &mut SubscriberPacer, bps: u64, n: u8) -> PacerAction {
let mut last = PacerAction::NoChange;
for _ in 0..n {
last = p.update(bps);
}
last
}
#[test]
fn starts_at_low() {
let p = SubscriberPacer::new();
assert_eq!(p.layer(), SfuRid::LOW);
assert!(!p.audio_only());
}
#[test]
fn upgrade_requires_3_consecutive_ticks() {
let mut p = SubscriberPacer::new();
let bps = MEDIUM_MIN_BPS + 1_000;
let _ = pump(&mut p, bps, 2);
assert_eq!(p.layer(), SfuRid::LOW, "should not upgrade after 2 ticks");
let a = p.update(bps);
assert_eq!(a, PacerAction::ChangeLayer(SfuRid::MEDIUM));
assert_eq!(p.layer(), SfuRid::MEDIUM);
}
#[test]
fn downgrade_is_immediate() {
let mut p = SubscriberPacer::new();
let _ = pump(&mut p, HIGH_MIN_BPS + 100_000, 6);
assert_eq!(p.layer(), SfuRid::HIGH);
let a = p.update(MEDIUM_MIN_BPS - 10_000);
assert_eq!(a, PacerAction::ChangeLayer(SfuRid::LOW));
assert_eq!(p.layer(), SfuRid::LOW);
}
#[test]
fn streak_resets_on_interruption() {
let mut p = SubscriberPacer::new();
let hi = MEDIUM_MIN_BPS + 1_000;
let lo = LOW_MIN_BPS + 1_000;
p.update(hi); p.update(hi); p.update(lo); p.update(hi); p.update(hi); assert_eq!(
p.layer(),
SfuRid::LOW,
"should NOT have upgraded --- streak reset"
);
}
#[test]
fn audio_only_below_threshold() {
let mut p = SubscriberPacer::new();
let a = p.update(AUDIO_ONLY_BPS - 1_000);
assert_eq!(a, PacerAction::GoAudioOnly);
assert!(p.audio_only());
assert_eq!(p.update(100_000), PacerAction::NoChange);
let a = p.update(LOW_MIN_BPS + 1_000);
assert_eq!(a, PacerAction::RestoreVideo);
assert!(!p.audio_only());
}
#[test]
fn no_change_at_correct_layer() {
let mut p = SubscriberPacer::new();
for _ in 0..10 {
assert_eq!(p.update(LOW_MIN_BPS + 50_000), PacerAction::NoChange);
}
}
#[test]
fn exact_audio_only_boundary_is_video_mode() {
let mut p = SubscriberPacer::new();
let action = p.update(AUDIO_ONLY_BPS); assert_ne!(
action,
PacerAction::GoAudioOnly,
"exactly AUDIO_ONLY_BPS should remain in video mode (condition is strictly <)"
);
}
#[test]
fn no_double_go_audio_only() {
let mut p = SubscriberPacer::new();
let first = p.update(AUDIO_ONLY_BPS - 1);
assert_eq!(first, PacerAction::GoAudioOnly);
let second = p.update(1_000); assert_eq!(second, PacerAction::NoChange,
"GoAudioOnly must not be emitted twice; second call while audio-only must return NoChange");
}
#[test]
fn restore_video_resets_streak_for_upgrade() {
let mut p = SubscriberPacer::new();
p.update(AUDIO_ONLY_BPS - 1); p.update(LOW_MIN_BPS + 1); p.update(MEDIUM_MIN_BPS + 1);
p.update(MEDIUM_MIN_BPS + 1);
assert_eq!(
p.layer(),
SfuRid::LOW,
"after RestoreVideo, still need 3 ticks to upgrade"
);
let action = p.update(MEDIUM_MIN_BPS + 1);
assert_eq!(action, PacerAction::ChangeLayer(SfuRid::MEDIUM));
}
#[test]
fn exact_low_min_boundary_triggers_restore_video() {
let mut p = SubscriberPacer::new();
p.update(AUDIO_ONLY_BPS - 1); let action = p.update(LOW_MIN_BPS); assert_eq!(
action,
PacerAction::RestoreVideo,
"exactly LOW_MIN_BPS while audio-only should trigger RestoreVideo (condition is >=)"
);
}
#[test]
fn grey_zone_while_audio_only_is_no_change() {
let mut p = SubscriberPacer::new();
p.update(AUDIO_ONLY_BPS - 1); for bps in [AUDIO_ONLY_BPS, AUDIO_ONLY_BPS + 1, LOW_MIN_BPS - 1] {
assert_eq!(
p.update(bps),
PacerAction::NoChange,
"bps={bps} in grey zone should be NoChange while audio-only"
);
}
}
#[test]
fn downgrade_from_medium_resets_streak_so_re_upgrade_needs_3_ticks() {
let mut p = SubscriberPacer::new();
for _ in 0..3 {
p.update(MEDIUM_MIN_BPS + 1);
}
assert_eq!(p.layer(), SfuRid::MEDIUM);
p.update(LOW_MIN_BPS + 1);
assert_eq!(p.layer(), SfuRid::LOW);
p.update(MEDIUM_MIN_BPS + 1);
p.update(MEDIUM_MIN_BPS + 1);
assert_eq!(
p.layer(),
SfuRid::LOW,
"streak must have reset on downgrade"
);
p.update(MEDIUM_MIN_BPS + 1);
assert_eq!(p.layer(), SfuRid::MEDIUM);
}
}