1use chrono::{DateTime, Utc};
6use serde::{Deserialize, Serialize};
7use std::collections::{HashMap, VecDeque};
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::Mutex;
10use tracing::info;
11
12#[derive(Debug, Clone, Serialize, Deserialize)]
14pub struct RequestRecord {
15 pub timestamp: DateTime<Utc>,
16 pub method: String,
17 pub uri: String,
18 pub backend: String,
19 pub model: Option<String>,
20 pub status: u16,
21 pub duration_ms: u64,
22 pub input_tokens: Option<u64>,
23 pub output_tokens: Option<u64>,
24 pub cache_read_tokens: Option<u64>,
25 pub cache_creation_tokens: Option<u64>,
26 pub error: Option<String>,
27}
28
29#[derive(Debug, Clone, Default, Serialize, Deserialize)]
31pub struct TokenUsage {
32 pub input_tokens: u64,
33 pub output_tokens: u64,
34 pub cache_read_tokens: u64,
35 pub cache_creation_tokens: u64,
36}
37
38impl TokenUsage {
39 pub fn parse_from_sse(chunk: &str) -> Option<Self> {
41 for line in chunk.lines() {
42 if let Some(data) = line.strip_prefix("data: ") {
43 if data == "[DONE]" {
44 continue;
45 }
46 if let Ok(json) = serde_json::from_str::<serde_json::Value>(data) {
47 if let Some(usage) = json.get("message").and_then(|m| m.get("usage")) {
49 return Some(Self {
50 input_tokens: usage.get("input_tokens").and_then(|v| v.as_u64()).unwrap_or(0),
51 output_tokens: usage.get("output_tokens").and_then(|v| v.as_u64()).unwrap_or(0),
52 cache_read_tokens: usage.get("cache_read_input_tokens").and_then(|v| v.as_u64()).unwrap_or(0),
53 cache_creation_tokens: usage.get("cache_creation_input_tokens").and_then(|v| v.as_u64()).unwrap_or(0),
54 });
55 }
56
57 if let Some(usage) = json.get("usage") {
59 return Some(Self {
60 input_tokens: usage.get("input_tokens").or(usage.get("prompt_tokens")).and_then(|v| v.as_u64()).unwrap_or(0),
61 output_tokens: usage.get("output_tokens").or(usage.get("completion_tokens")).and_then(|v| v.as_u64()).unwrap_or(0),
62 cache_read_tokens: usage.get("cache_read_input_tokens").and_then(|v| v.as_u64()).unwrap_or(0),
63 cache_creation_tokens: usage.get("cache_creation_input_tokens").and_then(|v| v.as_u64()).unwrap_or(0),
64 });
65 }
66 }
67 }
68 }
69 None
70 }
71}
72
73#[derive(Debug, Default)]
75pub struct RequestStats {
76 total_requests: AtomicU64,
77 success_count: AtomicU64,
78 client_error_count: AtomicU64,
79 server_error_count: AtomicU64,
80 total_duration_ms: AtomicU64,
81 total_input_tokens: AtomicU64,
82 total_output_tokens: AtomicU64,
83 total_cache_read_tokens: AtomicU64,
84 total_cache_creation_tokens: AtomicU64,
85 model_counts: Mutex<HashMap<String, AtomicU64>>,
86 backend_counts: Mutex<HashMap<String, AtomicU64>>,
87 recent_requests: Mutex<VecDeque<RequestRecord>>,
88 max_recent: usize,
89}
90
91impl RequestStats {
92 pub fn new(max_recent: usize) -> Self {
94 Self {
95 max_recent,
96 ..Default::default()
97 }
98 }
99
100 pub fn record_request(&self, record: RequestRecord) {
102 self.total_requests.fetch_add(1, Ordering::Relaxed);
103 self.total_duration_ms.fetch_add(record.duration_ms, Ordering::Relaxed);
104
105 if record.status >= 200 && record.status < 300 {
107 self.success_count.fetch_add(1, Ordering::Relaxed);
108 } else if record.status >= 400 && record.status < 500 {
109 self.client_error_count.fetch_add(1, Ordering::Relaxed);
110 } else if record.status >= 500 {
111 self.server_error_count.fetch_add(1, Ordering::Relaxed);
112 }
113
114 if let Some(t) = record.input_tokens {
116 self.total_input_tokens.fetch_add(t, Ordering::Relaxed);
117 }
118 if let Some(t) = record.output_tokens {
119 self.total_output_tokens.fetch_add(t, Ordering::Relaxed);
120 }
121 if let Some(t) = record.cache_read_tokens {
122 self.total_cache_read_tokens.fetch_add(t, Ordering::Relaxed);
123 }
124 if let Some(t) = record.cache_creation_tokens {
125 self.total_cache_creation_tokens.fetch_add(t, Ordering::Relaxed);
126 }
127
128 if let Some(ref model) = record.model
130 && let Ok(mut counts) = self.model_counts.lock() {
131 counts
132 .entry(model.clone())
133 .or_insert_with(|| AtomicU64::new(0))
134 .fetch_add(1, Ordering::Relaxed);
135 }
136
137 if !record.backend.is_empty()
139 && let Ok(mut counts) = self.backend_counts.lock() {
140 counts
141 .entry(record.backend.clone())
142 .or_insert_with(|| AtomicU64::new(0))
143 .fetch_add(1, Ordering::Relaxed);
144 }
145
146 if let Ok(mut recent) = self.recent_requests.lock() {
148 if recent.len() >= self.max_recent {
149 recent.pop_front();
150 }
151 recent.push_back(record);
152 }
153 }
154
155 pub fn summary(&self) -> StatsSummary {
157 let total = self.total_requests.load(Ordering::Relaxed);
158 let success = self.success_count.load(Ordering::Relaxed);
159 let client_err = self.client_error_count.load(Ordering::Relaxed);
160 let server_err = self.server_error_count.load(Ordering::Relaxed);
161 let total_duration = self.total_duration_ms.load(Ordering::Relaxed);
162
163 StatsSummary {
164 total_requests: total,
165 success_count: success,
166 client_error_count: client_err,
167 server_error_count: server_err,
168 success_rate: if total > 0 { success as f64 / total as f64 * 100.0 } else { 0.0 },
169 avg_duration_ms: if total > 0 { total_duration as f64 / total as f64 } else { 0.0 },
170 total_input_tokens: self.total_input_tokens.load(Ordering::Relaxed),
171 total_output_tokens: self.total_output_tokens.load(Ordering::Relaxed),
172 total_cache_read_tokens: self.total_cache_read_tokens.load(Ordering::Relaxed),
173 total_cache_creation_tokens: self.total_cache_creation_tokens.load(Ordering::Relaxed),
174 }
175 }
176
177 pub fn export_json(&self) -> serde_json::Value {
179 let summary = self.summary();
180
181 let model_counts: HashMap<String, u64> = match self.model_counts.lock() {
182 Ok(counts) => counts.iter().map(|(k, v)| (k.clone(), v.load(Ordering::Relaxed))).collect(),
183 Err(poisoned) => {
184 tracing::warn!("Stats lock poisoned, using default: {}", poisoned);
185 HashMap::new()
186 }
187 };
188
189 let backend_counts: HashMap<String, u64> = match self.backend_counts.lock() {
190 Ok(counts) => counts.iter().map(|(k, v)| (k.clone(), v.load(Ordering::Relaxed))).collect(),
191 Err(poisoned) => {
192 tracing::warn!("Backend counts lock poisoned: {}", poisoned);
193 HashMap::new()
194 }
195 };
196
197 serde_json::json!({
198 "summary": {
199 "total_requests": summary.total_requests,
200 "success_count": summary.success_count,
201 "client_error_count": summary.client_error_count,
202 "server_error_count": summary.server_error_count,
203 "success_rate_percent": format!("{:.2}", summary.success_rate),
204 "avg_duration_ms": format!("{:.2}", summary.avg_duration_ms),
205 "tokens": {
206 "input": summary.total_input_tokens,
207 "output": summary.total_output_tokens,
208 "cache_read": summary.total_cache_read_tokens,
209 "cache_creation": summary.total_cache_creation_tokens,
210 }
211 },
212 "model_counts": model_counts,
213 "backend_counts": backend_counts,
214 })
215 }
216
217 pub fn print_summary(&self) {
219 let summary = self.summary();
220 info!(
221 "Statistics: total={}, success={}, client_err={}, server_err={}, rate={:.1}%, avg={:.0}ms",
222 summary.total_requests,
223 summary.success_count,
224 summary.client_error_count,
225 summary.server_error_count,
226 summary.success_rate,
227 summary.avg_duration_ms
228 );
229 info!(
230 "Tokens: input={}, output={}, cache_read={}, cache_creation={}",
231 summary.total_input_tokens,
232 summary.total_output_tokens,
233 summary.total_cache_read_tokens,
234 summary.total_cache_creation_tokens
235 );
236 }
237
238 pub fn reset(&self) {
240 self.total_requests.store(0, Ordering::Relaxed);
241 self.success_count.store(0, Ordering::Relaxed);
242 self.client_error_count.store(0, Ordering::Relaxed);
243 self.server_error_count.store(0, Ordering::Relaxed);
244 self.total_duration_ms.store(0, Ordering::Relaxed);
245 self.total_input_tokens.store(0, Ordering::Relaxed);
246 self.total_output_tokens.store(0, Ordering::Relaxed);
247 self.total_cache_read_tokens.store(0, Ordering::Relaxed);
248 self.total_cache_creation_tokens.store(0, Ordering::Relaxed);
249
250 if let Ok(mut counts) = self.model_counts.lock() {
251 counts.clear();
252 }
253 if let Ok(mut counts) = self.backend_counts.lock() {
254 counts.clear();
255 }
256 if let Ok(mut recent) = self.recent_requests.lock() {
257 recent.clear();
258 }
259 }
260}
261
262#[derive(Debug, Clone, Serialize, Deserialize)]
264pub struct StatsSummary {
265 pub total_requests: u64,
266 pub success_count: u64,
267 pub client_error_count: u64,
268 pub server_error_count: u64,
269 pub success_rate: f64,
270 pub avg_duration_ms: f64,
271 pub total_input_tokens: u64,
272 pub total_output_tokens: u64,
273 pub total_cache_read_tokens: u64,
274 pub total_cache_creation_tokens: u64,
275}
276
277#[cfg(test)]
278mod tests {
279 use super::*;
280
281 #[test]
282 fn test_token_usage_parse() {
283 let chunk = r#"data: {"type":"message_start","message":{"id":"msg_xxx","usage":{"input_tokens":100,"output_tokens":0}}}"#;
284 let usage = TokenUsage::parse_from_sse(chunk);
285 assert!(usage.is_some());
286 let usage = usage.unwrap();
287 assert_eq!(usage.input_tokens, 100);
288 assert_eq!(usage.output_tokens, 0);
289 }
290
291 #[test]
292 fn test_stats_record() {
293 let stats = RequestStats::new(100);
294 stats.record_request(RequestRecord {
295 timestamp: Utc::now(),
296 method: "POST".to_string(),
297 uri: "/v1/messages".to_string(),
298 backend: "anthropic".to_string(),
299 model: Some("claude-3-opus".to_string()),
300 status: 200,
301 duration_ms: 1000,
302 input_tokens: Some(100),
303 output_tokens: Some(50),
304 cache_read_tokens: None,
305 cache_creation_tokens: None,
306 error: None,
307 });
308
309 let summary = stats.summary();
310 assert_eq!(summary.total_requests, 1);
311 assert_eq!(summary.success_count, 1);
312 assert_eq!(summary.total_input_tokens, 100);
313 assert_eq!(summary.total_output_tokens, 50);
314 }
315}