1use crate::{errors::Result, types::Message};
4use std::collections::VecDeque;
5use std::time::Duration;
6use tokio::sync::mpsc;
7use tokio::time::{sleep, timeout};
8use tracing::{debug, warn};
9
10#[derive(Debug, Clone)]
12pub struct RetryConfig {
13 pub max_retries: u32,
15 pub initial_delay: Duration,
17 pub max_delay: Duration,
19 pub backoff_multiplier: f64,
21 pub jitter_factor: f64,
23}
24
25impl Default for RetryConfig {
26 fn default() -> Self {
27 Self {
28 max_retries: 3,
29 initial_delay: Duration::from_millis(100),
30 max_delay: Duration::from_secs(30),
31 backoff_multiplier: 2.0,
32 jitter_factor: 0.1,
33 }
34 }
35}
36
37impl RetryConfig {
38 pub async fn retry<F, Fut, T>(&self, mut f: F) -> Result<T>
40 where
41 F: FnMut() -> Fut,
42 Fut: std::future::Future<Output = Result<T>>,
43 {
44 let mut retries = 0;
45 let mut delay = self.initial_delay;
46
47 loop {
48 match f().await {
49 Ok(result) => return Ok(result),
50 Err(e) if retries < self.max_retries => {
51 retries += 1;
52
53 let jitter = if self.jitter_factor > 0.0 {
55 let jitter_range = delay.as_secs_f64() * self.jitter_factor;
56 let jitter = rand::random::<f64>() * jitter_range - (jitter_range / 2.0);
57 Duration::from_secs_f64(jitter.abs())
58 } else {
59 Duration::ZERO
60 };
61
62 let actual_delay = delay + jitter;
63 warn!(
64 "Attempt {} failed, retrying in {:?}: {}",
65 retries, actual_delay, e
66 );
67
68 sleep(actual_delay).await;
69
70 delay = Duration::from_secs_f64(
72 (delay.as_secs_f64() * self.backoff_multiplier)
73 .min(self.max_delay.as_secs_f64()),
74 );
75 }
76 Err(e) => return Err(e),
77 }
78 }
79 }
80}
81
82pub struct MessageBatcher {
84 buffer: VecDeque<Message>,
86 max_batch_size: usize,
88 max_wait_time: Duration,
90 input_rx: mpsc::Receiver<Message>,
92 output_tx: mpsc::Sender<Vec<Message>>,
94}
95
96impl MessageBatcher {
97 pub fn new(
99 max_batch_size: usize,
100 max_wait_time: Duration,
101 ) -> (Self, mpsc::Sender<Message>, mpsc::Receiver<Vec<Message>>) {
102 let (input_tx, input_rx) = mpsc::channel(100);
103 let (output_tx, output_rx) = mpsc::channel(10);
104
105 let batcher = Self {
106 buffer: VecDeque::new(),
107 max_batch_size,
108 max_wait_time,
109 input_rx,
110 output_tx,
111 };
112
113 (batcher, input_tx, output_rx)
114 }
115
116 pub async fn run(mut self) {
118 loop {
119 let timeout_result = timeout(self.max_wait_time, self.input_rx.recv()).await;
121
122 match timeout_result {
123 Ok(Some(msg)) => {
124 self.buffer.push_back(msg);
125
126 if self.buffer.len() >= self.max_batch_size {
128 self.emit_batch().await;
129 }
130 }
131 Ok(None) => {
132 if !self.buffer.is_empty() {
134 self.emit_batch().await;
135 }
136 break;
137 }
138 Err(_) => {
139 if !self.buffer.is_empty() {
141 self.emit_batch().await;
142 }
143 }
144 }
145 }
146 }
147
148 async fn emit_batch(&mut self) {
150 if self.buffer.is_empty() {
151 return;
152 }
153
154 let batch: Vec<Message> = self.buffer.drain(..).collect();
155 debug!("Emitting batch of {} messages", batch.len());
156
157 if self.output_tx.send(batch).await.is_err() {
158 warn!("Failed to send batch, receiver dropped");
159 }
160 }
161}
162
163#[derive(Debug, Default, Clone)]
165pub struct PerformanceMetrics {
166 pub total_requests: u64,
168 pub successful_requests: u64,
170 pub failed_requests: u64,
172 pub total_latency_ms: u64,
174 pub max_latency_ms: u64,
176 pub min_latency_ms: u64,
178}
179
180impl PerformanceMetrics {
181 pub fn record_success(&mut self, latency_ms: u64) {
183 self.total_requests += 1;
184 self.successful_requests += 1;
185 self.total_latency_ms += latency_ms;
186 self.max_latency_ms = self.max_latency_ms.max(latency_ms);
187 self.min_latency_ms = if self.min_latency_ms == 0 {
188 latency_ms
189 } else {
190 self.min_latency_ms.min(latency_ms)
191 };
192 }
193
194 pub fn record_failure(&mut self) {
196 self.total_requests += 1;
197 self.failed_requests += 1;
198 }
199
200 pub fn average_latency_ms(&self) -> f64 {
202 if self.successful_requests == 0 {
203 0.0
204 } else {
205 self.total_latency_ms as f64 / self.successful_requests as f64
206 }
207 }
208
209 pub fn success_rate(&self) -> f64 {
211 if self.total_requests == 0 {
212 0.0
213 } else {
214 self.successful_requests as f64 / self.total_requests as f64
215 }
216 }
217}
218
219#[cfg(test)]
220mod tests {
221 use super::*;
222
223 #[test]
224 fn test_retry_config_default() {
225 let config = RetryConfig::default();
226 assert_eq!(config.max_retries, 3);
227 assert_eq!(config.initial_delay, Duration::from_millis(100));
228 assert_eq!(config.backoff_multiplier, 2.0);
229 }
230
231 #[test]
232 fn test_performance_metrics() {
233 let mut metrics = PerformanceMetrics::default();
234
235 metrics.record_success(100);
236 metrics.record_success(200);
237 metrics.record_failure();
238
239 assert_eq!(metrics.total_requests, 3);
240 assert_eq!(metrics.successful_requests, 2);
241 assert_eq!(metrics.failed_requests, 1);
242 assert_eq!(metrics.average_latency_ms(), 150.0);
243 assert!((metrics.success_rate() - 0.666).abs() < 0.01);
244 }
245}