Skip to main content

bamboo_engine/metrics/
worker.rs

1use std::sync::atomic::{AtomicBool, Ordering};
2use std::sync::Arc;
3
4use tokio::sync::mpsc;
5use tracing::{error, info, warn};
6
7use crate::metrics::bus::MetricsBus;
8use crate::metrics::events::{ChatEvent, ForwardEvent, MetricsEvent, SystemEvent};
9use crate::metrics::storage::{MetricsStorage, ToolCallCompletion};
10use crate::metrics::types::ForwardStatus;
11
12/// Worker that consumes metrics events from the bus and writes them to storage
13pub struct MetricsWorker {
14    storage: Arc<dyn MetricsStorage>,
15    running: Arc<AtomicBool>,
16}
17
18impl MetricsWorker {
19    /// Create a new metrics worker with the given storage backend
20    pub fn new(storage: Arc<dyn MetricsStorage>) -> Self {
21        Self {
22            storage,
23            running: Arc::new(AtomicBool::new(false)),
24        }
25    }
26
27    /// Spawn the worker task
28    ///
29    /// Returns a handle to stop the worker
30    pub fn spawn(
31        &self,
32        mut receiver: mpsc::Receiver<MetricsEvent>,
33        bus: MetricsBus,
34    ) -> Arc<AtomicBool> {
35        let storage = Arc::clone(&self.storage);
36        let running = Arc::clone(&self.running);
37        running.store(true, Ordering::SeqCst);
38
39        let running_clone = Arc::clone(&running);
40
41        tokio::spawn(async move {
42            info!("MetricsWorker started");
43            bus.emit(MetricsEvent::System(SystemEvent::WorkerStarted));
44
45            while running.load(Ordering::SeqCst) {
46                match receiver.recv().await {
47                    Some(event) => {
48                        if let Err(e) = Self::handle_event(&storage, &event).await {
49                            warn!("Failed to handle metrics event: {}", e);
50                            bus.emit(MetricsEvent::System(SystemEvent::StorageError {
51                                error: e.to_string(),
52                                event_type: event_type_name(&event),
53                            }));
54                        }
55                    }
56                    None => {
57                        info!("MetricsWorker channel closed");
58                        break;
59                    }
60                }
61            }
62
63            info!("MetricsWorker stopped");
64            bus.emit(MetricsEvent::System(SystemEvent::WorkerStopped));
65        });
66
67        running_clone
68    }
69
70    /// Handle a single metrics event
71    async fn handle_event(
72        storage: &Arc<dyn MetricsStorage>,
73        event: &MetricsEvent,
74    ) -> anyhow::Result<()> {
75        match event {
76            MetricsEvent::Chat(chat_event) => Self::handle_chat_event(storage, chat_event).await,
77            MetricsEvent::Forward(forward_event) => {
78                Self::handle_forward_event(storage, forward_event).await
79            }
80            MetricsEvent::System(system_event) => {
81                // Just log system events
82                match system_event {
83                    SystemEvent::WorkerStarted => info!("System: WorkerStarted"),
84                    SystemEvent::WorkerStopped => info!("System: WorkerStopped"),
85                    SystemEvent::MetricsDropped { count, reason } => {
86                        warn!(
87                            "System: MetricsDropped - {} events, reason: {}",
88                            count, reason
89                        );
90                    }
91                    SystemEvent::StorageError { error, event_type } => {
92                        error!("System: StorageError for {} - {}", event_type, error);
93                    }
94                }
95                Ok(())
96            }
97        }
98    }
99
100    /// Handle chat-related events
101    async fn handle_chat_event(
102        storage: &Arc<dyn MetricsStorage>,
103        event: &ChatEvent,
104    ) -> anyhow::Result<()> {
105        match event {
106            ChatEvent::SessionStarted {
107                session_id,
108                model,
109                meta,
110                ..
111            } => {
112                storage
113                    .upsert_session_start(session_id, model, meta.occurred_at)
114                    .await?;
115                info!("Chat: SessionStarted - {}", session_id);
116            }
117            ChatEvent::SessionCompleted {
118                session_id,
119                status,
120                meta,
121            } => {
122                storage
123                    .complete_session(session_id, *status, meta.occurred_at)
124                    .await?;
125                info!("Chat: SessionCompleted - {} ({:?})", session_id, status);
126            }
127            ChatEvent::RoundStarted {
128                round_id,
129                session_id,
130                model,
131                meta,
132            } => {
133                storage
134                    .insert_round_start(round_id, session_id, model, meta.occurred_at)
135                    .await?;
136                info!(
137                    "Chat: RoundStarted - {} in session {}",
138                    round_id, session_id
139                );
140            }
141            ChatEvent::RoundCompleted {
142                round_id,
143                status,
144                usage,
145                error,
146                meta,
147                ..
148            } => {
149                storage
150                    .complete_round(
151                        round_id,
152                        meta.occurred_at,
153                        *status,
154                        *usage,
155                        0,
156                        0,
157                        error.clone(),
158                    )
159                    .await?;
160                info!(
161                    "Chat: RoundCompleted - {} ({:?}) - {} tokens",
162                    round_id, status, usage.total_tokens
163                );
164            }
165            ChatEvent::ToolCalled {
166                tool_call_id,
167                round_id,
168                session_id,
169                tool_name,
170                latency_ms,
171                success,
172                meta,
173            } => {
174                // Insert tool start first
175                storage
176                    .insert_tool_start(
177                        tool_call_id,
178                        round_id,
179                        session_id,
180                        tool_name,
181                        meta.occurred_at,
182                    )
183                    .await?;
184
185                // Then complete it
186                let completion = ToolCallCompletion {
187                    completed_at: meta.occurred_at,
188                    success: *success,
189                    error: if *success {
190                        None
191                    } else {
192                        Some(format!("Tool failed after {}ms", latency_ms))
193                    },
194                };
195                storage.complete_tool_call(tool_call_id, completion).await?;
196                info!(
197                    "Chat: ToolCalled - {} ({}) - {}ms",
198                    tool_name,
199                    if *success { "success" } else { "failed" },
200                    latency_ms
201                );
202            }
203            ChatEvent::MessageCountUpdated {
204                session_id,
205                message_count,
206                meta,
207            } => {
208                storage
209                    .update_session_message_count(session_id, *message_count, meta.occurred_at)
210                    .await?;
211            }
212        }
213        Ok(())
214    }
215
216    /// Handle forward-related events
217    async fn handle_forward_event(
218        storage: &Arc<dyn MetricsStorage>,
219        event: &ForwardEvent,
220    ) -> anyhow::Result<()> {
221        match event {
222            ForwardEvent::RequestStarted {
223                request_id,
224                endpoint,
225                model,
226                is_stream,
227                meta,
228            } => {
229                storage
230                    .insert_forward_start(request_id, endpoint, model, *is_stream, meta.occurred_at)
231                    .await?;
232                info!(
233                    "Forward: RequestStarted - {} to {} (stream: {})",
234                    request_id, endpoint, is_stream
235                );
236            }
237            ForwardEvent::RequestCompleted {
238                request_id,
239                status_code,
240                status,
241                usage,
242                latency_ms,
243                error,
244                meta,
245            } => {
246                storage
247                    .complete_forward(
248                        request_id,
249                        meta.occurred_at,
250                        Some(*status_code),
251                        *status,
252                        *usage,
253                        error.clone(),
254                    )
255                    .await?;
256                info!(
257                    "Forward: RequestCompleted - {} ({} {}) - {}ms - {} tokens",
258                    request_id,
259                    status_code,
260                    match status {
261                        ForwardStatus::Pending => "pending",
262                        ForwardStatus::Success => "success",
263                        ForwardStatus::Error => "error",
264                    },
265                    latency_ms,
266                    usage.as_ref().map(|u| u.total_tokens).unwrap_or(0)
267                );
268            }
269        }
270        Ok(())
271    }
272
273    /// Stop the worker gracefully
274    pub fn stop(&self) {
275        self.running.store(false, Ordering::SeqCst);
276    }
277}
278
279fn event_type_name(event: &MetricsEvent) -> String {
280    match event {
281        MetricsEvent::Chat(e) => match e {
282            ChatEvent::SessionStarted { .. } => "Chat::SessionStarted",
283            ChatEvent::SessionCompleted { .. } => "Chat::SessionCompleted",
284            ChatEvent::RoundStarted { .. } => "Chat::RoundStarted",
285            ChatEvent::RoundCompleted { .. } => "Chat::RoundCompleted",
286            ChatEvent::ToolCalled { .. } => "Chat::ToolCalled",
287            ChatEvent::MessageCountUpdated { .. } => "Chat::MessageCountUpdated",
288        }
289        .to_string(),
290        MetricsEvent::Forward(e) => match e {
291            ForwardEvent::RequestStarted { .. } => "Forward::RequestStarted",
292            ForwardEvent::RequestCompleted { .. } => "Forward::RequestCompleted",
293        }
294        .to_string(),
295        MetricsEvent::System(e) => match e {
296            SystemEvent::WorkerStarted => "System::WorkerStarted",
297            SystemEvent::WorkerStopped => "System::WorkerStopped",
298            SystemEvent::MetricsDropped { .. } => "System::MetricsDropped",
299            SystemEvent::StorageError { .. } => "System::StorageError",
300        }
301        .to_string(),
302    }
303}
304
305#[cfg(test)]
306mod tests {
307    use super::*;
308    use crate::metrics::events::EventMeta;
309    use crate::metrics::types::{RoundStatus, TokenUsage};
310    use std::path::PathBuf;
311    use tempfile::tempdir;
312
313    async fn create_test_storage() -> (Arc<dyn MetricsStorage>, PathBuf) {
314        let dir = tempdir().expect("temp dir");
315        let db_path = dir.path().join("metrics.db");
316        // Keep temp dir alive for the test
317        std::mem::forget(dir);
318        let storage = Arc::new(crate::metrics::storage::SqliteMetricsStorage::new(&db_path));
319        storage.init().await.expect("init storage");
320        (storage, db_path)
321    }
322
323    #[tokio::test]
324    async fn test_worker_handles_chat_events() {
325        let (storage, _db_path) = create_test_storage().await;
326        let worker = MetricsWorker::new(Arc::clone(&storage));
327        let (bus, rx) = MetricsBus::new(100);
328
329        let running = worker.spawn(rx, bus.clone());
330
331        // Emit events
332        bus.emit(MetricsEvent::Chat(ChatEvent::SessionStarted {
333            meta: EventMeta::new(),
334            session_id: "test-session".to_string(),
335            model: "gpt-4".to_string(),
336        }));
337
338        bus.emit(MetricsEvent::Chat(ChatEvent::RoundStarted {
339            meta: EventMeta::new(),
340            round_id: "test-round".to_string(),
341            session_id: "test-session".to_string(),
342            model: "gpt-4".to_string(),
343        }));
344
345        bus.emit(MetricsEvent::Chat(ChatEvent::RoundCompleted {
346            meta: EventMeta::new(),
347            round_id: "test-round".to_string(),
348            session_id: "test-session".to_string(),
349            status: RoundStatus::Success,
350            usage: TokenUsage {
351                prompt_tokens: 10,
352                completion_tokens: 20,
353                total_tokens: 30,
354            },
355            latency_ms: 1000,
356            error: None,
357        }));
358
359        // Wait for processing
360        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
361
362        // Verify data was written
363        let summary = storage
364            .summary(crate::metrics::types::MetricsDateFilter::default())
365            .await
366            .expect("get summary");
367        assert_eq!(summary.total_sessions, 1);
368
369        // Stop worker
370        running.store(false, Ordering::SeqCst);
371    }
372
373    #[tokio::test]
374    async fn test_worker_handles_forward_events() {
375        let (storage, _db_path) = create_test_storage().await;
376        let worker = MetricsWorker::new(Arc::clone(&storage));
377        let (bus, rx) = MetricsBus::new(100);
378
379        let running = worker.spawn(rx, bus.clone());
380
381        // Emit forward events
382        bus.emit(MetricsEvent::Forward(ForwardEvent::RequestStarted {
383            meta: EventMeta::new(),
384            request_id: "req-123".to_string(),
385            endpoint: "openai.chat_completions".to_string(),
386            model: "gpt-4".to_string(),
387            is_stream: true,
388        }));
389
390        bus.emit(MetricsEvent::Forward(ForwardEvent::RequestCompleted {
391            meta: EventMeta::new(),
392            request_id: "req-123".to_string(),
393            status_code: 200,
394            status: ForwardStatus::Success,
395            usage: Some(TokenUsage {
396                prompt_tokens: 50,
397                completion_tokens: 100,
398                total_tokens: 150,
399            }),
400            latency_ms: 500,
401            error: None,
402        }));
403
404        // Wait for processing
405        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
406
407        // Verify data was written
408        let summary = storage
409            .forward_summary(crate::metrics::types::ForwardMetricsFilter::default())
410            .await
411            .expect("get forward summary");
412        assert_eq!(summary.total_requests, 1);
413
414        // Stop worker
415        running.store(false, Ordering::SeqCst);
416    }
417}