oxigdal_streaming/core/
backpressure.rs1use crate::error::{Result, StreamingError};
4use serde::{Deserialize, Serialize};
5use std::sync::Arc;
6use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
7use std::time::{Duration, Instant};
8use tokio::sync::RwLock;
9
10#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
12pub enum BackpressureStrategy {
13 Block,
15
16 DropOldest,
18
19 DropNewest,
21
22 Fail,
24
25 Adaptive,
27}
28
29#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct BackpressureConfig {
32 pub strategy: BackpressureStrategy,
34
35 pub high_watermark: f64,
37
38 pub low_watermark: f64,
40
41 pub max_latency: Duration,
43
44 pub sample_window: Duration,
46
47 pub adaptive: bool,
49}
50
51impl Default for BackpressureConfig {
52 fn default() -> Self {
53 Self {
54 strategy: BackpressureStrategy::Block,
55 high_watermark: 0.8,
56 low_watermark: 0.2,
57 max_latency: Duration::from_secs(1),
58 sample_window: Duration::from_secs(10),
59 adaptive: true,
60 }
61 }
62}
63
64#[derive(Debug, Clone)]
66pub struct LoadMetrics {
67 pub buffer_utilization: f64,
69
70 pub avg_latency: Duration,
72
73 pub peak_latency: Duration,
75
76 pub throughput: f64,
78
79 pub dropped_elements: u64,
81
82 pub backpressure_events: u64,
84}
85
86impl Default for LoadMetrics {
87 fn default() -> Self {
88 Self {
89 buffer_utilization: 0.0,
90 avg_latency: Duration::ZERO,
91 peak_latency: Duration::ZERO,
92 throughput: 0.0,
93 dropped_elements: 0,
94 backpressure_events: 0,
95 }
96 }
97}
98
99pub struct BackpressureManager {
101 config: BackpressureConfig,
102 metrics: Arc<RwLock<LoadMetrics>>,
103 buffer_capacity: AtomicUsize,
104 buffer_size: AtomicUsize,
105 elements_processed: AtomicU64,
106 elements_dropped: AtomicU64,
107 backpressure_events: AtomicU64,
108 last_sample: Arc<RwLock<Instant>>,
109 sample_start: Instant,
110}
111
112impl BackpressureManager {
113 pub fn new(config: BackpressureConfig, buffer_capacity: usize) -> Self {
115 Self {
116 config,
117 metrics: Arc::new(RwLock::new(LoadMetrics::default())),
118 buffer_capacity: AtomicUsize::new(buffer_capacity),
119 buffer_size: AtomicUsize::new(0),
120 elements_processed: AtomicU64::new(0),
121 elements_dropped: AtomicU64::new(0),
122 backpressure_events: AtomicU64::new(0),
123 last_sample: Arc::new(RwLock::new(Instant::now())),
124 sample_start: Instant::now(),
125 }
126 }
127
128 pub async fn should_apply_backpressure(&self) -> bool {
130 let utilization = self.buffer_utilization();
131 utilization >= self.config.high_watermark
132 }
133
134 pub async fn can_release_backpressure(&self) -> bool {
136 let utilization = self.buffer_utilization();
137 utilization <= self.config.low_watermark
138 }
139
140 pub async fn handle_element_arrival(&self) -> Result<bool> {
142 let current_size = self.buffer_size.load(Ordering::Relaxed);
143 let capacity = self.buffer_capacity.load(Ordering::Relaxed);
144
145 if current_size >= capacity {
146 self.backpressure_events.fetch_add(1, Ordering::Relaxed);
147
148 match self.config.strategy {
149 BackpressureStrategy::Block => {
150 return Ok(false);
151 }
152 BackpressureStrategy::DropOldest | BackpressureStrategy::DropNewest => {
153 self.elements_dropped.fetch_add(1, Ordering::Relaxed);
154 return Ok(true);
155 }
156 BackpressureStrategy::Fail => {
157 return Err(StreamingError::BufferFull);
158 }
159 BackpressureStrategy::Adaptive => {
160 if self.should_apply_backpressure().await {
161 return Ok(false);
162 }
163 }
164 }
165 }
166
167 self.buffer_size.fetch_add(1, Ordering::Relaxed);
168 Ok(true)
169 }
170
171 pub async fn handle_element_processed(&self, latency: Duration) {
173 self.buffer_size.fetch_sub(1, Ordering::Relaxed);
174 self.elements_processed.fetch_add(1, Ordering::Relaxed);
175
176 self.update_metrics(latency).await;
178 }
179
180 async fn update_metrics(&self, latency: Duration) {
182 let now = Instant::now();
183 let last_sample = *self.last_sample.read().await;
184
185 if now.duration_since(last_sample) >= self.config.sample_window {
186 let mut metrics = self.metrics.write().await;
187 let mut last = self.last_sample.write().await;
188
189 metrics.buffer_utilization = self.buffer_utilization();
190 metrics.dropped_elements = self.elements_dropped.load(Ordering::Relaxed);
191 metrics.backpressure_events = self.backpressure_events.load(Ordering::Relaxed);
192
193 let elapsed = now.duration_since(self.sample_start).as_secs_f64();
194 let processed = self.elements_processed.load(Ordering::Relaxed);
195 metrics.throughput = processed as f64 / elapsed;
196
197 if latency > metrics.peak_latency {
198 metrics.peak_latency = latency;
199 }
200
201 let alpha = 0.1;
203 let new_latency_secs = latency.as_secs_f64();
204 let old_latency_secs = metrics.avg_latency.as_secs_f64();
205 let avg_latency_secs = alpha * new_latency_secs + (1.0 - alpha) * old_latency_secs;
206 metrics.avg_latency = Duration::from_secs_f64(avg_latency_secs);
207
208 *last = now;
209 }
210 }
211
212 fn buffer_utilization(&self) -> f64 {
214 let size = self.buffer_size.load(Ordering::Relaxed);
215 let capacity = self.buffer_capacity.load(Ordering::Relaxed);
216
217 if capacity == 0 {
218 0.0
219 } else {
220 size as f64 / capacity as f64
221 }
222 }
223
224 pub async fn metrics(&self) -> LoadMetrics {
226 self.metrics.read().await.clone()
227 }
228
229 pub fn set_capacity(&self, capacity: usize) {
231 self.buffer_capacity.store(capacity, Ordering::Relaxed);
232 }
233
234 pub fn capacity(&self) -> usize {
236 self.buffer_capacity.load(Ordering::Relaxed)
237 }
238
239 pub fn size(&self) -> usize {
241 self.buffer_size.load(Ordering::Relaxed)
242 }
243
244 pub async fn reset_metrics(&self) {
246 let mut metrics = self.metrics.write().await;
247 *metrics = LoadMetrics::default();
248
249 self.elements_processed.store(0, Ordering::Relaxed);
250 self.elements_dropped.store(0, Ordering::Relaxed);
251 self.backpressure_events.store(0, Ordering::Relaxed);
252 }
253
254 pub async fn adjust_capacity_adaptive(&self) {
256 let utilization = self.buffer_utilization();
258 let metrics = self.metrics().await;
259
260 if utilization > self.config.high_watermark && metrics.avg_latency < self.config.max_latency
261 {
262 let current = self.buffer_capacity.load(Ordering::Relaxed);
263 let new_capacity = (current as f64 * 1.2) as usize;
264 self.buffer_capacity.store(new_capacity, Ordering::Relaxed);
265 } else if utilization < self.config.low_watermark {
266 let current = self.buffer_capacity.load(Ordering::Relaxed);
267 let new_capacity = ((current as f64 * 0.8) as usize).max(64);
268 self.buffer_capacity.store(new_capacity, Ordering::Relaxed);
269 }
270 }
271}
272
273#[cfg(test)]
274mod tests {
275 use super::*;
276
277 #[tokio::test]
278 async fn test_backpressure_manager_creation() {
279 let config = BackpressureConfig::default();
280 let manager = BackpressureManager::new(config, 1000);
281
282 assert_eq!(manager.capacity(), 1000);
283 assert_eq!(manager.size(), 0);
284 }
285
286 #[tokio::test]
287 async fn test_buffer_utilization() {
288 let config = BackpressureConfig::default();
289 let manager = BackpressureManager::new(config, 100);
290
291 assert_eq!(manager.buffer_utilization(), 0.0);
292
293 for _ in 0..50 {
294 manager
295 .handle_element_arrival()
296 .await
297 .expect("backpressure element arrival should succeed");
298 }
299
300 assert!((manager.buffer_utilization() - 0.5).abs() < 0.01);
301 }
302
303 #[tokio::test]
304 async fn test_backpressure_application() {
305 let config = BackpressureConfig {
306 high_watermark: 0.5,
307 ..Default::default()
308 };
309 let manager = BackpressureManager::new(config, 100);
310
311 for _ in 0..55 {
312 manager
313 .handle_element_arrival()
314 .await
315 .expect("backpressure element arrival should succeed");
316 }
317
318 assert!(manager.should_apply_backpressure().await);
319 }
320
321 #[tokio::test]
322 async fn test_adaptive_capacity_adjustment() {
323 let config = BackpressureConfig::default();
324 let manager = BackpressureManager::new(config, 100);
325
326 let initial_capacity = manager.capacity();
327
328 for _ in 0..95 {
329 manager
330 .handle_element_arrival()
331 .await
332 .expect("backpressure element arrival should succeed");
333 }
334
335 manager.adjust_capacity_adaptive().await;
336 let new_capacity = manager.capacity();
337
338 assert!(new_capacity > initial_capacity);
339 }
340}