alpine/stream/
recovery.rs1use crate::stream::network::NetworkConditions;
8
9const SUSTAINED_LOSS_THRESHOLD: f64 = 0.25;
10const RECOVERY_CLEAR_LOSS_THRESHOLD: f64 = 0.05;
11const BURST_LOSS_THRESHOLD: u64 = 3;
12const RECOVERY_CLEAR_BURST_THRESHOLD: u64 = 1;
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16pub enum RecoveryReason {
17 SustainedLoss,
19 BurstLoss,
21}
22
23impl RecoveryReason {
24 pub(crate) fn as_str(&self) -> &'static str {
25 match self {
26 RecoveryReason::SustainedLoss => "sustained_loss",
27 RecoveryReason::BurstLoss => "burst_loss",
28 }
29 }
30}
31
32#[derive(Debug, Clone, Copy, PartialEq, Eq)]
34pub enum RecoveryEvent {
35 RecoveryStarted(RecoveryReason),
37 RecoveryComplete(RecoveryReason),
39}
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42enum RecoveryState {
43 Idle,
44 Recovering(RecoveryReason),
45}
46
47#[derive(Debug)]
49pub struct RecoveryMonitor {
50 state: RecoveryState,
51}
52
53impl RecoveryMonitor {
54 pub fn new() -> Self {
56 Self {
57 state: RecoveryState::Idle,
58 }
59 }
60
61 pub fn feed(&mut self, conditions: &NetworkConditions) -> Option<RecoveryEvent> {
63 let metrics = conditions.metrics();
64 let gap = conditions.max_loss_gap();
65 match self.state {
66 RecoveryState::Idle => {
67 if gap >= BURST_LOSS_THRESHOLD {
68 self.state = RecoveryState::Recovering(RecoveryReason::BurstLoss);
69 return Some(RecoveryEvent::RecoveryStarted(RecoveryReason::BurstLoss));
70 }
71 if metrics.loss_ratio >= SUSTAINED_LOSS_THRESHOLD {
72 self.state = RecoveryState::Recovering(RecoveryReason::SustainedLoss);
73 return Some(RecoveryEvent::RecoveryStarted(
74 RecoveryReason::SustainedLoss,
75 ));
76 }
77 }
78 RecoveryState::Recovering(reason) => {
79 if metrics.loss_ratio <= RECOVERY_CLEAR_LOSS_THRESHOLD
80 && gap <= RECOVERY_CLEAR_BURST_THRESHOLD
81 {
82 self.state = RecoveryState::Idle;
83 return Some(RecoveryEvent::RecoveryComplete(reason));
84 }
85 }
86 }
87 None
88 }
89
90 pub fn is_recovering(&self) -> bool {
92 matches!(self.state, RecoveryState::Recovering(_))
93 }
94
95 pub fn active_reason(&self) -> Option<RecoveryReason> {
97 match self.state {
98 RecoveryState::Recovering(reason) => Some(reason),
99 RecoveryState::Idle => None,
100 }
101 }
102}
103
104#[cfg(test)]
105mod tests {
106 use super::*;
107 use crate::stream::network::NetworkConditions;
108
109 fn low_loss_conditions() -> NetworkConditions {
110 let mut cond = NetworkConditions::new();
111 cond.record_frame(10, 0, 1_000);
112 cond.record_frame(11, 1_000, 2_000);
113 cond.record_frame(12, 2_000, 3_000);
114 cond
115 }
116
117 #[test]
118 fn starts_and_completes_on_loss_ratio() {
119 let mut monitor = RecoveryMonitor::new();
120 let mut cond = NetworkConditions::new();
121 cond.record_frame(1, 0, 0);
122 cond.record_frame(2, 1_000, 0);
123 cond.record_frame(4, 2_000, 0);
124 let event = monitor.feed(&cond);
125 assert_eq!(
126 event,
127 Some(RecoveryEvent::RecoveryStarted(
128 RecoveryReason::SustainedLoss
129 ))
130 );
131 let complete = monitor.feed(&low_loss_conditions());
132 assert_eq!(
133 complete,
134 Some(RecoveryEvent::RecoveryComplete(
135 RecoveryReason::SustainedLoss
136 ))
137 );
138 }
139
140 #[test]
141 fn burst_gap_triggers_recovery() {
142 let mut monitor = RecoveryMonitor::new();
143 let mut cond = NetworkConditions::new();
144 cond.record_frame(1, 0, 0);
145 cond.record_frame(5, 1_000, 0);
146 let event = monitor.feed(&cond);
147 assert_eq!(
148 event,
149 Some(RecoveryEvent::RecoveryStarted(RecoveryReason::BurstLoss))
150 );
151 let complete = monitor.feed(&low_loss_conditions());
152 assert_eq!(
153 complete,
154 Some(RecoveryEvent::RecoveryComplete(RecoveryReason::BurstLoss))
155 );
156 }
157
158 #[test]
159 fn recovery_idempotent_until_cleared() {
160 let mut monitor = RecoveryMonitor::new();
161 let mut cond = NetworkConditions::new();
162 cond.record_frame(1, 0, 0);
163 cond.record_frame(4, 1_000, 0);
164 assert!(matches!(
165 monitor.feed(&cond),
166 Some(RecoveryEvent::RecoveryStarted(_))
167 ));
168 assert_eq!(monitor.feed(&cond), None);
169 }
170}