feagi_agent/core/
reconnect.rs1use crate::core::error::Result;
7use std::time::Duration;
8use tracing::{info, warn};
9
10pub struct ReconnectionStrategy {
12 base_backoff_ms: u64,
14
15 max_backoff_ms: u64,
17
18 current_attempt: u32,
20
21 max_attempts: u32,
23}
24
25impl ReconnectionStrategy {
26 pub fn new(base_backoff_ms: u64, max_attempts: u32) -> Self {
32 Self {
33 base_backoff_ms,
34 max_backoff_ms: 60_000, current_attempt: 0,
36 max_attempts,
37 }
38 }
39
40 pub fn next_backoff(&mut self) -> Option<Duration> {
42 if self.max_attempts > 0 && self.current_attempt >= self.max_attempts {
44 return None;
45 }
46
47 self.current_attempt += 1;
48
49 let backoff_ms = if self.current_attempt == 1 {
51 self.base_backoff_ms
52 } else {
53 let exp = 2u64.saturating_pow(self.current_attempt - 1);
54 (self.base_backoff_ms * exp).min(self.max_backoff_ms)
55 };
56
57 Some(Duration::from_millis(backoff_ms))
58 }
59
60 pub fn reset(&mut self) {
62 self.current_attempt = 0;
63 }
64
65 pub fn attempt_number(&self) -> u32 {
67 self.current_attempt
68 }
69
70 pub fn is_exhausted(&self) -> bool {
72 self.max_attempts > 0 && self.current_attempt >= self.max_attempts
73 }
74}
75
76pub fn retry_with_backoff<F, T>(
93 mut operation: F,
94 strategy: &mut ReconnectionStrategy,
95 operation_name: &str,
96) -> Result<T>
97where
98 F: FnMut() -> Result<T>,
99{
100 loop {
101 match operation() {
102 Ok(result) => {
103 if strategy.attempt_number() > 0 {
104 info!(
105 "[RECONNECT] ✓ {} succeeded after {} attempts",
106 operation_name,
107 strategy.attempt_number()
108 );
109 }
110 strategy.reset();
111 return Ok(result);
112 }
113 Err(e) if e.is_retryable() => {
114 if let Some(backoff) = strategy.next_backoff() {
115 warn!(
116 "[RECONNECT] ⚠ {} failed (attempt {}): {} - retrying in {:?}",
117 operation_name,
118 strategy.attempt_number(),
119 e,
120 backoff
121 );
122 std::thread::sleep(backoff);
123 } else {
124 warn!(
125 "[RECONNECT] ✗ {} failed after {} attempts - giving up",
126 operation_name,
127 strategy.attempt_number()
128 );
129 return Err(e);
130 }
131 }
132 Err(e) => {
133 return Err(e);
135 }
136 }
137 }
138}
139
140#[cfg(test)]
141mod tests {
142 use super::*;
143
144 #[test]
145 fn test_exponential_backoff() {
146 let mut strategy = ReconnectionStrategy::new(100, 5);
147
148 assert_eq!(strategy.next_backoff(), Some(Duration::from_millis(100))); assert_eq!(strategy.next_backoff(), Some(Duration::from_millis(200))); assert_eq!(strategy.next_backoff(), Some(Duration::from_millis(400))); assert_eq!(strategy.next_backoff(), Some(Duration::from_millis(800))); assert_eq!(strategy.next_backoff(), Some(Duration::from_millis(1600))); assert_eq!(strategy.next_backoff(), None); }
155
156 #[test]
157 fn test_backoff_capped() {
158 let mut strategy = ReconnectionStrategy::new(1000, 20);
159
160 for _ in 0..10 {
162 strategy.next_backoff();
163 }
164
165 let backoff = strategy.next_backoff().unwrap();
167 assert_eq!(backoff, Duration::from_millis(60_000));
168 }
169
170 #[test]
171 fn test_reset() {
172 let mut strategy = ReconnectionStrategy::new(100, 5);
173
174 strategy.next_backoff();
175 strategy.next_backoff();
176 assert_eq!(strategy.attempt_number(), 2);
177
178 strategy.reset();
179 assert_eq!(strategy.attempt_number(), 0);
180 }
181
182 #[test]
183 fn test_is_exhausted() {
184 let mut strategy = ReconnectionStrategy::new(100, 2);
185
186 assert!(!strategy.is_exhausted());
187 strategy.next_backoff();
188 assert!(!strategy.is_exhausted());
189 strategy.next_backoff();
190 assert!(strategy.is_exhausted());
191 }
192
193 #[test]
194 fn test_infinite_retries() {
195 let mut strategy = ReconnectionStrategy::new(100, 0);
196
197 for _ in 0..20 {
200 assert!(strategy.next_backoff().is_some());
201 assert!(!strategy.is_exhausted());
202 }
203 }
204}