cc_sdk/
perf_utils.rs

1//! Performance utilities for the Claude Code SDK
2
3use crate::{errors::Result, types::Message};
4use std::collections::VecDeque;
5use std::time::Duration;
6use tokio::sync::mpsc;
7use tokio::time::{sleep, timeout};
8use tracing::{debug, warn};
9
10/// Configuration for retry logic
11#[derive(Debug, Clone)]
12pub struct RetryConfig {
13    /// Maximum number of retry attempts
14    pub max_retries: u32,
15    /// Initial delay between retries
16    pub initial_delay: Duration,
17    /// Maximum delay between retries
18    pub max_delay: Duration,
19    /// Multiplier for exponential backoff
20    pub backoff_multiplier: f64,
21    /// Jitter factor (0.0 to 1.0) to add randomness to delays
22    pub jitter_factor: f64,
23}
24
25impl Default for RetryConfig {
26    fn default() -> Self {
27        Self {
28            max_retries: 3,
29            initial_delay: Duration::from_millis(100),
30            max_delay: Duration::from_secs(30),
31            backoff_multiplier: 2.0,
32            jitter_factor: 0.1,
33        }
34    }
35}
36
37impl RetryConfig {
38    /// Execute a future with retry logic
39    pub async fn retry<F, Fut, T>(&self, mut f: F) -> Result<T>
40    where
41        F: FnMut() -> Fut,
42        Fut: std::future::Future<Output = Result<T>>,
43    {
44        let mut retries = 0;
45        let mut delay = self.initial_delay;
46
47        loop {
48            match f().await {
49                Ok(result) => return Ok(result),
50                Err(e) if retries < self.max_retries => {
51                    retries += 1;
52
53                    // Add jitter to delay
54                    let jitter = if self.jitter_factor > 0.0 {
55                        let jitter_range = delay.as_secs_f64() * self.jitter_factor;
56                        let jitter = rand::random::<f64>() * jitter_range - (jitter_range / 2.0);
57                        Duration::from_secs_f64(jitter.abs())
58                    } else {
59                        Duration::ZERO
60                    };
61
62                    let actual_delay = delay + jitter;
63                    warn!(
64                        "Attempt {} failed, retrying in {:?}: {}",
65                        retries, actual_delay, e
66                    );
67
68                    sleep(actual_delay).await;
69
70                    // Calculate next delay with exponential backoff
71                    delay = Duration::from_secs_f64(
72                        (delay.as_secs_f64() * self.backoff_multiplier)
73                            .min(self.max_delay.as_secs_f64()),
74                    );
75                }
76                Err(e) => return Err(e),
77            }
78        }
79    }
80}
81
82/// Message batcher for efficient processing
83pub struct MessageBatcher {
84    /// Buffer for messages
85    buffer: VecDeque<Message>,
86    /// Maximum batch size
87    max_batch_size: usize,
88    /// Maximum wait time for a batch
89    max_wait_time: Duration,
90    /// Channel for incoming messages
91    input_rx: mpsc::Receiver<Message>,
92    /// Channel for outgoing batches
93    output_tx: mpsc::Sender<Vec<Message>>,
94}
95
96impl MessageBatcher {
97    /// Create a new message batcher
98    pub fn new(
99        max_batch_size: usize,
100        max_wait_time: Duration,
101    ) -> (Self, mpsc::Sender<Message>, mpsc::Receiver<Vec<Message>>) {
102        let (input_tx, input_rx) = mpsc::channel(100);
103        let (output_tx, output_rx) = mpsc::channel(10);
104
105        let batcher = Self {
106            buffer: VecDeque::new(),
107            max_batch_size,
108            max_wait_time,
109            input_rx,
110            output_tx,
111        };
112
113        (batcher, input_tx, output_rx)
114    }
115
116    /// Run the batcher
117    pub async fn run(mut self) {
118        loop {
119            // Wait for messages with timeout
120            let timeout_result = timeout(self.max_wait_time, self.input_rx.recv()).await;
121
122            match timeout_result {
123                Ok(Some(msg)) => {
124                    self.buffer.push_back(msg);
125
126                    // Check if we should emit a batch
127                    if self.buffer.len() >= self.max_batch_size {
128                        self.emit_batch().await;
129                    }
130                }
131                Ok(None) => {
132                    // Channel closed, emit remaining messages and exit
133                    if !self.buffer.is_empty() {
134                        self.emit_batch().await;
135                    }
136                    break;
137                }
138                Err(_) => {
139                    // Timeout, emit batch if we have messages
140                    if !self.buffer.is_empty() {
141                        self.emit_batch().await;
142                    }
143                }
144            }
145        }
146    }
147
148    /// Emit a batch of messages
149    async fn emit_batch(&mut self) {
150        if self.buffer.is_empty() {
151            return;
152        }
153
154        let batch: Vec<Message> = self.buffer.drain(..).collect();
155        debug!("Emitting batch of {} messages", batch.len());
156
157        if self.output_tx.send(batch).await.is_err() {
158            warn!("Failed to send batch, receiver dropped");
159        }
160    }
161}
162
163/// Performance metrics collector
164#[derive(Debug, Default, Clone)]
165pub struct PerformanceMetrics {
166    /// Total number of requests
167    pub total_requests: u64,
168    /// Number of successful requests
169    pub successful_requests: u64,
170    /// Number of failed requests
171    pub failed_requests: u64,
172    /// Total latency in milliseconds
173    pub total_latency_ms: u64,
174    /// Maximum latency in milliseconds
175    pub max_latency_ms: u64,
176    /// Minimum latency in milliseconds
177    pub min_latency_ms: u64,
178}
179
180impl PerformanceMetrics {
181    /// Record a successful request
182    pub fn record_success(&mut self, latency_ms: u64) {
183        self.total_requests += 1;
184        self.successful_requests += 1;
185        self.total_latency_ms += latency_ms;
186        self.max_latency_ms = self.max_latency_ms.max(latency_ms);
187        self.min_latency_ms = if self.min_latency_ms == 0 {
188            latency_ms
189        } else {
190            self.min_latency_ms.min(latency_ms)
191        };
192    }
193
194    /// Record a failed request
195    pub fn record_failure(&mut self) {
196        self.total_requests += 1;
197        self.failed_requests += 1;
198    }
199
200    /// Get average latency
201    pub fn average_latency_ms(&self) -> f64 {
202        if self.successful_requests == 0 {
203            0.0
204        } else {
205            self.total_latency_ms as f64 / self.successful_requests as f64
206        }
207    }
208
209    /// Get success rate
210    pub fn success_rate(&self) -> f64 {
211        if self.total_requests == 0 {
212            0.0
213        } else {
214            self.successful_requests as f64 / self.total_requests as f64
215        }
216    }
217}
218
219#[cfg(test)]
220mod tests {
221    use super::*;
222
223    #[test]
224    fn test_retry_config_default() {
225        let config = RetryConfig::default();
226        assert_eq!(config.max_retries, 3);
227        assert_eq!(config.initial_delay, Duration::from_millis(100));
228        assert_eq!(config.backoff_multiplier, 2.0);
229    }
230
231    #[test]
232    fn test_performance_metrics() {
233        let mut metrics = PerformanceMetrics::default();
234
235        metrics.record_success(100);
236        metrics.record_success(200);
237        metrics.record_failure();
238
239        assert_eq!(metrics.total_requests, 3);
240        assert_eq!(metrics.successful_requests, 2);
241        assert_eq!(metrics.failed_requests, 1);
242        assert_eq!(metrics.average_latency_ms(), 150.0);
243        assert!((metrics.success_rate() - 0.666).abs() < 0.01);
244    }
245}