1use anyhow::{anyhow, Result};
13use chrono::{DateTime, Duration as ChronoDuration, Utc};
14use serde::{Deserialize, Serialize};
15use std::collections::VecDeque;
16use std::sync::Arc;
17use tokio::sync::{Mutex, Semaphore};
18use tracing::{debug, warn};
19
20#[derive(Debug, Clone, Serialize, Deserialize)]
24pub enum BackpressureStrategy {
25 DropOldest,
27 DropNewest,
29 Block,
31 ExponentialBackoff {
33 initial_delay_ms: u64,
34 max_delay_ms: u64,
35 multiplier: f64,
36 },
37 Adaptive {
39 target_throughput: f64,
40 adjustment_factor: f64,
41 },
42}
43
44#[derive(Debug, Clone, Copy, PartialEq, Eq)]
46pub enum FlowControlSignal {
47 Proceed,
49 SlowDown,
51 Stop,
53}
54
55#[derive(Debug, Clone, Serialize, Deserialize)]
57pub struct BackpressureConfig {
58 pub max_buffer_size: usize,
60 pub strategy: BackpressureStrategy,
62 pub high_water_mark: f64,
64 pub low_water_mark: f64,
66 pub enable_adaptive: bool,
68 pub measurement_window: ChronoDuration,
70}
71
72impl Default for BackpressureConfig {
73 fn default() -> Self {
74 Self {
75 max_buffer_size: 10000,
76 strategy: BackpressureStrategy::Block,
77 high_water_mark: 0.8,
78 low_water_mark: 0.2,
79 enable_adaptive: true,
80 measurement_window: ChronoDuration::seconds(10),
81 }
82 }
83}
84
85#[derive(Debug, Clone, Default)]
87pub struct BackpressureStats {
88 pub events_received: u64,
89 pub events_processed: u64,
90 pub events_dropped: u64,
91 pub events_blocked: u64,
92 pub buffer_size: usize,
93 pub buffer_utilization: f64,
94 pub current_throughput: f64,
95 pub backpressure_events: u64,
96 pub avg_latency_ms: f64,
97}
98
99type TimestampedBuffer<T> = Arc<Mutex<VecDeque<(T, DateTime<Utc>)>>>;
101
102type ThroughputHistory = Arc<Mutex<VecDeque<(DateTime<Utc>, u64)>>>;
104
105pub struct BackpressureController<T> {
107 config: BackpressureConfig,
108 buffer: TimestampedBuffer<T>,
109 stats: Arc<Mutex<BackpressureStats>>,
110 flow_control: Arc<Mutex<FlowControlSignal>>,
111 semaphore: Arc<Semaphore>,
112 throughput_history: ThroughputHistory,
113}
114
115impl<T: Clone + Send> BackpressureController<T> {
116 pub fn new(config: BackpressureConfig) -> Self {
118 let max_permits = config.max_buffer_size;
119
120 Self {
121 config,
122 buffer: Arc::new(Mutex::new(VecDeque::new())),
123 stats: Arc::new(Mutex::new(BackpressureStats::default())),
124 flow_control: Arc::new(Mutex::new(FlowControlSignal::Proceed)),
125 semaphore: Arc::new(Semaphore::new(max_permits)),
126 throughput_history: Arc::new(Mutex::new(VecDeque::new())),
127 }
128 }
129
130 pub async fn offer(&self, event: T) -> Result<()> {
132 let mut stats = self.stats.lock().await;
133 stats.events_received += 1;
134 drop(stats);
135
136 match &self.config.strategy {
137 BackpressureStrategy::DropOldest => self.offer_drop_oldest(event).await,
138 BackpressureStrategy::DropNewest => self.offer_drop_newest(event).await,
139 BackpressureStrategy::Block => self.offer_blocking(event).await,
140 BackpressureStrategy::ExponentialBackoff {
141 initial_delay_ms,
142 max_delay_ms,
143 multiplier,
144 } => {
145 self.offer_with_backoff(event, *initial_delay_ms, *max_delay_ms, *multiplier)
146 .await
147 }
148 BackpressureStrategy::Adaptive {
149 target_throughput,
150 adjustment_factor,
151 } => {
152 self.offer_adaptive(event, *target_throughput, *adjustment_factor)
153 .await
154 }
155 }
156 }
157
158 async fn offer_drop_oldest(&self, event: T) -> Result<()> {
160 let mut buffer = self.buffer.lock().await;
161
162 if buffer.len() >= self.config.max_buffer_size {
163 buffer.pop_front();
165
166 let mut stats = self.stats.lock().await;
167 stats.events_dropped += 1;
168 drop(stats);
169
170 warn!("Buffer full, dropped oldest event");
171 }
172
173 buffer.push_back((event, Utc::now()));
174 self.update_flow_control(buffer.len()).await;
175
176 Ok(())
177 }
178
179 async fn offer_drop_newest(&self, event: T) -> Result<()> {
181 let mut buffer = self.buffer.lock().await;
182
183 if buffer.len() >= self.config.max_buffer_size {
184 let mut stats = self.stats.lock().await;
185 stats.events_dropped += 1;
186 drop(stats);
187
188 warn!("Buffer full, dropped newest event");
189 return Ok(());
190 }
191
192 buffer.push_back((event, Utc::now()));
193 self.update_flow_control(buffer.len()).await;
194
195 Ok(())
196 }
197
198 async fn offer_blocking(&self, event: T) -> Result<()> {
200 let _permit = self
202 .semaphore
203 .acquire()
204 .await
205 .map_err(|e| anyhow!("Failed to acquire semaphore: {}", e))?;
206
207 let mut buffer = self.buffer.lock().await;
208 buffer.push_back((event, Utc::now()));
209
210 let buffer_size = buffer.len();
211 drop(buffer);
212
213 self.update_flow_control(buffer_size).await;
214
215 Ok(())
216 }
217
218 async fn offer_with_backoff(
220 &self,
221 event: T,
222 initial_delay_ms: u64,
223 max_delay_ms: u64,
224 multiplier: f64,
225 ) -> Result<()> {
226 let mut delay_ms = initial_delay_ms;
227 let mut retries = 0;
228 const MAX_RETRIES: u32 = 10;
229
230 loop {
231 let buffer = self.buffer.lock().await;
232 let buffer_size = buffer.len();
233 drop(buffer);
234
235 if buffer_size < self.config.max_buffer_size {
236 let mut buffer = self.buffer.lock().await;
237 buffer.push_back((event, Utc::now()));
238 drop(buffer);
239
240 self.update_flow_control(buffer_size + 1).await;
241 return Ok(());
242 }
243
244 if retries >= MAX_RETRIES {
245 let mut stats = self.stats.lock().await;
246 stats.events_dropped += 1;
247 return Err(anyhow!("Max retries exceeded, dropping event"));
248 }
249
250 tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms)).await;
252
253 delay_ms = ((delay_ms as f64 * multiplier) as u64).min(max_delay_ms);
254 retries += 1;
255
256 let mut stats = self.stats.lock().await;
257 stats.events_blocked += 1;
258 drop(stats);
259 }
260 }
261
262 async fn offer_adaptive(
264 &self,
265 event: T,
266 target_throughput: f64,
267 adjustment_factor: f64,
268 ) -> Result<()> {
269 let current_throughput = self.measure_throughput().await;
271
272 if current_throughput > target_throughput {
274 let delay_ms =
275 ((current_throughput / target_throughput - 1.0) * adjustment_factor) as u64;
276 tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms)).await;
277 }
278
279 let mut buffer = self.buffer.lock().await;
281
282 if buffer.len() >= self.config.max_buffer_size {
283 let mut stats = self.stats.lock().await;
284 stats.events_dropped += 1;
285 drop(stats);
286
287 return Err(anyhow!("Buffer full even with adaptive throttling"));
288 }
289
290 buffer.push_back((event, Utc::now()));
291 let buffer_size = buffer.len();
292 drop(buffer);
293
294 self.update_flow_control(buffer_size).await;
295
296 Ok(())
297 }
298
299 pub async fn poll(&self) -> Result<Option<T>> {
301 let mut buffer = self.buffer.lock().await;
302
303 if let Some((event, timestamp)) = buffer.pop_front() {
304 let buffer_size = buffer.len();
305 drop(buffer);
306
307 self.semaphore.add_permits(1);
309
310 let mut stats = self.stats.lock().await;
312 stats.events_processed += 1;
313
314 let latency = (Utc::now() - timestamp).num_milliseconds() as f64;
315 let alpha = 0.1;
316 stats.avg_latency_ms = alpha * latency + (1.0 - alpha) * stats.avg_latency_ms;
317
318 drop(stats);
319
320 self.update_flow_control(buffer_size).await;
321 self.record_throughput().await;
322
323 Ok(Some(event))
324 } else {
325 Ok(None)
326 }
327 }
328
329 async fn update_flow_control(&self, buffer_size: usize) {
331 let utilization = buffer_size as f64 / self.config.max_buffer_size as f64;
332
333 let signal = if utilization >= self.config.high_water_mark {
334 FlowControlSignal::Stop
335 } else if utilization >= self.config.low_water_mark {
336 FlowControlSignal::SlowDown
337 } else {
338 FlowControlSignal::Proceed
339 };
340
341 let mut flow_control = self.flow_control.lock().await;
342 if *flow_control != signal {
343 debug!(
344 "Flow control signal changed: {:?} -> {:?}",
345 *flow_control, signal
346 );
347
348 if signal != FlowControlSignal::Proceed {
349 let mut stats = self.stats.lock().await;
350 stats.backpressure_events += 1;
351 }
352 }
353 *flow_control = signal;
354
355 let mut stats = self.stats.lock().await;
357 stats.buffer_size = buffer_size;
358 stats.buffer_utilization = utilization;
359 }
360
361 async fn record_throughput(&self) {
363 let now = Utc::now();
364 let mut history = self.throughput_history.lock().await;
365
366 history.push_back((now, 1));
367
368 let window_start = now - self.config.measurement_window;
370 while let Some((timestamp, _)) = history.front() {
371 if *timestamp < window_start {
372 history.pop_front();
373 } else {
374 break;
375 }
376 }
377 }
378
379 async fn measure_throughput(&self) -> f64 {
381 let now = Utc::now();
382 let history = self.throughput_history.lock().await;
383
384 if history.is_empty() {
385 return 0.0;
386 }
387
388 let window_start = now - self.config.measurement_window;
389 let count: u64 = history
390 .iter()
391 .filter(|(timestamp, _)| *timestamp >= window_start)
392 .map(|(_, count)| count)
393 .sum();
394
395 let elapsed_seconds = self.config.measurement_window.num_seconds() as f64;
396 count as f64 / elapsed_seconds
397 }
398
399 pub async fn flow_control_signal(&self) -> FlowControlSignal {
401 *self.flow_control.lock().await
402 }
403
404 pub async fn stats(&self) -> BackpressureStats {
406 let stats = self.stats.lock().await;
407 let mut result = stats.clone();
408
409 drop(stats);
411 result.current_throughput = self.measure_throughput().await;
412
413 result
414 }
415
416 pub async fn buffer_size(&self) -> usize {
418 self.buffer.lock().await.len()
419 }
420
421 pub async fn clear(&self) {
423 let mut buffer = self.buffer.lock().await;
424 let cleared_count = buffer.len();
425 buffer.clear();
426
427 self.semaphore.add_permits(cleared_count);
429
430 let mut stats = self.stats.lock().await;
431 stats.buffer_size = 0;
432 stats.buffer_utilization = 0.0;
433 }
434}
435
436pub struct RateLimiter {
438 tokens: Arc<Mutex<f64>>,
439 max_tokens: f64,
440 refill_rate: f64, last_refill: Arc<Mutex<DateTime<Utc>>>,
442}
443
444impl RateLimiter {
445 pub fn new(max_tokens: f64, refill_rate: f64) -> Self {
447 Self {
448 tokens: Arc::new(Mutex::new(max_tokens)),
449 max_tokens,
450 refill_rate,
451 last_refill: Arc::new(Mutex::new(Utc::now())),
452 }
453 }
454
455 pub async fn try_acquire(&self) -> bool {
457 self.refill_tokens().await;
458
459 let mut tokens = self.tokens.lock().await;
460 if *tokens >= 1.0 {
461 *tokens -= 1.0;
462 true
463 } else {
464 false
465 }
466 }
467
468 pub async fn acquire(&self) -> Result<()> {
470 loop {
471 if self.try_acquire().await {
472 return Ok(());
473 }
474
475 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
477 }
478 }
479
480 async fn refill_tokens(&self) {
482 let now = Utc::now();
483 let mut last_refill = self.last_refill.lock().await;
484
485 let elapsed = (now - *last_refill).num_milliseconds() as f64 / 1000.0;
486 let new_tokens = elapsed * self.refill_rate;
487
488 if new_tokens > 0.0 {
489 let mut tokens = self.tokens.lock().await;
490 *tokens = (*tokens + new_tokens).min(self.max_tokens);
491 *last_refill = now;
492 }
493 }
494
495 pub async fn available_tokens(&self) -> f64 {
497 self.refill_tokens().await;
498 *self.tokens.lock().await
499 }
500}
501
502#[cfg(test)]
503mod tests {
504 use super::*;
505
506 #[tokio::test]
507 async fn test_backpressure_drop_oldest() {
508 let config = BackpressureConfig {
509 max_buffer_size: 3,
510 strategy: BackpressureStrategy::DropOldest,
511 ..Default::default()
512 };
513
514 let controller = BackpressureController::new(config);
515
516 for i in 0..5 {
518 controller.offer(i).await.unwrap();
519 }
520
521 assert_eq!(controller.buffer_size().await, 3);
523
524 let event = controller.poll().await.unwrap().unwrap();
526 assert_eq!(event, 2);
527 }
528
529 #[tokio::test]
530 async fn test_backpressure_drop_newest() {
531 let config = BackpressureConfig {
532 max_buffer_size: 3,
533 strategy: BackpressureStrategy::DropNewest,
534 ..Default::default()
535 };
536
537 let controller = BackpressureController::new(config);
538
539 for i in 0..5 {
541 controller.offer(i).await.unwrap();
542 }
543
544 assert_eq!(controller.buffer_size().await, 3);
546
547 let event = controller.poll().await.unwrap().unwrap();
548 assert_eq!(event, 0);
549 }
550
551 #[tokio::test]
552 async fn test_flow_control_signals() {
553 let config = BackpressureConfig {
554 max_buffer_size: 100,
555 high_water_mark: 0.8,
556 low_water_mark: 0.2,
557 ..Default::default()
558 };
559
560 let controller = BackpressureController::new(config);
561
562 assert_eq!(
564 controller.flow_control_signal().await,
565 FlowControlSignal::Proceed
566 );
567
568 for i in 0..30 {
570 controller.offer(i).await.unwrap();
571 }
572
573 assert_eq!(
574 controller.flow_control_signal().await,
575 FlowControlSignal::SlowDown
576 );
577
578 for i in 30..85 {
580 controller.offer(i).await.unwrap();
581 }
582
583 assert_eq!(
584 controller.flow_control_signal().await,
585 FlowControlSignal::Stop
586 );
587 }
588
589 #[tokio::test]
590 async fn test_rate_limiter() {
591 let limiter = RateLimiter::new(10.0, 10.0); for _ in 0..10 {
595 assert!(limiter.try_acquire().await);
596 }
597
598 assert!(!limiter.try_acquire().await);
600
601 tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
603
604 assert!(limiter.try_acquire().await);
606 }
607}