replication_engine/
resilience.rs

1// Copyright (c) 2025-2026 Adrian Robinson. Licensed under the AGPL-3.0.
2// See LICENSE file in the project root for full license text.
3
4//! Resilience utilities: retry logic, backoff, rate limiting, bulkheads.
5//!
6//! This module provides patterns to protect services from overload:
7//!
8//! - [`RetryConfig`]: Exponential backoff for transient failures
9//! - [`RateLimiter`]: Token bucket to prevent thundering herd
10//! - [`Bulkhead`]: Semaphore to limit concurrent operations
11//!
12//! # Example
13//!
14//! ```rust,no_run
15//! # async fn example() -> Result<(), replication_engine::resilience::BulkheadFull> {
16//! use replication_engine::resilience::{RateLimiter, Bulkhead, RateLimitConfig};
17//!
18//! // Rate limit: max 1000 events/sec with burst of 100
19//! let limiter = RateLimiter::new(RateLimitConfig::default());
20//! limiter.acquire().await; // Blocks if over limit
21//!
22//! // Bulkhead: max 10 concurrent connections
23//! let bulkhead = Bulkhead::new(10);
24//! let _permit = bulkhead.acquire().await?;
25//! // permit dropped = slot released
26//! # Ok(())
27//! # }
28//! ```
29
30use std::num::NonZeroU32;
31use std::sync::Arc;
32use std::time::Duration;
33use governor::{Quota, RateLimiter as GovLimiter, state::{InMemoryState, NotKeyed}, clock::DefaultClock, middleware::NoOpMiddleware};
34use tokio::sync::{Semaphore, OwnedSemaphorePermit};
35
36/// Configuration for connection retry behavior.
37///
38#[derive(Debug, Clone)]
39pub struct RetryConfig {
40    /// Maximum number of retry attempts.
41    /// Set to `usize::MAX` for infinite retries (daemon mode).
42    pub max_attempts: usize,
43
44    /// Initial delay before first retry.
45    pub initial_delay: Duration,
46
47    /// Maximum delay between retries (ceiling for exponential backoff).
48    pub max_delay: Duration,
49
50    /// Backoff multiplier (e.g., 2.0 = double delay each retry).
51    pub backoff_factor: f64,
52
53    /// Timeout for each individual connection attempt.
54    pub connection_timeout: Duration,
55}
56
57impl Default for RetryConfig {
58    fn default() -> Self {
59        Self {
60            max_attempts: 10,
61            initial_delay: Duration::from_millis(100),
62            max_delay: Duration::from_secs(30),
63            backoff_factor: 2.0,
64            connection_timeout: Duration::from_secs(5),
65        }
66    }
67}
68
69impl RetryConfig {
70    /// Fast-fail retry for initial startup connection.
71    ///
72    /// Attempts connection 20 times with exponential backoff, failing after
73    /// approximately 30 seconds total. Use this during daemon startup to
74    /// detect configuration errors quickly.
75    ///
76    /// # Backoff Schedule
77    ///
78    /// ```text
79    /// Attempt  Delay     Cumulative
80    /// -------  -----     ----------
81    /// 1        500ms     500ms
82    /// 2        750ms     1.25s
83    /// 3        1.12s     2.37s
84    /// ...
85    /// 20       30s       ~45s (total)
86    /// ```
87    pub fn startup() -> Self {
88        Self {
89            max_attempts: 20,
90            initial_delay: Duration::from_millis(500),
91            max_delay: Duration::from_secs(30),
92            backoff_factor: 1.5,
93            connection_timeout: Duration::from_secs(10),
94        }
95    }
96
97    /// Infinite retry for long-running daemon (never give up!).
98    ///
99    /// Retries forever with exponential backoff capped at 5 minutes.
100    /// Use this for runtime reconnection after initial startup succeeds.
101    ///
102    /// # Backoff Schedule
103    ///
104    /// ```text
105    /// Attempt  Delay     Reasoning
106    /// -------  -----     ---------
107    /// 1        1s        Immediate transient retry
108    /// 2        2s        Brief network blip
109    /// 3        4s        DNS propagation
110    /// 4        8s        Container restart
111    /// 5        16s       Service recovery
112    /// 6        32s       Load balancer failover
113    /// 7        64s       Datacenter maintenance
114    /// 8        128s      Extended outage
115    /// 9        256s      Multi-hour incident
116    /// 10+      300s      Cap at 5 minutes, retry forever
117    /// ```
118    ///
119    /// Real-world example: DNS outages can last 24+ hours. This ensures
120    /// the daemon automatically recovers without manual restart.
121    pub fn daemon() -> Self {
122        Self {
123            max_attempts: usize::MAX, // Infinite retries
124            initial_delay: Duration::from_secs(1),
125            max_delay: Duration::from_secs(300), // Cap at 5 minutes
126            backoff_factor: 2.0,
127            connection_timeout: Duration::from_secs(30),
128        }
129    }
130
131    /// Fast-fail retry for tests.
132    ///
133    /// Fails quickly to avoid slow tests.
134    pub fn testing() -> Self {
135        Self {
136            max_attempts: 3,
137            initial_delay: Duration::from_millis(10),
138            max_delay: Duration::from_millis(100),
139            backoff_factor: 2.0,
140            connection_timeout: Duration::from_millis(500), // Fast timeout for tests
141        }
142    }
143
144    /// Calculate delay for a given attempt number (1-indexed).
145    pub fn delay_for_attempt(&self, attempt: usize) -> Duration {
146        if attempt == 0 {
147            return self.initial_delay;
148        }
149
150        let multiplier = self.backoff_factor.powi((attempt - 1) as i32);
151        let delay_secs = self.initial_delay.as_secs_f64() * multiplier;
152        let delay = Duration::from_secs_f64(delay_secs);
153
154        std::cmp::min(delay, self.max_delay)
155    }
156}
157
158// =============================================================================
159// Rate Limiting
160// =============================================================================
161
162/// Configuration for rate limiting.
163///
164/// Uses a token bucket algorithm: tokens refill at `refill_rate` per second,
165/// up to `burst_size` tokens. Each operation consumes one token.
166#[derive(Debug, Clone)]
167pub struct RateLimitConfig {
168    /// Maximum tokens that can be accumulated (burst capacity).
169    pub burst_size: u32,
170
171    /// Tokens added per second (sustained rate).
172    pub refill_rate: u32,
173}
174
175impl Default for RateLimitConfig {
176    /// Default: 1000 ops/sec with burst of 100.
177    fn default() -> Self {
178        Self {
179            burst_size: 100,
180            refill_rate: 1000,
181        }
182    }
183}
184
185impl RateLimitConfig {
186    /// Aggressive rate limit for testing or constrained environments.
187    pub fn conservative() -> Self {
188        Self {
189            burst_size: 10,
190            refill_rate: 100,
191        }
192    }
193
194    /// High throughput for production with beefy hardware.
195    pub fn high_throughput() -> Self {
196        Self {
197            burst_size: 500,
198            refill_rate: 10_000,
199        }
200    }
201
202    /// No rate limiting (unlimited).
203    pub fn unlimited() -> Self {
204        Self {
205            burst_size: u32::MAX,
206            refill_rate: u32::MAX,
207        }
208    }
209}
210
211/// Token bucket rate limiter.
212///
213/// Prevents thundering herd by limiting the rate of operations.
214/// Thread-safe and async-aware.
215///
216/// # Example
217///
218/// ```rust,no_run
219/// # async fn example() {
220/// use replication_engine::resilience::{RateLimiter, RateLimitConfig};
221/// let limiter = RateLimiter::new(RateLimitConfig::default());
222///
223/// // In hot path loop:
224/// let events = vec![1, 2, 3];
225/// for event in events {
226///     limiter.acquire().await; // Blocks if over limit
227///     // process(event);
228/// }
229/// # }
230/// ```
231pub struct RateLimiter {
232    limiter: GovLimiter<NotKeyed, InMemoryState, DefaultClock, NoOpMiddleware>,
233    config: RateLimitConfig,
234}
235
236impl RateLimiter {
237    /// Create a new rate limiter with the given configuration.
238    pub fn new(config: RateLimitConfig) -> Self {
239        // Create quota: `burst_size` tokens, refilling at `refill_rate` per second
240        let quota = Quota::per_second(NonZeroU32::new(config.refill_rate).unwrap_or(NonZeroU32::MIN))
241            .allow_burst(NonZeroU32::new(config.burst_size).unwrap_or(NonZeroU32::MIN));
242
243        let limiter = GovLimiter::direct(quota);
244
245        Self { limiter, config }
246    }
247
248    /// Acquire a permit, blocking until one is available.
249    ///
250    /// This method is cancel-safe.
251    pub async fn acquire(&self) {
252        self.limiter.until_ready().await;
253    }
254
255    /// Try to acquire a permit without blocking.
256    ///
257    /// Returns `true` if acquired, `false` if rate limit exceeded.
258    pub fn try_acquire(&self) -> bool {
259        self.limiter.check().is_ok()
260    }
261
262    /// Acquire multiple permits at once.
263    ///
264    /// Useful for batched operations where you want to rate limit
265    /// the batch as a whole rather than each item.
266    pub async fn acquire_many(&self, n: u32) {
267        if n == 0 {
268            return;
269        }
270        // For multiple permits, we loop and acquire one at a time
271        // This is simpler than trying to batch with governor
272        for _ in 0..n {
273            self.limiter.until_ready().await;
274        }
275    }
276
277    /// Get the current configuration.
278    pub fn config(&self) -> &RateLimitConfig {
279        &self.config
280    }
281}
282
283// =============================================================================
284// Bulkhead (Concurrency Limiter)
285// =============================================================================
286
287/// Error when bulkhead is full.
288#[derive(Debug, Clone, thiserror::Error)]
289#[error("bulkhead full: max {max_concurrent} concurrent operations")]
290pub struct BulkheadFull {
291    /// Maximum concurrent operations allowed.
292    pub max_concurrent: usize,
293}
294
295/// Bulkhead pattern: limits concurrent operations to prevent resource exhaustion.
296///
297/// Uses a semaphore to limit how many operations can run simultaneously.
298/// When the bulkhead is "full", new operations either wait or fail fast.
299///
300/// # Use Cases
301///
302/// - Limit concurrent peer connections
303/// - Limit concurrent sync-engine calls
304/// - Prevent thread/connection pool exhaustion
305///
306/// # Example
307///
308/// ```rust,no_run
309/// # async fn example() -> Result<(), replication_engine::resilience::BulkheadFull> {
310/// use replication_engine::resilience::Bulkhead;
311/// let bulkhead = Bulkhead::new(10); // Max 10 concurrent
312///
313/// // Blocking acquire
314/// let permit = bulkhead.acquire().await?;
315/// // do_work().await;
316/// drop(permit); // Release slot
317///
318/// // Non-blocking (fail fast)
319/// if let Some(permit) = bulkhead.try_acquire() {
320///     // do_work().await;
321///     drop(permit);
322/// } else {
323///     // return Err("service overloaded");
324/// }
325/// # Ok(())
326/// # }
327/// ```
328#[derive(Debug)]
329pub struct Bulkhead {
330    semaphore: Arc<Semaphore>,
331    max_concurrent: usize,
332}
333
334impl Bulkhead {
335    /// Create a new bulkhead with the given concurrency limit.
336    pub fn new(max_concurrent: usize) -> Self {
337        Self {
338            semaphore: Arc::new(Semaphore::new(max_concurrent)),
339            max_concurrent,
340        }
341    }
342
343    /// Create a bulkhead for limiting peer connections.
344    ///
345    /// Default: 50 concurrent connections.
346    pub fn for_peers() -> Self {
347        Self::new(50)
348    }
349
350    /// Create a bulkhead for limiting sync-engine operations.
351    ///
352    /// Default: 100 concurrent operations.
353    pub fn for_sync_engine() -> Self {
354        Self::new(100)
355    }
356
357    /// Acquire a permit, waiting if necessary.
358    ///
359    /// Returns a permit that releases the slot when dropped.
360    pub async fn acquire(&self) -> Result<OwnedSemaphorePermit, BulkheadFull> {
361        self.semaphore
362            .clone()
363            .acquire_owned()
364            .await
365            .map_err(|_| BulkheadFull {
366                max_concurrent: self.max_concurrent,
367            })
368    }
369
370    /// Try to acquire a permit without waiting.
371    ///
372    /// Returns `None` if the bulkhead is full.
373    pub fn try_acquire(&self) -> Option<OwnedSemaphorePermit> {
374        self.semaphore.clone().try_acquire_owned().ok()
375    }
376
377    /// Get the number of available permits.
378    pub fn available(&self) -> usize {
379        self.semaphore.available_permits()
380    }
381
382    /// Get the maximum concurrent operations allowed.
383    pub fn max_concurrent(&self) -> usize {
384        self.max_concurrent
385    }
386
387    /// Check if the bulkhead is full (no permits available).
388    pub fn is_full(&self) -> bool {
389        self.semaphore.available_permits() == 0
390    }
391}
392
393#[cfg(test)]
394mod tests {
395    use super::*;
396
397    #[test]
398    fn test_daemon_config() {
399        let config = RetryConfig::daemon();
400        assert_eq!(config.max_attempts, usize::MAX);
401        assert_eq!(config.max_delay, Duration::from_secs(300));
402    }
403
404    #[test]
405    fn test_startup_config() {
406        let config = RetryConfig::startup();
407        assert_eq!(config.max_attempts, 20);
408        assert_eq!(config.initial_delay, Duration::from_millis(500));
409    }
410
411    #[test]
412    fn test_delay_for_attempt() {
413        let config = RetryConfig {
414            max_attempts: 10,
415            initial_delay: Duration::from_secs(1),
416            max_delay: Duration::from_secs(30),
417            backoff_factor: 2.0,
418            connection_timeout: Duration::from_secs(5),
419        };
420
421        assert_eq!(config.delay_for_attempt(1), Duration::from_secs(1));
422        assert_eq!(config.delay_for_attempt(2), Duration::from_secs(2));
423        assert_eq!(config.delay_for_attempt(3), Duration::from_secs(4));
424        assert_eq!(config.delay_for_attempt(4), Duration::from_secs(8));
425        assert_eq!(config.delay_for_attempt(5), Duration::from_secs(16));
426        // Should cap at max_delay
427        assert_eq!(config.delay_for_attempt(10), Duration::from_secs(30));
428    }
429
430    // =========================================================================
431    // Rate Limiter Tests
432    // =========================================================================
433
434    #[test]
435    fn test_rate_limit_config_presets() {
436        let default = RateLimitConfig::default();
437        assert_eq!(default.burst_size, 100);
438        assert_eq!(default.refill_rate, 1000);
439
440        let conservative = RateLimitConfig::conservative();
441        assert_eq!(conservative.burst_size, 10);
442        assert_eq!(conservative.refill_rate, 100);
443
444        let high = RateLimitConfig::high_throughput();
445        assert_eq!(high.burst_size, 500);
446        assert_eq!(high.refill_rate, 10_000);
447    }
448
449    #[test]
450    fn test_rate_limiter_try_acquire_burst() {
451        let limiter = RateLimiter::new(RateLimitConfig {
452            burst_size: 5,
453            refill_rate: 1000,
454        });
455
456        // Should be able to acquire burst_size permits immediately
457        for _ in 0..5 {
458            assert!(limiter.try_acquire(), "should acquire within burst");
459        }
460
461        // Next acquire should fail (burst exhausted)
462        assert!(!limiter.try_acquire(), "should fail after burst exhausted");
463    }
464
465    #[tokio::test]
466    async fn test_rate_limiter_acquire_blocks() {
467        let limiter = RateLimiter::new(RateLimitConfig {
468            burst_size: 1,
469            refill_rate: 1000, // 1ms per token
470        });
471
472        // Exhaust burst
473        limiter.acquire().await;
474
475        // Next acquire should complete quickly (high refill rate)
476        let start = std::time::Instant::now();
477        limiter.acquire().await;
478        let elapsed = start.elapsed();
479
480        // Should be roughly 1ms (1000/sec = 1 per ms), but allow for scheduling
481        assert!(elapsed < Duration::from_millis(100), "should refill quickly");
482    }
483
484    #[tokio::test]
485    async fn test_rate_limiter_acquire_many() {
486        let limiter = RateLimiter::new(RateLimitConfig {
487            burst_size: 10,
488            refill_rate: 10_000,
489        });
490
491        // Should acquire all 10 quickly
492        let start = std::time::Instant::now();
493        limiter.acquire_many(10).await;
494        let elapsed = start.elapsed();
495
496        assert!(elapsed < Duration::from_millis(50), "batch acquire should be fast");
497    }
498
499    // =========================================================================
500    // Bulkhead Tests
501    // =========================================================================
502
503    #[test]
504    fn test_bulkhead_new() {
505        let bulkhead = Bulkhead::new(10);
506        assert_eq!(bulkhead.max_concurrent(), 10);
507        assert_eq!(bulkhead.available(), 10);
508        assert!(!bulkhead.is_full());
509    }
510
511    #[test]
512    fn test_bulkhead_presets() {
513        let peers = Bulkhead::for_peers();
514        assert_eq!(peers.max_concurrent(), 50);
515
516        let sync = Bulkhead::for_sync_engine();
517        assert_eq!(sync.max_concurrent(), 100);
518    }
519
520    #[test]
521    fn test_bulkhead_try_acquire() {
522        let bulkhead = Bulkhead::new(2);
523
524        let p1 = bulkhead.try_acquire();
525        assert!(p1.is_some());
526        assert_eq!(bulkhead.available(), 1);
527
528        let p2 = bulkhead.try_acquire();
529        assert!(p2.is_some());
530        assert_eq!(bulkhead.available(), 0);
531        assert!(bulkhead.is_full());
532
533        // Should fail - bulkhead full
534        let p3 = bulkhead.try_acquire();
535        assert!(p3.is_none());
536
537        // Drop one permit
538        drop(p1);
539        assert_eq!(bulkhead.available(), 1);
540        assert!(!bulkhead.is_full());
541
542        // Now should succeed
543        let p4 = bulkhead.try_acquire();
544        assert!(p4.is_some());
545    }
546
547    #[tokio::test]
548    async fn test_bulkhead_acquire_waits() {
549        let bulkhead = Arc::new(Bulkhead::new(1));
550        let bulkhead2 = Arc::clone(&bulkhead);
551
552        // Acquire the only permit
553        let permit = bulkhead.acquire().await.unwrap();
554        assert!(bulkhead.is_full());
555
556        // Spawn a task that will wait for the permit
557        let handle = tokio::spawn(async move {
558            let start = std::time::Instant::now();
559            let _p = bulkhead2.acquire().await.unwrap();
560            start.elapsed()
561        });
562
563        // Wait a bit, then release
564        tokio::time::sleep(Duration::from_millis(50)).await;
565        drop(permit);
566
567        // The waiting task should complete
568        let wait_time = handle.await.unwrap();
569        assert!(wait_time >= Duration::from_millis(40), "should have waited");
570    }
571
572    #[test]
573    fn test_bulkhead_full_error() {
574        let err = BulkheadFull { max_concurrent: 10 };
575        assert_eq!(
576            err.to_string(),
577            "bulkhead full: max 10 concurrent operations"
578        );
579    }
580
581    // =========================================================================
582    // Additional RetryConfig Tests
583    // =========================================================================
584
585    #[test]
586    fn test_retry_config_testing_preset() {
587        let config = RetryConfig::testing();
588        assert_eq!(config.max_attempts, 3);
589        assert_eq!(config.initial_delay, Duration::from_millis(10));
590        assert_eq!(config.max_delay, Duration::from_millis(100));
591        assert_eq!(config.connection_timeout, Duration::from_millis(500));
592    }
593
594    #[test]
595    fn test_retry_config_default() {
596        let config = RetryConfig::default();
597        assert_eq!(config.max_attempts, 10);
598        assert_eq!(config.initial_delay, Duration::from_millis(100));
599        assert_eq!(config.max_delay, Duration::from_secs(30));
600        assert_eq!(config.backoff_factor, 2.0);
601        assert_eq!(config.connection_timeout, Duration::from_secs(5));
602    }
603
604    #[test]
605    fn test_delay_for_attempt_zero() {
606        let config = RetryConfig::default();
607        // Attempt 0 should return initial_delay
608        assert_eq!(config.delay_for_attempt(0), config.initial_delay);
609    }
610
611    #[test]
612    fn test_delay_for_attempt_caps_at_max() {
613        let config = RetryConfig {
614            max_attempts: 100,
615            initial_delay: Duration::from_secs(1),
616            max_delay: Duration::from_secs(10),
617            backoff_factor: 2.0,
618            connection_timeout: Duration::from_secs(5),
619        };
620        // After enough attempts (but not too many to overflow), should cap at max_delay
621        assert_eq!(config.delay_for_attempt(10), Duration::from_secs(10));
622        assert_eq!(config.delay_for_attempt(20), Duration::from_secs(10));
623    }
624
625    #[test]
626    fn test_retry_config_clone() {
627        let config = RetryConfig::daemon();
628        let cloned = config.clone();
629        assert_eq!(cloned.max_attempts, config.max_attempts);
630        assert_eq!(cloned.max_delay, config.max_delay);
631    }
632
633    #[test]
634    fn test_retry_config_debug() {
635        let config = RetryConfig::testing();
636        let debug = format!("{:?}", config);
637        assert!(debug.contains("RetryConfig"));
638        assert!(debug.contains("max_attempts"));
639    }
640
641    // =========================================================================
642    // Additional RateLimitConfig Tests
643    // =========================================================================
644
645    #[test]
646    fn test_rate_limit_config_unlimited() {
647        let config = RateLimitConfig::unlimited();
648        assert_eq!(config.burst_size, u32::MAX);
649        assert_eq!(config.refill_rate, u32::MAX);
650    }
651
652    #[test]
653    fn test_rate_limit_config_clone() {
654        let config = RateLimitConfig::conservative();
655        let cloned = config.clone();
656        assert_eq!(cloned.burst_size, config.burst_size);
657        assert_eq!(cloned.refill_rate, config.refill_rate);
658    }
659
660    #[test]
661    fn test_rate_limit_config_debug() {
662        let config = RateLimitConfig::default();
663        let debug = format!("{:?}", config);
664        assert!(debug.contains("RateLimitConfig"));
665        assert!(debug.contains("burst_size"));
666        assert!(debug.contains("refill_rate"));
667    }
668
669    #[test]
670    fn test_rate_limiter_config_accessor() {
671        let config = RateLimitConfig::conservative();
672        let limiter = RateLimiter::new(config.clone());
673        let retrieved = limiter.config();
674        assert_eq!(retrieved.burst_size, config.burst_size);
675        assert_eq!(retrieved.refill_rate, config.refill_rate);
676    }
677
678    #[tokio::test]
679    async fn test_rate_limiter_acquire_many_zero() {
680        let limiter = RateLimiter::new(RateLimitConfig::default());
681        // Acquiring 0 should be instant
682        let start = std::time::Instant::now();
683        limiter.acquire_many(0).await;
684        assert!(start.elapsed() < Duration::from_millis(1));
685    }
686
687    // =========================================================================
688    // Additional Bulkhead Tests
689    // =========================================================================
690
691    #[test]
692    fn test_bulkhead_debug() {
693        let bulkhead = Bulkhead::new(5);
694        let debug = format!("{:?}", bulkhead);
695        assert!(debug.contains("Bulkhead"));
696    }
697
698    #[test]
699    fn test_bulkhead_full_error_clone() {
700        let err = BulkheadFull { max_concurrent: 25 };
701        let cloned = err.clone();
702        assert_eq!(cloned.max_concurrent, 25);
703    }
704
705    #[test]
706    fn test_bulkhead_full_error_debug() {
707        let err = BulkheadFull { max_concurrent: 42 };
708        let debug = format!("{:?}", err);
709        assert!(debug.contains("BulkheadFull"));
710        assert!(debug.contains("42"));
711    }
712
713    #[tokio::test]
714    async fn test_bulkhead_available_after_release() {
715        let bulkhead = Bulkhead::new(3);
716        
717        let p1 = bulkhead.acquire().await.unwrap();
718        let p2 = bulkhead.acquire().await.unwrap();
719        assert_eq!(bulkhead.available(), 1);
720        
721        drop(p1);
722        assert_eq!(bulkhead.available(), 2);
723        
724        drop(p2);
725        assert_eq!(bulkhead.available(), 3);
726    }
727}