1use std::sync::atomic::{AtomicBool, Ordering};
2use std::sync::Arc;
3
4use tokio::sync::mpsc;
5use tracing::{error, info, warn};
6
7use crate::metrics::bus::MetricsBus;
8use crate::metrics::events::{ChatEvent, ForwardEvent, MetricsEvent, SystemEvent};
9use crate::metrics::storage::{MetricsStorage, ToolCallCompletion};
10use crate::metrics::types::ForwardStatus;
11
12pub struct MetricsWorker {
14 storage: Arc<dyn MetricsStorage>,
15 running: Arc<AtomicBool>,
16}
17
18impl MetricsWorker {
19 pub fn new(storage: Arc<dyn MetricsStorage>) -> Self {
21 Self {
22 storage,
23 running: Arc::new(AtomicBool::new(false)),
24 }
25 }
26
27 pub fn spawn(
31 &self,
32 mut receiver: mpsc::Receiver<MetricsEvent>,
33 bus: MetricsBus,
34 ) -> Arc<AtomicBool> {
35 let storage = Arc::clone(&self.storage);
36 let running = Arc::clone(&self.running);
37 running.store(true, Ordering::SeqCst);
38
39 let running_clone = Arc::clone(&running);
40
41 tokio::spawn(async move {
42 info!("MetricsWorker started");
43 bus.emit(MetricsEvent::System(SystemEvent::WorkerStarted));
44
45 while running.load(Ordering::SeqCst) {
46 match receiver.recv().await {
47 Some(event) => {
48 if let Err(e) = Self::handle_event(&storage, &event).await {
49 warn!("Failed to handle metrics event: {}", e);
50 bus.emit(MetricsEvent::System(SystemEvent::StorageError {
51 error: e.to_string(),
52 event_type: event_type_name(&event),
53 }));
54 }
55 }
56 None => {
57 info!("MetricsWorker channel closed");
58 break;
59 }
60 }
61 }
62
63 info!("MetricsWorker stopped");
64 bus.emit(MetricsEvent::System(SystemEvent::WorkerStopped));
65 });
66
67 running_clone
68 }
69
70 async fn handle_event(
72 storage: &Arc<dyn MetricsStorage>,
73 event: &MetricsEvent,
74 ) -> anyhow::Result<()> {
75 match event {
76 MetricsEvent::Chat(chat_event) => Self::handle_chat_event(storage, chat_event).await,
77 MetricsEvent::Forward(forward_event) => {
78 Self::handle_forward_event(storage, forward_event).await
79 }
80 MetricsEvent::System(system_event) => {
81 match system_event {
83 SystemEvent::WorkerStarted => info!("System: WorkerStarted"),
84 SystemEvent::WorkerStopped => info!("System: WorkerStopped"),
85 SystemEvent::MetricsDropped { count, reason } => {
86 warn!(
87 "System: MetricsDropped - {} events, reason: {}",
88 count, reason
89 );
90 }
91 SystemEvent::StorageError { error, event_type } => {
92 error!("System: StorageError for {} - {}", event_type, error);
93 }
94 }
95 Ok(())
96 }
97 }
98 }
99
100 async fn handle_chat_event(
102 storage: &Arc<dyn MetricsStorage>,
103 event: &ChatEvent,
104 ) -> anyhow::Result<()> {
105 match event {
106 ChatEvent::SessionStarted {
107 session_id,
108 model,
109 meta,
110 ..
111 } => {
112 storage
113 .upsert_session_start(session_id, model, meta.occurred_at)
114 .await?;
115 info!("Chat: SessionStarted - {}", session_id);
116 }
117 ChatEvent::SessionCompleted {
118 session_id,
119 status,
120 meta,
121 } => {
122 storage
123 .complete_session(session_id, *status, meta.occurred_at)
124 .await?;
125 info!("Chat: SessionCompleted - {} ({:?})", session_id, status);
126 }
127 ChatEvent::RoundStarted {
128 round_id,
129 session_id,
130 model,
131 meta,
132 } => {
133 storage
134 .insert_round_start(round_id, session_id, model, meta.occurred_at)
135 .await?;
136 info!(
137 "Chat: RoundStarted - {} in session {}",
138 round_id, session_id
139 );
140 }
141 ChatEvent::RoundCompleted {
142 round_id,
143 status,
144 usage,
145 error,
146 meta,
147 ..
148 } => {
149 storage
150 .complete_round(
151 round_id,
152 meta.occurred_at,
153 *status,
154 *usage,
155 0,
156 0,
157 error.clone(),
158 )
159 .await?;
160 info!(
161 "Chat: RoundCompleted - {} ({:?}) - {} tokens",
162 round_id, status, usage.total_tokens
163 );
164 }
165 ChatEvent::ToolCalled {
166 tool_call_id,
167 round_id,
168 session_id,
169 tool_name,
170 latency_ms,
171 success,
172 meta,
173 } => {
174 storage
176 .insert_tool_start(
177 tool_call_id,
178 round_id,
179 session_id,
180 tool_name,
181 meta.occurred_at,
182 )
183 .await?;
184
185 let completion = ToolCallCompletion {
187 completed_at: meta.occurred_at,
188 success: *success,
189 error: if *success {
190 None
191 } else {
192 Some(format!("Tool failed after {}ms", latency_ms))
193 },
194 };
195 storage.complete_tool_call(tool_call_id, completion).await?;
196 info!(
197 "Chat: ToolCalled - {} ({}) - {}ms",
198 tool_name,
199 if *success { "success" } else { "failed" },
200 latency_ms
201 );
202 }
203 ChatEvent::MessageCountUpdated {
204 session_id,
205 message_count,
206 meta,
207 } => {
208 storage
209 .update_session_message_count(session_id, *message_count, meta.occurred_at)
210 .await?;
211 }
212 }
213 Ok(())
214 }
215
216 async fn handle_forward_event(
218 storage: &Arc<dyn MetricsStorage>,
219 event: &ForwardEvent,
220 ) -> anyhow::Result<()> {
221 match event {
222 ForwardEvent::RequestStarted {
223 request_id,
224 endpoint,
225 model,
226 is_stream,
227 meta,
228 } => {
229 storage
230 .insert_forward_start(request_id, endpoint, model, *is_stream, meta.occurred_at)
231 .await?;
232 info!(
233 "Forward: RequestStarted - {} to {} (stream: {})",
234 request_id, endpoint, is_stream
235 );
236 }
237 ForwardEvent::RequestCompleted {
238 request_id,
239 status_code,
240 status,
241 usage,
242 latency_ms,
243 error,
244 meta,
245 } => {
246 storage
247 .complete_forward(
248 request_id,
249 meta.occurred_at,
250 Some(*status_code),
251 *status,
252 *usage,
253 error.clone(),
254 )
255 .await?;
256 info!(
257 "Forward: RequestCompleted - {} ({} {}) - {}ms - {} tokens",
258 request_id,
259 status_code,
260 match status {
261 ForwardStatus::Pending => "pending",
262 ForwardStatus::Success => "success",
263 ForwardStatus::Error => "error",
264 },
265 latency_ms,
266 usage.as_ref().map(|u| u.total_tokens).unwrap_or(0)
267 );
268 }
269 }
270 Ok(())
271 }
272
273 pub fn stop(&self) {
275 self.running.store(false, Ordering::SeqCst);
276 }
277}
278
279fn event_type_name(event: &MetricsEvent) -> String {
280 match event {
281 MetricsEvent::Chat(e) => match e {
282 ChatEvent::SessionStarted { .. } => "Chat::SessionStarted",
283 ChatEvent::SessionCompleted { .. } => "Chat::SessionCompleted",
284 ChatEvent::RoundStarted { .. } => "Chat::RoundStarted",
285 ChatEvent::RoundCompleted { .. } => "Chat::RoundCompleted",
286 ChatEvent::ToolCalled { .. } => "Chat::ToolCalled",
287 ChatEvent::MessageCountUpdated { .. } => "Chat::MessageCountUpdated",
288 }
289 .to_string(),
290 MetricsEvent::Forward(e) => match e {
291 ForwardEvent::RequestStarted { .. } => "Forward::RequestStarted",
292 ForwardEvent::RequestCompleted { .. } => "Forward::RequestCompleted",
293 }
294 .to_string(),
295 MetricsEvent::System(e) => match e {
296 SystemEvent::WorkerStarted => "System::WorkerStarted",
297 SystemEvent::WorkerStopped => "System::WorkerStopped",
298 SystemEvent::MetricsDropped { .. } => "System::MetricsDropped",
299 SystemEvent::StorageError { .. } => "System::StorageError",
300 }
301 .to_string(),
302 }
303}
304
305#[cfg(test)]
306mod tests {
307 use super::*;
308 use crate::metrics::events::EventMeta;
309 use crate::metrics::types::{RoundStatus, TokenUsage};
310 use std::path::PathBuf;
311 use tempfile::tempdir;
312
313 async fn create_test_storage() -> (Arc<dyn MetricsStorage>, PathBuf) {
314 let dir = tempdir().expect("temp dir");
315 let db_path = dir.path().join("metrics.db");
316 std::mem::forget(dir);
318 let storage = Arc::new(crate::metrics::storage::SqliteMetricsStorage::new(&db_path));
319 storage.init().await.expect("init storage");
320 (storage, db_path)
321 }
322
323 #[tokio::test]
324 async fn test_worker_handles_chat_events() {
325 let (storage, _db_path) = create_test_storage().await;
326 let worker = MetricsWorker::new(Arc::clone(&storage));
327 let (bus, rx) = MetricsBus::new(100);
328
329 let running = worker.spawn(rx, bus.clone());
330
331 bus.emit(MetricsEvent::Chat(ChatEvent::SessionStarted {
333 meta: EventMeta::new(),
334 session_id: "test-session".to_string(),
335 model: "gpt-4".to_string(),
336 }));
337
338 bus.emit(MetricsEvent::Chat(ChatEvent::RoundStarted {
339 meta: EventMeta::new(),
340 round_id: "test-round".to_string(),
341 session_id: "test-session".to_string(),
342 model: "gpt-4".to_string(),
343 }));
344
345 bus.emit(MetricsEvent::Chat(ChatEvent::RoundCompleted {
346 meta: EventMeta::new(),
347 round_id: "test-round".to_string(),
348 session_id: "test-session".to_string(),
349 status: RoundStatus::Success,
350 usage: TokenUsage {
351 prompt_tokens: 10,
352 completion_tokens: 20,
353 total_tokens: 30,
354 },
355 latency_ms: 1000,
356 error: None,
357 }));
358
359 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
361
362 let summary = storage
364 .summary(crate::metrics::types::MetricsDateFilter::default())
365 .await
366 .expect("get summary");
367 assert_eq!(summary.total_sessions, 1);
368
369 running.store(false, Ordering::SeqCst);
371 }
372
373 #[tokio::test]
374 async fn test_worker_handles_forward_events() {
375 let (storage, _db_path) = create_test_storage().await;
376 let worker = MetricsWorker::new(Arc::clone(&storage));
377 let (bus, rx) = MetricsBus::new(100);
378
379 let running = worker.spawn(rx, bus.clone());
380
381 bus.emit(MetricsEvent::Forward(ForwardEvent::RequestStarted {
383 meta: EventMeta::new(),
384 request_id: "req-123".to_string(),
385 endpoint: "openai.chat_completions".to_string(),
386 model: "gpt-4".to_string(),
387 is_stream: true,
388 }));
389
390 bus.emit(MetricsEvent::Forward(ForwardEvent::RequestCompleted {
391 meta: EventMeta::new(),
392 request_id: "req-123".to_string(),
393 status_code: 200,
394 status: ForwardStatus::Success,
395 usage: Some(TokenUsage {
396 prompt_tokens: 50,
397 completion_tokens: 100,
398 total_tokens: 150,
399 }),
400 latency_ms: 500,
401 error: None,
402 }));
403
404 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
406
407 let summary = storage
409 .forward_summary(crate::metrics::types::ForwardMetricsFilter::default())
410 .await
411 .expect("get forward summary");
412 assert_eq!(summary.total_requests, 1);
413
414 running.store(false, Ordering::SeqCst);
416 }
417}