sync_engine/
backpressure.rs1#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
37pub enum BackpressureLevel {
38 Normal = 0,
39 Warn = 1,
40 Throttle = 2,
41 Critical = 3,
42 Emergency = 4,
43 Shutdown = 5,
44}
45
46impl BackpressureLevel {
47 #[must_use]
49 pub fn from_pressure(pressure: f64) -> Self {
50 match pressure {
51 p if p < 0.70 => Self::Normal,
52 p if p < 0.80 => Self::Warn,
53 p if p < 0.90 => Self::Throttle,
54 p if p < 0.95 => Self::Critical,
55 p if p < 0.98 => Self::Emergency,
56 _ => Self::Shutdown,
57 }
58 }
59
60 #[must_use]
62 pub fn should_accept_writes(&self) -> bool {
63 matches!(self, Self::Normal | Self::Warn | Self::Throttle)
64 }
65
66 #[must_use]
68 pub fn should_accept_reads(&self) -> bool {
69 !matches!(self, Self::Shutdown)
70 }
71
72 #[must_use]
74 pub fn eviction_multiplier(&self) -> f64 {
75 match self {
76 Self::Normal => 1.0,
77 Self::Warn => 1.5,
78 Self::Throttle => 2.0,
79 Self::Critical => 3.0,
80 Self::Emergency => 5.0,
81 Self::Shutdown => 10.0,
82 }
83 }
84
85 #[must_use]
87 pub fn http_status_code(&self) -> Option<u16> {
88 match self {
89 Self::Normal | Self::Warn => None,
90 Self::Throttle => Some(429),
91 Self::Critical | Self::Emergency | Self::Shutdown => Some(503),
92 }
93 }
94
95 pub fn retry_after_secs(&self) -> Option<u64> {
97 match self {
98 Self::Throttle => Some(1),
99 Self::Critical => Some(5),
100 Self::Emergency | Self::Shutdown => Some(30),
101 _ => None,
102 }
103 }
104
105 pub fn description(&self) -> &'static str {
106 match self {
107 Self::Normal => "Normal operation",
108 Self::Warn => "Warning - high memory usage",
109 Self::Throttle => "Throttling - rate limiting active",
110 Self::Critical => "Critical - writes rejected",
111 Self::Emergency => "Emergency - read-only mode",
112 Self::Shutdown => "Shutdown - graceful shutdown initiated",
113 }
114 }
115}
116
117impl std::fmt::Display for BackpressureLevel {
118 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
119 write!(f, "{:?}", self)
120 }
121}
122
123#[cfg(test)]
124mod tests {
125 use super::*;
126
127 #[test]
128 fn test_pressure_level_thresholds() {
129 assert_eq!(BackpressureLevel::from_pressure(0.0), BackpressureLevel::Normal);
130 assert_eq!(BackpressureLevel::from_pressure(0.69), BackpressureLevel::Normal);
131 assert_eq!(BackpressureLevel::from_pressure(0.70), BackpressureLevel::Warn);
132 assert_eq!(BackpressureLevel::from_pressure(0.79), BackpressureLevel::Warn);
133 assert_eq!(BackpressureLevel::from_pressure(0.80), BackpressureLevel::Throttle);
134 assert_eq!(BackpressureLevel::from_pressure(0.89), BackpressureLevel::Throttle);
135 assert_eq!(BackpressureLevel::from_pressure(0.90), BackpressureLevel::Critical);
136 assert_eq!(BackpressureLevel::from_pressure(0.94), BackpressureLevel::Critical);
137 assert_eq!(BackpressureLevel::from_pressure(0.95), BackpressureLevel::Emergency);
138 assert_eq!(BackpressureLevel::from_pressure(0.97), BackpressureLevel::Emergency);
139 assert_eq!(BackpressureLevel::from_pressure(0.98), BackpressureLevel::Shutdown);
140 assert_eq!(BackpressureLevel::from_pressure(1.0), BackpressureLevel::Shutdown);
141 }
142
143 #[test]
144 fn test_should_accept_writes() {
145 assert!(BackpressureLevel::Normal.should_accept_writes());
146 assert!(BackpressureLevel::Warn.should_accept_writes());
147 assert!(BackpressureLevel::Throttle.should_accept_writes());
148 assert!(!BackpressureLevel::Critical.should_accept_writes());
149 assert!(!BackpressureLevel::Emergency.should_accept_writes());
150 assert!(!BackpressureLevel::Shutdown.should_accept_writes());
151 }
152
153 #[test]
154 fn test_should_accept_reads() {
155 assert!(BackpressureLevel::Normal.should_accept_reads());
156 assert!(BackpressureLevel::Warn.should_accept_reads());
157 assert!(BackpressureLevel::Throttle.should_accept_reads());
158 assert!(BackpressureLevel::Critical.should_accept_reads());
159 assert!(BackpressureLevel::Emergency.should_accept_reads());
160 assert!(!BackpressureLevel::Shutdown.should_accept_reads());
161 }
162
163 #[test]
164 fn test_eviction_multiplier_increases_with_pressure() {
165 let levels = [
166 BackpressureLevel::Normal,
167 BackpressureLevel::Warn,
168 BackpressureLevel::Throttle,
169 BackpressureLevel::Critical,
170 BackpressureLevel::Emergency,
171 BackpressureLevel::Shutdown,
172 ];
173
174 for i in 1..levels.len() {
175 assert!(
176 levels[i].eviction_multiplier() >= levels[i - 1].eviction_multiplier(),
177 "eviction multiplier should increase with pressure"
178 );
179 }
180 }
181
182 #[test]
183 fn test_http_status_codes() {
184 assert_eq!(BackpressureLevel::Normal.http_status_code(), None);
185 assert_eq!(BackpressureLevel::Warn.http_status_code(), None);
186 assert_eq!(BackpressureLevel::Throttle.http_status_code(), Some(429));
187 assert_eq!(BackpressureLevel::Critical.http_status_code(), Some(503));
188 assert_eq!(BackpressureLevel::Emergency.http_status_code(), Some(503));
189 assert_eq!(BackpressureLevel::Shutdown.http_status_code(), Some(503));
190 }
191
192 #[test]
193 fn test_retry_after_increases_with_severity() {
194 assert_eq!(BackpressureLevel::Normal.retry_after_secs(), None);
195 assert_eq!(BackpressureLevel::Throttle.retry_after_secs(), Some(1));
196 assert_eq!(BackpressureLevel::Critical.retry_after_secs(), Some(5));
197 assert_eq!(BackpressureLevel::Emergency.retry_after_secs(), Some(30));
198 }
199
200 #[test]
201 fn test_level_ordering() {
202 assert!(BackpressureLevel::Normal < BackpressureLevel::Warn);
203 assert!(BackpressureLevel::Warn < BackpressureLevel::Throttle);
204 assert!(BackpressureLevel::Throttle < BackpressureLevel::Critical);
205 assert!(BackpressureLevel::Critical < BackpressureLevel::Emergency);
206 assert!(BackpressureLevel::Emergency < BackpressureLevel::Shutdown);
207 }
208}