Skip to main content

foxtive_worker/backends/
resilient.rs

1use std::sync::Arc;
2use std::time::{Duration, Instant};
3use tokio::sync::RwLock;
4use tracing::{error, info, warn};
5
6use crate::backends::{MessageBackend, ReceiveResult};
7use crate::error::WorkerResult;
8
9/// Reconnection strategy for backends
10#[derive(Debug, Clone)]
11pub enum ReconnectStrategy {
12    /// Fixed delay between reconnection attempts
13    Fixed(Duration),
14
15    /// Exponential backoff with optional max delay and jitter
16    Exponential {
17        initial: Duration,
18        max: Duration,
19        multiplier: f64,
20        /// Add random jitter to prevent thundering herd (0.0 - 1.0)
21        jitter_factor: f64,
22    },
23}
24
25impl ReconnectStrategy {
26    fn delay_for_attempt(&self, attempt: u32) -> Duration {
27        match self {
28            ReconnectStrategy::Fixed(d) => *d,
29            ReconnectStrategy::Exponential {
30                initial,
31                max,
32                multiplier,
33                jitter_factor,
34            } => {
35                // Calculate exponential backoff
36                let base_delay = initial.mul_f64(multiplier.powi(attempt as i32));
37                let clamped = base_delay.min(*max);
38
39                // Add jitter to prevent thundering herd problem
40                if *jitter_factor > 0.0 {
41                    let jitter_range = clamped.mul_f64(*jitter_factor);
42                    let jitter = jitter_range.mul_f64(rand::random::<f64>());
43                    clamped + jitter
44                } else {
45                    clamped
46                }
47            }
48        }
49    }
50}
51
52impl Default for ReconnectStrategy {
53    fn default() -> Self {
54        ReconnectStrategy::Exponential {
55            initial: Duration::from_secs(1),
56            max: Duration::from_secs(60),
57            multiplier: 2.0,
58            jitter_factor: 0.1, // 10% jitter
59        }
60    }
61}
62
63/// Wrapper that adds automatic reconnection to any backend.
64///
65/// This wrapper implements the "refuse to die" philosophy - it will retry
66/// operations indefinitely with exponential backoff until they succeed.
67/// The only way to stop it is through explicit shutdown.
68///
69/// # Example
70/// ```rust,no_run
71/// use foxtive_worker::{ResilientBackend, MessageBackend};
72/// use std::sync::Arc;
73///
74/// #[tokio::main]
75/// async fn main() {
76///     // Wrap any backend in ResilientBackend
77///     let backend = Arc::new(foxtive_worker::MemoryBackend::new());
78///     let resilient = ResilientBackend::new(backend);
79///     
80///     // This will retry forever if connection drops
81///     let msg = resilient.receive().await;
82/// }
83/// ```
84pub struct ResilientBackend {
85    inner: Arc<dyn MessageBackend>,
86    strategy: ReconnectStrategy,
87    reconnect_attempts: Arc<RwLock<u32>>,
88    last_success: Arc<RwLock<Instant>>,
89    is_connected: Arc<RwLock<bool>>,
90    /// Track consecutive failures for circuit breaker
91    consecutive_failures: Arc<RwLock<u32>>,
92}
93
94impl ResilientBackend {
95    /// Create a new resilient backend wrapper
96    pub fn new(inner: Arc<dyn MessageBackend>) -> Self {
97        Self {
98            inner,
99            strategy: ReconnectStrategy::default(),
100            reconnect_attempts: Arc::new(RwLock::new(0)),
101            last_success: Arc::new(RwLock::new(Instant::now())),
102            is_connected: Arc::new(RwLock::new(true)),
103            consecutive_failures: Arc::new(RwLock::new(0)),
104        }
105    }
106
107    /// Create with custom reconnection strategy
108    pub fn with_strategy(inner: Arc<dyn MessageBackend>, strategy: ReconnectStrategy) -> Self {
109        Self {
110            inner,
111            strategy,
112            reconnect_attempts: Arc::new(RwLock::new(0)),
113            last_success: Arc::new(RwLock::new(Instant::now())),
114            is_connected: Arc::new(RwLock::new(true)),
115            consecutive_failures: Arc::new(RwLock::new(0)),
116        }
117    }
118
119    /// Get the inner backend
120    pub fn inner(&self) -> &Arc<dyn MessageBackend> {
121        &self.inner
122    }
123
124    /// Check if currently connected
125    pub async fn is_connected(&self) -> bool {
126        *self.is_connected.read().await
127    }
128
129    /// Get current reconnection attempt count
130    pub async fn reconnect_attempts(&self) -> u32 {
131        *self.reconnect_attempts.read().await
132    }
133
134    /// Get current consecutive failure count (for circuit breaker logic)
135    pub async fn consecutive_failures(&self) -> u32 {
136        *self.consecutive_failures.read().await
137    }
138
139    /// Execute an operation with automatic reconnection on failure.
140    ///
141    /// This method implements the "refuse to die" philosophy - it will retry
142    /// indefinitely until the operation succeeds or shutdown is requested.
143    async fn execute_with_retry<T, F, Fut>(&self, operation_name: &str, op: F) -> WorkerResult<T>
144    where
145        F: Fn() -> Fut,
146        Fut: std::future::Future<Output = WorkerResult<T>>,
147    {
148        let mut attempt = 0;
149
150        loop {
151            match op().await {
152                Ok(result) => {
153                    // Reset reconnection state on success
154                    if attempt > 0 {
155                        info!("{} succeeded after {} attempts", operation_name, attempt);
156                    }
157                    *self.reconnect_attempts.write().await = 0;
158                    *self.consecutive_failures.write().await = 0;
159                    *self.last_success.write().await = Instant::now();
160                    *self.is_connected.write().await = true;
161                    return Ok(result);
162                }
163                Err(e) => {
164                    attempt += 1;
165                    *self.reconnect_attempts.write().await = attempt;
166                    let failures = {
167                        let mut f = self.consecutive_failures.write().await;
168                        *f += 1;
169                        *f
170                    };
171                    *self.is_connected.write().await = false;
172
173                    warn!(
174                        "{} failed (attempt {}, consecutive failures: {}): {}. Retrying...",
175                        operation_name, attempt, failures, e
176                    );
177
178                    // Try to recover connection
179                    if let Err(recover_err) = self.try_recover().await {
180                        error!("Recovery attempt failed: {}", recover_err);
181                    }
182
183                    // Calculate delay with exponential backoff and jitter
184                    let delay = self.strategy.delay_for_attempt(attempt - 1);
185
186                    // Log every 10th attempt to avoid spam
187                    if attempt % 10 == 0 || attempt <= 3 {
188                        warn!(
189                            "Still trying {} (attempt {}) - next retry in {:?}",
190                            operation_name, attempt, delay
191                        );
192                    }
193
194                    tokio::time::sleep(delay).await;
195
196                    // Never give up - keep retrying forever
197                    // The only way out is through explicit shutdown
198                }
199            }
200        }
201    }
202
203    /// Attempt to recover the connection
204    async fn try_recover(&self) -> WorkerResult<()> {
205        // Try health check to see if we can reconnect
206        match self.inner.health_check().await {
207            Ok(_) => {
208                info!("Connection recovered");
209                *self.consecutive_failures.write().await = 0;
210                Ok(())
211            }
212            Err(e) => {
213                warn!("Health check failed during recovery: {}", e);
214                // Some backends need explicit reconnect logic here
215                // For now, we rely on the backend's own reconnection (deadpool, etc.)
216                Err(e)
217            }
218        }
219    }
220}
221
222#[async_trait::async_trait]
223impl MessageBackend for ResilientBackend {
224    async fn receive(&self) -> WorkerResult<ReceiveResult<serde_json::Value>> {
225        self.execute_with_retry("receive", || async { self.inner.receive().await })
226            .await
227    }
228
229    async fn ack(&self, message_id: &str) -> WorkerResult<()> {
230        // Ack operations are usually idempotent, so we don't retry indefinitely
231        // Just try once and let the caller handle failures
232        self.inner.ack(message_id).await
233    }
234
235    async fn nack(&self, message_id: &str, requeue: bool) -> WorkerResult<()> {
236        // Nack operations are critical - we should retry to avoid message loss
237        self.execute_with_retry("nack", || async {
238            self.inner.nack(message_id, requeue).await
239        })
240        .await
241    }
242
243    async fn health_check(&self) -> WorkerResult<()> {
244        self.inner.health_check().await
245    }
246
247    async fn shutdown(&self) -> WorkerResult<()> {
248        self.inner.shutdown().await
249    }
250}
251
252/// Builder for configuring resilient backends
253pub struct ResilientBackendBuilder {
254    inner: Arc<dyn MessageBackend>,
255    strategy: ReconnectStrategy,
256}
257
258impl ResilientBackendBuilder {
259    /// Create a new builder
260    pub fn new(inner: Arc<dyn MessageBackend>) -> Self {
261        Self {
262            inner,
263            strategy: ReconnectStrategy::default(),
264        }
265    }
266
267    /// Set reconnection strategy
268    pub fn with_strategy(mut self, strategy: ReconnectStrategy) -> Self {
269        self.strategy = strategy;
270        self
271    }
272
273    /// Build the resilient backend
274    pub fn build(self) -> ResilientBackend {
275        ResilientBackend::with_strategy(self.inner, self.strategy)
276    }
277}
278
279#[cfg(test)]
280mod tests {
281    use super::*;
282    use crate::backends::{MemoryBackend, ReceiveResult};
283    use crate::error::WorkerError;
284    use std::sync::atomic::{AtomicUsize, Ordering};
285
286    // Helper to create a mock backend that fails on command
287    struct FailingBackend {
288        fail_count: Arc<AtomicUsize>,
289        total_calls: Arc<AtomicUsize>,
290        succeed_after: usize,
291    }
292
293    impl FailingBackend {
294        fn new(succeed_after: usize) -> (Arc<Self>, Arc<AtomicUsize>, Arc<AtomicUsize>) {
295            let fail_count = Arc::new(AtomicUsize::new(0));
296            let total_calls = Arc::new(AtomicUsize::new(0));
297            (
298                Arc::new(Self {
299                    fail_count: fail_count.clone(),
300                    total_calls: total_calls.clone(),
301                    succeed_after,
302                }),
303                fail_count,
304                total_calls,
305            )
306        }
307    }
308
309    #[async_trait::async_trait]
310    impl MessageBackend for FailingBackend {
311        async fn receive(&self) -> WorkerResult<ReceiveResult<serde_json::Value>> {
312            let calls = self.total_calls.fetch_add(1, Ordering::SeqCst);
313            if calls < self.succeed_after {
314                self.fail_count.fetch_add(1, Ordering::SeqCst);
315                Err(WorkerError::BackendError(
316                    "Simulated network failure".to_string(),
317                ))
318            } else {
319                Ok(ReceiveResult::Shutdown)
320            }
321        }
322
323        async fn ack(&self, _message_id: &str) -> WorkerResult<()> {
324            Ok(())
325        }
326
327        async fn nack(&self, _message_id: &str, _requeue: bool) -> WorkerResult<()> {
328            Ok(())
329        }
330
331        async fn health_check(&self) -> WorkerResult<()> {
332            let calls = self.total_calls.load(Ordering::SeqCst);
333            if calls < self.succeed_after {
334                Err(WorkerError::BackendError("Health check failed".to_string()))
335            } else {
336                Ok(())
337            }
338        }
339
340        async fn shutdown(&self) -> WorkerResult<()> {
341            Ok(())
342        }
343    }
344
345    #[tokio::test]
346    async fn test_resilient_backend_wraps_successfully() {
347        let inner = Arc::new(MemoryBackend::new());
348        let resilient = ResilientBackend::new(inner.clone());
349
350        assert!(resilient.is_connected().await);
351        assert_eq!(resilient.reconnect_attempts().await, 0);
352        assert_eq!(resilient.consecutive_failures().await, 0);
353    }
354
355    #[tokio::test]
356    async fn test_resilient_backend_receive() {
357        let inner = MemoryBackend::new();
358        let backend_arc = Arc::new(inner);
359        let resilient = ResilientBackend::new(backend_arc.clone());
360
361        // Add a message (MemoryBackend uses enqueue, not add_message)
362        backend_arc.enqueue(serde_json::json!({"test": "data"}));
363
364        // Receive through resilient wrapper
365        let result = resilient.receive().await.unwrap();
366        assert!(result.is_message());
367        if let ReceiveResult::Message(msg) = result {
368            assert_eq!(msg.message.payload["test"], "data");
369        } else {
370            panic!("Expected Message variant");
371        }
372    }
373
374    #[tokio::test]
375    async fn test_resilient_backend_with_custom_strategy() {
376        let inner = Arc::new(MemoryBackend::new());
377        let strategy = ReconnectStrategy::Fixed(Duration::from_secs(1));
378        let resilient = ResilientBackend::with_strategy(inner, strategy);
379
380        assert!(resilient.is_connected().await);
381    }
382
383    #[tokio::test]
384    async fn test_exponential_backoff_calculation() {
385        let strategy = ReconnectStrategy::Exponential {
386            initial: Duration::from_millis(100),
387            max: Duration::from_secs(1),
388            multiplier: 2.0,
389            jitter_factor: 0.0, // No jitter for predictable testing
390        };
391
392        // Test exponential growth
393        assert_eq!(strategy.delay_for_attempt(0).as_millis(), 100); // 100ms
394        assert_eq!(strategy.delay_for_attempt(1).as_millis(), 200); // 200ms
395        assert_eq!(strategy.delay_for_attempt(2).as_millis(), 400); // 400ms
396        assert_eq!(strategy.delay_for_attempt(3).as_millis(), 800); // 800ms
397        assert_eq!(strategy.delay_for_attempt(4).as_millis(), 1000); // Capped at 1s
398        assert_eq!(strategy.delay_for_attempt(5).as_millis(), 1000); // Still capped
399    }
400
401    #[tokio::test]
402    async fn test_exponential_backoff_with_jitter() {
403        let strategy = ReconnectStrategy::Exponential {
404            initial: Duration::from_millis(100),
405            max: Duration::from_secs(1),
406            multiplier: 2.0,
407            jitter_factor: 0.5, // 50% jitter
408        };
409
410        // With 50% jitter, delay should be between base and base * 1.5
411        let delay = strategy.delay_for_attempt(0);
412        let base = 100;
413        assert!(delay.as_millis() >= base as u128);
414        assert!(delay.as_millis() <= (base as f64 * 1.5) as u128);
415    }
416
417    #[tokio::test]
418    async fn test_fixed_delay_strategy() {
419        let strategy = ReconnectStrategy::Fixed(Duration::from_secs(2));
420
421        // Should always return the same delay regardless of attempt
422        assert_eq!(strategy.delay_for_attempt(0).as_secs(), 2);
423        assert_eq!(strategy.delay_for_attempt(5).as_secs(), 2);
424        assert_eq!(strategy.delay_for_attempt(100).as_secs(), 2);
425    }
426
427    #[tokio::test]
428    async fn test_reconnection_on_failure() {
429        // Backend fails first 2 times, succeeds on 3rd
430        let (backend, fail_count, total_calls) = FailingBackend::new(2);
431        let resilient = ResilientBackend::new(backend);
432
433        // This should retry until success
434        let result = resilient.receive().await;
435
436        assert!(result.is_ok());
437        if let Ok(receive_result) = result {
438            assert!(receive_result.is_shutdown()); // FailingBackend returns Shutdown on success
439        }
440        assert_eq!(fail_count.load(Ordering::SeqCst), 2); // Failed twice
441        assert_eq!(total_calls.load(Ordering::SeqCst), 3); // Called 3 times total
442        assert_eq!(resilient.reconnect_attempts().await, 0); // Reset after success
443        assert_eq!(resilient.consecutive_failures().await, 0); // Reset after success
444        assert!(resilient.is_connected().await);
445    }
446
447    #[tokio::test]
448    async fn test_connection_state_tracking() {
449        // Backend fails first time, then succeeds
450        let (backend, _, _) = FailingBackend::new(1);
451        let resilient = ResilientBackend::new(backend);
452
453        // Initial state
454        assert!(resilient.is_connected().await);
455        assert_eq!(resilient.reconnect_attempts().await, 0);
456
457        // First call will fail and trigger reconnection
458        let _ = resilient.receive().await;
459
460        // After recovery, should be connected again
461        assert!(resilient.is_connected().await);
462        assert_eq!(resilient.reconnect_attempts().await, 0); // Reset after success
463    }
464
465    #[tokio::test]
466    async fn test_consecutive_failure_tracking() {
467        // Backend fails 3 times before succeeding
468        let (backend, _, _) = FailingBackend::new(3);
469        let resilient = ResilientBackend::new(backend);
470
471        // Start receiving - this will retry internally
472        let _ = resilient.receive().await;
473
474        // After success, failures should be reset
475        assert_eq!(resilient.consecutive_failures().await, 0);
476    }
477
478    #[tokio::test]
479    async fn test_ack_operations_dont_retry_indefinitely() {
480        let inner = Arc::new(MemoryBackend::new());
481        let resilient = ResilientBackend::new(inner.clone());
482
483        // Ack operations should complete immediately without retry logic
484        let result = resilient.ack("non-existent-id").await;
485        assert!(result.is_ok());
486    }
487
488    #[tokio::test]
489    async fn test_health_check_passthrough() {
490        let inner = Arc::new(MemoryBackend::new());
491        let resilient = ResilientBackend::new(inner.clone());
492
493        // Health check should pass through to inner backend
494        let result = resilient.health_check().await;
495        assert!(result.is_ok());
496    }
497
498    #[tokio::test]
499    async fn test_shutdown_passthrough() {
500        let inner = Arc::new(MemoryBackend::new());
501        let resilient = ResilientBackend::new(inner.clone());
502
503        // Shutdown should pass through
504        let result = resilient.shutdown().await;
505        assert!(result.is_ok());
506    }
507
508    #[tokio::test]
509    async fn test_builder_pattern() {
510        let inner = Arc::new(MemoryBackend::new());
511        let strategy = ReconnectStrategy::Exponential {
512            initial: Duration::from_millis(500),
513            max: Duration::from_secs(30),
514            multiplier: 2.5,
515            jitter_factor: 0.2,
516        };
517
518        let resilient = ResilientBackendBuilder::new(inner)
519            .with_strategy(strategy)
520            .build();
521
522        assert!(resilient.is_connected().await);
523    }
524
525    #[tokio::test]
526    async fn test_multiple_receive_operations() {
527        let inner = MemoryBackend::new();
528        let backend_arc = Arc::new(inner);
529        let resilient = ResilientBackend::new(backend_arc.clone());
530
531        // Add multiple messages
532        backend_arc.enqueue(serde_json::json!({"msg": 1}));
533        backend_arc.enqueue(serde_json::json!({"msg": 2}));
534        backend_arc.enqueue(serde_json::json!({"msg": 3}));
535
536        // Receive all messages through resilient wrapper
537        for expected in 1..=3 {
538            let result = resilient.receive().await.unwrap();
539            if let ReceiveResult::Message(msg) = result {
540                assert_eq!(msg.message.payload["msg"], expected);
541            } else {
542                panic!("Expected Message variant, got {:?}", result);
543            }
544        }
545
546        // All operations should succeed without retries
547        assert_eq!(resilient.reconnect_attempts().await, 0);
548    }
549
550    #[tokio::test]
551    async fn test_default_reconnect_strategy() {
552        let strategy = ReconnectStrategy::default();
553
554        // Verify default values
555        match strategy {
556            ReconnectStrategy::Exponential {
557                initial,
558                max,
559                multiplier,
560                jitter_factor,
561            } => {
562                assert_eq!(initial, Duration::from_secs(1));
563                assert_eq!(max, Duration::from_secs(60));
564                assert_eq!(multiplier, 2.0);
565                assert_eq!(jitter_factor, 0.1);
566            }
567            _ => panic!("Default should be Exponential"),
568        }
569    }
570}