Skip to main content

bamboo_engine/metrics/
storage.rs

1//! Metrics Storage System
2//!
3//! This module provides persistent storage for agent metrics using SQLite as the backend.
4//! It implements a comprehensive metrics collection and query system for monitoring
5//! agent performance, resource usage, and behavior patterns.
6//!
7//! # Architecture
8//!
9//! The storage system is built around the [`MetricsStorage`] trait, which defines
10//! the interface for storing and retrieving metrics data. The primary implementation
11//! is [`SqliteMetricsStorage`], which uses SQLite with WAL mode for reliable,
12//! concurrent access.
13//!
14//! # Data Model
15//!
16//! Metrics are organized into three main categories:
17//!
18//! ## Session Metrics
19//! Track complete conversation sessions from start to finish, including:
20//! - Total rounds and token usage
21//! - Tool call counts and breakdown
22//! - Session duration and status
23//!
24//! ## Round Metrics
25//! Track individual request-response cycles within sessions:
26//! - Per-round token consumption
27//! - Round status and errors
28//! - Associated tool calls
29//!
30//! ## Forward Metrics
31//! Track HTTP proxy operations to upstream APIs:
32//! - Request/response tracking
33//! - Endpoint-specific metrics
34//! - Token usage per provider
35//!
36//! # Storage Schema
37//!
38//! The SQLite database contains the following tables:
39//! - `session_metrics`: Aggregated session-level metrics
40//! - `round_metrics`: Individual round metrics linked to sessions
41//! - `tool_call_metrics`: Tool invocation details linked to rounds
42//! - `forward_request_metrics`: HTTP proxy request tracking
43//!
44//! # Usage
45//!
46//! ```rust,ignore
47//! use bamboo_agent::agent::metrics::storage::{SqliteMetricsStorage, MetricsStorage};
48//!
49//! #[tokio::main]
50//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
51//!     // Initialize storage
52//!     let storage = SqliteMetricsStorage::new("metrics.db");
53//!     storage.init().await?;
54//!
55//!     // Record session start
56//!     storage.upsert_session_start(
57//!         "session-123",
58//!         "gpt-4",
59//!         chrono::Utc::now()
60//!     ).await?;
61//!
62//!     // Query metrics
63//!     let summary = storage.summary(Default::default()).await?;
64//!     println!("Total sessions: {}", summary.total_sessions);
65//!
66//!     Ok(())
67//! }
68//! ```
69//!
70//! # Performance
71//!
72//! The storage system is optimized for:
73//! - **Concurrent writes**: Uses WAL mode and spawn_blocking for async compatibility
74//! - **Efficient queries**: Indexed by timestamps, models, and endpoints
75//! - **Aggregate caching**: Session metrics are pre-aggregated for fast queries
76//!
77//! # Thread Safety
78//!
79//! All storage operations are thread-safe and can be called from multiple
80//! async tasks concurrently. SQLite connections are opened per-operation
81//! to avoid blocking the async runtime.
82
83use std::collections::HashMap;
84use std::path::{Path, PathBuf};
85
86use async_trait::async_trait;
87use chrono::{DateTime, NaiveDate, Utc};
88use rusqlite::{params, params_from_iter, Connection, OptionalExtension};
89use thiserror::Error;
90
91use crate::metrics::types::{
92    DailyMetrics, ForwardEndpointMetrics, ForwardMetricsFilter, ForwardMetricsSummary,
93    ForwardRequestMetrics, ForwardStatus, MetricsDateFilter, MetricsSummary, ModelMetrics,
94    RoundMetrics, RoundStatus, SessionDetail, SessionMetrics, SessionMetricsFilter, SessionStatus,
95    TokenUsage, ToolCallMetrics,
96};
97
98/// Result type for metrics storage operations.
99///
100/// This is a specialized Result type that uses [`MetricsError`] as the error type,
101/// providing a consistent return type across all storage operations.
102pub type MetricsResult<T> = Result<T, MetricsError>;
103
104/// Errors that can occur during metrics storage operations.
105///
106/// This enum covers all the error cases that can arise when working with
107/// the metrics storage system, from database errors to data validation issues.
108#[derive(Debug, Error)]
109pub enum MetricsError {
110    /// SQLite database operation failed.
111    ///
112    /// This can occur due to SQL syntax errors, constraint violations,
113    /// database corruption, or connection issues.
114    #[error("sqlite error: {0}")]
115    Sqlite(#[from] rusqlite::Error),
116
117    /// Timestamp parsing failed.
118    ///
119    /// This occurs when reading timestamps from the database that don't
120    /// conform to the expected RFC3339 format.
121    #[error("time parse error: {0}")]
122    Chrono(#[from] chrono::ParseError),
123
124    /// I/O operation failed.
125    ///
126    /// This can occur when creating the database file, directory, or
127    /// during other file system operations.
128    #[error("io error: {0}")]
129    Io(#[from] std::io::Error),
130
131    /// Async task failed to complete.
132    ///
133    /// This occurs when a spawned blocking task panics or is cancelled,
134    /// typically indicating a serious system issue.
135    #[error("storage task join error: {0}")]
136    Task(String),
137
138    /// Data validation failed.
139    ///
140    /// This occurs when retrieved data doesn't match expected constraints,
141    /// such as invalid enum values or malformed data.
142    #[error("invalid metrics data: {0}")]
143    InvalidData(String),
144}
145
146/// Information about a completed tool call.
147///
148/// This structure contains the completion details for a tool invocation,
149/// including when it finished, whether it succeeded, and any error information.
150///
151/// # Fields
152///
153/// - `completed_at`: Timestamp when the tool finished execution
154/// - `success`: Whether the tool executed successfully
155/// - `error`: Error message if the tool failed, None on success
156///
157/// # Example
158///
159/// ```rust,ignore
160/// use bamboo_agent::agent::metrics::storage::ToolCallCompletion;
161/// use chrono::Utc;
162///
163/// let completion = ToolCallCompletion {
164///     completed_at: Utc::now(),
165///     success: true,
166///     error: None,
167/// };
168/// ```
169#[derive(Debug, Clone)]
170pub struct ToolCallCompletion {
171    /// Timestamp when the tool call completed
172    pub completed_at: DateTime<Utc>,
173    /// Whether the tool execution succeeded
174    pub success: bool,
175    /// Error message if execution failed, None on success
176    pub error: Option<String>,
177}
178
179/// Trait defining the interface for metrics storage backends.
180///
181/// This trait provides an abstract interface for storing and querying metrics data.
182/// The primary implementation is [`SqliteMetricsStorage`], but this trait allows
183/// for alternative backends (e.g., PostgreSQL, TimescaleDB) to be implemented.
184///
185/// # Async Operations
186///
187/// All methods are async to support non-blocking I/O operations. The SQLite
188/// implementation uses `spawn_blocking` to avoid blocking the async runtime
189/// with database operations.
190///
191/// # Thread Safety
192///
193/// Implementations must be `Send + Sync` to allow sharing across async tasks.
194///
195/// # Data Consistency
196///
197/// The storage system maintains referential integrity between:
198/// - Sessions → Rounds → Tool Calls
199/// - Sessions aggregate data from child entities
200/// - Deletions cascade appropriately
201///
202/// # Example
203///
204/// ```rust,ignore
205/// use bamboo_agent::agent::metrics::storage::{MetricsStorage, SqliteMetricsStorage};
206/// use bamboo_agent::agent::metrics::types::{SessionStatus, TokenUsage};
207/// use chrono::Utc;
208///
209/// async fn example(storage: &dyn MetricsStorage) -> Result<(), Box<dyn std::error::Error>> {
210///     // Start a session
211///     storage.upsert_session_start("s1", "gpt-4", Utc::now()).await?;
212///
213///     // Add a round
214///     storage.insert_round_start("r1", "s1", "gpt-4", Utc::now()).await?;
215///
216///     // Complete the round
217///     storage.complete_round(
218///         "r1",
219///         Utc::now(),
220///         bamboo::agent::metrics::types::RoundStatus::Success,
221///         TokenUsage { prompt_tokens: 10, completion_tokens: 20, total_tokens: 30 },
222///         None
223///     ).await?;
224///
225///     // Complete the session
226///     storage.complete_session("s1", SessionStatus::Completed, Utc::now()).await?;
227///
228///     Ok(())
229/// }
230/// ```
231#[async_trait]
232pub trait MetricsStorage: Send + Sync {
233    /// Initializes the storage backend.
234    ///
235    /// This must be called before any other storage operations.
236    /// For SQLite, this creates the database schema if it doesn't exist.
237    ///
238    /// # Errors
239    ///
240    /// Returns an error if the database cannot be created or initialized.
241    async fn init(&self) -> MetricsResult<()>;
242
243    /// Records the start of a new chat session.
244    ///
245    /// If a session with the same ID already exists, it will be reset to
246    /// running status (useful for session recovery scenarios).
247    ///
248    /// # Arguments
249    ///
250    /// * `session_id` - Unique identifier for the session
251    /// * `model` - AI model being used (e.g., "gpt-4", "claude-3")
252    /// * `started_at` - Timestamp when the session started
253    ///
254    /// # Example
255    ///
256    /// ```rust,ignore
257    /// storage.upsert_session_start("session-123", "gpt-4", Utc::now()).await?;
258    /// ```
259    async fn upsert_session_start(
260        &self,
261        session_id: &str,
262        model: &str,
263        started_at: DateTime<Utc>,
264    ) -> MetricsResult<()>;
265
266    /// Updates the message count for a session.
267    ///
268    /// This should be called whenever messages are added to the conversation.
269    ///
270    /// # Arguments
271    ///
272    /// * `session_id` - Session to update
273    /// * `message_count` - New total message count
274    /// * `updated_at` - Timestamp of the update
275    async fn update_session_message_count(
276        &self,
277        session_id: &str,
278        message_count: u32,
279        updated_at: DateTime<Utc>,
280    ) -> MetricsResult<()>;
281
282    /// Marks a session as completed with a final status.
283    ///
284    /// This triggers a final aggregation of all session metrics before closing.
285    ///
286    /// # Arguments
287    ///
288    /// * `session_id` - Session to complete
289    /// * `status` - Final session status (completed, failed, or cancelled)
290    /// * `completed_at` - Timestamp when the session ended
291    async fn complete_session(
292        &self,
293        session_id: &str,
294        status: SessionStatus,
295        completed_at: DateTime<Utc>,
296    ) -> MetricsResult<()>;
297
298    /// Records the start of a new round within a session.
299    ///
300    /// A round represents a single request-response cycle. This also
301    /// triggers an update to the parent session's aggregate counters.
302    ///
303    /// # Arguments
304    ///
305    /// * `round_id` - Unique identifier for this round
306    /// * `session_id` - Parent session this round belongs to
307    /// * `model` - AI model being used for this round
308    /// * `started_at` - Timestamp when the round started
309    async fn insert_round_start(
310        &self,
311        round_id: &str,
312        session_id: &str,
313        model: &str,
314        started_at: DateTime<Utc>,
315    ) -> MetricsResult<()>;
316
317    /// Completes a round with final metrics and status.
318    ///
319    /// This records the round's completion and triggers an update to
320    /// the parent session's aggregated metrics.
321    ///
322    /// # Arguments
323    ///
324    /// * `round_id` - Round to complete
325    /// * `completed_at` - Timestamp when the round finished
326    /// * `status` - Final round status (success or failed)
327    /// * `usage` - Token consumption during this round
328    /// * `error` - Error message if the round failed, None on success
329    async fn complete_round(
330        &self,
331        round_id: &str,
332        completed_at: DateTime<Utc>,
333        status: RoundStatus,
334        usage: TokenUsage,
335        prompt_cached_tool_outputs: u32,
336        error: Option<String>,
337    ) -> MetricsResult<()>;
338
339    /// Records a context-compression event against a round and refreshes the parent session aggregates.
340    async fn record_round_compression(
341        &self,
342        round_id: &str,
343        compressed_at: DateTime<Utc>,
344        tokens_saved: u32,
345    ) -> MetricsResult<()>;
346
347    /// Records the start of a tool invocation.
348    ///
349    /// Tools are called during rounds to perform specific actions
350    /// (e.g., reading files, executing commands).
351    ///
352    /// # Arguments
353    ///
354    /// * `tool_call_id` - Unique identifier for this tool call
355    /// * `round_id` - Round this tool call belongs to
356    /// * `session_id` - Session this tool call belongs to
357    /// * `tool_name` - Name of the tool being invoked
358    /// * `started_at` - Timestamp when the tool was invoked
359    async fn insert_tool_start(
360        &self,
361        tool_call_id: &str,
362        round_id: &str,
363        session_id: &str,
364        tool_name: &str,
365        started_at: DateTime<Utc>,
366    ) -> MetricsResult<()>;
367
368    /// Records the completion of a tool call.
369    ///
370    /// This updates the tool call record with completion details and
371    /// triggers an update to the parent session's tool call count.
372    ///
373    /// # Arguments
374    ///
375    /// * `tool_call_id` - Tool call to complete
376    /// * `completion` - Completion details including success status and timing
377    async fn complete_tool_call(
378        &self,
379        tool_call_id: &str,
380        completion: ToolCallCompletion,
381    ) -> MetricsResult<()>;
382
383    // Forward request metrics methods
384
385    /// Records the start of a forwarded HTTP request to an upstream API.
386    ///
387    /// This tracks requests proxied to external API providers like OpenAI or Anthropic.
388    ///
389    /// # Arguments
390    ///
391    /// * `forward_id` - Unique identifier for this forwarded request
392    /// * `endpoint` - API endpoint identifier (e.g., "openai.chat_completions")
393    /// * `model` - AI model being requested
394    /// * `is_stream` - Whether this is a streaming (SSE) request
395    /// * `started_at` - Timestamp when the request was initiated
396    async fn insert_forward_start(
397        &self,
398        forward_id: &str,
399        endpoint: &str,
400        model: &str,
401        is_stream: bool,
402        started_at: DateTime<Utc>,
403    ) -> MetricsResult<()>;
404
405    /// Completes a forwarded request with response details.
406    ///
407    /// This records the response from the upstream API, including status,
408    /// token usage, and any errors.
409    ///
410    /// # Arguments
411    ///
412    /// * `forward_id` - Forwarded request to complete
413    /// * `completed_at` - Timestamp when the response was received
414    /// * `status_code` - HTTP status code from the upstream API
415    /// * `status` - Classified status (success, error, or timeout)
416    /// * `usage` - Token usage if provided in the response
417    /// * `error` - Error message if the request failed
418    async fn complete_forward(
419        &self,
420        forward_id: &str,
421        completed_at: DateTime<Utc>,
422        status_code: Option<u16>,
423        status: ForwardStatus,
424        usage: Option<TokenUsage>,
425        error: Option<String>,
426    ) -> MetricsResult<()>;
427
428    /// Retrieves aggregated summary statistics for forwarded requests.
429    ///
430    /// Returns counts of total/successful/failed requests, token usage,
431    /// and average latency for requests matching the filter criteria.
432    ///
433    /// # Arguments
434    ///
435    /// * `filter` - Filter criteria for date range, endpoint, and model
436    ///
437    /// # Example
438    ///
439    /// ```rust,ignore
440    /// use bamboo_agent::agent::metrics::types::ForwardMetricsFilter;
441    ///
442    /// let filter = ForwardMetricsFilter {
443    ///     start_date: Some(NaiveDate::from_ymd_opt(2026, 2, 1).unwrap()),
444    ///     end_date: Some(NaiveDate::from_ymd_opt(2026, 2, 28).unwrap()),
445    ///     endpoint: Some("openai.chat_completions".to_string()),
446    ///     model: None,
447    ///     limit: None,
448    /// };
449    ///
450    /// let summary = storage.forward_summary(filter).await?;
451    /// println!("Total requests: {}", summary.total_requests);
452    /// println!("Success rate: {:.2}%",
453    ///     (summary.successful_requests as f64 / summary.total_requests as f64) * 100.0);
454    /// ```
455    async fn forward_summary(
456        &self,
457        filter: ForwardMetricsFilter,
458    ) -> MetricsResult<ForwardMetricsSummary>;
459
460    /// Retrieves metrics grouped by endpoint.
461    ///
462    /// Returns per-endpoint statistics including request counts,
463    /// success rates, token usage, and average latency.
464    ///
465    /// # Arguments
466    ///
467    /// * `filter` - Filter criteria (endpoint filter is ignored, grouped by all endpoints)
468    ///
469    /// # Example
470    ///
471    /// ```rust,ignore
472    /// let endpoints = storage.forward_by_endpoint(filter).await?;
473    /// for endpoint in endpoints {
474    ///     println!("{}: {} requests, {:.2}ms avg",
475    ///         endpoint.endpoint,
476    ///         endpoint.requests,
477    ///         endpoint.avg_duration_ms.unwrap_or(0) as f64
478    ///     );
479    /// }
480    /// ```
481    async fn forward_by_endpoint(
482        &self,
483        filter: ForwardMetricsFilter,
484    ) -> MetricsResult<Vec<ForwardEndpointMetrics>>;
485
486    /// Retrieves individual forward request records.
487    ///
488    /// Returns detailed information about each forwarded request,
489    /// including timing, status, and token usage.
490    ///
491    /// # Arguments
492    ///
493    /// * `filter` - Filter criteria including pagination via `limit`
494    ///
495    /// # Example
496    ///
497    /// ```rust,ignore
498    /// let filter = ForwardMetricsFilter {
499    ///     limit: Some(50),
500    ///     ..Default::default()
501    /// };
502    ///
503    /// let requests = storage.forward_requests(filter).await?;
504    /// for req in requests {
505    ///     println!("{}: {} - {:?}", req.forward_id, req.endpoint, req.status);
506    /// }
507    /// ```
508    async fn forward_requests(
509        &self,
510        filter: ForwardMetricsFilter,
511    ) -> MetricsResult<Vec<ForwardRequestMetrics>>;
512
513    /// Retrieves daily aggregated metrics for forwarded requests.
514    ///
515    /// Returns per-day statistics for the specified date range,
516    /// useful for trend analysis and reporting.
517    ///
518    /// # Arguments
519    ///
520    /// * `days` - Number of days to include
521    /// * `end_date` - End date for the range (defaults to today)
522    ///
523    /// # Example
524    ///
525    /// ```rust,ignore
526    /// let daily = storage.forward_daily_metrics(7, None).await?;
527    /// for day in daily {
528    ///     println!("{}: {} requests, {} tokens",
529    ///         day.date,
530    ///         day.total_sessions,
531    ///         day.total_token_usage.total_tokens
532    ///     );
533    /// }
534    /// ```
535    async fn forward_daily_metrics(
536        &self,
537        days: u32,
538        end_date: Option<NaiveDate>,
539    ) -> MetricsResult<Vec<DailyMetrics>>;
540
541    /// Retrieves aggregated summary statistics for chat sessions.
542    ///
543    /// Returns total sessions, token usage, tool call counts, and
544    /// active session count for sessions matching the filter.
545    ///
546    /// # Arguments
547    ///
548    /// * `filter` - Date range filter criteria
549    ///
550    /// # Example
551    ///
552    /// ```rust,ignore
553    /// use bamboo_agent::agent::metrics::types::MetricsDateFilter;
554    ///
555    /// let filter = MetricsDateFilter {
556    ///     start_date: Some(NaiveDate::from_ymd_opt(2026, 2, 1).unwrap()),
557    ///     end_date: Some(NaiveDate::from_ymd_opt(2026, 2, 28).unwrap()),
558    /// };
559    ///
560    /// let summary = storage.summary(filter).await?;
561    /// println!("Active sessions: {}", summary.active_sessions);
562    /// println!("Total tokens: {}", summary.total_tokens.total_tokens);
563    /// ```
564    async fn summary(&self, filter: MetricsDateFilter) -> MetricsResult<MetricsSummary>;
565
566    /// Retrieves metrics grouped by AI model.
567    ///
568    /// Returns per-model statistics including session counts,
569    /// rounds, token usage, and tool calls.
570    ///
571    /// # Arguments
572    ///
573    /// * `filter` - Date range filter criteria
574    ///
575    /// # Example
576    ///
577    /// ```rust,ignore
578    /// let models = storage.by_model(filter).await?;
579    /// for model in models {
580    ///     println!("{}: {} sessions, {} tokens",
581    ///         model.model,
582    ///         model.sessions,
583    ///         model.tokens.total_tokens
584    ///     );
585    /// }
586    /// ```
587    async fn by_model(&self, filter: MetricsDateFilter) -> MetricsResult<Vec<ModelMetrics>>;
588
589    /// Retrieves session metrics with filtering and pagination.
590    ///
591    /// Returns detailed information about sessions matching the filter criteria,
592    /// including token usage, tool breakdown, and status.
593    ///
594    /// # Arguments
595    ///
596    /// * `filter` - Filter criteria including date range, model, and pagination
597    ///
598    /// # Example
599    ///
600    /// ```rust,ignore
601    /// use bamboo_agent::agent::metrics::types::SessionMetricsFilter;
602    ///
603    /// let filter = SessionMetricsFilter {
604    ///     model: Some("gpt-4".to_string()),
605    ///     limit: Some(100),
606    ///     ..Default::default()
607    /// };
608    ///
609    /// let sessions = storage.sessions(filter).await?;
610    /// for session in sessions {
611    ///     println!("{}: {} rounds, {} tools",
612    ///         session.session_id,
613    ///         session.total_rounds,
614    ///         session.tool_call_count
615    ///     );
616    /// }
617    /// ```
618    async fn sessions(&self, filter: SessionMetricsFilter) -> MetricsResult<Vec<SessionMetrics>>;
619
620    /// Retrieves complete details for a specific session.
621    ///
622    /// Returns the session metrics along with all associated rounds
623    /// and their tool calls for detailed analysis.
624    ///
625    /// # Arguments
626    ///
627    /// * `session_id` - Session to retrieve
628    ///
629    /// # Returns
630    ///
631    /// Returns `Ok(None)` if the session doesn't exist.
632    ///
633    /// # Example
634    ///
635    /// ```rust,ignore
636    /// if let Some(detail) = storage.session_detail("session-123").await? {
637    ///     println!("Session: {}", detail.session.session_id);
638    ///     for round in detail.rounds {
639    ///         println!("  Round {}: {} tokens, {} tools",
640    ///             round.round_id,
641    ///             round.token_usage.total_tokens,
642    ///             round.tool_calls.len()
643    ///         );
644    ///     }
645    /// }
646    /// ```
647    async fn session_detail(&self, session_id: &str) -> MetricsResult<Option<SessionDetail>>;
648
649    /// Increments the execute sync mismatch counter for a specific stable reason label.
650    async fn increment_execute_sync_mismatch(
651        &self,
652        reason: &str,
653        occurred_at: DateTime<Utc>,
654    ) -> MetricsResult<()>;
655
656    /// Retrieves daily aggregated metrics for chat sessions.
657    ///
658    /// Returns per-day statistics including session counts, token usage,
659    /// model breakdown, and tool breakdown for trend analysis.
660    ///
661    /// # Arguments
662    ///
663    /// * `days` - Number of days to include
664    /// * `end_date` - End date for the range (defaults to today)
665    ///
666    /// # Example
667    ///
668    /// ```rust,ignore
669    /// let daily = storage.daily_metrics(30, None).await?;
670    /// for day in daily {
671    ///     println!("{}: {} sessions, {} tokens",
672    ///         day.date,
673    ///         day.total_sessions,
674    ///         day.total_token_usage.total_tokens
675    ///     );
676    ///
677    ///     // Model breakdown
678    ///     for (model, usage) in day.model_breakdown {
679    ///         println!("  {}: {} tokens", model, usage.total_tokens);
680    ///     }
681    /// }
682    /// ```
683    async fn daily_metrics(
684        &self,
685        days: u32,
686        end_date: Option<NaiveDate>,
687    ) -> MetricsResult<Vec<DailyMetrics>>;
688
689    /// Deletes old round records before a cutoff date.
690    ///
691    /// This is used for data retention and cleanup. After deleting rounds,
692    /// it triggers a refresh of affected session aggregates.
693    ///
694    /// # Arguments
695    ///
696    /// * `cutoff` - Delete rounds started before this timestamp
697    ///
698    /// # Returns
699    ///
700    /// Returns the number of rounds deleted.
701    ///
702    /// # Warning
703    ///
704    /// This operation is irreversible. Ensure you have backups if needed.
705    ///
706    /// # Example
707    ///
708    /// ```rust,ignore
709    /// use chrono::{Duration, Utc};
710    ///
711    /// // Delete rounds older than 90 days
712    /// let cutoff = Utc::now() - Duration::days(90);
713    /// let deleted = storage.prune_rounds_before(cutoff).await?;
714    /// println!("Deleted {} old rounds", deleted);
715    /// ```
716    async fn prune_rounds_before(&self, cutoff: DateTime<Utc>) -> MetricsResult<u64>;
717
718    /// Reconciles stale session / round / forward rows using durable runtime hints.
719    async fn reconcile_stale_executions(
720        &self,
721        active_session_ids: &[String],
722        awaiting_response_session_ids: &[String],
723    ) -> MetricsResult<()>;
724}
725
726/// SQLite-based implementation of the MetricsStorage trait.
727///
728/// This is the primary storage backend for the metrics system, using SQLite
729/// with WAL (Write-Ahead Logging) mode for reliable concurrent access.
730///
731/// # Features
732///
733/// - **WAL Mode**: Enables concurrent readers with writers
734/// - **Foreign Keys**: Enforces referential integrity
735/// - **Async Compatible**: Uses `spawn_blocking` to avoid blocking the async runtime
736/// - **Automatic Schema Migration**: Creates tables on initialization
737///
738/// # Database Schema
739///
740/// The database contains four main tables:
741///
742/// ## session_metrics
743/// Stores aggregated session-level metrics with columns for:
744/// - Session identification (session_id, model)
745/// - Timing (started_at, completed_at, updated_at)
746/// - Aggregates (total_rounds, prompt_tokens, completion_tokens, total_tokens, tool_call_count)
747/// - Status and message count
748///
749/// ## round_metrics
750/// Stores individual round metrics with foreign keys to sessions:
751/// - Round identification (round_id, session_id, model)
752/// - Timing and status
753/// - Token usage per round
754/// - Error information
755///
756/// ## tool_call_metrics
757/// Stores tool invocation details with foreign keys to rounds and sessions:
758/// - Tool identification (tool_call_id, round_id, session_id, tool_name)
759/// - Execution timing and success status
760/// - Error details
761///
762/// ## forward_request_metrics
763/// Stores HTTP proxy request tracking:
764/// - Request identification (forward_id, endpoint, model)
765/// - Request type (is_stream)
766/// - Response details (status_code, status, token usage)
767/// - Error information
768///
769/// # Example
770///
771/// ```rust,ignore
772/// use bamboo_agent::agent::metrics::storage::SqliteMetricsStorage;
773/// use bamboo_agent::agent::metrics::storage::MetricsStorage;
774///
775/// #[tokio::main]
776/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
777///     // Create storage instance
778///     let storage = SqliteMetricsStorage::new("path/to/metrics.db");
779///
780///     // Initialize database schema
781///     storage.init().await?;
782///
783///     // Now ready to use
784///     storage.upsert_session_start("s1", "gpt-4", chrono::Utc::now()).await?;
785///
786///     Ok(())
787/// }
788/// ```
789///
790/// # Thread Safety
791///
792/// The storage can be safely cloned and shared across threads. Each operation
793/// opens its own database connection to avoid blocking and ensure thread safety.
794#[derive(Debug, Clone)]
795pub struct SqliteMetricsStorage {
796    /// Path to the SQLite database file
797    db_path: PathBuf,
798}
799
800impl SqliteMetricsStorage {
801    /// Creates a new SQLite storage instance.
802    ///
803    /// The database file will be created when [`init`](MetricsStorage::init) is called.
804    /// If the file already exists, it will be used as-is.
805    ///
806    /// # Arguments
807    ///
808    /// * `db_path` - Path to the SQLite database file (will create parent directories if needed)
809    ///
810    /// # Example
811    ///
812    /// ```rust,ignore
813    /// use bamboo_agent::agent::metrics::storage::SqliteMetricsStorage;
814    ///
815    /// let storage = SqliteMetricsStorage::new("metrics.db");
816    /// let storage = SqliteMetricsStorage::new("/var/data/bamboo/metrics.db");
817    /// ```
818    pub fn new(db_path: impl AsRef<Path>) -> Self {
819        Self {
820            db_path: db_path.as_ref().to_path_buf(),
821        }
822    }
823
824    /// Executes a function with a database connection in a blocking context.
825    ///
826    /// This helper method handles:
827    /// 1. Opening a connection to the database
828    /// 2. Running the provided function in `spawn_blocking` to avoid blocking async runtime
829    /// 3. Proper error handling and task joining
830    ///
831    /// # Type Parameters
832    ///
833    /// * `T` - Return type of the function (must be Send + 'static)
834    /// * `F` - Function type (must be Send + 'static)
835    ///
836    /// # Arguments
837    ///
838    /// * `func` - Function to execute with the database connection
839    ///
840    /// # Errors
841    ///
842    /// Returns an error if:
843    /// - The database connection fails to open
844    /// - The function returns an error
845    /// - The blocking task fails to complete
846    async fn with_connection<T, F>(&self, func: F) -> MetricsResult<T>
847    where
848        T: Send + 'static,
849        F: FnOnce(&Connection) -> MetricsResult<T> + Send + 'static,
850    {
851        let db_path = self.db_path.clone();
852        tokio::task::spawn_blocking(move || {
853            let connection = open_connection(&db_path)?;
854            func(&connection)
855        })
856        .await
857        .map_err(|error| MetricsError::Task(error.to_string()))?
858    }
859}
860
861#[async_trait]
862impl MetricsStorage for SqliteMetricsStorage {
863    async fn init(&self) -> MetricsResult<()> {
864        self.with_connection(|connection| {
865            connection.execute_batch(
866                r#"
867                CREATE TABLE IF NOT EXISTS session_metrics (
868                    session_id TEXT PRIMARY KEY,
869                    model TEXT NOT NULL,
870                    started_at TEXT NOT NULL,
871                    completed_at TEXT,
872                    status TEXT NOT NULL DEFAULT 'running',
873                    total_rounds INTEGER NOT NULL DEFAULT 0,
874                    prompt_tokens INTEGER NOT NULL DEFAULT 0,
875                    completion_tokens INTEGER NOT NULL DEFAULT 0,
876                    total_tokens INTEGER NOT NULL DEFAULT 0,
877                    prompt_cached_tool_outputs INTEGER NOT NULL DEFAULT 0,
878                    total_compression_events INTEGER NOT NULL DEFAULT 0,
879                    total_tokens_saved INTEGER NOT NULL DEFAULT 0,
880                    tool_call_count INTEGER NOT NULL DEFAULT 0,
881                    message_count INTEGER NOT NULL DEFAULT 0,
882                    updated_at TEXT NOT NULL
883                );
884
885                CREATE TABLE IF NOT EXISTS round_metrics (
886                    round_id TEXT PRIMARY KEY,
887                    session_id TEXT NOT NULL,
888                    model TEXT NOT NULL,
889                    started_at TEXT NOT NULL,
890                    completed_at TEXT,
891                    status TEXT NOT NULL DEFAULT 'running',
892                    prompt_tokens INTEGER NOT NULL DEFAULT 0,
893                    completion_tokens INTEGER NOT NULL DEFAULT 0,
894                    total_tokens INTEGER NOT NULL DEFAULT 0,
895                    prompt_cached_tool_outputs INTEGER NOT NULL DEFAULT 0,
896                    compression_count INTEGER NOT NULL DEFAULT 0,
897                    tokens_saved INTEGER NOT NULL DEFAULT 0,
898                    error TEXT,
899                    FOREIGN KEY(session_id) REFERENCES session_metrics(session_id) ON DELETE CASCADE
900                );
901
902                CREATE TABLE IF NOT EXISTS tool_call_metrics (
903                    tool_call_id TEXT PRIMARY KEY,
904                    round_id TEXT NOT NULL,
905                    session_id TEXT NOT NULL,
906                    tool_name TEXT NOT NULL,
907                    started_at TEXT NOT NULL,
908                    completed_at TEXT,
909                    success INTEGER,
910                    error TEXT,
911                    FOREIGN KEY(round_id) REFERENCES round_metrics(round_id) ON DELETE CASCADE,
912                    FOREIGN KEY(session_id) REFERENCES session_metrics(session_id) ON DELETE CASCADE
913                );
914
915                CREATE INDEX IF NOT EXISTS idx_session_started_at ON session_metrics(started_at);
916                CREATE INDEX IF NOT EXISTS idx_session_model ON session_metrics(model);
917                CREATE INDEX IF NOT EXISTS idx_round_session ON round_metrics(session_id);
918                CREATE INDEX IF NOT EXISTS idx_tool_session ON tool_call_metrics(session_id);
919                CREATE INDEX IF NOT EXISTS idx_tool_started_at ON tool_call_metrics(started_at);
920                CREATE INDEX IF NOT EXISTS idx_tool_name ON tool_call_metrics(tool_name);
921
922                CREATE TABLE IF NOT EXISTS forward_request_metrics (
923                    forward_id TEXT PRIMARY KEY,
924                    endpoint TEXT NOT NULL,
925                    model TEXT NOT NULL,
926                    is_stream INTEGER NOT NULL,
927                    started_at TEXT NOT NULL,
928                    completed_at TEXT,
929                    status_code INTEGER,
930                    status TEXT NOT NULL DEFAULT 'pending',
931                    prompt_tokens INTEGER,
932                    completion_tokens INTEGER,
933                    total_tokens INTEGER,
934                    error TEXT,
935                    updated_at TEXT NOT NULL
936                );
937
938                CREATE TABLE IF NOT EXISTS execute_sync_mismatch_metrics (
939                    reason TEXT NOT NULL,
940                    mismatch_date TEXT NOT NULL,
941                    count INTEGER NOT NULL DEFAULT 0,
942                    updated_at TEXT NOT NULL,
943                    PRIMARY KEY (reason, mismatch_date)
944                );
945
946                CREATE INDEX IF NOT EXISTS idx_forward_started_at ON forward_request_metrics(started_at);
947                CREATE INDEX IF NOT EXISTS idx_forward_endpoint ON forward_request_metrics(endpoint);
948                CREATE INDEX IF NOT EXISTS idx_forward_model ON forward_request_metrics(model);
949                CREATE INDEX IF NOT EXISTS idx_execute_sync_mismatch_date ON execute_sync_mismatch_metrics(mismatch_date);
950                CREATE INDEX IF NOT EXISTS idx_execute_sync_mismatch_reason ON execute_sync_mismatch_metrics(reason);
951                "#,
952            )?;
953            ensure_integer_column(
954                connection,
955                "session_metrics",
956                "prompt_cached_tool_outputs",
957                0,
958            )?;
959            ensure_integer_column(connection, "session_metrics", "total_compression_events", 0)?;
960            ensure_integer_column(connection, "session_metrics", "total_tokens_saved", 0)?;
961            ensure_integer_column(connection, "round_metrics", "prompt_cached_tool_outputs", 0)?;
962            ensure_integer_column(connection, "round_metrics", "compression_count", 0)?;
963            ensure_integer_column(connection, "round_metrics", "tokens_saved", 0)?;
964            connection.execute(
965                "UPDATE forward_request_metrics SET status = 'pending' WHERE status IS NULL OR trim(status) = ''",
966                [],
967            )?;
968            Ok(())
969        })
970        .await
971    }
972
973    async fn upsert_session_start(
974        &self,
975        session_id: &str,
976        model: &str,
977        started_at: DateTime<Utc>,
978    ) -> MetricsResult<()> {
979        let session_id = session_id.to_string();
980        let model = model.to_string();
981        let started_at = format_timestamp(started_at);
982
983        self.with_connection(move |connection| {
984            connection.execute(
985                r#"
986                INSERT INTO session_metrics (
987                    session_id, model, started_at, status, updated_at
988                ) VALUES (?1, ?2, ?3, 'running', ?3)
989                ON CONFLICT(session_id) DO UPDATE SET
990                    model = excluded.model,
991                    started_at = CASE
992                        WHEN session_metrics.started_at <= excluded.started_at THEN session_metrics.started_at
993                        ELSE excluded.started_at
994                    END,
995                    completed_at = NULL,
996                    status = 'running',
997                    updated_at = excluded.updated_at
998                "#,
999                params![session_id, model, started_at],
1000            )?;
1001            Ok(())
1002        })
1003        .await
1004    }
1005
1006    async fn update_session_message_count(
1007        &self,
1008        session_id: &str,
1009        message_count: u32,
1010        updated_at: DateTime<Utc>,
1011    ) -> MetricsResult<()> {
1012        let session_id = session_id.to_string();
1013        let updated_at = format_timestamp(updated_at);
1014
1015        self.with_connection(move |connection| {
1016            connection.execute(
1017                "UPDATE session_metrics SET message_count = ?1, updated_at = ?2 WHERE session_id = ?3",
1018                params![i64::from(message_count), updated_at, session_id],
1019            )?;
1020            Ok(())
1021        })
1022        .await
1023    }
1024
1025    async fn complete_session(
1026        &self,
1027        session_id: &str,
1028        status: SessionStatus,
1029        completed_at: DateTime<Utc>,
1030    ) -> MetricsResult<()> {
1031        let session_id = session_id.to_string();
1032        let completed_at_str = format_timestamp(completed_at);
1033
1034        self.with_connection(move |connection| {
1035            refresh_session_aggregates(connection, &session_id, completed_at)?;
1036            connection.execute(
1037                "UPDATE session_metrics SET status = ?1, completed_at = ?2, updated_at = ?2 WHERE session_id = ?3",
1038                params![status.as_str(), completed_at_str, session_id],
1039            )?;
1040            Ok(())
1041        })
1042        .await
1043    }
1044
1045    async fn insert_round_start(
1046        &self,
1047        round_id: &str,
1048        session_id: &str,
1049        model: &str,
1050        started_at: DateTime<Utc>,
1051    ) -> MetricsResult<()> {
1052        let round_id = round_id.to_string();
1053        let session_id = session_id.to_string();
1054        let model = model.to_string();
1055        let started_at_str = format_timestamp(started_at);
1056
1057        self.with_connection(move |connection| {
1058            connection.execute(
1059                r#"
1060                INSERT INTO round_metrics (
1061                    round_id, session_id, model, started_at, status
1062                ) VALUES (?1, ?2, ?3, ?4, 'running')
1063                ON CONFLICT(round_id) DO NOTHING
1064                "#,
1065                params![round_id, session_id, model, started_at_str],
1066            )?;
1067            refresh_session_aggregates(connection, &session_id, started_at)?;
1068            Ok(())
1069        })
1070        .await
1071    }
1072
1073    async fn complete_round(
1074        &self,
1075        round_id: &str,
1076        completed_at: DateTime<Utc>,
1077        status: RoundStatus,
1078        usage: TokenUsage,
1079        prompt_cached_tool_outputs: u32,
1080        error: Option<String>,
1081    ) -> MetricsResult<()> {
1082        let round_id = round_id.to_string();
1083        let completed_at_str = format_timestamp(completed_at);
1084
1085        self.with_connection(move |connection| {
1086            let session_id: String = connection.query_row(
1087                "SELECT session_id FROM round_metrics WHERE round_id = ?1",
1088                params![round_id],
1089                |row| row.get(0),
1090            )?;
1091
1092            connection.execute(
1093                r#"
1094                UPDATE round_metrics
1095                SET completed_at = ?1,
1096                    status = ?2,
1097                    prompt_tokens = ?3,
1098                    completion_tokens = ?4,
1099                    total_tokens = ?5,
1100                    prompt_cached_tool_outputs = ?6,
1101                    error = ?7
1102                WHERE round_id = ?8
1103                "#,
1104                params![
1105                    completed_at_str,
1106                    status.as_str(),
1107                    usage.prompt_tokens as i64,
1108                    usage.completion_tokens as i64,
1109                    usage.total_tokens as i64,
1110                    i64::from(prompt_cached_tool_outputs),
1111                    error,
1112                    round_id,
1113                ],
1114            )?;
1115
1116            refresh_session_aggregates(connection, &session_id, completed_at)?;
1117            Ok(())
1118        })
1119        .await
1120    }
1121
1122    async fn record_round_compression(
1123        &self,
1124        round_id: &str,
1125        compressed_at: DateTime<Utc>,
1126        tokens_saved: u32,
1127    ) -> MetricsResult<()> {
1128        let round_id = round_id.to_string();
1129
1130        self.with_connection(move |connection| {
1131            let session_id: String = connection.query_row(
1132                "SELECT session_id FROM round_metrics WHERE round_id = ?1",
1133                params![round_id],
1134                |row| row.get(0),
1135            )?;
1136
1137            connection.execute(
1138                r#"
1139                UPDATE round_metrics
1140                SET compression_count = COALESCE(compression_count, 0) + 1,
1141                    tokens_saved = COALESCE(tokens_saved, 0) + ?1
1142                WHERE round_id = ?2
1143                "#,
1144                params![i64::from(tokens_saved), round_id],
1145            )?;
1146
1147            refresh_session_aggregates(connection, &session_id, compressed_at)?;
1148            Ok(())
1149        })
1150        .await
1151    }
1152
1153    async fn insert_tool_start(
1154        &self,
1155        tool_call_id: &str,
1156        round_id: &str,
1157        session_id: &str,
1158        tool_name: &str,
1159        started_at: DateTime<Utc>,
1160    ) -> MetricsResult<()> {
1161        let tool_call_id = tool_call_id.to_string();
1162        let round_id = round_id.to_string();
1163        let session_id = session_id.to_string();
1164        let tool_name = tool_name.to_string();
1165        let started_at_str = format_timestamp(started_at);
1166
1167        self.with_connection(move |connection| {
1168            connection.execute(
1169                r#"
1170                INSERT INTO tool_call_metrics (
1171                    tool_call_id, round_id, session_id, tool_name, started_at
1172                ) VALUES (?1, ?2, ?3, ?4, ?5)
1173                ON CONFLICT(tool_call_id) DO UPDATE SET
1174                    round_id = excluded.round_id,
1175                    session_id = excluded.session_id,
1176                    tool_name = excluded.tool_name,
1177                    started_at = excluded.started_at
1178                "#,
1179                params![
1180                    tool_call_id,
1181                    round_id,
1182                    session_id,
1183                    tool_name,
1184                    started_at_str
1185                ],
1186            )?;
1187            Ok(())
1188        })
1189        .await
1190    }
1191
1192    async fn complete_tool_call(
1193        &self,
1194        tool_call_id: &str,
1195        completion: ToolCallCompletion,
1196    ) -> MetricsResult<()> {
1197        let tool_call_id = tool_call_id.to_string();
1198        let completed_at = format_timestamp(completion.completed_at);
1199        let success = if completion.success { 1_i64 } else { 0_i64 };
1200        let error = completion.error;
1201
1202        self.with_connection(move |connection| {
1203            let session_id: String = connection.query_row(
1204                "SELECT session_id FROM tool_call_metrics WHERE tool_call_id = ?1",
1205                params![tool_call_id],
1206                |row| row.get(0),
1207            )?;
1208
1209            connection.execute(
1210                "UPDATE tool_call_metrics SET completed_at = ?1, success = ?2, error = ?3 WHERE tool_call_id = ?4",
1211                params![completed_at, success, error, tool_call_id],
1212            )?;
1213
1214            refresh_session_aggregates(connection, &session_id, completion.completed_at)?;
1215            Ok(())
1216        })
1217        .await
1218    }
1219
1220    async fn insert_forward_start(
1221        &self,
1222        forward_id: &str,
1223        endpoint: &str,
1224        model: &str,
1225        is_stream: bool,
1226        started_at: DateTime<Utc>,
1227    ) -> MetricsResult<()> {
1228        let forward_id = forward_id.to_string();
1229        let endpoint = endpoint.to_string();
1230        let model = model.to_string();
1231        let is_stream_int = if is_stream { 1_i64 } else { 0_i64 };
1232        let started_at_str = format_timestamp(started_at);
1233
1234        self.with_connection(move |connection| {
1235            connection.execute(
1236                r#"
1237                INSERT INTO forward_request_metrics (
1238                    forward_id, endpoint, model, is_stream, started_at, status, updated_at
1239                ) VALUES (?1, ?2, ?3, ?4, ?5, 'pending', ?5)
1240                ON CONFLICT(forward_id) DO UPDATE SET
1241                    endpoint = excluded.endpoint,
1242                    model = excluded.model,
1243                    is_stream = excluded.is_stream,
1244                    started_at = excluded.started_at,
1245                    completed_at = NULL,
1246                    status_code = NULL,
1247                    status = 'pending',
1248                    prompt_tokens = NULL,
1249                    completion_tokens = NULL,
1250                    total_tokens = NULL,
1251                    error = NULL,
1252                    updated_at = excluded.updated_at
1253                "#,
1254                params![forward_id, endpoint, model, is_stream_int, started_at_str],
1255            )?;
1256            Ok(())
1257        })
1258        .await
1259    }
1260
1261    async fn complete_forward(
1262        &self,
1263        forward_id: &str,
1264        completed_at: DateTime<Utc>,
1265        status_code: Option<u16>,
1266        status: ForwardStatus,
1267        usage: Option<TokenUsage>,
1268        error: Option<String>,
1269    ) -> MetricsResult<()> {
1270        let forward_id = forward_id.to_string();
1271        let completed_at_str = format_timestamp(completed_at);
1272        let status_code_int = status_code.map(|s| s as i64);
1273        let (prompt, completion, total) = match usage {
1274            Some(u) => (
1275                Some(u.prompt_tokens as i64),
1276                Some(u.completion_tokens as i64),
1277                Some(u.total_tokens as i64),
1278            ),
1279            None => (None, None, None),
1280        };
1281
1282        self.with_connection(move |connection| {
1283            connection.execute(
1284                r#"
1285                UPDATE forward_request_metrics
1286                SET completed_at = ?1,
1287                    status_code = ?2,
1288                    status = ?3,
1289                    prompt_tokens = ?4,
1290                    completion_tokens = ?5,
1291                    total_tokens = ?6,
1292                    error = ?7,
1293                    updated_at = ?1
1294                WHERE forward_id = ?8
1295                "#,
1296                params![
1297                    completed_at_str,
1298                    status_code_int,
1299                    status.as_str(),
1300                    prompt,
1301                    completion,
1302                    total,
1303                    error,
1304                    forward_id,
1305                ],
1306            )?;
1307            Ok(())
1308        })
1309        .await
1310    }
1311
1312    async fn forward_summary(
1313        &self,
1314        filter: ForwardMetricsFilter,
1315    ) -> MetricsResult<ForwardMetricsSummary> {
1316        self.with_connection(move |connection| {
1317            let mut params_vec = Vec::new();
1318            let where_clause = build_forward_where_clause(
1319                filter.start_date,
1320                filter.end_date,
1321                filter.endpoint.as_deref(),
1322                filter.model.as_deref(),
1323                &mut params_vec,
1324            );
1325
1326            let sql = format!(
1327                "SELECT COUNT(*), \
1328                 COALESCE(SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END), 0), \
1329                 COALESCE(SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END), 0), \
1330                 COALESCE(SUM(prompt_tokens), 0), \
1331                 COALESCE(SUM(completion_tokens), 0), \
1332                 COALESCE(SUM(total_tokens), 0), \
1333                 AVG(CASE WHEN completed_at IS NOT NULL THEN \
1334                     (julianday(completed_at) - julianday(started_at)) * 86400000 END) \
1335                 FROM forward_request_metrics {}",
1336                where_clause
1337            );
1338
1339            let mut stmt = connection.prepare(&sql)?;
1340            let summary = stmt.query_row(params_from_iter(params_vec.iter()), |row| {
1341                let avg_duration: Option<f64> = row.get(6)?;
1342                Ok(ForwardMetricsSummary {
1343                    total_requests: row.get::<_, i64>(0)? as u64,
1344                    successful_requests: row.get::<_, i64>(1)? as u64,
1345                    failed_requests: row.get::<_, i64>(2)? as u64,
1346                    total_tokens: TokenUsage {
1347                        prompt_tokens: row.get::<_, i64>(3)? as u64,
1348                        completion_tokens: row.get::<_, i64>(4)? as u64,
1349                        total_tokens: row.get::<_, i64>(5)? as u64,
1350                    },
1351                    avg_duration_ms: avg_duration.map(|d| d as u64),
1352                })
1353            })?;
1354
1355            Ok(summary)
1356        })
1357        .await
1358    }
1359
1360    async fn forward_by_endpoint(
1361        &self,
1362        filter: ForwardMetricsFilter,
1363    ) -> MetricsResult<Vec<ForwardEndpointMetrics>> {
1364        self.with_connection(move |connection| {
1365            let mut params_vec = Vec::new();
1366            let where_clause = build_forward_where_clause(
1367                filter.start_date,
1368                filter.end_date,
1369                None,
1370                filter.model.as_deref(),
1371                &mut params_vec,
1372            );
1373
1374            let sql = format!(
1375                "SELECT endpoint, COUNT(*), \
1376                 COALESCE(SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END), 0), \
1377                 COALESCE(SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END), 0), \
1378                 COALESCE(SUM(prompt_tokens), 0), \
1379                 COALESCE(SUM(completion_tokens), 0), \
1380                 COALESCE(SUM(total_tokens), 0), \
1381                 AVG(CASE WHEN completed_at IS NOT NULL THEN \
1382                     (julianday(completed_at) - julianday(started_at)) * 86400000 END) \
1383                 FROM forward_request_metrics {} \
1384                 GROUP BY endpoint ORDER BY COUNT(*) DESC",
1385                where_clause
1386            );
1387
1388            let mut stmt = connection.prepare(&sql)?;
1389            let mut rows = stmt.query(params_from_iter(params_vec.iter()))?;
1390            let mut endpoints = Vec::new();
1391
1392            while let Some(row) = rows.next()? {
1393                let avg_duration: Option<f64> = row.get(7)?;
1394                endpoints.push(ForwardEndpointMetrics {
1395                    endpoint: row.get(0)?,
1396                    requests: row.get::<_, i64>(1)? as u64,
1397                    successful: row.get::<_, i64>(2)? as u64,
1398                    failed: row.get::<_, i64>(3)? as u64,
1399                    tokens: TokenUsage {
1400                        prompt_tokens: row.get::<_, i64>(4)? as u64,
1401                        completion_tokens: row.get::<_, i64>(5)? as u64,
1402                        total_tokens: row.get::<_, i64>(6)? as u64,
1403                    },
1404                    avg_duration_ms: avg_duration.map(|d| d as u64),
1405                });
1406            }
1407
1408            Ok(endpoints)
1409        })
1410        .await
1411    }
1412
1413    async fn forward_requests(
1414        &self,
1415        filter: ForwardMetricsFilter,
1416    ) -> MetricsResult<Vec<ForwardRequestMetrics>> {
1417        self.with_connection(move |connection| {
1418            let mut params_vec = Vec::new();
1419            let where_clause = build_forward_where_clause(
1420                filter.start_date,
1421                filter.end_date,
1422                filter.endpoint.as_deref(),
1423                filter.model.as_deref(),
1424                &mut params_vec,
1425            );
1426
1427            let limit = i64::from(filter.limit.unwrap_or(100).min(1_000));
1428            let sql = format!(
1429                "SELECT forward_id, endpoint, model, is_stream, started_at, completed_at, \
1430                 status_code, status, prompt_tokens, completion_tokens, total_tokens, error \
1431                 FROM forward_request_metrics {} \
1432                 ORDER BY started_at DESC LIMIT {}",
1433                where_clause, limit
1434            );
1435
1436            let mut stmt = connection.prepare(&sql)?;
1437            let mut rows = stmt.query(params_from_iter(params_vec.iter()))?;
1438            let mut requests = Vec::new();
1439
1440            while let Some(row) = rows.next()? {
1441                let started_at = parse_timestamp(row.get::<_, String>(4)?)?;
1442                let completed_at = parse_optional_timestamp(row.get::<_, Option<String>>(5)?)?;
1443                let status_raw: Option<String> = row.get(7)?;
1444                let status = status_raw.and_then(|s| ForwardStatus::from_db(&s));
1445
1446                let prompt: Option<i64> = row.get(8)?;
1447                let completion: Option<i64> = row.get(9)?;
1448                let total: Option<i64> = row.get(10)?;
1449                let token_usage = match (prompt, completion, total) {
1450                    (Some(p), Some(c), Some(t)) => Some(TokenUsage {
1451                        prompt_tokens: p as u64,
1452                        completion_tokens: c as u64,
1453                        total_tokens: t as u64,
1454                    }),
1455                    _ => None,
1456                };
1457
1458                requests.push(ForwardRequestMetrics {
1459                    forward_id: row.get(0)?,
1460                    endpoint: row.get(1)?,
1461                    model: row.get(2)?,
1462                    is_stream: row.get::<_, i64>(3)? > 0,
1463                    started_at,
1464                    completed_at,
1465                    status_code: row.get::<_, Option<i64>>(6)?.map(|s| s as u16),
1466                    status,
1467                    token_usage,
1468                    error: row.get(11)?,
1469                    duration_ms: compute_duration_ms(started_at, completed_at),
1470                });
1471            }
1472
1473            Ok(requests)
1474        })
1475        .await
1476    }
1477
1478    async fn forward_daily_metrics(
1479        &self,
1480        days: u32,
1481        end_date: Option<NaiveDate>,
1482    ) -> MetricsResult<Vec<DailyMetrics>> {
1483        let end_date = end_date.unwrap_or_else(|| Utc::now().date_naive());
1484        let span = days.max(1) - 1;
1485        let start_date = end_date - chrono::Duration::days(i64::from(span));
1486
1487        self.with_connection(move |connection| {
1488            let mut stmt = connection.prepare(
1489                r#"
1490                SELECT
1491                    date(started_at) AS date_key,
1492                    COUNT(*) AS total_sessions,
1493                    COALESCE(SUM(prompt_tokens), 0) AS prompt_tokens,
1494                    COALESCE(SUM(completion_tokens), 0) AS completion_tokens,
1495                    COALESCE(SUM(total_tokens), 0) AS total_tokens
1496                FROM forward_request_metrics
1497                WHERE date(started_at) BETWEEN date(?1) AND date(?2)
1498                GROUP BY date_key
1499                ORDER BY date_key ASC
1500                "#,
1501            )?;
1502
1503            let mut rows = stmt.query(params![start_date.to_string(), end_date.to_string()])?;
1504            let mut result = Vec::new();
1505
1506            while let Some(row) = rows.next()? {
1507                let date = NaiveDate::parse_from_str(&row.get::<_, String>(0)?, "%Y-%m-%d")?;
1508
1509                result.push(DailyMetrics {
1510                    date,
1511                    total_sessions: row.get::<_, i64>(1)? as u32,
1512                    total_rounds: 0,
1513                    total_token_usage: TokenUsage {
1514                        prompt_tokens: row.get::<_, i64>(2)? as u64,
1515                        completion_tokens: row.get::<_, i64>(3)? as u64,
1516                        total_tokens: row.get::<_, i64>(4)? as u64,
1517                    },
1518                    total_tool_calls: 0,
1519                    prompt_cached_tool_outputs: 0,
1520                    model_breakdown: HashMap::new(),
1521                    tool_breakdown: HashMap::new(),
1522                });
1523            }
1524
1525            Ok(result)
1526        })
1527        .await
1528    }
1529
1530    async fn summary(&self, filter: MetricsDateFilter) -> MetricsResult<MetricsSummary> {
1531        self.with_connection(move |connection| {
1532            let mut params_vec = Vec::new();
1533            let where_clause = build_session_where_clause(
1534                filter.start_date,
1535                filter.end_date,
1536                None,
1537                &mut params_vec,
1538            );
1539
1540            let summary_sql = format!(
1541                "SELECT COUNT(*), COALESCE(SUM(prompt_tokens), 0), COALESCE(SUM(completion_tokens), 0), COALESCE(SUM(total_tokens), 0), COALESCE(SUM(tool_call_count), 0), COALESCE(SUM(prompt_cached_tool_outputs), 0), COALESCE(SUM(total_compression_events), 0), COALESCE(SUM(total_tokens_saved), 0), COALESCE(SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END), 0), COALESCE(SUM(CASE WHEN status = 'awaiting_response' THEN 1 ELSE 0 END), 0), COALESCE(SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END), 0), COALESCE(SUM(CASE WHEN status = 'cancelled' THEN 1 ELSE 0 END), 0) FROM session_metrics {}",
1542                where_clause
1543            );
1544
1545            let mut stmt = connection.prepare(&summary_sql)?;
1546            let mut summary = stmt.query_row(params_from_iter(params_vec.iter()), |row| {
1547                Ok(MetricsSummary {
1548                    total_sessions: row.get::<_, i64>(0)? as u64,
1549                    total_tokens: TokenUsage {
1550                        prompt_tokens: row.get::<_, i64>(1)? as u64,
1551                        completion_tokens: row.get::<_, i64>(2)? as u64,
1552                        total_tokens: row.get::<_, i64>(3)? as u64,
1553                    },
1554                    total_tool_calls: row.get::<_, i64>(4)? as u64,
1555                    prompt_cached_tool_outputs: row.get::<_, i64>(5)? as u64,
1556                    total_compression_events: row.get::<_, i64>(6)? as u64,
1557                    total_tokens_saved: row.get::<_, i64>(7)? as u64,
1558                    completed_sessions: row.get::<_, i64>(8)? as u64,
1559                    awaiting_response_sessions: row.get::<_, i64>(9)? as u64,
1560                    error_sessions: row.get::<_, i64>(10)? as u64,
1561                    cancelled_sessions: row.get::<_, i64>(11)? as u64,
1562                    total_sync_mismatches: 0,
1563                    sync_mismatch_breakdown: HashMap::new(),
1564                    active_sessions: 0,
1565                })
1566            })?;
1567
1568            let mut mismatch_params = Vec::new();
1569            let mismatch_clause = build_execute_sync_mismatch_where_clause(
1570                filter.start_date,
1571                filter.end_date,
1572                None,
1573                &mut mismatch_params,
1574            );
1575            let mismatch_sql = format!(
1576                "SELECT COALESCE(SUM(count), 0) FROM execute_sync_mismatch_metrics {}",
1577                mismatch_clause
1578            );
1579            let mut mismatch_stmt = connection.prepare(&mismatch_sql)?;
1580            summary.total_sync_mismatches = mismatch_stmt
1581                .query_row(params_from_iter(mismatch_params.iter()), |row| row.get::<_, i64>(0))?
1582                as u64;
1583            summary.sync_mismatch_breakdown = load_execute_sync_mismatch_breakdown(
1584                connection,
1585                filter.start_date,
1586                filter.end_date,
1587            )?;
1588            let mut active_params = Vec::new();
1589            let active_clause = build_session_where_clause(
1590                filter.start_date,
1591                filter.end_date,
1592                Some("running"),
1593                &mut active_params,
1594            );
1595            let active_sql = format!(
1596                "SELECT COUNT(*) FROM session_metrics {}",
1597                active_clause
1598            );
1599            let mut active_stmt = connection.prepare(&active_sql)?;
1600            let active_sessions = active_stmt.query_row(params_from_iter(active_params.iter()), |row| {
1601                row.get::<_, i64>(0)
1602            })? as u64;
1603
1604            Ok(MetricsSummary {
1605                active_sessions,
1606                ..summary
1607            })
1608        })
1609        .await
1610    }
1611
1612    async fn by_model(&self, filter: MetricsDateFilter) -> MetricsResult<Vec<ModelMetrics>> {
1613        self.with_connection(move |connection| {
1614            let mut params_vec = Vec::new();
1615            let where_clause = build_session_where_clause(
1616                filter.start_date,
1617                filter.end_date,
1618                None,
1619                &mut params_vec,
1620            );
1621
1622            let sql = format!(
1623                "SELECT model, COUNT(*), COALESCE(SUM(total_rounds), 0), COALESCE(SUM(prompt_tokens), 0), COALESCE(SUM(completion_tokens), 0), COALESCE(SUM(total_tokens), 0), COALESCE(SUM(tool_call_count), 0), COALESCE(SUM(prompt_cached_tool_outputs), 0) FROM session_metrics {} GROUP BY model ORDER BY SUM(total_tokens) DESC",
1624                where_clause
1625            );
1626
1627            let mut stmt = connection.prepare(&sql)?;
1628            let mut rows = stmt.query(params_from_iter(params_vec.iter()))?;
1629            let mut models = Vec::new();
1630
1631            while let Some(row) = rows.next()? {
1632                models.push(ModelMetrics {
1633                    model: row.get(0)?,
1634                    sessions: row.get::<_, i64>(1)? as u64,
1635                    rounds: row.get::<_, i64>(2)? as u64,
1636                    tokens: TokenUsage {
1637                        prompt_tokens: row.get::<_, i64>(3)? as u64,
1638                        completion_tokens: row.get::<_, i64>(4)? as u64,
1639                        total_tokens: row.get::<_, i64>(5)? as u64,
1640                    },
1641                    tool_calls: row.get::<_, i64>(6)? as u64,
1642                    prompt_cached_tool_outputs: row.get::<_, i64>(7)? as u64,
1643                });
1644            }
1645
1646            Ok(models)
1647        })
1648        .await
1649    }
1650
1651    async fn sessions(&self, filter: SessionMetricsFilter) -> MetricsResult<Vec<SessionMetrics>> {
1652        self.with_connection(move |connection| {
1653            let mut params_vec = Vec::new();
1654            let where_clause = build_session_where_clause(
1655                filter.start_date,
1656                filter.end_date,
1657                None,
1658                &mut params_vec,
1659            );
1660            let mut conditions = if where_clause.is_empty() {
1661                Vec::new()
1662            } else {
1663                vec![where_clause.replacen("WHERE ", "", 1)]
1664            };
1665            if let Some(model) = filter.model {
1666                conditions.push("model = ?".to_string());
1667                params_vec.push(model);
1668            }
1669
1670            let where_sql = if conditions.is_empty() {
1671                String::new()
1672            } else {
1673                format!("WHERE {}", conditions.join(" AND "))
1674            };
1675
1676            let limit = i64::from(filter.limit.unwrap_or(100).min(1_000));
1677            let sql = format!(
1678                "SELECT session_id, model, started_at, completed_at, total_rounds, prompt_tokens, completion_tokens, total_tokens, tool_call_count, prompt_cached_tool_outputs, total_compression_events, total_tokens_saved, status, message_count FROM session_metrics {} ORDER BY started_at DESC LIMIT {}",
1679                where_sql, limit
1680            );
1681
1682            let mut stmt = connection.prepare(&sql)?;
1683            let mut rows = stmt.query(params_from_iter(params_vec.iter()))?;
1684            let mut sessions = Vec::new();
1685
1686            while let Some(row) = rows.next()? {
1687                let session_id: String = row.get(0)?;
1688                let started_at = parse_timestamp(row.get::<_, String>(2)?)?;
1689                let completed_at = parse_optional_timestamp(row.get::<_, Option<String>>(3)?)?;
1690                let status_raw: String = row.get(12)?;
1691                let status = SessionStatus::from_db(&status_raw).ok_or_else(|| {
1692                    MetricsError::InvalidData(format!("unknown session status: {}", status_raw))
1693                })?;
1694                let tool_breakdown = load_tool_breakdown(connection, &session_id)?;
1695
1696                sessions.push(SessionMetrics {
1697                    session_id,
1698                    model: row.get(1)?,
1699                    started_at,
1700                    completed_at,
1701                    total_rounds: row.get::<_, i64>(4)? as u32,
1702                    total_token_usage: TokenUsage {
1703                        prompt_tokens: row.get::<_, i64>(5)? as u64,
1704                        completion_tokens: row.get::<_, i64>(6)? as u64,
1705                        total_tokens: row.get::<_, i64>(7)? as u64,
1706                    },
1707                    tool_call_count: row.get::<_, i64>(8)? as u32,
1708                    prompt_cached_tool_outputs: row.get::<_, i64>(9)? as u64,
1709                    total_compression_events: row.get::<_, i64>(10)? as u64,
1710                    total_tokens_saved: row.get::<_, i64>(11)? as u64,
1711                    tool_breakdown,
1712                    status,
1713                    message_count: row.get::<_, i64>(13)? as u32,
1714                    duration_ms: compute_duration_ms(started_at, completed_at),
1715                });
1716            }
1717
1718            Ok(sessions)
1719        })
1720        .await
1721    }
1722
1723    async fn session_detail(&self, session_id: &str) -> MetricsResult<Option<SessionDetail>> {
1724        let session_id = session_id.to_string();
1725        self.with_connection(move |connection| {
1726            let session_sql = "SELECT session_id, model, started_at, completed_at, total_rounds, prompt_tokens, completion_tokens, total_tokens, tool_call_count, prompt_cached_tool_outputs, total_compression_events, total_tokens_saved, status, message_count FROM session_metrics WHERE session_id = ?1";
1727            let session_row = connection
1728                .query_row(session_sql, params![session_id], |row| {
1729                    Ok((
1730                        row.get::<_, String>(0)?,
1731                        row.get::<_, String>(1)?,
1732                        row.get::<_, String>(2)?,
1733                        row.get::<_, Option<String>>(3)?,
1734                        row.get::<_, i64>(4)?,
1735                        row.get::<_, i64>(5)?,
1736                        row.get::<_, i64>(6)?,
1737                        row.get::<_, i64>(7)?,
1738                        row.get::<_, i64>(8)?,
1739                        row.get::<_, i64>(9)?,
1740                        row.get::<_, i64>(10)?,
1741                        row.get::<_, i64>(11)?,
1742                        row.get::<_, String>(12)?,
1743                        row.get::<_, i64>(13)?,
1744                    ))
1745                })
1746                .optional()?;
1747
1748            let Some((
1749                session_id,
1750                model,
1751                started_at_raw,
1752                completed_at_raw,
1753                total_rounds,
1754                prompt_tokens,
1755                completion_tokens,
1756                total_tokens,
1757                tool_call_count,
1758                prompt_cached_tool_outputs,
1759                total_compression_events,
1760                total_tokens_saved,
1761                status_raw,
1762                message_count,
1763            )) = session_row
1764            else {
1765                return Ok(None);
1766            };
1767
1768            let started_at = parse_timestamp(started_at_raw)?;
1769            let completed_at = parse_optional_timestamp(completed_at_raw)?;
1770            let status = SessionStatus::from_db(&status_raw).ok_or_else(|| {
1771                MetricsError::InvalidData(format!("unknown session status: {}", status_raw))
1772            })?;
1773            let tool_breakdown = load_tool_breakdown(connection, &session_id)?;
1774
1775            let session = SessionMetrics {
1776                session_id: session_id.clone(),
1777                model,
1778                started_at,
1779                completed_at,
1780                total_rounds: total_rounds as u32,
1781                total_token_usage: TokenUsage {
1782                    prompt_tokens: prompt_tokens as u64,
1783                    completion_tokens: completion_tokens as u64,
1784                    total_tokens: total_tokens as u64,
1785                },
1786                tool_call_count: tool_call_count as u32,
1787                prompt_cached_tool_outputs: prompt_cached_tool_outputs as u64,
1788                total_compression_events: total_compression_events as u64,
1789                total_tokens_saved: total_tokens_saved as u64,
1790                tool_breakdown,
1791                status,
1792                message_count: message_count as u32,
1793                duration_ms: compute_duration_ms(started_at, completed_at),
1794            };
1795
1796            let rounds = load_rounds(connection, &session_id)?;
1797            Ok(Some(SessionDetail { session, rounds }))
1798        })
1799        .await
1800    }
1801
1802    async fn increment_execute_sync_mismatch(
1803        &self,
1804        reason: &str,
1805        occurred_at: DateTime<Utc>,
1806    ) -> MetricsResult<()> {
1807        let reason = reason.to_string();
1808        let mismatch_date = occurred_at.date_naive().to_string();
1809        let updated_at = format_timestamp(occurred_at);
1810
1811        self.with_connection(move |connection| {
1812            connection.execute(
1813                r#"
1814                INSERT INTO execute_sync_mismatch_metrics (reason, mismatch_date, count, updated_at)
1815                VALUES (?1, ?2, 1, ?3)
1816                ON CONFLICT(reason, mismatch_date) DO UPDATE SET
1817                    count = count + 1,
1818                    updated_at = excluded.updated_at
1819                "#,
1820                params![reason, mismatch_date, updated_at],
1821            )?;
1822            Ok(())
1823        })
1824        .await
1825    }
1826
1827    async fn daily_metrics(
1828        &self,
1829        days: u32,
1830        end_date: Option<NaiveDate>,
1831    ) -> MetricsResult<Vec<DailyMetrics>> {
1832        let end_date = end_date.unwrap_or_else(|| Utc::now().date_naive());
1833        let span = days.max(1) - 1;
1834        let start_date = end_date - chrono::Duration::days(i64::from(span));
1835
1836        self.with_connection(move |connection| {
1837            let mut stmt = connection.prepare(
1838                r#"
1839                SELECT
1840                    date(started_at) AS date_key,
1841                    COUNT(*) AS total_sessions,
1842                    COALESCE(SUM(total_rounds), 0) AS total_rounds,
1843                    COALESCE(SUM(prompt_tokens), 0) AS prompt_tokens,
1844                    COALESCE(SUM(completion_tokens), 0) AS completion_tokens,
1845                    COALESCE(SUM(total_tokens), 0) AS total_tokens,
1846                    COALESCE(SUM(tool_call_count), 0) AS total_tool_calls,
1847                    COALESCE(SUM(prompt_cached_tool_outputs), 0) AS prompt_cached_tool_outputs
1848                FROM session_metrics
1849                WHERE date(started_at) BETWEEN date(?1) AND date(?2)
1850                GROUP BY date_key
1851                ORDER BY date_key ASC
1852                "#,
1853            )?;
1854
1855            let mut rows = stmt.query(params![start_date.to_string(), end_date.to_string()])?;
1856            let mut result = Vec::new();
1857
1858            while let Some(row) = rows.next()? {
1859                let date = NaiveDate::parse_from_str(&row.get::<_, String>(0)?, "%Y-%m-%d")?;
1860                let model_breakdown = load_daily_model_breakdown(connection, date)?;
1861                let tool_breakdown = load_daily_tool_breakdown(connection, date)?;
1862
1863                result.push(DailyMetrics {
1864                    date,
1865                    total_sessions: row.get::<_, i64>(1)? as u32,
1866                    total_rounds: row.get::<_, i64>(2)? as u32,
1867                    total_token_usage: TokenUsage {
1868                        prompt_tokens: row.get::<_, i64>(3)? as u64,
1869                        completion_tokens: row.get::<_, i64>(4)? as u64,
1870                        total_tokens: row.get::<_, i64>(5)? as u64,
1871                    },
1872                    total_tool_calls: row.get::<_, i64>(6)? as u32,
1873                    prompt_cached_tool_outputs: row.get::<_, i64>(7)? as u64,
1874                    model_breakdown,
1875                    tool_breakdown,
1876                });
1877            }
1878
1879            Ok(result)
1880        })
1881        .await
1882    }
1883
1884    async fn prune_rounds_before(&self, cutoff: DateTime<Utc>) -> MetricsResult<u64> {
1885        self.with_connection(move |connection| {
1886            let cutoff_str = format_timestamp(cutoff);
1887            let deleted = connection.execute(
1888                "DELETE FROM round_metrics WHERE started_at < ?1",
1889                params![cutoff_str],
1890            )?;
1891
1892            let mut stmt = connection.prepare("SELECT session_id FROM session_metrics")?;
1893            let session_ids: Vec<String> = stmt
1894                .query_map([], |row| row.get(0))?
1895                .collect::<Result<Vec<String>, _>>()?;
1896            for session_id in session_ids {
1897                refresh_session_aggregates(connection, &session_id, Utc::now())?;
1898            }
1899
1900            Ok(deleted as u64)
1901        })
1902        .await
1903    }
1904
1905    async fn reconcile_stale_executions(
1906        &self,
1907        active_session_ids: &[String],
1908        awaiting_response_session_ids: &[String],
1909    ) -> MetricsResult<()> {
1910        let active_session_ids = active_session_ids.to_vec();
1911        let awaiting_response_session_ids = awaiting_response_session_ids.to_vec();
1912
1913        self.with_connection(move |connection| {
1914            let reconciled_at = Utc::now();
1915            let reconciled_at_str = format_timestamp(reconciled_at);
1916
1917            let mut stmt = connection.prepare(
1918                "SELECT session_id FROM session_metrics WHERE status = 'running'",
1919            )?;
1920            let running_session_ids: Vec<String> = stmt
1921                .query_map([], |row| row.get(0))?
1922                .collect::<Result<Vec<String>, _>>()?;
1923
1924            for session_id in running_session_ids {
1925                if active_session_ids.iter().any(|id| id == &session_id) {
1926                    continue;
1927                }
1928
1929                let status = if awaiting_response_session_ids
1930                    .iter()
1931                    .any(|id| id == &session_id)
1932                {
1933                    SessionStatus::AwaitingResponse
1934                } else {
1935                    SessionStatus::Completed
1936                };
1937
1938                connection.execute(
1939                    "UPDATE session_metrics SET status = ?1, completed_at = COALESCE(completed_at, ?2), updated_at = ?2 WHERE session_id = ?3",
1940                    params![status.as_str(), reconciled_at_str, session_id],
1941                )?;
1942                refresh_session_aggregates(connection, &session_id, reconciled_at)?;
1943            }
1944
1945            connection.execute(
1946                "UPDATE round_metrics SET status = 'error', completed_at = COALESCE(completed_at, ?1), error = COALESCE(error, 'reconciled_stale_round') WHERE status = 'running'",
1947                params![reconciled_at_str],
1948            )?;
1949            connection.execute(
1950                "UPDATE forward_request_metrics SET status = 'error', completed_at = COALESCE(completed_at, ?1), error = COALESCE(error, 'reconciled_stale_forward'), updated_at = ?1 WHERE status = 'pending' AND completed_at IS NULL",
1951                params![reconciled_at_str],
1952            )?;
1953
1954            Ok(())
1955        })
1956        .await
1957    }
1958}
1959
1960/// Opens a connection to the SQLite database with proper configuration.
1961///
1962/// This function:
1963/// 1. Creates parent directories if they don't exist
1964/// 2. Opens the database file (creates if doesn't exist)
1965/// 3. Configures optimal SQLite settings:
1966///    - WAL mode for concurrent access
1967///    - Foreign key enforcement
1968///    - Normal synchronous mode for performance
1969///
1970/// # Arguments
1971///
1972/// * `path` - Path to the SQLite database file
1973///
1974/// # Errors
1975///
1976/// Returns an error if:
1977/// - Parent directories cannot be created
1978/// - Database file cannot be opened
1979/// - PRAGMA settings fail to apply
1980fn open_connection(path: &Path) -> MetricsResult<Connection> {
1981    if let Some(parent) = path.parent() {
1982        std::fs::create_dir_all(parent)?;
1983    }
1984    let connection = Connection::open(path)?;
1985    connection.execute_batch(
1986        r#"
1987        PRAGMA journal_mode = WAL;
1988        PRAGMA foreign_keys = ON;
1989        PRAGMA synchronous = NORMAL;
1990        "#,
1991    )?;
1992    Ok(connection)
1993}
1994
1995/// Formats a timestamp as RFC3339 string for database storage.
1996///
1997/// # Arguments
1998///
1999/// * `timestamp` - DateTime to format
2000///
2001/// # Returns
2002///
2003/// RFC3339 formatted string (e.g., "2026-02-24T12:34:56.789+00:00")
2004fn format_timestamp(timestamp: DateTime<Utc>) -> String {
2005    timestamp.to_rfc3339()
2006}
2007
2008/// Parses an RFC3339 timestamp string from the database.
2009///
2010/// # Arguments
2011///
2012/// * `raw` - RFC3339 formatted string
2013///
2014/// # Errors
2015///
2016/// Returns an error if the string doesn't conform to RFC3339 format.
2017fn parse_timestamp(raw: String) -> MetricsResult<DateTime<Utc>> {
2018    Ok(DateTime::parse_from_rfc3339(&raw)?.with_timezone(&Utc))
2019}
2020
2021/// Parses an optional RFC3339 timestamp string.
2022///
2023/// # Arguments
2024///
2025/// * `raw` - Optional RFC3339 formatted string
2026///
2027/// # Returns
2028///
2029/// Returns `Ok(None)` if the input is None, otherwise parses the timestamp.
2030fn parse_optional_timestamp(raw: Option<String>) -> MetricsResult<Option<DateTime<Utc>>> {
2031    raw.map(parse_timestamp).transpose()
2032}
2033
2034/// Computes the duration in milliseconds between two timestamps.
2035///
2036/// # Arguments
2037///
2038/// * `started_at` - Start timestamp
2039/// * `completed_at` - Optional end timestamp
2040///
2041/// # Returns
2042///
2043/// Returns `None` if `completed_at` is None, otherwise returns the duration in milliseconds.
2044/// Returns `None` if the duration is negative or too large to fit in u64.
2045fn compute_duration_ms(
2046    started_at: DateTime<Utc>,
2047    completed_at: Option<DateTime<Utc>>,
2048) -> Option<u64> {
2049    completed_at.and_then(|end| {
2050        end.signed_duration_since(started_at)
2051            .num_milliseconds()
2052            .try_into()
2053            .ok()
2054    })
2055}
2056
2057/// Builds a SQL WHERE clause for session metrics queries.
2058///
2059/// Constructs a WHERE clause based on the provided filter criteria,
2060/// appending parameters to the params vector in the correct order.
2061///
2062/// # Arguments
2063///
2064/// * `start_date` - Optional start date filter (inclusive)
2065/// * `end_date` - Optional end date filter (inclusive)
2066/// * `required_status` - Optional status filter (e.g., "running", "completed")
2067/// * `params_vec` - Vector to append SQL parameters to
2068///
2069/// # Returns
2070///
2071/// Returns an empty string if no filters are applied, otherwise returns
2072/// a WHERE clause starting with "WHERE ".
2073fn build_session_where_clause(
2074    start_date: Option<NaiveDate>,
2075    end_date: Option<NaiveDate>,
2076    required_status: Option<&str>,
2077    params_vec: &mut Vec<String>,
2078) -> String {
2079    let mut conditions = Vec::new();
2080
2081    if let Some(start) = start_date {
2082        conditions.push("date(started_at) >= date(?)".to_string());
2083        params_vec.push(start.to_string());
2084    }
2085
2086    if let Some(end) = end_date {
2087        conditions.push("date(started_at) <= date(?)".to_string());
2088        params_vec.push(end.to_string());
2089    }
2090
2091    if let Some(status) = required_status {
2092        conditions.push("status = ?".to_string());
2093        params_vec.push(status.to_string());
2094    }
2095
2096    if conditions.is_empty() {
2097        String::new()
2098    } else {
2099        format!("WHERE {}", conditions.join(" AND "))
2100    }
2101}
2102
2103/// Builds a SQL WHERE clause for forward request metrics queries.
2104///
2105/// Constructs a WHERE clause based on the provided filter criteria,
2106/// appending parameters to the params vector in the correct order.
2107///
2108/// # Arguments
2109///
2110/// * `start_date` - Optional start date filter (inclusive)
2111/// * `end_date` - Optional end date filter (inclusive)
2112/// * `endpoint` - Optional endpoint filter
2113/// * `model` - Optional model filter
2114/// * `params_vec` - Vector to append SQL parameters to
2115///
2116/// # Returns
2117///
2118/// Returns an empty string if no filters are applied, otherwise returns
2119/// a WHERE clause starting with "WHERE ".
2120fn build_forward_where_clause(
2121    start_date: Option<NaiveDate>,
2122    end_date: Option<NaiveDate>,
2123    endpoint: Option<&str>,
2124    model: Option<&str>,
2125    params_vec: &mut Vec<String>,
2126) -> String {
2127    let mut conditions = Vec::new();
2128
2129    if let Some(start) = start_date {
2130        conditions.push("date(started_at) >= date(?)".to_string());
2131        params_vec.push(start.to_string());
2132    }
2133
2134    if let Some(end) = end_date {
2135        conditions.push("date(started_at) <= date(?)".to_string());
2136        params_vec.push(end.to_string());
2137    }
2138
2139    if let Some(ep) = endpoint {
2140        conditions.push("endpoint = ?".to_string());
2141        params_vec.push(ep.to_string());
2142    }
2143
2144    if let Some(m) = model {
2145        conditions.push("model = ?".to_string());
2146        params_vec.push(m.to_string());
2147    }
2148
2149    if conditions.is_empty() {
2150        String::new()
2151    } else {
2152        format!("WHERE {}", conditions.join(" AND "))
2153    }
2154}
2155
2156fn build_execute_sync_mismatch_where_clause(
2157    start_date: Option<NaiveDate>,
2158    end_date: Option<NaiveDate>,
2159    reason: Option<&str>,
2160    params_vec: &mut Vec<String>,
2161) -> String {
2162    let mut conditions = Vec::new();
2163
2164    if let Some(start) = start_date {
2165        conditions.push("date(mismatch_date) >= date(?)".to_string());
2166        params_vec.push(start.to_string());
2167    }
2168
2169    if let Some(end) = end_date {
2170        conditions.push("date(mismatch_date) <= date(?)".to_string());
2171        params_vec.push(end.to_string());
2172    }
2173
2174    if let Some(reason) = reason {
2175        conditions.push("reason = ?".to_string());
2176        params_vec.push(reason.to_string());
2177    }
2178
2179    if conditions.is_empty() {
2180        String::new()
2181    } else {
2182        format!("WHERE {}", conditions.join(" AND "))
2183    }
2184}
2185
2186fn ensure_integer_column(
2187    connection: &Connection,
2188    table: &str,
2189    column: &str,
2190    default_value: i64,
2191) -> MetricsResult<()> {
2192    let pragma = format!("PRAGMA table_info({table})");
2193    let mut stmt = connection.prepare(&pragma)?;
2194    let mut rows = stmt.query([])?;
2195    while let Some(row) = rows.next()? {
2196        let name: String = row.get(1)?;
2197        if name == column {
2198            return Ok(());
2199        }
2200    }
2201
2202    let alter =
2203        format!("ALTER TABLE {table} ADD COLUMN {column} INTEGER NOT NULL DEFAULT {default_value}");
2204    connection.execute(&alter, [])?;
2205    Ok(())
2206}
2207
2208/// Refreshes aggregated metrics for a session by recalculating from child entities.
2209///
2210/// This function updates the session's aggregate columns by summing values
2211/// from all associated rounds and counting tool calls. It should be called
2212/// whenever a round or tool call is added or modified.
2213///
2214/// # Updated Columns
2215///
2216/// - `total_rounds`: Count of rounds in the session
2217/// - `prompt_tokens`: Sum of prompt tokens from all rounds
2218/// - `completion_tokens`: Sum of completion tokens from all rounds
2219/// - `total_tokens`: Sum of total tokens from all rounds
2220/// - `prompt_cached_tool_outputs`: Sum of prompt-side cached tool outputs from all rounds
2221/// - `total_compression_events`: Sum of compression events from all rounds
2222/// - `total_tokens_saved`: Sum of tokens saved by compression from all rounds
2223/// - `tool_call_count`: Count of tool calls in the session
2224/// - `updated_at`: Timestamp of this update
2225///
2226/// # Arguments
2227///
2228/// * `connection` - Database connection to use
2229/// * `session_id` - Session to refresh
2230/// * `updated_at` - Timestamp for the updated_at column
2231///
2232/// # Errors
2233///
2234/// Returns an error if the SQL execution fails.
2235fn refresh_session_aggregates(
2236    connection: &Connection,
2237    session_id: &str,
2238    updated_at: DateTime<Utc>,
2239) -> MetricsResult<()> {
2240    let updated_at = format_timestamp(updated_at);
2241    connection.execute(
2242        r#"
2243        UPDATE session_metrics
2244        SET
2245            total_rounds = COALESCE((SELECT COUNT(*) FROM round_metrics WHERE session_id = ?1), 0),
2246            prompt_tokens = COALESCE((SELECT SUM(prompt_tokens) FROM round_metrics WHERE session_id = ?1), 0),
2247            completion_tokens = COALESCE((SELECT SUM(completion_tokens) FROM round_metrics WHERE session_id = ?1), 0),
2248            total_tokens = COALESCE((SELECT SUM(total_tokens) FROM round_metrics WHERE session_id = ?1), 0),
2249            prompt_cached_tool_outputs = COALESCE((SELECT SUM(prompt_cached_tool_outputs) FROM round_metrics WHERE session_id = ?1), 0),
2250            total_compression_events = COALESCE((SELECT SUM(compression_count) FROM round_metrics WHERE session_id = ?1), 0),
2251            total_tokens_saved = COALESCE((SELECT SUM(tokens_saved) FROM round_metrics WHERE session_id = ?1), 0),
2252            tool_call_count = COALESCE((SELECT COUNT(*) FROM tool_call_metrics WHERE session_id = ?1), 0),
2253            updated_at = ?2
2254        WHERE session_id = ?1
2255        "#,
2256        params![session_id, updated_at],
2257    )?;
2258    Ok(())
2259}
2260
2261/// Loads tool call breakdown (tool_name -> count) for a session.
2262///
2263/// Retrieves the count of tool invocations grouped by tool name
2264/// for the specified session.
2265///
2266/// # Arguments
2267///
2268/// * `connection` - Database connection to use
2269/// * `session_id` - Session to get tool breakdown for
2270///
2271/// # Returns
2272///
2273/// A HashMap mapping tool names to their invocation counts.
2274///
2275/// # Example
2276///
2277/// ```rust,ignore
2278/// let breakdown = load_tool_breakdown(&conn, "session-123")?;
2279/// // breakdown might be: {"read_file": 5, "execute_command": 2}
2280/// ```
2281fn load_tool_breakdown(
2282    connection: &Connection,
2283    session_id: &str,
2284) -> MetricsResult<HashMap<String, u32>> {
2285    let mut stmt = connection.prepare(
2286        "SELECT tool_name, COUNT(*) FROM tool_call_metrics WHERE session_id = ?1 GROUP BY tool_name",
2287    )?;
2288    let mut rows = stmt.query(params![session_id])?;
2289    let mut breakdown = HashMap::new();
2290
2291    while let Some(row) = rows.next()? {
2292        let tool: String = row.get(0)?;
2293        let count: i64 = row.get(1)?;
2294        breakdown.insert(tool, count as u32);
2295    }
2296
2297    Ok(breakdown)
2298}
2299
2300/// Loads all rounds for a session with their associated tool calls.
2301///
2302/// Retrieves complete round metrics including token usage, status,
2303/// and all tool calls made during each round.
2304///
2305/// # Arguments
2306///
2307/// * `connection` - Database connection to use
2308/// * `session_id` - Session to get rounds for
2309///
2310/// # Returns
2311///
2312/// A vector of RoundMetrics ordered by started_at ascending.
2313///
2314/// # Errors
2315///
2316/// Returns an error if:
2317/// - SQL execution fails
2318/// - Timestamp parsing fails
2319/// - Status values are invalid
2320fn load_rounds(connection: &Connection, session_id: &str) -> MetricsResult<Vec<RoundMetrics>> {
2321    let mut stmt = connection.prepare(
2322        "SELECT round_id, session_id, model, started_at, completed_at, status, prompt_tokens, completion_tokens, total_tokens, prompt_cached_tool_outputs, compression_count, tokens_saved, error FROM round_metrics WHERE session_id = ?1 ORDER BY started_at ASC",
2323    )?;
2324    let mut rows = stmt.query(params![session_id])?;
2325    let mut rounds = Vec::new();
2326
2327    while let Some(row) = rows.next()? {
2328        let round_id: String = row.get(0)?;
2329        let started_at = parse_timestamp(row.get::<_, String>(3)?)?;
2330        let completed_at = parse_optional_timestamp(row.get::<_, Option<String>>(4)?)?;
2331        let status_raw: String = row.get(5)?;
2332        let status = RoundStatus::from_db(&status_raw).ok_or_else(|| {
2333            MetricsError::InvalidData(format!("unknown round status: {}", status_raw))
2334        })?;
2335
2336        rounds.push(RoundMetrics {
2337            round_id: round_id.clone(),
2338            session_id: row.get(1)?,
2339            model: row.get(2)?,
2340            started_at,
2341            completed_at,
2342            token_usage: TokenUsage {
2343                prompt_tokens: row.get::<_, i64>(6)? as u64,
2344                completion_tokens: row.get::<_, i64>(7)? as u64,
2345                total_tokens: row.get::<_, i64>(8)? as u64,
2346            },
2347            tool_calls: load_tool_calls(connection, &round_id)?,
2348            status,
2349            prompt_cached_tool_outputs: row.get::<_, i64>(9)? as u32,
2350            compression_count: row.get::<_, i64>(10)? as u32,
2351            tokens_saved: row.get::<_, i64>(11)? as u32,
2352            error: row.get(12)?,
2353            duration_ms: compute_duration_ms(started_at, completed_at),
2354        });
2355    }
2356
2357    Ok(rounds)
2358}
2359
2360/// Loads all tool calls for a specific round.
2361///
2362/// Retrieves tool invocation details including timing, success status,
2363/// and error information.
2364///
2365/// # Arguments
2366///
2367/// * `connection` - Database connection to use
2368/// * `round_id` - Round to get tool calls for
2369///
2370/// # Returns
2371///
2372/// A vector of ToolCallMetrics ordered by started_at ascending.
2373fn load_tool_calls(connection: &Connection, round_id: &str) -> MetricsResult<Vec<ToolCallMetrics>> {
2374    let mut stmt = connection.prepare(
2375        "SELECT tool_call_id, tool_name, started_at, completed_at, success, error FROM tool_call_metrics WHERE round_id = ?1 ORDER BY started_at ASC",
2376    )?;
2377    let mut rows = stmt.query(params![round_id])?;
2378    let mut tools = Vec::new();
2379
2380    while let Some(row) = rows.next()? {
2381        let started_at = parse_timestamp(row.get::<_, String>(2)?)?;
2382        let completed_at = parse_optional_timestamp(row.get::<_, Option<String>>(3)?)?;
2383        let success = row.get::<_, Option<i64>>(4)?.map(|value| value > 0);
2384
2385        tools.push(ToolCallMetrics {
2386            tool_call_id: row.get(0)?,
2387            tool_name: row.get(1)?,
2388            started_at,
2389            completed_at,
2390            success,
2391            error: row.get(5)?,
2392            duration_ms: compute_duration_ms(started_at, completed_at),
2393        });
2394    }
2395
2396    Ok(tools)
2397}
2398
2399/// Loads model-level token usage breakdown for a specific date.
2400///
2401/// Retrieves aggregated token usage grouped by AI model for all
2402/// sessions that started on the specified date.
2403///
2404/// # Arguments
2405///
2406/// * `connection` - Database connection to use
2407/// * `date` - Date to get model breakdown for
2408///
2409/// # Returns
2410///
2411/// A HashMap mapping model names to their total token usage.
2412///
2413/// # Example
2414///
2415/// ```rust,ignore
2416/// let breakdown = load_daily_model_breakdown(&conn, NaiveDate::from_ymd_opt(2026, 2, 24).unwrap())?;
2417/// // breakdown might be: {"gpt-4": TokenUsage{...}, "claude-3": TokenUsage{...}}
2418/// ```
2419fn load_daily_model_breakdown(
2420    connection: &Connection,
2421    date: NaiveDate,
2422) -> MetricsResult<HashMap<String, TokenUsage>> {
2423    let mut stmt = connection.prepare(
2424        r#"
2425        SELECT model,
2426               COALESCE(SUM(prompt_tokens), 0),
2427               COALESCE(SUM(completion_tokens), 0),
2428               COALESCE(SUM(total_tokens), 0)
2429        FROM session_metrics
2430        WHERE date(started_at) = date(?1)
2431        GROUP BY model
2432        "#,
2433    )?;
2434
2435    let mut rows = stmt.query(params![date.to_string()])?;
2436    let mut breakdown = HashMap::new();
2437
2438    while let Some(row) = rows.next()? {
2439        breakdown.insert(
2440            row.get::<_, String>(0)?,
2441            TokenUsage {
2442                prompt_tokens: row.get::<_, i64>(1)? as u64,
2443                completion_tokens: row.get::<_, i64>(2)? as u64,
2444                total_tokens: row.get::<_, i64>(3)? as u64,
2445            },
2446        );
2447    }
2448
2449    Ok(breakdown)
2450}
2451
2452/// Loads tool call count breakdown for a specific date.
2453///
2454/// Retrieves the count of tool invocations grouped by tool name
2455/// for all tool calls that occurred on the specified date.
2456///
2457/// # Arguments
2458///
2459/// * `connection` - Database connection to use
2460/// * `date` - Date to get tool breakdown for
2461///
2462/// # Returns
2463///
2464/// A HashMap mapping tool names to their invocation counts.
2465///
2466/// # Example
2467///
2468/// ```rust,ignore
2469/// let breakdown = load_daily_tool_breakdown(&conn, NaiveDate::from_ymd_opt(2026, 2, 24).unwrap())?;
2470/// // breakdown might be: {"read_file": 10, "write_file": 5, "execute_command": 3}
2471/// ```
2472fn load_daily_tool_breakdown(
2473    connection: &Connection,
2474    date: NaiveDate,
2475) -> MetricsResult<HashMap<String, u32>> {
2476    let mut stmt = connection.prepare(
2477        r#"
2478        SELECT tool_name, COUNT(*)
2479        FROM tool_call_metrics
2480        WHERE date(started_at) = date(?1)
2481        GROUP BY tool_name
2482        "#,
2483    )?;
2484
2485    let mut rows = stmt.query(params![date.to_string()])?;
2486    let mut breakdown = HashMap::new();
2487
2488    while let Some(row) = rows.next()? {
2489        breakdown.insert(row.get::<_, String>(0)?, row.get::<_, i64>(1)? as u32);
2490    }
2491
2492    Ok(breakdown)
2493}
2494
2495fn load_execute_sync_mismatch_breakdown(
2496    connection: &Connection,
2497    start_date: Option<NaiveDate>,
2498    end_date: Option<NaiveDate>,
2499) -> MetricsResult<HashMap<String, u64>> {
2500    let mut params_vec = Vec::new();
2501    let where_clause =
2502        build_execute_sync_mismatch_where_clause(start_date, end_date, None, &mut params_vec);
2503    let sql = format!(
2504        "SELECT reason, COALESCE(SUM(count), 0) FROM execute_sync_mismatch_metrics {} GROUP BY reason ORDER BY reason ASC",
2505        where_clause
2506    );
2507    let mut stmt = connection.prepare(&sql)?;
2508    let mut rows = stmt.query(params_from_iter(params_vec.iter()))?;
2509    let mut breakdown = HashMap::new();
2510
2511    while let Some(row) = rows.next()? {
2512        breakdown.insert(row.get::<_, String>(0)?, row.get::<_, i64>(1)? as u64);
2513    }
2514
2515    Ok(breakdown)
2516}
2517
2518#[cfg(test)]
2519mod tests {
2520    use std::collections::HashMap;
2521
2522    use chrono::{NaiveDate, TimeZone, Utc};
2523    use tempfile::tempdir;
2524
2525    use super::{MetricsStorage, SqliteMetricsStorage, ToolCallCompletion};
2526    use crate::metrics::types::{
2527        ForwardMetricsFilter, ForwardStatus, MetricsDateFilter, RoundStatus, SessionMetricsFilter,
2528        SessionStatus, TokenUsage,
2529    };
2530
2531    #[tokio::test]
2532    async fn storage_records_session_and_round_data_for_summary_queries() {
2533        let dir = tempdir().expect("temp dir");
2534        let storage = SqliteMetricsStorage::new(dir.path().join("metrics.db"));
2535
2536        storage.init().await.expect("init storage");
2537
2538        let started_at = Utc
2539            .with_ymd_and_hms(2026, 2, 10, 10, 0, 0)
2540            .single()
2541            .expect("valid datetime");
2542        storage
2543            .upsert_session_start("session-a", "gpt-4", started_at)
2544            .await
2545            .expect("session started");
2546        storage
2547            .update_session_message_count("session-a", 7, started_at)
2548            .await
2549            .expect("message count update");
2550
2551        storage
2552            .insert_round_start("round-a", "session-a", "gpt-4", started_at)
2553            .await
2554            .expect("round start");
2555        storage
2556            .insert_tool_start("tool-1", "round-a", "session-a", "read_file", started_at)
2557            .await
2558            .expect("tool start");
2559        storage
2560            .complete_tool_call(
2561                "tool-1",
2562                ToolCallCompletion {
2563                    completed_at: started_at,
2564                    success: true,
2565                    error: None,
2566                },
2567            )
2568            .await
2569            .expect("tool completion");
2570        storage
2571            .complete_round(
2572                "round-a",
2573                started_at,
2574                RoundStatus::Success,
2575                TokenUsage {
2576                    prompt_tokens: 10,
2577                    completion_tokens: 15,
2578                    total_tokens: 25,
2579                },
2580                3,
2581                None,
2582            )
2583            .await
2584            .expect("round completion");
2585        storage
2586            .complete_session("session-a", SessionStatus::Completed, started_at)
2587            .await
2588            .expect("session completion");
2589
2590        let summary = storage
2591            .summary(MetricsDateFilter::default())
2592            .await
2593            .expect("summary query");
2594
2595        assert_eq!(summary.total_sessions, 1);
2596        assert_eq!(summary.total_tokens.total_tokens, 25);
2597        assert_eq!(summary.total_tool_calls, 1);
2598        assert_eq!(summary.prompt_cached_tool_outputs, 3);
2599
2600        let detail = storage
2601            .session_detail("session-a")
2602            .await
2603            .expect("session detail query")
2604            .expect("session detail should exist");
2605        assert_eq!(detail.session.prompt_cached_tool_outputs, 3);
2606        assert_eq!(detail.rounds.len(), 1);
2607        assert_eq!(detail.rounds[0].prompt_cached_tool_outputs, 3);
2608    }
2609
2610    #[tokio::test]
2611    async fn storage_filters_sessions_and_returns_tool_breakdown() {
2612        let dir = tempdir().expect("temp dir");
2613        let storage = SqliteMetricsStorage::new(dir.path().join("metrics.db"));
2614
2615        storage.init().await.expect("init storage");
2616
2617        let day_a = Utc
2618            .with_ymd_and_hms(2026, 2, 1, 9, 0, 0)
2619            .single()
2620            .expect("valid datetime");
2621        let day_b = Utc
2622            .with_ymd_and_hms(2026, 2, 5, 9, 0, 0)
2623            .single()
2624            .expect("valid datetime");
2625
2626        storage
2627            .upsert_session_start("s1", "gpt-4", day_a)
2628            .await
2629            .expect("session start");
2630        storage
2631            .insert_round_start("r1", "s1", "gpt-4", day_a)
2632            .await
2633            .expect("round start");
2634        storage
2635            .insert_tool_start("t1", "r1", "s1", "read_file", day_a)
2636            .await
2637            .expect("tool start");
2638        storage
2639            .complete_tool_call(
2640                "t1",
2641                ToolCallCompletion {
2642                    completed_at: day_a,
2643                    success: true,
2644                    error: None,
2645                },
2646            )
2647            .await
2648            .expect("tool complete");
2649        storage
2650            .complete_round(
2651                "r1",
2652                day_a,
2653                RoundStatus::Success,
2654                TokenUsage {
2655                    prompt_tokens: 1,
2656                    completion_tokens: 1,
2657                    total_tokens: 2,
2658                },
2659                0,
2660                None,
2661            )
2662            .await
2663            .expect("round complete");
2664
2665        storage
2666            .upsert_session_start("s2", "claude-3", day_b)
2667            .await
2668            .expect("session start");
2669
2670        let sessions = storage
2671            .sessions(SessionMetricsFilter {
2672                start_date: Some(NaiveDate::from_ymd_opt(2026, 2, 1).expect("valid date")),
2673                end_date: Some(NaiveDate::from_ymd_opt(2026, 2, 3).expect("valid date")),
2674                model: Some("gpt-4".to_string()),
2675                limit: Some(100),
2676            })
2677            .await
2678            .expect("sessions query");
2679
2680        assert_eq!(sessions.len(), 1);
2681        assert_eq!(sessions[0].session_id, "s1");
2682        assert_eq!(sessions[0].tool_breakdown.get("read_file"), Some(&1));
2683    }
2684
2685    #[tokio::test]
2686    async fn storage_produces_daily_rollups_with_model_and_tool_breakdowns() {
2687        let dir = tempdir().expect("temp dir");
2688        let storage = SqliteMetricsStorage::new(dir.path().join("metrics.db"));
2689
2690        storage.init().await.expect("init storage");
2691
2692        let now = Utc
2693            .with_ymd_and_hms(2026, 2, 10, 12, 0, 0)
2694            .single()
2695            .expect("valid datetime");
2696        storage
2697            .upsert_session_start("daily-1", "gpt-4", now)
2698            .await
2699            .expect("session start");
2700        storage
2701            .insert_round_start("daily-r1", "daily-1", "gpt-4", now)
2702            .await
2703            .expect("round start");
2704        storage
2705            .insert_tool_start("daily-t1", "daily-r1", "daily-1", "write_file", now)
2706            .await
2707            .expect("tool start");
2708        storage
2709            .complete_tool_call(
2710                "daily-t1",
2711                ToolCallCompletion {
2712                    completed_at: now,
2713                    success: true,
2714                    error: None,
2715                },
2716            )
2717            .await
2718            .expect("tool complete");
2719        storage
2720            .complete_round(
2721                "daily-r1",
2722                now,
2723                RoundStatus::Success,
2724                TokenUsage {
2725                    prompt_tokens: 3,
2726                    completion_tokens: 7,
2727                    total_tokens: 10,
2728                },
2729                0,
2730                None,
2731            )
2732            .await
2733            .expect("round completion");
2734
2735        let daily = storage
2736            .daily_metrics(
2737                7,
2738                Some(NaiveDate::from_ymd_opt(2026, 2, 10).expect("valid date")),
2739            )
2740            .await
2741            .expect("daily metrics");
2742
2743        assert_eq!(daily.len(), 1);
2744        let row = &daily[0];
2745        assert_eq!(row.total_sessions, 1);
2746        assert_eq!(row.total_rounds, 1);
2747        assert_eq!(row.total_tool_calls, 1);
2748        assert_eq!(
2749            row.model_breakdown
2750                .get("gpt-4")
2751                .map(|usage| usage.total_tokens),
2752            Some(10)
2753        );
2754        assert_eq!(
2755            row.tool_breakdown,
2756            HashMap::from([(String::from("write_file"), 1)])
2757        );
2758    }
2759
2760    #[tokio::test]
2761    async fn storage_reconciles_stale_running_sessions_rounds_and_forwards() {
2762        let dir = tempdir().expect("temp dir");
2763        let storage = SqliteMetricsStorage::new(dir.path().join("metrics.db"));
2764
2765        storage.init().await.expect("init storage");
2766
2767        let now = Utc
2768            .with_ymd_and_hms(2026, 2, 12, 9, 0, 0)
2769            .single()
2770            .expect("valid datetime");
2771
2772        storage
2773            .upsert_session_start("stale-await", "gpt-4", now)
2774            .await
2775            .expect("await session start");
2776        storage
2777            .insert_round_start("round-await", "stale-await", "gpt-4", now)
2778            .await
2779            .expect("await round start");
2780
2781        storage
2782            .upsert_session_start("stale-complete", "gpt-4", now)
2783            .await
2784            .expect("complete session start");
2785        storage
2786            .insert_round_start("round-complete", "stale-complete", "gpt-4", now)
2787            .await
2788            .expect("complete round start");
2789
2790        storage
2791            .insert_forward_start(
2792                "forward-pending",
2793                "/v1/chat/completions",
2794                "gpt-4",
2795                false,
2796                now,
2797            )
2798            .await
2799            .expect("forward start");
2800
2801        storage
2802            .reconcile_stale_executions(&[], &[String::from("stale-await")])
2803            .await
2804            .expect("reconcile stale executions");
2805
2806        let sessions = storage
2807            .sessions(SessionMetricsFilter::default())
2808            .await
2809            .expect("sessions query");
2810        let stale_await = sessions
2811            .iter()
2812            .find(|session| session.session_id == "stale-await")
2813            .expect("stale-await should exist");
2814        let stale_complete = sessions
2815            .iter()
2816            .find(|session| session.session_id == "stale-complete")
2817            .expect("stale-complete should exist");
2818        assert_eq!(stale_await.status, SessionStatus::AwaitingResponse);
2819        assert_eq!(stale_complete.status, SessionStatus::Completed);
2820        assert!(stale_await.completed_at.is_some());
2821        assert!(stale_complete.completed_at.is_some());
2822
2823        let await_detail = storage
2824            .session_detail("stale-await")
2825            .await
2826            .expect("await detail query")
2827            .expect("await detail exists");
2828        let complete_detail = storage
2829            .session_detail("stale-complete")
2830            .await
2831            .expect("complete detail query")
2832            .expect("complete detail exists");
2833        assert_eq!(await_detail.rounds[0].status, RoundStatus::Error);
2834        assert_eq!(complete_detail.rounds[0].status, RoundStatus::Error);
2835        assert_eq!(
2836            await_detail.rounds[0].error.as_deref(),
2837            Some("reconciled_stale_round")
2838        );
2839        assert_eq!(
2840            complete_detail.rounds[0].error.as_deref(),
2841            Some("reconciled_stale_round")
2842        );
2843
2844        let forward_requests = storage
2845            .forward_requests(ForwardMetricsFilter::default())
2846            .await
2847            .expect("forward requests query");
2848        assert_eq!(forward_requests.len(), 1);
2849        assert_eq!(forward_requests[0].status, Some(ForwardStatus::Error));
2850        assert_eq!(
2851            forward_requests[0].error.as_deref(),
2852            Some("reconciled_stale_forward")
2853        );
2854
2855        let forward_summary = storage
2856            .forward_summary(ForwardMetricsFilter::default())
2857            .await
2858            .expect("forward summary query");
2859        assert_eq!(forward_summary.total_requests, 1);
2860        assert_eq!(forward_summary.successful_requests, 0);
2861        assert_eq!(forward_summary.failed_requests, 1);
2862
2863        let summary = storage
2864            .summary(MetricsDateFilter::default())
2865            .await
2866            .expect("summary query");
2867        assert_eq!(summary.total_sessions, 2);
2868        assert_eq!(summary.active_sessions, 0);
2869        assert_eq!(summary.awaiting_response_sessions, 1);
2870        assert_eq!(summary.completed_sessions, 1);
2871        assert_eq!(summary.error_sessions, 0);
2872        assert_eq!(summary.cancelled_sessions, 0);
2873    }
2874
2875    #[tokio::test]
2876    async fn storage_summarizes_execute_sync_mismatches_by_reason() {
2877        let dir = tempdir().expect("temp dir");
2878        let storage = SqliteMetricsStorage::new(dir.path().join("metrics.db"));
2879
2880        storage.init().await.expect("init storage");
2881
2882        let day_a = Utc
2883            .with_ymd_and_hms(2026, 2, 10, 10, 0, 0)
2884            .single()
2885            .expect("valid datetime");
2886        let day_b = Utc
2887            .with_ymd_and_hms(2026, 2, 11, 10, 0, 0)
2888            .single()
2889            .expect("valid datetime");
2890
2891        storage
2892            .increment_execute_sync_mismatch("message_count", day_a)
2893            .await
2894            .expect("message_count mismatch one");
2895        storage
2896            .increment_execute_sync_mismatch("message_count", day_a)
2897            .await
2898            .expect("message_count mismatch two");
2899        storage
2900            .increment_execute_sync_mismatch("pending_question", day_a)
2901            .await
2902            .expect("pending question mismatch");
2903        storage
2904            .increment_execute_sync_mismatch("last_message_id", day_b)
2905            .await
2906            .expect("last_message_id mismatch");
2907
2908        let day_a_summary = storage
2909            .summary(MetricsDateFilter {
2910                start_date: Some(NaiveDate::from_ymd_opt(2026, 2, 10).expect("valid date")),
2911                end_date: Some(NaiveDate::from_ymd_opt(2026, 2, 10).expect("valid date")),
2912            })
2913            .await
2914            .expect("day a summary");
2915
2916        assert_eq!(day_a_summary.total_sync_mismatches, 3);
2917        assert_eq!(
2918            day_a_summary.sync_mismatch_breakdown,
2919            HashMap::from([
2920                (String::from("message_count"), 2_u64),
2921                (String::from("pending_question"), 1_u64),
2922            ])
2923        );
2924
2925        let full_summary = storage
2926            .summary(MetricsDateFilter::default())
2927            .await
2928            .expect("full summary");
2929        assert_eq!(full_summary.total_sync_mismatches, 4);
2930        assert_eq!(
2931            full_summary.sync_mismatch_breakdown.get("last_message_id"),
2932            Some(&1_u64)
2933        );
2934    }
2935}