sync_engine/
backpressure.rs1#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
40pub enum BackpressureLevel {
41 Normal = 0,
42 Warn = 1,
43 Throttle = 2,
44 Critical = 3,
45 Emergency = 4,
46 Shutdown = 5,
47}
48
49impl BackpressureLevel {
50 #[must_use]
52 pub fn from_pressure(pressure: f64) -> Self {
53 match pressure {
54 p if p < 0.70 => Self::Normal,
55 p if p < 0.80 => Self::Warn,
56 p if p < 0.90 => Self::Throttle,
57 p if p < 0.95 => Self::Critical,
58 p if p < 0.98 => Self::Emergency,
59 _ => Self::Shutdown,
60 }
61 }
62
63 #[must_use]
65 pub fn should_accept_writes(&self) -> bool {
66 matches!(self, Self::Normal | Self::Warn | Self::Throttle)
67 }
68
69 #[must_use]
71 pub fn should_accept_reads(&self) -> bool {
72 !matches!(self, Self::Shutdown)
73 }
74
75 #[must_use]
77 pub fn eviction_multiplier(&self) -> f64 {
78 match self {
79 Self::Normal => 1.0,
80 Self::Warn => 1.5,
81 Self::Throttle => 2.0,
82 Self::Critical => 3.0,
83 Self::Emergency => 5.0,
84 Self::Shutdown => 10.0,
85 }
86 }
87
88 #[must_use]
90 pub fn http_status_code(&self) -> Option<u16> {
91 match self {
92 Self::Normal | Self::Warn => None,
93 Self::Throttle => Some(429),
94 Self::Critical | Self::Emergency | Self::Shutdown => Some(503),
95 }
96 }
97
98 pub fn retry_after_secs(&self) -> Option<u64> {
100 match self {
101 Self::Throttle => Some(1),
102 Self::Critical => Some(5),
103 Self::Emergency | Self::Shutdown => Some(30),
104 _ => None,
105 }
106 }
107
108 pub fn description(&self) -> &'static str {
109 match self {
110 Self::Normal => "Normal operation",
111 Self::Warn => "Warning - high memory usage",
112 Self::Throttle => "Throttling - rate limiting active",
113 Self::Critical => "Critical - writes rejected",
114 Self::Emergency => "Emergency - read-only mode",
115 Self::Shutdown => "Shutdown - graceful shutdown initiated",
116 }
117 }
118}
119
120impl std::fmt::Display for BackpressureLevel {
121 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
122 write!(f, "{:?}", self)
123 }
124}
125
126#[cfg(test)]
127mod tests {
128 use super::*;
129
130 #[test]
131 fn test_pressure_level_thresholds() {
132 assert_eq!(BackpressureLevel::from_pressure(0.0), BackpressureLevel::Normal);
133 assert_eq!(BackpressureLevel::from_pressure(0.69), BackpressureLevel::Normal);
134 assert_eq!(BackpressureLevel::from_pressure(0.70), BackpressureLevel::Warn);
135 assert_eq!(BackpressureLevel::from_pressure(0.79), BackpressureLevel::Warn);
136 assert_eq!(BackpressureLevel::from_pressure(0.80), BackpressureLevel::Throttle);
137 assert_eq!(BackpressureLevel::from_pressure(0.89), BackpressureLevel::Throttle);
138 assert_eq!(BackpressureLevel::from_pressure(0.90), BackpressureLevel::Critical);
139 assert_eq!(BackpressureLevel::from_pressure(0.94), BackpressureLevel::Critical);
140 assert_eq!(BackpressureLevel::from_pressure(0.95), BackpressureLevel::Emergency);
141 assert_eq!(BackpressureLevel::from_pressure(0.97), BackpressureLevel::Emergency);
142 assert_eq!(BackpressureLevel::from_pressure(0.98), BackpressureLevel::Shutdown);
143 assert_eq!(BackpressureLevel::from_pressure(1.0), BackpressureLevel::Shutdown);
144 }
145
146 #[test]
147 fn test_should_accept_writes() {
148 assert!(BackpressureLevel::Normal.should_accept_writes());
149 assert!(BackpressureLevel::Warn.should_accept_writes());
150 assert!(BackpressureLevel::Throttle.should_accept_writes());
151 assert!(!BackpressureLevel::Critical.should_accept_writes());
152 assert!(!BackpressureLevel::Emergency.should_accept_writes());
153 assert!(!BackpressureLevel::Shutdown.should_accept_writes());
154 }
155
156 #[test]
157 fn test_should_accept_reads() {
158 assert!(BackpressureLevel::Normal.should_accept_reads());
159 assert!(BackpressureLevel::Warn.should_accept_reads());
160 assert!(BackpressureLevel::Throttle.should_accept_reads());
161 assert!(BackpressureLevel::Critical.should_accept_reads());
162 assert!(BackpressureLevel::Emergency.should_accept_reads());
163 assert!(!BackpressureLevel::Shutdown.should_accept_reads());
164 }
165
166 #[test]
167 fn test_eviction_multiplier_increases_with_pressure() {
168 let levels = [
169 BackpressureLevel::Normal,
170 BackpressureLevel::Warn,
171 BackpressureLevel::Throttle,
172 BackpressureLevel::Critical,
173 BackpressureLevel::Emergency,
174 BackpressureLevel::Shutdown,
175 ];
176
177 for i in 1..levels.len() {
178 assert!(
179 levels[i].eviction_multiplier() >= levels[i - 1].eviction_multiplier(),
180 "eviction multiplier should increase with pressure"
181 );
182 }
183 }
184
185 #[test]
186 fn test_http_status_codes() {
187 assert_eq!(BackpressureLevel::Normal.http_status_code(), None);
188 assert_eq!(BackpressureLevel::Warn.http_status_code(), None);
189 assert_eq!(BackpressureLevel::Throttle.http_status_code(), Some(429));
190 assert_eq!(BackpressureLevel::Critical.http_status_code(), Some(503));
191 assert_eq!(BackpressureLevel::Emergency.http_status_code(), Some(503));
192 assert_eq!(BackpressureLevel::Shutdown.http_status_code(), Some(503));
193 }
194
195 #[test]
196 fn test_retry_after_increases_with_severity() {
197 assert_eq!(BackpressureLevel::Normal.retry_after_secs(), None);
198 assert_eq!(BackpressureLevel::Throttle.retry_after_secs(), Some(1));
199 assert_eq!(BackpressureLevel::Critical.retry_after_secs(), Some(5));
200 assert_eq!(BackpressureLevel::Emergency.retry_after_secs(), Some(30));
201 }
202
203 #[test]
204 fn test_level_ordering() {
205 assert!(BackpressureLevel::Normal < BackpressureLevel::Warn);
206 assert!(BackpressureLevel::Warn < BackpressureLevel::Throttle);
207 assert!(BackpressureLevel::Throttle < BackpressureLevel::Critical);
208 assert!(BackpressureLevel::Critical < BackpressureLevel::Emergency);
209 assert!(BackpressureLevel::Emergency < BackpressureLevel::Shutdown);
210 }
211}