1use std::num::NonZeroU32;
31use std::sync::Arc;
32use std::time::Duration;
33use governor::{Quota, RateLimiter as GovLimiter, state::{InMemoryState, NotKeyed}, clock::DefaultClock, middleware::NoOpMiddleware};
34use tokio::sync::{Semaphore, OwnedSemaphorePermit};
35
36#[derive(Debug, Clone)]
39pub struct RetryConfig {
40 pub max_attempts: usize,
43
44 pub initial_delay: Duration,
46
47 pub max_delay: Duration,
49
50 pub backoff_factor: f64,
52
53 pub connection_timeout: Duration,
55}
56
57impl Default for RetryConfig {
58 fn default() -> Self {
59 Self {
60 max_attempts: 10,
61 initial_delay: Duration::from_millis(100),
62 max_delay: Duration::from_secs(30),
63 backoff_factor: 2.0,
64 connection_timeout: Duration::from_secs(5),
65 }
66 }
67}
68
69impl RetryConfig {
70 pub fn startup() -> Self {
88 Self {
89 max_attempts: 20,
90 initial_delay: Duration::from_millis(500),
91 max_delay: Duration::from_secs(30),
92 backoff_factor: 1.5,
93 connection_timeout: Duration::from_secs(10),
94 }
95 }
96
97 pub fn daemon() -> Self {
122 Self {
123 max_attempts: usize::MAX, initial_delay: Duration::from_secs(1),
125 max_delay: Duration::from_secs(300), backoff_factor: 2.0,
127 connection_timeout: Duration::from_secs(30),
128 }
129 }
130
131 pub fn testing() -> Self {
135 Self {
136 max_attempts: 3,
137 initial_delay: Duration::from_millis(10),
138 max_delay: Duration::from_millis(100),
139 backoff_factor: 2.0,
140 connection_timeout: Duration::from_millis(500), }
142 }
143
144 pub fn delay_for_attempt(&self, attempt: usize) -> Duration {
146 if attempt == 0 {
147 return self.initial_delay;
148 }
149
150 let multiplier = self.backoff_factor.powi((attempt - 1) as i32);
151 let delay_secs = self.initial_delay.as_secs_f64() * multiplier;
152 let delay = Duration::from_secs_f64(delay_secs);
153
154 std::cmp::min(delay, self.max_delay)
155 }
156}
157
158#[derive(Debug, Clone)]
167pub struct RateLimitConfig {
168 pub burst_size: u32,
170
171 pub refill_rate: u32,
173}
174
175impl Default for RateLimitConfig {
176 fn default() -> Self {
178 Self {
179 burst_size: 100,
180 refill_rate: 1000,
181 }
182 }
183}
184
185impl RateLimitConfig {
186 pub fn conservative() -> Self {
188 Self {
189 burst_size: 10,
190 refill_rate: 100,
191 }
192 }
193
194 pub fn high_throughput() -> Self {
196 Self {
197 burst_size: 500,
198 refill_rate: 10_000,
199 }
200 }
201
202 pub fn unlimited() -> Self {
204 Self {
205 burst_size: u32::MAX,
206 refill_rate: u32::MAX,
207 }
208 }
209}
210
211pub struct RateLimiter {
232 limiter: GovLimiter<NotKeyed, InMemoryState, DefaultClock, NoOpMiddleware>,
233 config: RateLimitConfig,
234}
235
236impl RateLimiter {
237 pub fn new(config: RateLimitConfig) -> Self {
239 let quota = Quota::per_second(NonZeroU32::new(config.refill_rate).unwrap_or(NonZeroU32::MIN))
241 .allow_burst(NonZeroU32::new(config.burst_size).unwrap_or(NonZeroU32::MIN));
242
243 let limiter = GovLimiter::direct(quota);
244
245 Self { limiter, config }
246 }
247
248 pub async fn acquire(&self) {
252 self.limiter.until_ready().await;
253 }
254
255 pub fn try_acquire(&self) -> bool {
259 self.limiter.check().is_ok()
260 }
261
262 pub async fn acquire_many(&self, n: u32) {
267 if n == 0 {
268 return;
269 }
270 for _ in 0..n {
273 self.limiter.until_ready().await;
274 }
275 }
276
277 pub fn config(&self) -> &RateLimitConfig {
279 &self.config
280 }
281}
282
283#[derive(Debug, Clone, thiserror::Error)]
289#[error("bulkhead full: max {max_concurrent} concurrent operations")]
290pub struct BulkheadFull {
291 pub max_concurrent: usize,
293}
294
295#[derive(Debug)]
329pub struct Bulkhead {
330 semaphore: Arc<Semaphore>,
331 max_concurrent: usize,
332}
333
334impl Bulkhead {
335 pub fn new(max_concurrent: usize) -> Self {
337 Self {
338 semaphore: Arc::new(Semaphore::new(max_concurrent)),
339 max_concurrent,
340 }
341 }
342
343 pub fn for_peers() -> Self {
347 Self::new(50)
348 }
349
350 pub fn for_sync_engine() -> Self {
354 Self::new(100)
355 }
356
357 pub async fn acquire(&self) -> Result<OwnedSemaphorePermit, BulkheadFull> {
361 self.semaphore
362 .clone()
363 .acquire_owned()
364 .await
365 .map_err(|_| BulkheadFull {
366 max_concurrent: self.max_concurrent,
367 })
368 }
369
370 pub fn try_acquire(&self) -> Option<OwnedSemaphorePermit> {
374 self.semaphore.clone().try_acquire_owned().ok()
375 }
376
377 pub fn available(&self) -> usize {
379 self.semaphore.available_permits()
380 }
381
382 pub fn max_concurrent(&self) -> usize {
384 self.max_concurrent
385 }
386
387 pub fn is_full(&self) -> bool {
389 self.semaphore.available_permits() == 0
390 }
391}
392
393#[cfg(test)]
394mod tests {
395 use super::*;
396
397 #[test]
398 fn test_daemon_config() {
399 let config = RetryConfig::daemon();
400 assert_eq!(config.max_attempts, usize::MAX);
401 assert_eq!(config.max_delay, Duration::from_secs(300));
402 }
403
404 #[test]
405 fn test_startup_config() {
406 let config = RetryConfig::startup();
407 assert_eq!(config.max_attempts, 20);
408 assert_eq!(config.initial_delay, Duration::from_millis(500));
409 }
410
411 #[test]
412 fn test_delay_for_attempt() {
413 let config = RetryConfig {
414 max_attempts: 10,
415 initial_delay: Duration::from_secs(1),
416 max_delay: Duration::from_secs(30),
417 backoff_factor: 2.0,
418 connection_timeout: Duration::from_secs(5),
419 };
420
421 assert_eq!(config.delay_for_attempt(1), Duration::from_secs(1));
422 assert_eq!(config.delay_for_attempt(2), Duration::from_secs(2));
423 assert_eq!(config.delay_for_attempt(3), Duration::from_secs(4));
424 assert_eq!(config.delay_for_attempt(4), Duration::from_secs(8));
425 assert_eq!(config.delay_for_attempt(5), Duration::from_secs(16));
426 assert_eq!(config.delay_for_attempt(10), Duration::from_secs(30));
428 }
429
430 #[test]
435 fn test_rate_limit_config_presets() {
436 let default = RateLimitConfig::default();
437 assert_eq!(default.burst_size, 100);
438 assert_eq!(default.refill_rate, 1000);
439
440 let conservative = RateLimitConfig::conservative();
441 assert_eq!(conservative.burst_size, 10);
442 assert_eq!(conservative.refill_rate, 100);
443
444 let high = RateLimitConfig::high_throughput();
445 assert_eq!(high.burst_size, 500);
446 assert_eq!(high.refill_rate, 10_000);
447 }
448
449 #[test]
450 fn test_rate_limiter_try_acquire_burst() {
451 let limiter = RateLimiter::new(RateLimitConfig {
452 burst_size: 5,
453 refill_rate: 1000,
454 });
455
456 for _ in 0..5 {
458 assert!(limiter.try_acquire(), "should acquire within burst");
459 }
460
461 assert!(!limiter.try_acquire(), "should fail after burst exhausted");
463 }
464
465 #[tokio::test]
466 async fn test_rate_limiter_acquire_blocks() {
467 let limiter = RateLimiter::new(RateLimitConfig {
468 burst_size: 1,
469 refill_rate: 1000, });
471
472 limiter.acquire().await;
474
475 let start = std::time::Instant::now();
477 limiter.acquire().await;
478 let elapsed = start.elapsed();
479
480 assert!(elapsed < Duration::from_millis(100), "should refill quickly");
482 }
483
484 #[tokio::test]
485 async fn test_rate_limiter_acquire_many() {
486 let limiter = RateLimiter::new(RateLimitConfig {
487 burst_size: 10,
488 refill_rate: 10_000,
489 });
490
491 let start = std::time::Instant::now();
493 limiter.acquire_many(10).await;
494 let elapsed = start.elapsed();
495
496 assert!(elapsed < Duration::from_millis(50), "batch acquire should be fast");
497 }
498
499 #[test]
504 fn test_bulkhead_new() {
505 let bulkhead = Bulkhead::new(10);
506 assert_eq!(bulkhead.max_concurrent(), 10);
507 assert_eq!(bulkhead.available(), 10);
508 assert!(!bulkhead.is_full());
509 }
510
511 #[test]
512 fn test_bulkhead_presets() {
513 let peers = Bulkhead::for_peers();
514 assert_eq!(peers.max_concurrent(), 50);
515
516 let sync = Bulkhead::for_sync_engine();
517 assert_eq!(sync.max_concurrent(), 100);
518 }
519
520 #[test]
521 fn test_bulkhead_try_acquire() {
522 let bulkhead = Bulkhead::new(2);
523
524 let p1 = bulkhead.try_acquire();
525 assert!(p1.is_some());
526 assert_eq!(bulkhead.available(), 1);
527
528 let p2 = bulkhead.try_acquire();
529 assert!(p2.is_some());
530 assert_eq!(bulkhead.available(), 0);
531 assert!(bulkhead.is_full());
532
533 let p3 = bulkhead.try_acquire();
535 assert!(p3.is_none());
536
537 drop(p1);
539 assert_eq!(bulkhead.available(), 1);
540 assert!(!bulkhead.is_full());
541
542 let p4 = bulkhead.try_acquire();
544 assert!(p4.is_some());
545 }
546
547 #[tokio::test]
548 async fn test_bulkhead_acquire_waits() {
549 let bulkhead = Arc::new(Bulkhead::new(1));
550 let bulkhead2 = Arc::clone(&bulkhead);
551
552 let permit = bulkhead.acquire().await.unwrap();
554 assert!(bulkhead.is_full());
555
556 let handle = tokio::spawn(async move {
558 let start = std::time::Instant::now();
559 let _p = bulkhead2.acquire().await.unwrap();
560 start.elapsed()
561 });
562
563 tokio::time::sleep(Duration::from_millis(50)).await;
565 drop(permit);
566
567 let wait_time = handle.await.unwrap();
569 assert!(wait_time >= Duration::from_millis(40), "should have waited");
570 }
571
572 #[test]
573 fn test_bulkhead_full_error() {
574 let err = BulkheadFull { max_concurrent: 10 };
575 assert_eq!(
576 err.to_string(),
577 "bulkhead full: max 10 concurrent operations"
578 );
579 }
580
581 #[test]
586 fn test_retry_config_testing_preset() {
587 let config = RetryConfig::testing();
588 assert_eq!(config.max_attempts, 3);
589 assert_eq!(config.initial_delay, Duration::from_millis(10));
590 assert_eq!(config.max_delay, Duration::from_millis(100));
591 assert_eq!(config.connection_timeout, Duration::from_millis(500));
592 }
593
594 #[test]
595 fn test_retry_config_default() {
596 let config = RetryConfig::default();
597 assert_eq!(config.max_attempts, 10);
598 assert_eq!(config.initial_delay, Duration::from_millis(100));
599 assert_eq!(config.max_delay, Duration::from_secs(30));
600 assert_eq!(config.backoff_factor, 2.0);
601 assert_eq!(config.connection_timeout, Duration::from_secs(5));
602 }
603
604 #[test]
605 fn test_delay_for_attempt_zero() {
606 let config = RetryConfig::default();
607 assert_eq!(config.delay_for_attempt(0), config.initial_delay);
609 }
610
611 #[test]
612 fn test_delay_for_attempt_caps_at_max() {
613 let config = RetryConfig {
614 max_attempts: 100,
615 initial_delay: Duration::from_secs(1),
616 max_delay: Duration::from_secs(10),
617 backoff_factor: 2.0,
618 connection_timeout: Duration::from_secs(5),
619 };
620 assert_eq!(config.delay_for_attempt(10), Duration::from_secs(10));
622 assert_eq!(config.delay_for_attempt(20), Duration::from_secs(10));
623 }
624
625 #[test]
626 fn test_retry_config_clone() {
627 let config = RetryConfig::daemon();
628 let cloned = config.clone();
629 assert_eq!(cloned.max_attempts, config.max_attempts);
630 assert_eq!(cloned.max_delay, config.max_delay);
631 }
632
633 #[test]
634 fn test_retry_config_debug() {
635 let config = RetryConfig::testing();
636 let debug = format!("{:?}", config);
637 assert!(debug.contains("RetryConfig"));
638 assert!(debug.contains("max_attempts"));
639 }
640
641 #[test]
646 fn test_rate_limit_config_unlimited() {
647 let config = RateLimitConfig::unlimited();
648 assert_eq!(config.burst_size, u32::MAX);
649 assert_eq!(config.refill_rate, u32::MAX);
650 }
651
652 #[test]
653 fn test_rate_limit_config_clone() {
654 let config = RateLimitConfig::conservative();
655 let cloned = config.clone();
656 assert_eq!(cloned.burst_size, config.burst_size);
657 assert_eq!(cloned.refill_rate, config.refill_rate);
658 }
659
660 #[test]
661 fn test_rate_limit_config_debug() {
662 let config = RateLimitConfig::default();
663 let debug = format!("{:?}", config);
664 assert!(debug.contains("RateLimitConfig"));
665 assert!(debug.contains("burst_size"));
666 assert!(debug.contains("refill_rate"));
667 }
668
669 #[test]
670 fn test_rate_limiter_config_accessor() {
671 let config = RateLimitConfig::conservative();
672 let limiter = RateLimiter::new(config.clone());
673 let retrieved = limiter.config();
674 assert_eq!(retrieved.burst_size, config.burst_size);
675 assert_eq!(retrieved.refill_rate, config.refill_rate);
676 }
677
678 #[tokio::test]
679 async fn test_rate_limiter_acquire_many_zero() {
680 let limiter = RateLimiter::new(RateLimitConfig::default());
681 let start = std::time::Instant::now();
683 limiter.acquire_many(0).await;
684 assert!(start.elapsed() < Duration::from_millis(1));
685 }
686
687 #[test]
692 fn test_bulkhead_debug() {
693 let bulkhead = Bulkhead::new(5);
694 let debug = format!("{:?}", bulkhead);
695 assert!(debug.contains("Bulkhead"));
696 }
697
698 #[test]
699 fn test_bulkhead_full_error_clone() {
700 let err = BulkheadFull { max_concurrent: 25 };
701 let cloned = err.clone();
702 assert_eq!(cloned.max_concurrent, 25);
703 }
704
705 #[test]
706 fn test_bulkhead_full_error_debug() {
707 let err = BulkheadFull { max_concurrent: 42 };
708 let debug = format!("{:?}", err);
709 assert!(debug.contains("BulkheadFull"));
710 assert!(debug.contains("42"));
711 }
712
713 #[tokio::test]
714 async fn test_bulkhead_available_after_release() {
715 let bulkhead = Bulkhead::new(3);
716
717 let p1 = bulkhead.acquire().await.unwrap();
718 let p2 = bulkhead.acquire().await.unwrap();
719 assert_eq!(bulkhead.available(), 1);
720
721 drop(p1);
722 assert_eq!(bulkhead.available(), 2);
723
724 drop(p2);
725 assert_eq!(bulkhead.available(), 3);
726 }
727}