Skip to main content

bamboo_engine/metrics/
storage.rs

1//! Metrics Storage System
2//!
3//! This module provides persistent storage for agent metrics using SQLite as the backend.
4//! It implements a comprehensive metrics collection and query system for monitoring
5//! agent performance, resource usage, and behavior patterns.
6//!
7//! # Architecture
8//!
9//! The storage system is built around the [`MetricsStorage`] trait, which defines
10//! the interface for storing and retrieving metrics data. The primary implementation
11//! is [`SqliteMetricsStorage`], which uses SQLite with WAL mode for reliable,
12//! concurrent access.
13//!
14//! # Data Model
15//!
16//! Metrics are organized into three main categories:
17//!
18//! ## Session Metrics
19//! Track complete conversation sessions from start to finish, including:
20//! - Total rounds and token usage
21//! - Tool call counts and breakdown
22//! - Session duration and status
23//!
24//! ## Round Metrics
25//! Track individual request-response cycles within sessions:
26//! - Per-round token consumption
27//! - Round status and errors
28//! - Associated tool calls
29//!
30//! ## Forward Metrics
31//! Track HTTP proxy operations to upstream APIs:
32//! - Request/response tracking
33//! - Endpoint-specific metrics
34//! - Token usage per provider
35//!
36//! # Storage Schema
37//!
38//! The SQLite database contains the following tables:
39//! - `session_metrics`: Aggregated session-level metrics
40//! - `round_metrics`: Individual round metrics linked to sessions
41//! - `tool_call_metrics`: Tool invocation details linked to rounds
42//! - `forward_request_metrics`: HTTP proxy request tracking
43//!
44//! # Usage
45//!
46//! ```rust,ignore
47//! use bamboo_agent::agent::metrics::storage::{SqliteMetricsStorage, MetricsStorage};
48//!
49//! #[tokio::main]
50//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
51//!     // Initialize storage
52//!     let storage = SqliteMetricsStorage::new("metrics.db");
53//!     storage.init().await?;
54//!
55//!     // Record session start
56//!     storage.upsert_session_start(
57//!         "session-123",
58//!         "gpt-4",
59//!         chrono::Utc::now()
60//!     ).await?;
61//!
62//!     // Query metrics
63//!     let summary = storage.summary(Default::default()).await?;
64//!     println!("Total sessions: {}", summary.total_sessions);
65//!
66//!     Ok(())
67//! }
68//! ```
69//!
70//! # Performance
71//!
72//! The storage system is optimized for:
73//! - **Concurrent writes**: Uses WAL mode and spawn_blocking for async compatibility
74//! - **Efficient queries**: Indexed by timestamps, models, and endpoints
75//! - **Aggregate caching**: Session metrics are pre-aggregated for fast queries
76//!
77//! # Thread Safety
78//!
79//! All storage operations are thread-safe and can be called from multiple
80//! async tasks concurrently. SQLite connections are opened per-operation
81//! to avoid blocking the async runtime.
82
83use std::collections::HashMap;
84use std::path::{Path, PathBuf};
85
86use async_trait::async_trait;
87use chrono::{DateTime, NaiveDate, Utc};
88use rusqlite::{params, params_from_iter, Connection, OptionalExtension};
89use thiserror::Error;
90
91use crate::metrics::types::{
92    DailyMetrics, ForwardEndpointMetrics, ForwardMetricsFilter, ForwardMetricsSummary,
93    ForwardRequestMetrics, ForwardStatus, MetricsDateFilter, MetricsSummary, ModelMetrics,
94    RoundMetrics, RoundStatus, SessionDetail, SessionMetrics, SessionMetricsFilter, SessionStatus,
95    TokenUsage, ToolCallMetrics,
96};
97
98/// Result type for metrics storage operations.
99///
100/// This is a specialized Result type that uses [`MetricsError`] as the error type,
101/// providing a consistent return type across all storage operations.
102pub type MetricsResult<T> = Result<T, MetricsError>;
103
104/// Errors that can occur during metrics storage operations.
105///
106/// This enum covers all the error cases that can arise when working with
107/// the metrics storage system, from database errors to data validation issues.
108#[derive(Debug, Error)]
109pub enum MetricsError {
110    /// SQLite database operation failed.
111    ///
112    /// This can occur due to SQL syntax errors, constraint violations,
113    /// database corruption, or connection issues.
114    #[error("sqlite error: {0}")]
115    Sqlite(#[from] rusqlite::Error),
116
117    /// Timestamp parsing failed.
118    ///
119    /// This occurs when reading timestamps from the database that don't
120    /// conform to the expected RFC3339 format.
121    #[error("time parse error: {0}")]
122    Chrono(#[from] chrono::ParseError),
123
124    /// I/O operation failed.
125    ///
126    /// This can occur when creating the database file, directory, or
127    /// during other file system operations.
128    #[error("io error: {0}")]
129    Io(#[from] std::io::Error),
130
131    /// Async task failed to complete.
132    ///
133    /// This occurs when a spawned blocking task panics or is cancelled,
134    /// typically indicating a serious system issue.
135    #[error("storage task join error: {0}")]
136    Task(String),
137
138    /// Data validation failed.
139    ///
140    /// This occurs when retrieved data doesn't match expected constraints,
141    /// such as invalid enum values or malformed data.
142    #[error("invalid metrics data: {0}")]
143    InvalidData(String),
144}
145
146/// Information about a completed tool call.
147///
148/// This structure contains the completion details for a tool invocation,
149/// including when it finished, whether it succeeded, and any error information.
150///
151/// # Fields
152///
153/// - `completed_at`: Timestamp when the tool finished execution
154/// - `success`: Whether the tool executed successfully
155/// - `error`: Error message if the tool failed, None on success
156///
157/// # Example
158///
159/// ```rust,ignore
160/// use bamboo_agent::agent::metrics::storage::ToolCallCompletion;
161/// use chrono::Utc;
162///
163/// let completion = ToolCallCompletion {
164///     completed_at: Utc::now(),
165///     success: true,
166///     error: None,
167/// };
168/// ```
169#[derive(Debug, Clone)]
170pub struct ToolCallCompletion {
171    /// Timestamp when the tool call completed
172    pub completed_at: DateTime<Utc>,
173    /// Whether the tool execution succeeded
174    pub success: bool,
175    /// Error message if execution failed, None on success
176    pub error: Option<String>,
177}
178
179/// Trait defining the interface for metrics storage backends.
180///
181/// This trait provides an abstract interface for storing and querying metrics data.
182/// The primary implementation is [`SqliteMetricsStorage`], but this trait allows
183/// for alternative backends (e.g., PostgreSQL, TimescaleDB) to be implemented.
184///
185/// # Async Operations
186///
187/// All methods are async to support non-blocking I/O operations. The SQLite
188/// implementation uses `spawn_blocking` to avoid blocking the async runtime
189/// with database operations.
190///
191/// # Thread Safety
192///
193/// Implementations must be `Send + Sync` to allow sharing across async tasks.
194///
195/// # Data Consistency
196///
197/// The storage system maintains referential integrity between:
198/// - Sessions → Rounds → Tool Calls
199/// - Sessions aggregate data from child entities
200/// - Deletions cascade appropriately
201///
202/// # Example
203///
204/// ```rust,ignore
205/// use bamboo_agent::agent::metrics::storage::{MetricsStorage, SqliteMetricsStorage};
206/// use bamboo_agent::agent::metrics::types::{SessionStatus, TokenUsage};
207/// use chrono::Utc;
208///
209/// async fn example(storage: &dyn MetricsStorage) -> Result<(), Box<dyn std::error::Error>> {
210///     // Start a session
211///     storage.upsert_session_start("s1", "gpt-4", Utc::now()).await?;
212///
213///     // Add a round
214///     storage.insert_round_start("r1", "s1", "gpt-4", Utc::now()).await?;
215///
216///     // Complete the round
217///     storage.complete_round(
218///         "r1",
219///         Utc::now(),
220///         bamboo::agent::metrics::types::RoundStatus::Success,
221///         TokenUsage { prompt_tokens: 10, completion_tokens: 20, total_tokens: 30 },
222///         None
223///     ).await?;
224///
225///     // Complete the session
226///     storage.complete_session("s1", SessionStatus::Completed, Utc::now()).await?;
227///
228///     Ok(())
229/// }
230/// ```
231#[async_trait]
232pub trait MetricsStorage: Send + Sync {
233    /// Initializes the storage backend.
234    ///
235    /// This must be called before any other storage operations.
236    /// For SQLite, this creates the database schema if it doesn't exist.
237    ///
238    /// # Errors
239    ///
240    /// Returns an error if the database cannot be created or initialized.
241    async fn init(&self) -> MetricsResult<()>;
242
243    /// Records the start of a new chat session.
244    ///
245    /// If a session with the same ID already exists, it will be reset to
246    /// running status (useful for session recovery scenarios).
247    ///
248    /// # Arguments
249    ///
250    /// * `session_id` - Unique identifier for the session
251    /// * `model` - AI model being used (e.g., "gpt-4", "claude-3")
252    /// * `started_at` - Timestamp when the session started
253    ///
254    /// # Example
255    ///
256    /// ```rust,ignore
257    /// storage.upsert_session_start("session-123", "gpt-4", Utc::now()).await?;
258    /// ```
259    async fn upsert_session_start(
260        &self,
261        session_id: &str,
262        model: &str,
263        started_at: DateTime<Utc>,
264    ) -> MetricsResult<()>;
265
266    /// Updates the message count for a session.
267    ///
268    /// This should be called whenever messages are added to the conversation.
269    ///
270    /// # Arguments
271    ///
272    /// * `session_id` - Session to update
273    /// * `message_count` - New total message count
274    /// * `updated_at` - Timestamp of the update
275    async fn update_session_message_count(
276        &self,
277        session_id: &str,
278        message_count: u32,
279        updated_at: DateTime<Utc>,
280    ) -> MetricsResult<()>;
281
282    /// Marks a session as completed with a final status.
283    ///
284    /// This triggers a final aggregation of all session metrics before closing.
285    ///
286    /// # Arguments
287    ///
288    /// * `session_id` - Session to complete
289    /// * `status` - Final session status (completed, failed, or cancelled)
290    /// * `completed_at` - Timestamp when the session ended
291    async fn complete_session(
292        &self,
293        session_id: &str,
294        status: SessionStatus,
295        completed_at: DateTime<Utc>,
296    ) -> MetricsResult<()>;
297
298    /// Records the start of a new round within a session.
299    ///
300    /// A round represents a single request-response cycle. This also
301    /// triggers an update to the parent session's aggregate counters.
302    ///
303    /// # Arguments
304    ///
305    /// * `round_id` - Unique identifier for this round
306    /// * `session_id` - Parent session this round belongs to
307    /// * `model` - AI model being used for this round
308    /// * `started_at` - Timestamp when the round started
309    async fn insert_round_start(
310        &self,
311        round_id: &str,
312        session_id: &str,
313        model: &str,
314        started_at: DateTime<Utc>,
315    ) -> MetricsResult<()>;
316
317    /// Completes a round with final metrics and status.
318    ///
319    /// This records the round's completion and triggers an update to
320    /// the parent session's aggregated metrics.
321    ///
322    /// # Arguments
323    ///
324    /// * `round_id` - Round to complete
325    /// * `completed_at` - Timestamp when the round finished
326    /// * `status` - Final round status (success or failed)
327    /// * `usage` - Token consumption during this round
328    /// * `error` - Error message if the round failed, None on success
329    async fn complete_round(
330        &self,
331        round_id: &str,
332        completed_at: DateTime<Utc>,
333        status: RoundStatus,
334        usage: TokenUsage,
335        prompt_cached_tool_outputs: u32,
336        prompt_cached_tool_tokens_saved: u32,
337        error: Option<String>,
338    ) -> MetricsResult<()>;
339
340    /// Records a context-compression event against a round and refreshes the parent session aggregates.
341    async fn record_round_compression(
342        &self,
343        round_id: &str,
344        compressed_at: DateTime<Utc>,
345        tokens_saved: u32,
346    ) -> MetricsResult<()>;
347
348    /// Records the start of a tool invocation.
349    ///
350    /// Tools are called during rounds to perform specific actions
351    /// (e.g., reading files, executing commands).
352    ///
353    /// # Arguments
354    ///
355    /// * `tool_call_id` - Unique identifier for this tool call
356    /// * `round_id` - Round this tool call belongs to
357    /// * `session_id` - Session this tool call belongs to
358    /// * `tool_name` - Name of the tool being invoked
359    /// * `started_at` - Timestamp when the tool was invoked
360    async fn insert_tool_start(
361        &self,
362        tool_call_id: &str,
363        round_id: &str,
364        session_id: &str,
365        tool_name: &str,
366        started_at: DateTime<Utc>,
367    ) -> MetricsResult<()>;
368
369    /// Records the completion of a tool call.
370    ///
371    /// This updates the tool call record with completion details and
372    /// triggers an update to the parent session's tool call count.
373    ///
374    /// # Arguments
375    ///
376    /// * `tool_call_id` - Tool call to complete
377    /// * `completion` - Completion details including success status and timing
378    async fn complete_tool_call(
379        &self,
380        tool_call_id: &str,
381        completion: ToolCallCompletion,
382    ) -> MetricsResult<()>;
383
384    // Forward request metrics methods
385
386    /// Records the start of a forwarded HTTP request to an upstream API.
387    ///
388    /// This tracks requests proxied to external API providers like OpenAI or Anthropic.
389    ///
390    /// # Arguments
391    ///
392    /// * `forward_id` - Unique identifier for this forwarded request
393    /// * `endpoint` - API endpoint identifier (e.g., "openai.chat_completions")
394    /// * `model` - AI model being requested
395    /// * `is_stream` - Whether this is a streaming (SSE) request
396    /// * `started_at` - Timestamp when the request was initiated
397    async fn insert_forward_start(
398        &self,
399        forward_id: &str,
400        endpoint: &str,
401        model: &str,
402        is_stream: bool,
403        started_at: DateTime<Utc>,
404    ) -> MetricsResult<()>;
405
406    /// Completes a forwarded request with response details.
407    ///
408    /// This records the response from the upstream API, including status,
409    /// token usage, and any errors.
410    ///
411    /// # Arguments
412    ///
413    /// * `forward_id` - Forwarded request to complete
414    /// * `completed_at` - Timestamp when the response was received
415    /// * `status_code` - HTTP status code from the upstream API
416    /// * `status` - Classified status (success, error, or timeout)
417    /// * `usage` - Token usage if provided in the response
418    /// * `error` - Error message if the request failed
419    async fn complete_forward(
420        &self,
421        forward_id: &str,
422        completed_at: DateTime<Utc>,
423        status_code: Option<u16>,
424        status: ForwardStatus,
425        usage: Option<TokenUsage>,
426        error: Option<String>,
427    ) -> MetricsResult<()>;
428
429    /// Retrieves aggregated summary statistics for forwarded requests.
430    ///
431    /// Returns counts of total/successful/failed requests, token usage,
432    /// and average latency for requests matching the filter criteria.
433    ///
434    /// # Arguments
435    ///
436    /// * `filter` - Filter criteria for date range, endpoint, and model
437    ///
438    /// # Example
439    ///
440    /// ```rust,ignore
441    /// use bamboo_agent::agent::metrics::types::ForwardMetricsFilter;
442    ///
443    /// let filter = ForwardMetricsFilter {
444    ///     start_date: Some(NaiveDate::from_ymd_opt(2026, 2, 1).unwrap()),
445    ///     end_date: Some(NaiveDate::from_ymd_opt(2026, 2, 28).unwrap()),
446    ///     endpoint: Some("openai.chat_completions".to_string()),
447    ///     model: None,
448    ///     limit: None,
449    /// };
450    ///
451    /// let summary = storage.forward_summary(filter).await?;
452    /// println!("Total requests: {}", summary.total_requests);
453    /// println!("Success rate: {:.2}%",
454    ///     (summary.successful_requests as f64 / summary.total_requests as f64) * 100.0);
455    /// ```
456    async fn forward_summary(
457        &self,
458        filter: ForwardMetricsFilter,
459    ) -> MetricsResult<ForwardMetricsSummary>;
460
461    /// Retrieves metrics grouped by endpoint.
462    ///
463    /// Returns per-endpoint statistics including request counts,
464    /// success rates, token usage, and average latency.
465    ///
466    /// # Arguments
467    ///
468    /// * `filter` - Filter criteria (endpoint filter is ignored, grouped by all endpoints)
469    ///
470    /// # Example
471    ///
472    /// ```rust,ignore
473    /// let endpoints = storage.forward_by_endpoint(filter).await?;
474    /// for endpoint in endpoints {
475    ///     println!("{}: {} requests, {:.2}ms avg",
476    ///         endpoint.endpoint,
477    ///         endpoint.requests,
478    ///         endpoint.avg_duration_ms.unwrap_or(0) as f64
479    ///     );
480    /// }
481    /// ```
482    async fn forward_by_endpoint(
483        &self,
484        filter: ForwardMetricsFilter,
485    ) -> MetricsResult<Vec<ForwardEndpointMetrics>>;
486
487    /// Retrieves individual forward request records.
488    ///
489    /// Returns detailed information about each forwarded request,
490    /// including timing, status, and token usage.
491    ///
492    /// # Arguments
493    ///
494    /// * `filter` - Filter criteria including pagination via `limit`
495    ///
496    /// # Example
497    ///
498    /// ```rust,ignore
499    /// let filter = ForwardMetricsFilter {
500    ///     limit: Some(50),
501    ///     ..Default::default()
502    /// };
503    ///
504    /// let requests = storage.forward_requests(filter).await?;
505    /// for req in requests {
506    ///     println!("{}: {} - {:?}", req.forward_id, req.endpoint, req.status);
507    /// }
508    /// ```
509    async fn forward_requests(
510        &self,
511        filter: ForwardMetricsFilter,
512    ) -> MetricsResult<Vec<ForwardRequestMetrics>>;
513
514    /// Retrieves daily aggregated metrics for forwarded requests.
515    ///
516    /// Returns per-day statistics for the specified date range,
517    /// useful for trend analysis and reporting.
518    ///
519    /// # Arguments
520    ///
521    /// * `days` - Number of days to include
522    /// * `end_date` - End date for the range (defaults to today)
523    ///
524    /// # Example
525    ///
526    /// ```rust,ignore
527    /// let daily = storage.forward_daily_metrics(7, None).await?;
528    /// for day in daily {
529    ///     println!("{}: {} requests, {} tokens",
530    ///         day.date,
531    ///         day.total_sessions,
532    ///         day.total_token_usage.total_tokens
533    ///     );
534    /// }
535    /// ```
536    async fn forward_daily_metrics(
537        &self,
538        days: u32,
539        end_date: Option<NaiveDate>,
540    ) -> MetricsResult<Vec<DailyMetrics>>;
541
542    /// Retrieves aggregated summary statistics for chat sessions.
543    ///
544    /// Returns total sessions, token usage, tool call counts, and
545    /// active session count for sessions matching the filter.
546    ///
547    /// # Arguments
548    ///
549    /// * `filter` - Date range filter criteria
550    ///
551    /// # Example
552    ///
553    /// ```rust,ignore
554    /// use bamboo_agent::agent::metrics::types::MetricsDateFilter;
555    ///
556    /// let filter = MetricsDateFilter {
557    ///     start_date: Some(NaiveDate::from_ymd_opt(2026, 2, 1).unwrap()),
558    ///     end_date: Some(NaiveDate::from_ymd_opt(2026, 2, 28).unwrap()),
559    /// };
560    ///
561    /// let summary = storage.summary(filter).await?;
562    /// println!("Active sessions: {}", summary.active_sessions);
563    /// println!("Total tokens: {}", summary.total_tokens.total_tokens);
564    /// ```
565    async fn summary(&self, filter: MetricsDateFilter) -> MetricsResult<MetricsSummary>;
566
567    /// Retrieves metrics grouped by AI model.
568    ///
569    /// Returns per-model statistics including session counts,
570    /// rounds, token usage, and tool calls.
571    ///
572    /// # Arguments
573    ///
574    /// * `filter` - Date range filter criteria
575    ///
576    /// # Example
577    ///
578    /// ```rust,ignore
579    /// let models = storage.by_model(filter).await?;
580    /// for model in models {
581    ///     println!("{}: {} sessions, {} tokens",
582    ///         model.model,
583    ///         model.sessions,
584    ///         model.tokens.total_tokens
585    ///     );
586    /// }
587    /// ```
588    async fn by_model(&self, filter: MetricsDateFilter) -> MetricsResult<Vec<ModelMetrics>>;
589
590    /// Retrieves session metrics with filtering and pagination.
591    ///
592    /// Returns detailed information about sessions matching the filter criteria,
593    /// including token usage, tool breakdown, and status.
594    ///
595    /// # Arguments
596    ///
597    /// * `filter` - Filter criteria including date range, model, and pagination
598    ///
599    /// # Example
600    ///
601    /// ```rust,ignore
602    /// use bamboo_agent::agent::metrics::types::SessionMetricsFilter;
603    ///
604    /// let filter = SessionMetricsFilter {
605    ///     model: Some("gpt-4".to_string()),
606    ///     limit: Some(100),
607    ///     ..Default::default()
608    /// };
609    ///
610    /// let sessions = storage.sessions(filter).await?;
611    /// for session in sessions {
612    ///     println!("{}: {} rounds, {} tools",
613    ///         session.session_id,
614    ///         session.total_rounds,
615    ///         session.tool_call_count
616    ///     );
617    /// }
618    /// ```
619    async fn sessions(&self, filter: SessionMetricsFilter) -> MetricsResult<Vec<SessionMetrics>>;
620
621    /// Retrieves complete details for a specific session.
622    ///
623    /// Returns the session metrics along with all associated rounds
624    /// and their tool calls for detailed analysis.
625    ///
626    /// # Arguments
627    ///
628    /// * `session_id` - Session to retrieve
629    ///
630    /// # Returns
631    ///
632    /// Returns `Ok(None)` if the session doesn't exist.
633    ///
634    /// # Example
635    ///
636    /// ```rust,ignore
637    /// if let Some(detail) = storage.session_detail("session-123").await? {
638    ///     println!("Session: {}", detail.session.session_id);
639    ///     for round in detail.rounds {
640    ///         println!("  Round {}: {} tokens, {} tools",
641    ///             round.round_id,
642    ///             round.token_usage.total_tokens,
643    ///             round.tool_calls.len()
644    ///         );
645    ///     }
646    /// }
647    /// ```
648    async fn session_detail(&self, session_id: &str) -> MetricsResult<Option<SessionDetail>>;
649
650    /// Increments the execute sync mismatch counter for a specific stable reason label.
651    async fn increment_execute_sync_mismatch(
652        &self,
653        reason: &str,
654        occurred_at: DateTime<Utc>,
655    ) -> MetricsResult<()>;
656
657    /// Retrieves daily aggregated metrics for chat sessions.
658    ///
659    /// Returns per-day statistics including session counts, token usage,
660    /// model breakdown, and tool breakdown for trend analysis.
661    ///
662    /// # Arguments
663    ///
664    /// * `days` - Number of days to include
665    /// * `end_date` - End date for the range (defaults to today)
666    ///
667    /// # Example
668    ///
669    /// ```rust,ignore
670    /// let daily = storage.daily_metrics(30, None).await?;
671    /// for day in daily {
672    ///     println!("{}: {} sessions, {} tokens",
673    ///         day.date,
674    ///         day.total_sessions,
675    ///         day.total_token_usage.total_tokens
676    ///     );
677    ///
678    ///     // Model breakdown
679    ///     for (model, usage) in day.model_breakdown {
680    ///         println!("  {}: {} tokens", model, usage.total_tokens);
681    ///     }
682    /// }
683    /// ```
684    async fn daily_metrics(
685        &self,
686        days: u32,
687        end_date: Option<NaiveDate>,
688    ) -> MetricsResult<Vec<DailyMetrics>>;
689
690    /// Deletes old round records before a cutoff date.
691    ///
692    /// This is used for data retention and cleanup. After deleting rounds,
693    /// it triggers a refresh of affected session aggregates.
694    ///
695    /// # Arguments
696    ///
697    /// * `cutoff` - Delete rounds started before this timestamp
698    ///
699    /// # Returns
700    ///
701    /// Returns the number of rounds deleted.
702    ///
703    /// # Warning
704    ///
705    /// This operation is irreversible. Ensure you have backups if needed.
706    ///
707    /// # Example
708    ///
709    /// ```rust,ignore
710    /// use chrono::{Duration, Utc};
711    ///
712    /// // Delete rounds older than 90 days
713    /// let cutoff = Utc::now() - Duration::days(90);
714    /// let deleted = storage.prune_rounds_before(cutoff).await?;
715    /// println!("Deleted {} old rounds", deleted);
716    /// ```
717    async fn prune_rounds_before(&self, cutoff: DateTime<Utc>) -> MetricsResult<u64>;
718
719    /// Reconciles stale session / round / forward rows using durable runtime hints.
720    async fn reconcile_stale_executions(
721        &self,
722        active_session_ids: &[String],
723        awaiting_response_session_ids: &[String],
724    ) -> MetricsResult<()>;
725}
726
727/// SQLite-based implementation of the MetricsStorage trait.
728///
729/// This is the primary storage backend for the metrics system, using SQLite
730/// with WAL (Write-Ahead Logging) mode for reliable concurrent access.
731///
732/// # Features
733///
734/// - **WAL Mode**: Enables concurrent readers with writers
735/// - **Foreign Keys**: Enforces referential integrity
736/// - **Async Compatible**: Uses `spawn_blocking` to avoid blocking the async runtime
737/// - **Automatic Schema Migration**: Creates tables on initialization
738///
739/// # Database Schema
740///
741/// The database contains four main tables:
742///
743/// ## session_metrics
744/// Stores aggregated session-level metrics with columns for:
745/// - Session identification (session_id, model)
746/// - Timing (started_at, completed_at, updated_at)
747/// - Aggregates (total_rounds, prompt_tokens, completion_tokens, total_tokens, tool_call_count)
748/// - Status and message count
749///
750/// ## round_metrics
751/// Stores individual round metrics with foreign keys to sessions:
752/// - Round identification (round_id, session_id, model)
753/// - Timing and status
754/// - Token usage per round
755/// - Error information
756///
757/// ## tool_call_metrics
758/// Stores tool invocation details with foreign keys to rounds and sessions:
759/// - Tool identification (tool_call_id, round_id, session_id, tool_name)
760/// - Execution timing and success status
761/// - Error details
762///
763/// ## forward_request_metrics
764/// Stores HTTP proxy request tracking:
765/// - Request identification (forward_id, endpoint, model)
766/// - Request type (is_stream)
767/// - Response details (status_code, status, token usage)
768/// - Error information
769///
770/// # Example
771///
772/// ```rust,ignore
773/// use bamboo_agent::agent::metrics::storage::SqliteMetricsStorage;
774/// use bamboo_agent::agent::metrics::storage::MetricsStorage;
775///
776/// #[tokio::main]
777/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
778///     // Create storage instance
779///     let storage = SqliteMetricsStorage::new("path/to/metrics.db");
780///
781///     // Initialize database schema
782///     storage.init().await?;
783///
784///     // Now ready to use
785///     storage.upsert_session_start("s1", "gpt-4", chrono::Utc::now()).await?;
786///
787///     Ok(())
788/// }
789/// ```
790///
791/// # Thread Safety
792///
793/// The storage can be safely cloned and shared across threads. Each operation
794/// opens its own database connection to avoid blocking and ensure thread safety.
795#[derive(Debug, Clone)]
796pub struct SqliteMetricsStorage {
797    /// Path to the SQLite database file
798    db_path: PathBuf,
799}
800
801impl SqliteMetricsStorage {
802    /// Creates a new SQLite storage instance.
803    ///
804    /// The database file will be created when [`init`](MetricsStorage::init) is called.
805    /// If the file already exists, it will be used as-is.
806    ///
807    /// # Arguments
808    ///
809    /// * `db_path` - Path to the SQLite database file (will create parent directories if needed)
810    ///
811    /// # Example
812    ///
813    /// ```rust,ignore
814    /// use bamboo_agent::agent::metrics::storage::SqliteMetricsStorage;
815    ///
816    /// let storage = SqliteMetricsStorage::new("metrics.db");
817    /// let storage = SqliteMetricsStorage::new("/var/data/bamboo/metrics.db");
818    /// ```
819    pub fn new(db_path: impl AsRef<Path>) -> Self {
820        Self {
821            db_path: db_path.as_ref().to_path_buf(),
822        }
823    }
824
825    /// Executes a function with a database connection in a blocking context.
826    ///
827    /// This helper method handles:
828    /// 1. Opening a connection to the database
829    /// 2. Running the provided function in `spawn_blocking` to avoid blocking async runtime
830    /// 3. Proper error handling and task joining
831    ///
832    /// # Type Parameters
833    ///
834    /// * `T` - Return type of the function (must be Send + 'static)
835    /// * `F` - Function type (must be Send + 'static)
836    ///
837    /// # Arguments
838    ///
839    /// * `func` - Function to execute with the database connection
840    ///
841    /// # Errors
842    ///
843    /// Returns an error if:
844    /// - The database connection fails to open
845    /// - The function returns an error
846    /// - The blocking task fails to complete
847    async fn with_connection<T, F>(&self, func: F) -> MetricsResult<T>
848    where
849        T: Send + 'static,
850        F: FnOnce(&Connection) -> MetricsResult<T> + Send + 'static,
851    {
852        let db_path = self.db_path.clone();
853        tokio::task::spawn_blocking(move || {
854            let connection = open_connection(&db_path)?;
855            func(&connection)
856        })
857        .await
858        .map_err(|error| MetricsError::Task(error.to_string()))?
859    }
860}
861
862#[async_trait]
863impl MetricsStorage for SqliteMetricsStorage {
864    async fn init(&self) -> MetricsResult<()> {
865        self.with_connection(|connection| {
866            connection.execute_batch(
867                r#"
868                CREATE TABLE IF NOT EXISTS session_metrics (
869                    session_id TEXT PRIMARY KEY,
870                    model TEXT NOT NULL,
871                    started_at TEXT NOT NULL,
872                    completed_at TEXT,
873                    status TEXT NOT NULL DEFAULT 'running',
874                    total_rounds INTEGER NOT NULL DEFAULT 0,
875                    prompt_tokens INTEGER NOT NULL DEFAULT 0,
876                    completion_tokens INTEGER NOT NULL DEFAULT 0,
877                    total_tokens INTEGER NOT NULL DEFAULT 0,
878                    prompt_cached_tool_outputs INTEGER NOT NULL DEFAULT 0,
879                    prompt_cached_tool_tokens_saved INTEGER NOT NULL DEFAULT 0,
880                    total_compression_events INTEGER NOT NULL DEFAULT 0,
881                    total_tokens_saved INTEGER NOT NULL DEFAULT 0,
882                    tool_call_count INTEGER NOT NULL DEFAULT 0,
883                    message_count INTEGER NOT NULL DEFAULT 0,
884                    updated_at TEXT NOT NULL
885                );
886
887                CREATE TABLE IF NOT EXISTS round_metrics (
888                    round_id TEXT PRIMARY KEY,
889                    session_id TEXT NOT NULL,
890                    model TEXT NOT NULL,
891                    started_at TEXT NOT NULL,
892                    completed_at TEXT,
893                    status TEXT NOT NULL DEFAULT 'running',
894                    prompt_tokens INTEGER NOT NULL DEFAULT 0,
895                    completion_tokens INTEGER NOT NULL DEFAULT 0,
896                    total_tokens INTEGER NOT NULL DEFAULT 0,
897                    prompt_cached_tool_outputs INTEGER NOT NULL DEFAULT 0,
898                    prompt_cached_tool_tokens_saved INTEGER NOT NULL DEFAULT 0,
899                    compression_count INTEGER NOT NULL DEFAULT 0,
900                    tokens_saved INTEGER NOT NULL DEFAULT 0,
901                    error TEXT,
902                    FOREIGN KEY(session_id) REFERENCES session_metrics(session_id) ON DELETE CASCADE
903                );
904
905                CREATE TABLE IF NOT EXISTS tool_call_metrics (
906                    tool_call_id TEXT PRIMARY KEY,
907                    round_id TEXT NOT NULL,
908                    session_id TEXT NOT NULL,
909                    tool_name TEXT NOT NULL,
910                    started_at TEXT NOT NULL,
911                    completed_at TEXT,
912                    success INTEGER,
913                    error TEXT,
914                    FOREIGN KEY(round_id) REFERENCES round_metrics(round_id) ON DELETE CASCADE,
915                    FOREIGN KEY(session_id) REFERENCES session_metrics(session_id) ON DELETE CASCADE
916                );
917
918                CREATE INDEX IF NOT EXISTS idx_session_started_at ON session_metrics(started_at);
919                CREATE INDEX IF NOT EXISTS idx_session_model ON session_metrics(model);
920                CREATE INDEX IF NOT EXISTS idx_round_session ON round_metrics(session_id);
921                CREATE INDEX IF NOT EXISTS idx_tool_session ON tool_call_metrics(session_id);
922                CREATE INDEX IF NOT EXISTS idx_tool_started_at ON tool_call_metrics(started_at);
923                CREATE INDEX IF NOT EXISTS idx_tool_name ON tool_call_metrics(tool_name);
924
925                CREATE TABLE IF NOT EXISTS forward_request_metrics (
926                    forward_id TEXT PRIMARY KEY,
927                    endpoint TEXT NOT NULL,
928                    model TEXT NOT NULL,
929                    is_stream INTEGER NOT NULL,
930                    started_at TEXT NOT NULL,
931                    completed_at TEXT,
932                    status_code INTEGER,
933                    status TEXT NOT NULL DEFAULT 'pending',
934                    prompt_tokens INTEGER,
935                    completion_tokens INTEGER,
936                    total_tokens INTEGER,
937                    error TEXT,
938                    updated_at TEXT NOT NULL
939                );
940
941                CREATE TABLE IF NOT EXISTS execute_sync_mismatch_metrics (
942                    reason TEXT NOT NULL,
943                    mismatch_date TEXT NOT NULL,
944                    count INTEGER NOT NULL DEFAULT 0,
945                    updated_at TEXT NOT NULL,
946                    PRIMARY KEY (reason, mismatch_date)
947                );
948
949                CREATE INDEX IF NOT EXISTS idx_forward_started_at ON forward_request_metrics(started_at);
950                CREATE INDEX IF NOT EXISTS idx_forward_endpoint ON forward_request_metrics(endpoint);
951                CREATE INDEX IF NOT EXISTS idx_forward_model ON forward_request_metrics(model);
952                CREATE INDEX IF NOT EXISTS idx_execute_sync_mismatch_date ON execute_sync_mismatch_metrics(mismatch_date);
953                CREATE INDEX IF NOT EXISTS idx_execute_sync_mismatch_reason ON execute_sync_mismatch_metrics(reason);
954                "#,
955            )?;
956            ensure_integer_column(
957                connection,
958                "session_metrics",
959                "prompt_cached_tool_outputs",
960                0,
961            )?;
962            ensure_integer_column(connection, "session_metrics", "prompt_cached_tool_tokens_saved", 0)?;
963            ensure_integer_column(connection, "session_metrics", "total_compression_events", 0)?;
964            ensure_integer_column(connection, "session_metrics", "total_tokens_saved", 0)?;
965            ensure_integer_column(connection, "round_metrics", "prompt_cached_tool_outputs", 0)?;
966            ensure_integer_column(connection, "round_metrics", "prompt_cached_tool_tokens_saved", 0)?;
967            ensure_integer_column(connection, "round_metrics", "compression_count", 0)?;
968            ensure_integer_column(connection, "round_metrics", "tokens_saved", 0)?;
969            connection.execute(
970                "UPDATE forward_request_metrics SET status = 'pending' WHERE status IS NULL OR trim(status) = ''",
971                [],
972            )?;
973            Ok(())
974        })
975        .await
976    }
977
978    async fn upsert_session_start(
979        &self,
980        session_id: &str,
981        model: &str,
982        started_at: DateTime<Utc>,
983    ) -> MetricsResult<()> {
984        let session_id = session_id.to_string();
985        let model = model.to_string();
986        let started_at = format_timestamp(started_at);
987
988        self.with_connection(move |connection| {
989            connection.execute(
990                r#"
991                INSERT INTO session_metrics (
992                    session_id, model, started_at, status, updated_at
993                ) VALUES (?1, ?2, ?3, 'running', ?3)
994                ON CONFLICT(session_id) DO UPDATE SET
995                    model = excluded.model,
996                    started_at = CASE
997                        WHEN session_metrics.started_at <= excluded.started_at THEN session_metrics.started_at
998                        ELSE excluded.started_at
999                    END,
1000                    completed_at = NULL,
1001                    status = 'running',
1002                    updated_at = excluded.updated_at
1003                "#,
1004                params![session_id, model, started_at],
1005            )?;
1006            Ok(())
1007        })
1008        .await
1009    }
1010
1011    async fn update_session_message_count(
1012        &self,
1013        session_id: &str,
1014        message_count: u32,
1015        updated_at: DateTime<Utc>,
1016    ) -> MetricsResult<()> {
1017        let session_id = session_id.to_string();
1018        let updated_at = format_timestamp(updated_at);
1019
1020        self.with_connection(move |connection| {
1021            connection.execute(
1022                "UPDATE session_metrics SET message_count = ?1, updated_at = ?2 WHERE session_id = ?3",
1023                params![i64::from(message_count), updated_at, session_id],
1024            )?;
1025            Ok(())
1026        })
1027        .await
1028    }
1029
1030    async fn complete_session(
1031        &self,
1032        session_id: &str,
1033        status: SessionStatus,
1034        completed_at: DateTime<Utc>,
1035    ) -> MetricsResult<()> {
1036        let session_id = session_id.to_string();
1037        let completed_at_str = format_timestamp(completed_at);
1038
1039        self.with_connection(move |connection| {
1040            refresh_session_aggregates(connection, &session_id, completed_at)?;
1041            connection.execute(
1042                "UPDATE session_metrics SET status = ?1, completed_at = ?2, updated_at = ?2 WHERE session_id = ?3",
1043                params![status.as_str(), completed_at_str, session_id],
1044            )?;
1045            Ok(())
1046        })
1047        .await
1048    }
1049
1050    async fn insert_round_start(
1051        &self,
1052        round_id: &str,
1053        session_id: &str,
1054        model: &str,
1055        started_at: DateTime<Utc>,
1056    ) -> MetricsResult<()> {
1057        let round_id = round_id.to_string();
1058        let session_id = session_id.to_string();
1059        let model = model.to_string();
1060        let started_at_str = format_timestamp(started_at);
1061
1062        self.with_connection(move |connection| {
1063            connection.execute(
1064                r#"
1065                INSERT INTO round_metrics (
1066                    round_id, session_id, model, started_at, status
1067                ) VALUES (?1, ?2, ?3, ?4, 'running')
1068                ON CONFLICT(round_id) DO NOTHING
1069                "#,
1070                params![round_id, session_id, model, started_at_str],
1071            )?;
1072            refresh_session_aggregates(connection, &session_id, started_at)?;
1073            Ok(())
1074        })
1075        .await
1076    }
1077
1078    async fn complete_round(
1079        &self,
1080        round_id: &str,
1081        completed_at: DateTime<Utc>,
1082        status: RoundStatus,
1083        usage: TokenUsage,
1084        prompt_cached_tool_outputs: u32,
1085        prompt_cached_tool_tokens_saved: u32,
1086        error: Option<String>,
1087    ) -> MetricsResult<()> {
1088        let round_id = round_id.to_string();
1089        let completed_at_str = format_timestamp(completed_at);
1090
1091        self.with_connection(move |connection| {
1092            let session_id: String = connection.query_row(
1093                "SELECT session_id FROM round_metrics WHERE round_id = ?1",
1094                params![round_id],
1095                |row| row.get(0),
1096            )?;
1097
1098            connection.execute(
1099                r#"
1100                UPDATE round_metrics
1101                SET completed_at = ?1,
1102                    status = ?2,
1103                    prompt_tokens = ?3,
1104                    completion_tokens = ?4,
1105                    total_tokens = ?5,
1106                    prompt_cached_tool_outputs = ?6,
1107                    prompt_cached_tool_tokens_saved = COALESCE(prompt_cached_tool_tokens_saved, 0) + ?7,
1108                    tokens_saved = COALESCE(tokens_saved, 0) + ?8,
1109                    error = ?9
1110                WHERE round_id = ?10
1111                "#,
1112                params![
1113                    completed_at_str,
1114                    status.as_str(),
1115                    usage.prompt_tokens as i64,
1116                    usage.completion_tokens as i64,
1117                    usage.total_tokens as i64,
1118                    i64::from(prompt_cached_tool_outputs),
1119                    i64::from(prompt_cached_tool_tokens_saved),
1120                    i64::from(prompt_cached_tool_tokens_saved),
1121                    error,
1122                    round_id,
1123                ],
1124            )?;
1125
1126            refresh_session_aggregates(connection, &session_id, completed_at)?;
1127            Ok(())
1128        })
1129        .await
1130    }
1131
1132    async fn record_round_compression(
1133        &self,
1134        round_id: &str,
1135        compressed_at: DateTime<Utc>,
1136        tokens_saved: u32,
1137    ) -> MetricsResult<()> {
1138        let round_id = round_id.to_string();
1139
1140        self.with_connection(move |connection| {
1141            let session_id: String = connection.query_row(
1142                "SELECT session_id FROM round_metrics WHERE round_id = ?1",
1143                params![round_id],
1144                |row| row.get(0),
1145            )?;
1146
1147            connection.execute(
1148                r#"
1149                UPDATE round_metrics
1150                SET compression_count = COALESCE(compression_count, 0) + 1,
1151                    tokens_saved = COALESCE(tokens_saved, 0) + ?1
1152                WHERE round_id = ?2
1153                "#,
1154                params![i64::from(tokens_saved), round_id],
1155            )?;
1156
1157            refresh_session_aggregates(connection, &session_id, compressed_at)?;
1158            Ok(())
1159        })
1160        .await
1161    }
1162
1163    async fn insert_tool_start(
1164        &self,
1165        tool_call_id: &str,
1166        round_id: &str,
1167        session_id: &str,
1168        tool_name: &str,
1169        started_at: DateTime<Utc>,
1170    ) -> MetricsResult<()> {
1171        let tool_call_id = tool_call_id.to_string();
1172        let round_id = round_id.to_string();
1173        let session_id = session_id.to_string();
1174        let tool_name = tool_name.to_string();
1175        let started_at_str = format_timestamp(started_at);
1176
1177        self.with_connection(move |connection| {
1178            connection.execute(
1179                r#"
1180                INSERT INTO tool_call_metrics (
1181                    tool_call_id, round_id, session_id, tool_name, started_at
1182                ) VALUES (?1, ?2, ?3, ?4, ?5)
1183                ON CONFLICT(tool_call_id) DO UPDATE SET
1184                    round_id = excluded.round_id,
1185                    session_id = excluded.session_id,
1186                    tool_name = excluded.tool_name,
1187                    started_at = excluded.started_at
1188                "#,
1189                params![
1190                    tool_call_id,
1191                    round_id,
1192                    session_id,
1193                    tool_name,
1194                    started_at_str
1195                ],
1196            )?;
1197            Ok(())
1198        })
1199        .await
1200    }
1201
1202    async fn complete_tool_call(
1203        &self,
1204        tool_call_id: &str,
1205        completion: ToolCallCompletion,
1206    ) -> MetricsResult<()> {
1207        let tool_call_id = tool_call_id.to_string();
1208        let completed_at = format_timestamp(completion.completed_at);
1209        let success = if completion.success { 1_i64 } else { 0_i64 };
1210        let error = completion.error;
1211
1212        self.with_connection(move |connection| {
1213            let session_id: String = connection.query_row(
1214                "SELECT session_id FROM tool_call_metrics WHERE tool_call_id = ?1",
1215                params![tool_call_id],
1216                |row| row.get(0),
1217            )?;
1218
1219            connection.execute(
1220                "UPDATE tool_call_metrics SET completed_at = ?1, success = ?2, error = ?3 WHERE tool_call_id = ?4",
1221                params![completed_at, success, error, tool_call_id],
1222            )?;
1223
1224            refresh_session_aggregates(connection, &session_id, completion.completed_at)?;
1225            Ok(())
1226        })
1227        .await
1228    }
1229
1230    async fn insert_forward_start(
1231        &self,
1232        forward_id: &str,
1233        endpoint: &str,
1234        model: &str,
1235        is_stream: bool,
1236        started_at: DateTime<Utc>,
1237    ) -> MetricsResult<()> {
1238        let forward_id = forward_id.to_string();
1239        let endpoint = endpoint.to_string();
1240        let model = model.to_string();
1241        let is_stream_int = if is_stream { 1_i64 } else { 0_i64 };
1242        let started_at_str = format_timestamp(started_at);
1243
1244        self.with_connection(move |connection| {
1245            connection.execute(
1246                r#"
1247                INSERT INTO forward_request_metrics (
1248                    forward_id, endpoint, model, is_stream, started_at, status, updated_at
1249                ) VALUES (?1, ?2, ?3, ?4, ?5, 'pending', ?5)
1250                ON CONFLICT(forward_id) DO UPDATE SET
1251                    endpoint = excluded.endpoint,
1252                    model = excluded.model,
1253                    is_stream = excluded.is_stream,
1254                    started_at = excluded.started_at,
1255                    completed_at = NULL,
1256                    status_code = NULL,
1257                    status = 'pending',
1258                    prompt_tokens = NULL,
1259                    completion_tokens = NULL,
1260                    total_tokens = NULL,
1261                    error = NULL,
1262                    updated_at = excluded.updated_at
1263                "#,
1264                params![forward_id, endpoint, model, is_stream_int, started_at_str],
1265            )?;
1266            Ok(())
1267        })
1268        .await
1269    }
1270
1271    async fn complete_forward(
1272        &self,
1273        forward_id: &str,
1274        completed_at: DateTime<Utc>,
1275        status_code: Option<u16>,
1276        status: ForwardStatus,
1277        usage: Option<TokenUsage>,
1278        error: Option<String>,
1279    ) -> MetricsResult<()> {
1280        let forward_id = forward_id.to_string();
1281        let completed_at_str = format_timestamp(completed_at);
1282        let status_code_int = status_code.map(|s| s as i64);
1283        let (prompt, completion, total) = match usage {
1284            Some(u) => (
1285                Some(u.prompt_tokens as i64),
1286                Some(u.completion_tokens as i64),
1287                Some(u.total_tokens as i64),
1288            ),
1289            None => (None, None, None),
1290        };
1291
1292        self.with_connection(move |connection| {
1293            connection.execute(
1294                r#"
1295                UPDATE forward_request_metrics
1296                SET completed_at = ?1,
1297                    status_code = ?2,
1298                    status = ?3,
1299                    prompt_tokens = ?4,
1300                    completion_tokens = ?5,
1301                    total_tokens = ?6,
1302                    error = ?7,
1303                    updated_at = ?1
1304                WHERE forward_id = ?8
1305                "#,
1306                params![
1307                    completed_at_str,
1308                    status_code_int,
1309                    status.as_str(),
1310                    prompt,
1311                    completion,
1312                    total,
1313                    error,
1314                    forward_id,
1315                ],
1316            )?;
1317            Ok(())
1318        })
1319        .await
1320    }
1321
1322    async fn forward_summary(
1323        &self,
1324        filter: ForwardMetricsFilter,
1325    ) -> MetricsResult<ForwardMetricsSummary> {
1326        self.with_connection(move |connection| {
1327            let mut params_vec = Vec::new();
1328            let where_clause = build_forward_where_clause(
1329                filter.start_date,
1330                filter.end_date,
1331                filter.endpoint.as_deref(),
1332                filter.model.as_deref(),
1333                &mut params_vec,
1334            );
1335
1336            let sql = format!(
1337                "SELECT COUNT(*), \
1338                 COALESCE(SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END), 0), \
1339                 COALESCE(SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END), 0), \
1340                 COALESCE(SUM(prompt_tokens), 0), \
1341                 COALESCE(SUM(completion_tokens), 0), \
1342                 COALESCE(SUM(total_tokens), 0), \
1343                 AVG(CASE WHEN completed_at IS NOT NULL THEN \
1344                     (julianday(completed_at) - julianday(started_at)) * 86400000 END) \
1345                 FROM forward_request_metrics {}",
1346                where_clause
1347            );
1348
1349            let mut stmt = connection.prepare(&sql)?;
1350            let summary = stmt.query_row(params_from_iter(params_vec.iter()), |row| {
1351                let avg_duration: Option<f64> = row.get(6)?;
1352                Ok(ForwardMetricsSummary {
1353                    total_requests: row.get::<_, i64>(0)? as u64,
1354                    successful_requests: row.get::<_, i64>(1)? as u64,
1355                    failed_requests: row.get::<_, i64>(2)? as u64,
1356                    total_tokens: TokenUsage {
1357                        prompt_tokens: row.get::<_, i64>(3)? as u64,
1358                        completion_tokens: row.get::<_, i64>(4)? as u64,
1359                        total_tokens: row.get::<_, i64>(5)? as u64,
1360                    },
1361                    avg_duration_ms: avg_duration.map(|d| d as u64),
1362                })
1363            })?;
1364
1365            Ok(summary)
1366        })
1367        .await
1368    }
1369
1370    async fn forward_by_endpoint(
1371        &self,
1372        filter: ForwardMetricsFilter,
1373    ) -> MetricsResult<Vec<ForwardEndpointMetrics>> {
1374        self.with_connection(move |connection| {
1375            let mut params_vec = Vec::new();
1376            let where_clause = build_forward_where_clause(
1377                filter.start_date,
1378                filter.end_date,
1379                None,
1380                filter.model.as_deref(),
1381                &mut params_vec,
1382            );
1383
1384            let sql = format!(
1385                "SELECT endpoint, COUNT(*), \
1386                 COALESCE(SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END), 0), \
1387                 COALESCE(SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END), 0), \
1388                 COALESCE(SUM(prompt_tokens), 0), \
1389                 COALESCE(SUM(completion_tokens), 0), \
1390                 COALESCE(SUM(total_tokens), 0), \
1391                 AVG(CASE WHEN completed_at IS NOT NULL THEN \
1392                     (julianday(completed_at) - julianday(started_at)) * 86400000 END) \
1393                 FROM forward_request_metrics {} \
1394                 GROUP BY endpoint ORDER BY COUNT(*) DESC",
1395                where_clause
1396            );
1397
1398            let mut stmt = connection.prepare(&sql)?;
1399            let mut rows = stmt.query(params_from_iter(params_vec.iter()))?;
1400            let mut endpoints = Vec::new();
1401
1402            while let Some(row) = rows.next()? {
1403                let avg_duration: Option<f64> = row.get(7)?;
1404                endpoints.push(ForwardEndpointMetrics {
1405                    endpoint: row.get(0)?,
1406                    requests: row.get::<_, i64>(1)? as u64,
1407                    successful: row.get::<_, i64>(2)? as u64,
1408                    failed: row.get::<_, i64>(3)? as u64,
1409                    tokens: TokenUsage {
1410                        prompt_tokens: row.get::<_, i64>(4)? as u64,
1411                        completion_tokens: row.get::<_, i64>(5)? as u64,
1412                        total_tokens: row.get::<_, i64>(6)? as u64,
1413                    },
1414                    avg_duration_ms: avg_duration.map(|d| d as u64),
1415                });
1416            }
1417
1418            Ok(endpoints)
1419        })
1420        .await
1421    }
1422
1423    async fn forward_requests(
1424        &self,
1425        filter: ForwardMetricsFilter,
1426    ) -> MetricsResult<Vec<ForwardRequestMetrics>> {
1427        self.with_connection(move |connection| {
1428            let mut params_vec = Vec::new();
1429            let where_clause = build_forward_where_clause(
1430                filter.start_date,
1431                filter.end_date,
1432                filter.endpoint.as_deref(),
1433                filter.model.as_deref(),
1434                &mut params_vec,
1435            );
1436
1437            let limit = i64::from(filter.limit.unwrap_or(100).min(1_000));
1438            let sql = format!(
1439                "SELECT forward_id, endpoint, model, is_stream, started_at, completed_at, \
1440                 status_code, status, prompt_tokens, completion_tokens, total_tokens, error \
1441                 FROM forward_request_metrics {} \
1442                 ORDER BY started_at DESC LIMIT {}",
1443                where_clause, limit
1444            );
1445
1446            let mut stmt = connection.prepare(&sql)?;
1447            let mut rows = stmt.query(params_from_iter(params_vec.iter()))?;
1448            let mut requests = Vec::new();
1449
1450            while let Some(row) = rows.next()? {
1451                let started_at = parse_timestamp(row.get::<_, String>(4)?)?;
1452                let completed_at = parse_optional_timestamp(row.get::<_, Option<String>>(5)?)?;
1453                let status_raw: Option<String> = row.get(7)?;
1454                let status = status_raw.and_then(|s| ForwardStatus::from_db(&s));
1455
1456                let prompt: Option<i64> = row.get(8)?;
1457                let completion: Option<i64> = row.get(9)?;
1458                let total: Option<i64> = row.get(10)?;
1459                let token_usage = match (prompt, completion, total) {
1460                    (Some(p), Some(c), Some(t)) => Some(TokenUsage {
1461                        prompt_tokens: p as u64,
1462                        completion_tokens: c as u64,
1463                        total_tokens: t as u64,
1464                    }),
1465                    _ => None,
1466                };
1467
1468                requests.push(ForwardRequestMetrics {
1469                    forward_id: row.get(0)?,
1470                    endpoint: row.get(1)?,
1471                    model: row.get(2)?,
1472                    is_stream: row.get::<_, i64>(3)? > 0,
1473                    started_at,
1474                    completed_at,
1475                    status_code: row.get::<_, Option<i64>>(6)?.map(|s| s as u16),
1476                    status,
1477                    token_usage,
1478                    error: row.get(11)?,
1479                    duration_ms: compute_duration_ms(started_at, completed_at),
1480                });
1481            }
1482
1483            Ok(requests)
1484        })
1485        .await
1486    }
1487
1488    async fn forward_daily_metrics(
1489        &self,
1490        days: u32,
1491        end_date: Option<NaiveDate>,
1492    ) -> MetricsResult<Vec<DailyMetrics>> {
1493        let end_date = end_date.unwrap_or_else(|| Utc::now().date_naive());
1494        let span = days.max(1) - 1;
1495        let start_date = end_date - chrono::Duration::days(i64::from(span));
1496
1497        self.with_connection(move |connection| {
1498            let mut stmt = connection.prepare(
1499                r#"
1500                SELECT
1501                    date(started_at) AS date_key,
1502                    COUNT(*) AS total_sessions,
1503                    COALESCE(SUM(prompt_tokens), 0) AS prompt_tokens,
1504                    COALESCE(SUM(completion_tokens), 0) AS completion_tokens,
1505                    COALESCE(SUM(total_tokens), 0) AS total_tokens
1506                FROM forward_request_metrics
1507                WHERE date(started_at) BETWEEN date(?1) AND date(?2)
1508                GROUP BY date_key
1509                ORDER BY date_key ASC
1510                "#,
1511            )?;
1512
1513            let mut rows = stmt.query(params![start_date.to_string(), end_date.to_string()])?;
1514            let mut result = Vec::new();
1515
1516            while let Some(row) = rows.next()? {
1517                let date = NaiveDate::parse_from_str(&row.get::<_, String>(0)?, "%Y-%m-%d")?;
1518
1519                result.push(DailyMetrics {
1520                    date,
1521                    total_sessions: row.get::<_, i64>(1)? as u32,
1522                    total_rounds: 0,
1523                    total_token_usage: TokenUsage {
1524                        prompt_tokens: row.get::<_, i64>(2)? as u64,
1525                        completion_tokens: row.get::<_, i64>(3)? as u64,
1526                        total_tokens: row.get::<_, i64>(4)? as u64,
1527                    },
1528                    total_tool_calls: 0,
1529                    prompt_cached_tool_outputs: 0,
1530                    model_breakdown: HashMap::new(),
1531                    tool_breakdown: HashMap::new(),
1532                });
1533            }
1534
1535            Ok(result)
1536        })
1537        .await
1538    }
1539
1540    async fn summary(&self, filter: MetricsDateFilter) -> MetricsResult<MetricsSummary> {
1541        self.with_connection(move |connection| {
1542            let mut params_vec = Vec::new();
1543            let where_clause = build_session_where_clause(
1544                filter.start_date,
1545                filter.end_date,
1546                None,
1547                &mut params_vec,
1548            );
1549
1550            let summary_sql = format!(
1551                "SELECT COUNT(*), COALESCE(SUM(prompt_tokens), 0), COALESCE(SUM(completion_tokens), 0), COALESCE(SUM(total_tokens), 0), COALESCE(SUM(tool_call_count), 0), COALESCE(SUM(prompt_cached_tool_outputs), 0), COALESCE(SUM(prompt_cached_tool_tokens_saved), 0), COALESCE(SUM(total_compression_events), 0), COALESCE(SUM(total_tokens_saved), 0), COALESCE(SUM(CASE WHEN status = 'completed' THEN 1 ELSE 0 END), 0), COALESCE(SUM(CASE WHEN status = 'awaiting_response' THEN 1 ELSE 0 END), 0), COALESCE(SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END), 0), COALESCE(SUM(CASE WHEN status = 'cancelled' THEN 1 ELSE 0 END), 0) FROM session_metrics {}",
1552                where_clause
1553            );
1554
1555            let mut stmt = connection.prepare(&summary_sql)?;
1556            let mut summary = stmt.query_row(params_from_iter(params_vec.iter()), |row| {
1557                Ok(MetricsSummary {
1558                    total_sessions: row.get::<_, i64>(0)? as u64,
1559                    total_tokens: TokenUsage {
1560                        prompt_tokens: row.get::<_, i64>(1)? as u64,
1561                        completion_tokens: row.get::<_, i64>(2)? as u64,
1562                        total_tokens: row.get::<_, i64>(3)? as u64,
1563                    },
1564                    total_tool_calls: row.get::<_, i64>(4)? as u64,
1565                    prompt_cached_tool_outputs: row.get::<_, i64>(5)? as u64,
1566                    tool_context_tokens_saved: row.get::<_, i64>(6)? as u64,
1567                    total_compression_events: row.get::<_, i64>(7)? as u64,
1568                    total_tokens_saved: row.get::<_, i64>(8)? as u64,
1569                    non_tool_compression_tokens_saved: (row.get::<_, i64>(8)? - row.get::<_, i64>(6)?).max(0) as u64,
1570                    completed_sessions: row.get::<_, i64>(9)? as u64,
1571                    awaiting_response_sessions: row.get::<_, i64>(10)? as u64,
1572                    error_sessions: row.get::<_, i64>(11)? as u64,
1573                    cancelled_sessions: row.get::<_, i64>(12)? as u64,
1574                    total_sync_mismatches: 0,
1575                    sync_mismatch_breakdown: HashMap::new(),
1576                    active_sessions: 0,
1577                })
1578            })?;
1579
1580            let mut mismatch_params = Vec::new();
1581            let mismatch_clause = build_execute_sync_mismatch_where_clause(
1582                filter.start_date,
1583                filter.end_date,
1584                None,
1585                &mut mismatch_params,
1586            );
1587            let mismatch_sql = format!(
1588                "SELECT COALESCE(SUM(count), 0) FROM execute_sync_mismatch_metrics {}",
1589                mismatch_clause
1590            );
1591            let mut mismatch_stmt = connection.prepare(&mismatch_sql)?;
1592            summary.total_sync_mismatches = mismatch_stmt
1593                .query_row(params_from_iter(mismatch_params.iter()), |row| row.get::<_, i64>(0))?
1594                as u64;
1595            summary.sync_mismatch_breakdown = load_execute_sync_mismatch_breakdown(
1596                connection,
1597                filter.start_date,
1598                filter.end_date,
1599            )?;
1600            let mut active_params = Vec::new();
1601            let active_clause = build_session_where_clause(
1602                filter.start_date,
1603                filter.end_date,
1604                Some("running"),
1605                &mut active_params,
1606            );
1607            let active_sql = format!(
1608                "SELECT COUNT(*) FROM session_metrics {}",
1609                active_clause
1610            );
1611            let mut active_stmt = connection.prepare(&active_sql)?;
1612            let active_sessions = active_stmt.query_row(params_from_iter(active_params.iter()), |row| {
1613                row.get::<_, i64>(0)
1614            })? as u64;
1615
1616            Ok(MetricsSummary {
1617                active_sessions,
1618                ..summary
1619            })
1620        })
1621        .await
1622    }
1623
1624    async fn by_model(&self, filter: MetricsDateFilter) -> MetricsResult<Vec<ModelMetrics>> {
1625        self.with_connection(move |connection| {
1626            let mut params_vec = Vec::new();
1627            let where_clause = build_session_where_clause(
1628                filter.start_date,
1629                filter.end_date,
1630                None,
1631                &mut params_vec,
1632            );
1633
1634            let sql = format!(
1635                "SELECT model, COUNT(*), COALESCE(SUM(total_rounds), 0), COALESCE(SUM(prompt_tokens), 0), COALESCE(SUM(completion_tokens), 0), COALESCE(SUM(total_tokens), 0), COALESCE(SUM(tool_call_count), 0), COALESCE(SUM(prompt_cached_tool_outputs), 0) FROM session_metrics {} GROUP BY model ORDER BY SUM(total_tokens) DESC",
1636                where_clause
1637            );
1638
1639            let mut stmt = connection.prepare(&sql)?;
1640            let mut rows = stmt.query(params_from_iter(params_vec.iter()))?;
1641            let mut models = Vec::new();
1642
1643            while let Some(row) = rows.next()? {
1644                models.push(ModelMetrics {
1645                    model: row.get(0)?,
1646                    sessions: row.get::<_, i64>(1)? as u64,
1647                    rounds: row.get::<_, i64>(2)? as u64,
1648                    tokens: TokenUsage {
1649                        prompt_tokens: row.get::<_, i64>(3)? as u64,
1650                        completion_tokens: row.get::<_, i64>(4)? as u64,
1651                        total_tokens: row.get::<_, i64>(5)? as u64,
1652                    },
1653                    tool_calls: row.get::<_, i64>(6)? as u64,
1654                    prompt_cached_tool_outputs: row.get::<_, i64>(7)? as u64,
1655                });
1656            }
1657
1658            Ok(models)
1659        })
1660        .await
1661    }
1662
1663    async fn sessions(&self, filter: SessionMetricsFilter) -> MetricsResult<Vec<SessionMetrics>> {
1664        self.with_connection(move |connection| {
1665            let mut params_vec = Vec::new();
1666            let where_clause = build_session_where_clause(
1667                filter.start_date,
1668                filter.end_date,
1669                None,
1670                &mut params_vec,
1671            );
1672            let mut conditions = if where_clause.is_empty() {
1673                Vec::new()
1674            } else {
1675                vec![where_clause.replacen("WHERE ", "", 1)]
1676            };
1677            if let Some(model) = filter.model {
1678                conditions.push("model = ?".to_string());
1679                params_vec.push(model);
1680            }
1681
1682            let where_sql = if conditions.is_empty() {
1683                String::new()
1684            } else {
1685                format!("WHERE {}", conditions.join(" AND "))
1686            };
1687
1688            let limit = i64::from(filter.limit.unwrap_or(100).min(1_000));
1689            let sql = format!(
1690                "SELECT session_id, model, started_at, completed_at, total_rounds, prompt_tokens, completion_tokens, total_tokens, tool_call_count, prompt_cached_tool_outputs, prompt_cached_tool_tokens_saved, total_compression_events, total_tokens_saved, status, message_count FROM session_metrics {} ORDER BY started_at DESC LIMIT {}",
1691                where_sql, limit
1692            );
1693
1694            let mut stmt = connection.prepare(&sql)?;
1695            let mut rows = stmt.query(params_from_iter(params_vec.iter()))?;
1696            let mut sessions = Vec::new();
1697
1698            while let Some(row) = rows.next()? {
1699                let session_id: String = row.get(0)?;
1700                let started_at = parse_timestamp(row.get::<_, String>(2)?)?;
1701                let completed_at = parse_optional_timestamp(row.get::<_, Option<String>>(3)?)?;
1702                let status_raw: String = row.get(13)?;
1703                let status = SessionStatus::from_db(&status_raw).ok_or_else(|| {
1704                    MetricsError::InvalidData(format!("unknown session status: {}", status_raw))
1705                })?;
1706                let tool_breakdown = load_tool_breakdown(connection, &session_id)?;
1707
1708                sessions.push(SessionMetrics {
1709                    session_id,
1710                    model: row.get(1)?,
1711                    started_at,
1712                    completed_at,
1713                    total_rounds: row.get::<_, i64>(4)? as u32,
1714                    total_token_usage: TokenUsage {
1715                        prompt_tokens: row.get::<_, i64>(5)? as u64,
1716                        completion_tokens: row.get::<_, i64>(6)? as u64,
1717                        total_tokens: row.get::<_, i64>(7)? as u64,
1718                    },
1719                    tool_call_count: row.get::<_, i64>(8)? as u32,
1720                    prompt_cached_tool_outputs: row.get::<_, i64>(9)? as u64,
1721                    prompt_cached_tool_tokens_saved: row.get::<_, i64>(10)? as u64,
1722                    total_compression_events: row.get::<_, i64>(11)? as u64,
1723                    total_tokens_saved: row.get::<_, i64>(12)? as u64,
1724                    tool_breakdown,
1725                    status,
1726                    message_count: row.get::<_, i64>(14)? as u32,
1727                    duration_ms: compute_duration_ms(started_at, completed_at),
1728                });
1729            }
1730
1731            Ok(sessions)
1732        })
1733        .await
1734    }
1735
1736    async fn session_detail(&self, session_id: &str) -> MetricsResult<Option<SessionDetail>> {
1737        let session_id = session_id.to_string();
1738        self.with_connection(move |connection| {
1739            let session_sql = "SELECT session_id, model, started_at, completed_at, total_rounds, prompt_tokens, completion_tokens, total_tokens, tool_call_count, prompt_cached_tool_outputs, prompt_cached_tool_tokens_saved, total_compression_events, total_tokens_saved, status, message_count FROM session_metrics WHERE session_id = ?1";
1740            let session_row = connection
1741                .query_row(session_sql, params![session_id], |row| {
1742                    Ok((
1743                        row.get::<_, String>(0)?,
1744                        row.get::<_, String>(1)?,
1745                        row.get::<_, String>(2)?,
1746                        row.get::<_, Option<String>>(3)?,
1747                        row.get::<_, i64>(4)?,
1748                        row.get::<_, i64>(5)?,
1749                        row.get::<_, i64>(6)?,
1750                        row.get::<_, i64>(7)?,
1751                        row.get::<_, i64>(8)?,
1752                        row.get::<_, i64>(9)?,
1753                        row.get::<_, i64>(10)?,
1754                        row.get::<_, i64>(11)?,
1755                        row.get::<_, i64>(12)?,
1756                        row.get::<_, String>(13)?,
1757                        row.get::<_, i64>(14)?,
1758                    ))
1759                })
1760                .optional()?;
1761
1762            let Some((
1763                session_id,
1764                model,
1765                started_at_raw,
1766                completed_at_raw,
1767                total_rounds,
1768                prompt_tokens,
1769                completion_tokens,
1770                total_tokens,
1771                tool_call_count,
1772                prompt_cached_tool_outputs,
1773                prompt_cached_tool_tokens_saved,
1774                total_compression_events,
1775                total_tokens_saved,
1776                status_raw,
1777                message_count,
1778            )) = session_row
1779            else {
1780                return Ok(None);
1781            };
1782
1783            let started_at = parse_timestamp(started_at_raw)?;
1784            let completed_at = parse_optional_timestamp(completed_at_raw)?;
1785            let status = SessionStatus::from_db(&status_raw).ok_or_else(|| {
1786                MetricsError::InvalidData(format!("unknown session status: {}", status_raw))
1787            })?;
1788            let tool_breakdown = load_tool_breakdown(connection, &session_id)?;
1789
1790            let session = SessionMetrics {
1791                session_id: session_id.clone(),
1792                model,
1793                started_at,
1794                completed_at,
1795                total_rounds: total_rounds as u32,
1796                total_token_usage: TokenUsage {
1797                    prompt_tokens: prompt_tokens as u64,
1798                    completion_tokens: completion_tokens as u64,
1799                    total_tokens: total_tokens as u64,
1800                },
1801                tool_call_count: tool_call_count as u32,
1802                prompt_cached_tool_outputs: prompt_cached_tool_outputs as u64,
1803                prompt_cached_tool_tokens_saved: prompt_cached_tool_tokens_saved as u64,
1804                total_compression_events: total_compression_events as u64,
1805                total_tokens_saved: total_tokens_saved as u64,
1806                tool_breakdown,
1807                status,
1808                message_count: message_count as u32,
1809                duration_ms: compute_duration_ms(started_at, completed_at),
1810            };
1811
1812            let rounds = load_rounds(connection, &session_id)?;
1813            Ok(Some(SessionDetail { session, rounds }))
1814        })
1815        .await
1816    }
1817
1818    async fn increment_execute_sync_mismatch(
1819        &self,
1820        reason: &str,
1821        occurred_at: DateTime<Utc>,
1822    ) -> MetricsResult<()> {
1823        let reason = reason.to_string();
1824        let mismatch_date = occurred_at.date_naive().to_string();
1825        let updated_at = format_timestamp(occurred_at);
1826
1827        self.with_connection(move |connection| {
1828            connection.execute(
1829                r#"
1830                INSERT INTO execute_sync_mismatch_metrics (reason, mismatch_date, count, updated_at)
1831                VALUES (?1, ?2, 1, ?3)
1832                ON CONFLICT(reason, mismatch_date) DO UPDATE SET
1833                    count = count + 1,
1834                    updated_at = excluded.updated_at
1835                "#,
1836                params![reason, mismatch_date, updated_at],
1837            )?;
1838            Ok(())
1839        })
1840        .await
1841    }
1842
1843    async fn daily_metrics(
1844        &self,
1845        days: u32,
1846        end_date: Option<NaiveDate>,
1847    ) -> MetricsResult<Vec<DailyMetrics>> {
1848        let end_date = end_date.unwrap_or_else(|| Utc::now().date_naive());
1849        let span = days.max(1) - 1;
1850        let start_date = end_date - chrono::Duration::days(i64::from(span));
1851
1852        self.with_connection(move |connection| {
1853            let mut stmt = connection.prepare(
1854                r#"
1855                SELECT
1856                    date(started_at) AS date_key,
1857                    COUNT(*) AS total_sessions,
1858                    COALESCE(SUM(total_rounds), 0) AS total_rounds,
1859                    COALESCE(SUM(prompt_tokens), 0) AS prompt_tokens,
1860                    COALESCE(SUM(completion_tokens), 0) AS completion_tokens,
1861                    COALESCE(SUM(total_tokens), 0) AS total_tokens,
1862                    COALESCE(SUM(tool_call_count), 0) AS total_tool_calls,
1863                    COALESCE(SUM(prompt_cached_tool_outputs), 0) AS prompt_cached_tool_outputs
1864                FROM session_metrics
1865                WHERE date(started_at) BETWEEN date(?1) AND date(?2)
1866                GROUP BY date_key
1867                ORDER BY date_key ASC
1868                "#,
1869            )?;
1870
1871            let mut rows = stmt.query(params![start_date.to_string(), end_date.to_string()])?;
1872            let mut result = Vec::new();
1873
1874            while let Some(row) = rows.next()? {
1875                let date = NaiveDate::parse_from_str(&row.get::<_, String>(0)?, "%Y-%m-%d")?;
1876                let model_breakdown = load_daily_model_breakdown(connection, date)?;
1877                let tool_breakdown = load_daily_tool_breakdown(connection, date)?;
1878
1879                result.push(DailyMetrics {
1880                    date,
1881                    total_sessions: row.get::<_, i64>(1)? as u32,
1882                    total_rounds: row.get::<_, i64>(2)? as u32,
1883                    total_token_usage: TokenUsage {
1884                        prompt_tokens: row.get::<_, i64>(3)? as u64,
1885                        completion_tokens: row.get::<_, i64>(4)? as u64,
1886                        total_tokens: row.get::<_, i64>(5)? as u64,
1887                    },
1888                    total_tool_calls: row.get::<_, i64>(6)? as u32,
1889                    prompt_cached_tool_outputs: row.get::<_, i64>(7)? as u64,
1890                    model_breakdown,
1891                    tool_breakdown,
1892                });
1893            }
1894
1895            Ok(result)
1896        })
1897        .await
1898    }
1899
1900    async fn prune_rounds_before(&self, cutoff: DateTime<Utc>) -> MetricsResult<u64> {
1901        self.with_connection(move |connection| {
1902            let cutoff_str = format_timestamp(cutoff);
1903            let deleted = connection.execute(
1904                "DELETE FROM round_metrics WHERE started_at < ?1",
1905                params![cutoff_str],
1906            )?;
1907
1908            let mut stmt = connection.prepare("SELECT session_id FROM session_metrics")?;
1909            let session_ids: Vec<String> = stmt
1910                .query_map([], |row| row.get(0))?
1911                .collect::<Result<Vec<String>, _>>()?;
1912            for session_id in session_ids {
1913                refresh_session_aggregates(connection, &session_id, Utc::now())?;
1914            }
1915
1916            Ok(deleted as u64)
1917        })
1918        .await
1919    }
1920
1921    async fn reconcile_stale_executions(
1922        &self,
1923        active_session_ids: &[String],
1924        awaiting_response_session_ids: &[String],
1925    ) -> MetricsResult<()> {
1926        let active_session_ids = active_session_ids.to_vec();
1927        let awaiting_response_session_ids = awaiting_response_session_ids.to_vec();
1928
1929        self.with_connection(move |connection| {
1930            let reconciled_at = Utc::now();
1931            let reconciled_at_str = format_timestamp(reconciled_at);
1932
1933            let mut stmt = connection.prepare(
1934                "SELECT session_id FROM session_metrics WHERE status = 'running'",
1935            )?;
1936            let running_session_ids: Vec<String> = stmt
1937                .query_map([], |row| row.get(0))?
1938                .collect::<Result<Vec<String>, _>>()?;
1939
1940            for session_id in running_session_ids {
1941                if active_session_ids.iter().any(|id| id == &session_id) {
1942                    continue;
1943                }
1944
1945                let status = if awaiting_response_session_ids
1946                    .iter()
1947                    .any(|id| id == &session_id)
1948                {
1949                    SessionStatus::AwaitingResponse
1950                } else {
1951                    SessionStatus::Completed
1952                };
1953
1954                connection.execute(
1955                    "UPDATE session_metrics SET status = ?1, completed_at = COALESCE(completed_at, ?2), updated_at = ?2 WHERE session_id = ?3",
1956                    params![status.as_str(), reconciled_at_str, session_id],
1957                )?;
1958                refresh_session_aggregates(connection, &session_id, reconciled_at)?;
1959            }
1960
1961            connection.execute(
1962                "UPDATE round_metrics SET status = 'error', completed_at = COALESCE(completed_at, ?1), error = COALESCE(error, 'reconciled_stale_round') WHERE status = 'running'",
1963                params![reconciled_at_str],
1964            )?;
1965            connection.execute(
1966                "UPDATE forward_request_metrics SET status = 'error', completed_at = COALESCE(completed_at, ?1), error = COALESCE(error, 'reconciled_stale_forward'), updated_at = ?1 WHERE status = 'pending' AND completed_at IS NULL",
1967                params![reconciled_at_str],
1968            )?;
1969
1970            Ok(())
1971        })
1972        .await
1973    }
1974}
1975
1976/// Opens a connection to the SQLite database with proper configuration.
1977///
1978/// This function:
1979/// 1. Creates parent directories if they don't exist
1980/// 2. Opens the database file (creates if doesn't exist)
1981/// 3. Configures optimal SQLite settings:
1982///    - WAL mode for concurrent access
1983///    - Foreign key enforcement
1984///    - Normal synchronous mode for performance
1985///
1986/// # Arguments
1987///
1988/// * `path` - Path to the SQLite database file
1989///
1990/// # Errors
1991///
1992/// Returns an error if:
1993/// - Parent directories cannot be created
1994/// - Database file cannot be opened
1995/// - PRAGMA settings fail to apply
1996fn open_connection(path: &Path) -> MetricsResult<Connection> {
1997    if let Some(parent) = path.parent() {
1998        std::fs::create_dir_all(parent)?;
1999    }
2000    let connection = Connection::open(path)?;
2001    connection.execute_batch(
2002        r#"
2003        PRAGMA journal_mode = WAL;
2004        PRAGMA foreign_keys = ON;
2005        PRAGMA synchronous = NORMAL;
2006        "#,
2007    )?;
2008    Ok(connection)
2009}
2010
2011/// Formats a timestamp as RFC3339 string for database storage.
2012///
2013/// # Arguments
2014///
2015/// * `timestamp` - DateTime to format
2016///
2017/// # Returns
2018///
2019/// RFC3339 formatted string (e.g., "2026-02-24T12:34:56.789+00:00")
2020fn format_timestamp(timestamp: DateTime<Utc>) -> String {
2021    timestamp.to_rfc3339()
2022}
2023
2024/// Parses an RFC3339 timestamp string from the database.
2025///
2026/// # Arguments
2027///
2028/// * `raw` - RFC3339 formatted string
2029///
2030/// # Errors
2031///
2032/// Returns an error if the string doesn't conform to RFC3339 format.
2033fn parse_timestamp(raw: String) -> MetricsResult<DateTime<Utc>> {
2034    Ok(DateTime::parse_from_rfc3339(&raw)?.with_timezone(&Utc))
2035}
2036
2037/// Parses an optional RFC3339 timestamp string.
2038///
2039/// # Arguments
2040///
2041/// * `raw` - Optional RFC3339 formatted string
2042///
2043/// # Returns
2044///
2045/// Returns `Ok(None)` if the input is None, otherwise parses the timestamp.
2046fn parse_optional_timestamp(raw: Option<String>) -> MetricsResult<Option<DateTime<Utc>>> {
2047    raw.map(parse_timestamp).transpose()
2048}
2049
2050/// Computes the duration in milliseconds between two timestamps.
2051///
2052/// # Arguments
2053///
2054/// * `started_at` - Start timestamp
2055/// * `completed_at` - Optional end timestamp
2056///
2057/// # Returns
2058///
2059/// Returns `None` if `completed_at` is None, otherwise returns the duration in milliseconds.
2060/// Returns `None` if the duration is negative or too large to fit in u64.
2061fn compute_duration_ms(
2062    started_at: DateTime<Utc>,
2063    completed_at: Option<DateTime<Utc>>,
2064) -> Option<u64> {
2065    completed_at.and_then(|end| {
2066        end.signed_duration_since(started_at)
2067            .num_milliseconds()
2068            .try_into()
2069            .ok()
2070    })
2071}
2072
2073/// Builds a SQL WHERE clause for session metrics queries.
2074///
2075/// Constructs a WHERE clause based on the provided filter criteria,
2076/// appending parameters to the params vector in the correct order.
2077///
2078/// # Arguments
2079///
2080/// * `start_date` - Optional start date filter (inclusive)
2081/// * `end_date` - Optional end date filter (inclusive)
2082/// * `required_status` - Optional status filter (e.g., "running", "completed")
2083/// * `params_vec` - Vector to append SQL parameters to
2084///
2085/// # Returns
2086///
2087/// Returns an empty string if no filters are applied, otherwise returns
2088/// a WHERE clause starting with "WHERE ".
2089fn build_session_where_clause(
2090    start_date: Option<NaiveDate>,
2091    end_date: Option<NaiveDate>,
2092    required_status: Option<&str>,
2093    params_vec: &mut Vec<String>,
2094) -> String {
2095    let mut conditions = Vec::new();
2096
2097    if let Some(start) = start_date {
2098        conditions.push("date(started_at) >= date(?)".to_string());
2099        params_vec.push(start.to_string());
2100    }
2101
2102    if let Some(end) = end_date {
2103        conditions.push("date(started_at) <= date(?)".to_string());
2104        params_vec.push(end.to_string());
2105    }
2106
2107    if let Some(status) = required_status {
2108        conditions.push("status = ?".to_string());
2109        params_vec.push(status.to_string());
2110    }
2111
2112    if conditions.is_empty() {
2113        String::new()
2114    } else {
2115        format!("WHERE {}", conditions.join(" AND "))
2116    }
2117}
2118
2119/// Builds a SQL WHERE clause for forward request metrics queries.
2120///
2121/// Constructs a WHERE clause based on the provided filter criteria,
2122/// appending parameters to the params vector in the correct order.
2123///
2124/// # Arguments
2125///
2126/// * `start_date` - Optional start date filter (inclusive)
2127/// * `end_date` - Optional end date filter (inclusive)
2128/// * `endpoint` - Optional endpoint filter
2129/// * `model` - Optional model filter
2130/// * `params_vec` - Vector to append SQL parameters to
2131///
2132/// # Returns
2133///
2134/// Returns an empty string if no filters are applied, otherwise returns
2135/// a WHERE clause starting with "WHERE ".
2136fn build_forward_where_clause(
2137    start_date: Option<NaiveDate>,
2138    end_date: Option<NaiveDate>,
2139    endpoint: Option<&str>,
2140    model: Option<&str>,
2141    params_vec: &mut Vec<String>,
2142) -> String {
2143    let mut conditions = Vec::new();
2144
2145    if let Some(start) = start_date {
2146        conditions.push("date(started_at) >= date(?)".to_string());
2147        params_vec.push(start.to_string());
2148    }
2149
2150    if let Some(end) = end_date {
2151        conditions.push("date(started_at) <= date(?)".to_string());
2152        params_vec.push(end.to_string());
2153    }
2154
2155    if let Some(ep) = endpoint {
2156        conditions.push("endpoint = ?".to_string());
2157        params_vec.push(ep.to_string());
2158    }
2159
2160    if let Some(m) = model {
2161        conditions.push("model = ?".to_string());
2162        params_vec.push(m.to_string());
2163    }
2164
2165    if conditions.is_empty() {
2166        String::new()
2167    } else {
2168        format!("WHERE {}", conditions.join(" AND "))
2169    }
2170}
2171
2172fn build_execute_sync_mismatch_where_clause(
2173    start_date: Option<NaiveDate>,
2174    end_date: Option<NaiveDate>,
2175    reason: Option<&str>,
2176    params_vec: &mut Vec<String>,
2177) -> String {
2178    let mut conditions = Vec::new();
2179
2180    if let Some(start) = start_date {
2181        conditions.push("date(mismatch_date) >= date(?)".to_string());
2182        params_vec.push(start.to_string());
2183    }
2184
2185    if let Some(end) = end_date {
2186        conditions.push("date(mismatch_date) <= date(?)".to_string());
2187        params_vec.push(end.to_string());
2188    }
2189
2190    if let Some(reason) = reason {
2191        conditions.push("reason = ?".to_string());
2192        params_vec.push(reason.to_string());
2193    }
2194
2195    if conditions.is_empty() {
2196        String::new()
2197    } else {
2198        format!("WHERE {}", conditions.join(" AND "))
2199    }
2200}
2201
2202fn ensure_integer_column(
2203    connection: &Connection,
2204    table: &str,
2205    column: &str,
2206    default_value: i64,
2207) -> MetricsResult<()> {
2208    let pragma = format!("PRAGMA table_info({table})");
2209    let mut stmt = connection.prepare(&pragma)?;
2210    let mut rows = stmt.query([])?;
2211    while let Some(row) = rows.next()? {
2212        let name: String = row.get(1)?;
2213        if name == column {
2214            return Ok(());
2215        }
2216    }
2217
2218    let alter =
2219        format!("ALTER TABLE {table} ADD COLUMN {column} INTEGER NOT NULL DEFAULT {default_value}");
2220    connection.execute(&alter, [])?;
2221    Ok(())
2222}
2223
2224/// Refreshes aggregated metrics for a session by recalculating from child entities.
2225///
2226/// This function updates the session's aggregate columns by summing values
2227/// from all associated rounds and counting tool calls. It should be called
2228/// whenever a round or tool call is added or modified.
2229///
2230/// # Updated Columns
2231///
2232/// - `total_rounds`: Count of rounds in the session
2233/// - `prompt_tokens`: Sum of prompt tokens from all rounds
2234/// - `completion_tokens`: Sum of completion tokens from all rounds
2235/// - `total_tokens`: Sum of total tokens from all rounds
2236/// - `prompt_cached_tool_outputs`: Sum of prompt-side cached tool outputs from all rounds
2237/// - `total_compression_events`: Sum of compression events from all rounds
2238/// - `total_tokens_saved`: Sum of tokens saved by compression from all rounds
2239/// - `tool_call_count`: Count of tool calls in the session
2240/// - `updated_at`: Timestamp of this update
2241///
2242/// # Arguments
2243///
2244/// * `connection` - Database connection to use
2245/// * `session_id` - Session to refresh
2246/// * `updated_at` - Timestamp for the updated_at column
2247///
2248/// # Errors
2249///
2250/// Returns an error if the SQL execution fails.
2251fn refresh_session_aggregates(
2252    connection: &Connection,
2253    session_id: &str,
2254    updated_at: DateTime<Utc>,
2255) -> MetricsResult<()> {
2256    let updated_at = format_timestamp(updated_at);
2257    connection.execute(
2258        r#"
2259        UPDATE session_metrics
2260        SET
2261            total_rounds = COALESCE((SELECT COUNT(*) FROM round_metrics WHERE session_id = ?1), 0),
2262            prompt_tokens = COALESCE((SELECT SUM(prompt_tokens) FROM round_metrics WHERE session_id = ?1), 0),
2263            completion_tokens = COALESCE((SELECT SUM(completion_tokens) FROM round_metrics WHERE session_id = ?1), 0),
2264            total_tokens = COALESCE((SELECT SUM(total_tokens) FROM round_metrics WHERE session_id = ?1), 0),
2265            prompt_cached_tool_outputs = COALESCE((SELECT SUM(prompt_cached_tool_outputs) FROM round_metrics WHERE session_id = ?1), 0),
2266            prompt_cached_tool_tokens_saved = COALESCE((SELECT SUM(prompt_cached_tool_tokens_saved) FROM round_metrics WHERE session_id = ?1), 0),
2267            total_compression_events = COALESCE((SELECT SUM(compression_count) FROM round_metrics WHERE session_id = ?1), 0),
2268            total_tokens_saved = COALESCE((SELECT SUM(tokens_saved) FROM round_metrics WHERE session_id = ?1), 0),
2269            tool_call_count = COALESCE((SELECT COUNT(*) FROM tool_call_metrics WHERE session_id = ?1), 0),
2270            updated_at = ?2
2271        WHERE session_id = ?1
2272        "#,
2273        params![session_id, updated_at],
2274    )?;
2275    Ok(())
2276}
2277
2278/// Loads tool call breakdown (tool_name -> count) for a session.
2279///
2280/// Retrieves the count of tool invocations grouped by tool name
2281/// for the specified session.
2282///
2283/// # Arguments
2284///
2285/// * `connection` - Database connection to use
2286/// * `session_id` - Session to get tool breakdown for
2287///
2288/// # Returns
2289///
2290/// A HashMap mapping tool names to their invocation counts.
2291///
2292/// # Example
2293///
2294/// ```rust,ignore
2295/// let breakdown = load_tool_breakdown(&conn, "session-123")?;
2296/// // breakdown might be: {"read_file": 5, "execute_command": 2}
2297/// ```
2298fn load_tool_breakdown(
2299    connection: &Connection,
2300    session_id: &str,
2301) -> MetricsResult<HashMap<String, u32>> {
2302    let mut stmt = connection.prepare(
2303        "SELECT tool_name, COUNT(*) FROM tool_call_metrics WHERE session_id = ?1 GROUP BY tool_name",
2304    )?;
2305    let mut rows = stmt.query(params![session_id])?;
2306    let mut breakdown = HashMap::new();
2307
2308    while let Some(row) = rows.next()? {
2309        let tool: String = row.get(0)?;
2310        let count: i64 = row.get(1)?;
2311        breakdown.insert(tool, count as u32);
2312    }
2313
2314    Ok(breakdown)
2315}
2316
2317/// Loads all rounds for a session with their associated tool calls.
2318///
2319/// Retrieves complete round metrics including token usage, status,
2320/// and all tool calls made during each round.
2321///
2322/// # Arguments
2323///
2324/// * `connection` - Database connection to use
2325/// * `session_id` - Session to get rounds for
2326///
2327/// # Returns
2328///
2329/// A vector of RoundMetrics ordered by started_at ascending.
2330///
2331/// # Errors
2332///
2333/// Returns an error if:
2334/// - SQL execution fails
2335/// - Timestamp parsing fails
2336/// - Status values are invalid
2337fn load_rounds(connection: &Connection, session_id: &str) -> MetricsResult<Vec<RoundMetrics>> {
2338    let mut stmt = connection.prepare(
2339        "SELECT round_id, session_id, model, started_at, completed_at, status, prompt_tokens, completion_tokens, total_tokens, prompt_cached_tool_outputs, prompt_cached_tool_tokens_saved, compression_count, tokens_saved, error FROM round_metrics WHERE session_id = ?1 ORDER BY started_at ASC",
2340    )?;
2341    let mut rows = stmt.query(params![session_id])?;
2342    let mut rounds = Vec::new();
2343
2344    while let Some(row) = rows.next()? {
2345        let round_id: String = row.get(0)?;
2346        let started_at = parse_timestamp(row.get::<_, String>(3)?)?;
2347        let completed_at = parse_optional_timestamp(row.get::<_, Option<String>>(4)?)?;
2348        let status_raw: String = row.get(5)?;
2349        let status = RoundStatus::from_db(&status_raw).ok_or_else(|| {
2350            MetricsError::InvalidData(format!("unknown round status: {}", status_raw))
2351        })?;
2352
2353        rounds.push(RoundMetrics {
2354            round_id: round_id.clone(),
2355            session_id: row.get(1)?,
2356            model: row.get(2)?,
2357            started_at,
2358            completed_at,
2359            token_usage: TokenUsage {
2360                prompt_tokens: row.get::<_, i64>(6)? as u64,
2361                completion_tokens: row.get::<_, i64>(7)? as u64,
2362                total_tokens: row.get::<_, i64>(8)? as u64,
2363            },
2364            tool_calls: load_tool_calls(connection, &round_id)?,
2365            status,
2366            prompt_cached_tool_outputs: row.get::<_, i64>(9)? as u32,
2367            prompt_cached_tool_tokens_saved: row.get::<_, i64>(10)? as u32,
2368            compression_count: row.get::<_, i64>(11)? as u32,
2369            tokens_saved: row.get::<_, i64>(12)? as u32,
2370            error: row.get(13)?,
2371            duration_ms: compute_duration_ms(started_at, completed_at),
2372        });
2373    }
2374
2375    Ok(rounds)
2376}
2377
2378/// Loads all tool calls for a specific round.
2379///
2380/// Retrieves tool invocation details including timing, success status,
2381/// and error information.
2382///
2383/// # Arguments
2384///
2385/// * `connection` - Database connection to use
2386/// * `round_id` - Round to get tool calls for
2387///
2388/// # Returns
2389///
2390/// A vector of ToolCallMetrics ordered by started_at ascending.
2391fn load_tool_calls(connection: &Connection, round_id: &str) -> MetricsResult<Vec<ToolCallMetrics>> {
2392    let mut stmt = connection.prepare(
2393        "SELECT tool_call_id, tool_name, started_at, completed_at, success, error FROM tool_call_metrics WHERE round_id = ?1 ORDER BY started_at ASC",
2394    )?;
2395    let mut rows = stmt.query(params![round_id])?;
2396    let mut tools = Vec::new();
2397
2398    while let Some(row) = rows.next()? {
2399        let started_at = parse_timestamp(row.get::<_, String>(2)?)?;
2400        let completed_at = parse_optional_timestamp(row.get::<_, Option<String>>(3)?)?;
2401        let success = row.get::<_, Option<i64>>(4)?.map(|value| value > 0);
2402
2403        tools.push(ToolCallMetrics {
2404            tool_call_id: row.get(0)?,
2405            tool_name: row.get(1)?,
2406            started_at,
2407            completed_at,
2408            success,
2409            error: row.get(5)?,
2410            duration_ms: compute_duration_ms(started_at, completed_at),
2411        });
2412    }
2413
2414    Ok(tools)
2415}
2416
2417/// Loads model-level token usage breakdown for a specific date.
2418///
2419/// Retrieves aggregated token usage grouped by AI model for all
2420/// sessions that started on the specified date.
2421///
2422/// # Arguments
2423///
2424/// * `connection` - Database connection to use
2425/// * `date` - Date to get model breakdown for
2426///
2427/// # Returns
2428///
2429/// A HashMap mapping model names to their total token usage.
2430///
2431/// # Example
2432///
2433/// ```rust,ignore
2434/// let breakdown = load_daily_model_breakdown(&conn, NaiveDate::from_ymd_opt(2026, 2, 24).unwrap())?;
2435/// // breakdown might be: {"gpt-4": TokenUsage{...}, "claude-3": TokenUsage{...}}
2436/// ```
2437fn load_daily_model_breakdown(
2438    connection: &Connection,
2439    date: NaiveDate,
2440) -> MetricsResult<HashMap<String, TokenUsage>> {
2441    let mut stmt = connection.prepare(
2442        r#"
2443        SELECT model,
2444               COALESCE(SUM(prompt_tokens), 0),
2445               COALESCE(SUM(completion_tokens), 0),
2446               COALESCE(SUM(total_tokens), 0)
2447        FROM session_metrics
2448        WHERE date(started_at) = date(?1)
2449        GROUP BY model
2450        "#,
2451    )?;
2452
2453    let mut rows = stmt.query(params![date.to_string()])?;
2454    let mut breakdown = HashMap::new();
2455
2456    while let Some(row) = rows.next()? {
2457        breakdown.insert(
2458            row.get::<_, String>(0)?,
2459            TokenUsage {
2460                prompt_tokens: row.get::<_, i64>(1)? as u64,
2461                completion_tokens: row.get::<_, i64>(2)? as u64,
2462                total_tokens: row.get::<_, i64>(3)? as u64,
2463            },
2464        );
2465    }
2466
2467    Ok(breakdown)
2468}
2469
2470/// Loads tool call count breakdown for a specific date.
2471///
2472/// Retrieves the count of tool invocations grouped by tool name
2473/// for all tool calls that occurred on the specified date.
2474///
2475/// # Arguments
2476///
2477/// * `connection` - Database connection to use
2478/// * `date` - Date to get tool breakdown for
2479///
2480/// # Returns
2481///
2482/// A HashMap mapping tool names to their invocation counts.
2483///
2484/// # Example
2485///
2486/// ```rust,ignore
2487/// let breakdown = load_daily_tool_breakdown(&conn, NaiveDate::from_ymd_opt(2026, 2, 24).unwrap())?;
2488/// // breakdown might be: {"read_file": 10, "write_file": 5, "execute_command": 3}
2489/// ```
2490fn load_daily_tool_breakdown(
2491    connection: &Connection,
2492    date: NaiveDate,
2493) -> MetricsResult<HashMap<String, u32>> {
2494    let mut stmt = connection.prepare(
2495        r#"
2496        SELECT tool_name, COUNT(*)
2497        FROM tool_call_metrics
2498        WHERE date(started_at) = date(?1)
2499        GROUP BY tool_name
2500        "#,
2501    )?;
2502
2503    let mut rows = stmt.query(params![date.to_string()])?;
2504    let mut breakdown = HashMap::new();
2505
2506    while let Some(row) = rows.next()? {
2507        breakdown.insert(row.get::<_, String>(0)?, row.get::<_, i64>(1)? as u32);
2508    }
2509
2510    Ok(breakdown)
2511}
2512
2513fn load_execute_sync_mismatch_breakdown(
2514    connection: &Connection,
2515    start_date: Option<NaiveDate>,
2516    end_date: Option<NaiveDate>,
2517) -> MetricsResult<HashMap<String, u64>> {
2518    let mut params_vec = Vec::new();
2519    let where_clause =
2520        build_execute_sync_mismatch_where_clause(start_date, end_date, None, &mut params_vec);
2521    let sql = format!(
2522        "SELECT reason, COALESCE(SUM(count), 0) FROM execute_sync_mismatch_metrics {} GROUP BY reason ORDER BY reason ASC",
2523        where_clause
2524    );
2525    let mut stmt = connection.prepare(&sql)?;
2526    let mut rows = stmt.query(params_from_iter(params_vec.iter()))?;
2527    let mut breakdown = HashMap::new();
2528
2529    while let Some(row) = rows.next()? {
2530        breakdown.insert(row.get::<_, String>(0)?, row.get::<_, i64>(1)? as u64);
2531    }
2532
2533    Ok(breakdown)
2534}
2535
2536#[cfg(test)]
2537mod tests {
2538    use std::collections::HashMap;
2539
2540    use chrono::{NaiveDate, TimeZone, Utc};
2541    use tempfile::tempdir;
2542
2543    use super::{MetricsStorage, SqliteMetricsStorage, ToolCallCompletion};
2544    use crate::metrics::types::{
2545        ForwardMetricsFilter, ForwardStatus, MetricsDateFilter, RoundStatus, SessionMetricsFilter,
2546        SessionStatus, TokenUsage,
2547    };
2548
2549    #[tokio::test]
2550    async fn storage_records_session_and_round_data_for_summary_queries() {
2551        let dir = tempdir().expect("temp dir");
2552        let storage = SqliteMetricsStorage::new(dir.path().join("metrics.db"));
2553
2554        storage.init().await.expect("init storage");
2555
2556        let started_at = Utc
2557            .with_ymd_and_hms(2026, 2, 10, 10, 0, 0)
2558            .single()
2559            .expect("valid datetime");
2560        storage
2561            .upsert_session_start("session-a", "gpt-4", started_at)
2562            .await
2563            .expect("session started");
2564        storage
2565            .update_session_message_count("session-a", 7, started_at)
2566            .await
2567            .expect("message count update");
2568
2569        storage
2570            .insert_round_start("round-a", "session-a", "gpt-4", started_at)
2571            .await
2572            .expect("round start");
2573        storage
2574            .insert_tool_start("tool-1", "round-a", "session-a", "read_file", started_at)
2575            .await
2576            .expect("tool start");
2577        storage
2578            .complete_tool_call(
2579                "tool-1",
2580                ToolCallCompletion {
2581                    completed_at: started_at,
2582                    success: true,
2583                    error: None,
2584                },
2585            )
2586            .await
2587            .expect("tool completion");
2588        storage
2589            .complete_round(
2590                "round-a",
2591                started_at,
2592                RoundStatus::Success,
2593                TokenUsage {
2594                    prompt_tokens: 10,
2595                    completion_tokens: 15,
2596                    total_tokens: 25,
2597                },
2598                3,
2599                0,
2600                None,
2601            )
2602            .await
2603            .expect("round completion");
2604        storage
2605            .complete_session("session-a", SessionStatus::Completed, started_at)
2606            .await
2607            .expect("session completion");
2608
2609        let summary = storage
2610            .summary(MetricsDateFilter::default())
2611            .await
2612            .expect("summary query");
2613
2614        assert_eq!(summary.total_sessions, 1);
2615        assert_eq!(summary.total_tokens.total_tokens, 25);
2616        assert_eq!(summary.total_tool_calls, 1);
2617        assert_eq!(summary.prompt_cached_tool_outputs, 3);
2618
2619        let detail = storage
2620            .session_detail("session-a")
2621            .await
2622            .expect("session detail query")
2623            .expect("session detail should exist");
2624        assert_eq!(detail.session.prompt_cached_tool_outputs, 3);
2625        assert_eq!(detail.rounds.len(), 1);
2626        assert_eq!(detail.rounds[0].prompt_cached_tool_outputs, 3);
2627    }
2628
2629    #[tokio::test]
2630    async fn storage_filters_sessions_and_returns_tool_breakdown() {
2631        let dir = tempdir().expect("temp dir");
2632        let storage = SqliteMetricsStorage::new(dir.path().join("metrics.db"));
2633
2634        storage.init().await.expect("init storage");
2635
2636        let day_a = Utc
2637            .with_ymd_and_hms(2026, 2, 1, 9, 0, 0)
2638            .single()
2639            .expect("valid datetime");
2640        let day_b = Utc
2641            .with_ymd_and_hms(2026, 2, 5, 9, 0, 0)
2642            .single()
2643            .expect("valid datetime");
2644
2645        storage
2646            .upsert_session_start("s1", "gpt-4", day_a)
2647            .await
2648            .expect("session start");
2649        storage
2650            .insert_round_start("r1", "s1", "gpt-4", day_a)
2651            .await
2652            .expect("round start");
2653        storage
2654            .insert_tool_start("t1", "r1", "s1", "read_file", day_a)
2655            .await
2656            .expect("tool start");
2657        storage
2658            .complete_tool_call(
2659                "t1",
2660                ToolCallCompletion {
2661                    completed_at: day_a,
2662                    success: true,
2663                    error: None,
2664                },
2665            )
2666            .await
2667            .expect("tool complete");
2668        storage
2669            .complete_round(
2670                "r1",
2671                day_a,
2672                RoundStatus::Success,
2673                TokenUsage {
2674                    prompt_tokens: 1,
2675                    completion_tokens: 1,
2676                    total_tokens: 2,
2677                },
2678                0,
2679                0,
2680                None,
2681            )
2682            .await
2683            .expect("round complete");
2684
2685        storage
2686            .upsert_session_start("s2", "claude-3", day_b)
2687            .await
2688            .expect("session start");
2689
2690        let sessions = storage
2691            .sessions(SessionMetricsFilter {
2692                start_date: Some(NaiveDate::from_ymd_opt(2026, 2, 1).expect("valid date")),
2693                end_date: Some(NaiveDate::from_ymd_opt(2026, 2, 3).expect("valid date")),
2694                model: Some("gpt-4".to_string()),
2695                limit: Some(100),
2696            })
2697            .await
2698            .expect("sessions query");
2699
2700        assert_eq!(sessions.len(), 1);
2701        assert_eq!(sessions[0].session_id, "s1");
2702        assert_eq!(sessions[0].tool_breakdown.get("read_file"), Some(&1));
2703    }
2704
2705    #[tokio::test]
2706    async fn storage_produces_daily_rollups_with_model_and_tool_breakdowns() {
2707        let dir = tempdir().expect("temp dir");
2708        let storage = SqliteMetricsStorage::new(dir.path().join("metrics.db"));
2709
2710        storage.init().await.expect("init storage");
2711
2712        let now = Utc
2713            .with_ymd_and_hms(2026, 2, 10, 12, 0, 0)
2714            .single()
2715            .expect("valid datetime");
2716        storage
2717            .upsert_session_start("daily-1", "gpt-4", now)
2718            .await
2719            .expect("session start");
2720        storage
2721            .insert_round_start("daily-r1", "daily-1", "gpt-4", now)
2722            .await
2723            .expect("round start");
2724        storage
2725            .insert_tool_start("daily-t1", "daily-r1", "daily-1", "write_file", now)
2726            .await
2727            .expect("tool start");
2728        storage
2729            .complete_tool_call(
2730                "daily-t1",
2731                ToolCallCompletion {
2732                    completed_at: now,
2733                    success: true,
2734                    error: None,
2735                },
2736            )
2737            .await
2738            .expect("tool complete");
2739        storage
2740            .complete_round(
2741                "daily-r1",
2742                now,
2743                RoundStatus::Success,
2744                TokenUsage {
2745                    prompt_tokens: 3,
2746                    completion_tokens: 7,
2747                    total_tokens: 10,
2748                },
2749                0,
2750                0,
2751                None,
2752            )
2753            .await
2754            .expect("round completion");
2755
2756        let daily = storage
2757            .daily_metrics(
2758                7,
2759                Some(NaiveDate::from_ymd_opt(2026, 2, 10).expect("valid date")),
2760            )
2761            .await
2762            .expect("daily metrics");
2763
2764        assert_eq!(daily.len(), 1);
2765        let row = &daily[0];
2766        assert_eq!(row.total_sessions, 1);
2767        assert_eq!(row.total_rounds, 1);
2768        assert_eq!(row.total_tool_calls, 1);
2769        assert_eq!(
2770            row.model_breakdown
2771                .get("gpt-4")
2772                .map(|usage| usage.total_tokens),
2773            Some(10)
2774        );
2775        assert_eq!(
2776            row.tool_breakdown,
2777            HashMap::from([(String::from("write_file"), 1)])
2778        );
2779    }
2780
2781    #[tokio::test]
2782    async fn storage_reconciles_stale_running_sessions_rounds_and_forwards() {
2783        let dir = tempdir().expect("temp dir");
2784        let storage = SqliteMetricsStorage::new(dir.path().join("metrics.db"));
2785
2786        storage.init().await.expect("init storage");
2787
2788        let now = Utc
2789            .with_ymd_and_hms(2026, 2, 12, 9, 0, 0)
2790            .single()
2791            .expect("valid datetime");
2792
2793        storage
2794            .upsert_session_start("stale-await", "gpt-4", now)
2795            .await
2796            .expect("await session start");
2797        storage
2798            .insert_round_start("round-await", "stale-await", "gpt-4", now)
2799            .await
2800            .expect("await round start");
2801
2802        storage
2803            .upsert_session_start("stale-complete", "gpt-4", now)
2804            .await
2805            .expect("complete session start");
2806        storage
2807            .insert_round_start("round-complete", "stale-complete", "gpt-4", now)
2808            .await
2809            .expect("complete round start");
2810
2811        storage
2812            .insert_forward_start(
2813                "forward-pending",
2814                "/v1/chat/completions",
2815                "gpt-4",
2816                false,
2817                now,
2818            )
2819            .await
2820            .expect("forward start");
2821
2822        storage
2823            .reconcile_stale_executions(&[], &[String::from("stale-await")])
2824            .await
2825            .expect("reconcile stale executions");
2826
2827        let sessions = storage
2828            .sessions(SessionMetricsFilter::default())
2829            .await
2830            .expect("sessions query");
2831        let stale_await = sessions
2832            .iter()
2833            .find(|session| session.session_id == "stale-await")
2834            .expect("stale-await should exist");
2835        let stale_complete = sessions
2836            .iter()
2837            .find(|session| session.session_id == "stale-complete")
2838            .expect("stale-complete should exist");
2839        assert_eq!(stale_await.status, SessionStatus::AwaitingResponse);
2840        assert_eq!(stale_complete.status, SessionStatus::Completed);
2841        assert!(stale_await.completed_at.is_some());
2842        assert!(stale_complete.completed_at.is_some());
2843
2844        let await_detail = storage
2845            .session_detail("stale-await")
2846            .await
2847            .expect("await detail query")
2848            .expect("await detail exists");
2849        let complete_detail = storage
2850            .session_detail("stale-complete")
2851            .await
2852            .expect("complete detail query")
2853            .expect("complete detail exists");
2854        assert_eq!(await_detail.rounds[0].status, RoundStatus::Error);
2855        assert_eq!(complete_detail.rounds[0].status, RoundStatus::Error);
2856        assert_eq!(
2857            await_detail.rounds[0].error.as_deref(),
2858            Some("reconciled_stale_round")
2859        );
2860        assert_eq!(
2861            complete_detail.rounds[0].error.as_deref(),
2862            Some("reconciled_stale_round")
2863        );
2864
2865        let forward_requests = storage
2866            .forward_requests(ForwardMetricsFilter::default())
2867            .await
2868            .expect("forward requests query");
2869        assert_eq!(forward_requests.len(), 1);
2870        assert_eq!(forward_requests[0].status, Some(ForwardStatus::Error));
2871        assert_eq!(
2872            forward_requests[0].error.as_deref(),
2873            Some("reconciled_stale_forward")
2874        );
2875
2876        let forward_summary = storage
2877            .forward_summary(ForwardMetricsFilter::default())
2878            .await
2879            .expect("forward summary query");
2880        assert_eq!(forward_summary.total_requests, 1);
2881        assert_eq!(forward_summary.successful_requests, 0);
2882        assert_eq!(forward_summary.failed_requests, 1);
2883
2884        let summary = storage
2885            .summary(MetricsDateFilter::default())
2886            .await
2887            .expect("summary query");
2888        assert_eq!(summary.total_sessions, 2);
2889        assert_eq!(summary.active_sessions, 0);
2890        assert_eq!(summary.awaiting_response_sessions, 1);
2891        assert_eq!(summary.completed_sessions, 1);
2892        assert_eq!(summary.error_sessions, 0);
2893        assert_eq!(summary.cancelled_sessions, 0);
2894    }
2895
2896    #[tokio::test]
2897    async fn storage_summarizes_execute_sync_mismatches_by_reason() {
2898        let dir = tempdir().expect("temp dir");
2899        let storage = SqliteMetricsStorage::new(dir.path().join("metrics.db"));
2900
2901        storage.init().await.expect("init storage");
2902
2903        let day_a = Utc
2904            .with_ymd_and_hms(2026, 2, 10, 10, 0, 0)
2905            .single()
2906            .expect("valid datetime");
2907        let day_b = Utc
2908            .with_ymd_and_hms(2026, 2, 11, 10, 0, 0)
2909            .single()
2910            .expect("valid datetime");
2911
2912        storage
2913            .increment_execute_sync_mismatch("message_count", day_a)
2914            .await
2915            .expect("message_count mismatch one");
2916        storage
2917            .increment_execute_sync_mismatch("message_count", day_a)
2918            .await
2919            .expect("message_count mismatch two");
2920        storage
2921            .increment_execute_sync_mismatch("pending_question", day_a)
2922            .await
2923            .expect("pending question mismatch");
2924        storage
2925            .increment_execute_sync_mismatch("last_message_id", day_b)
2926            .await
2927            .expect("last_message_id mismatch");
2928
2929        let day_a_summary = storage
2930            .summary(MetricsDateFilter {
2931                start_date: Some(NaiveDate::from_ymd_opt(2026, 2, 10).expect("valid date")),
2932                end_date: Some(NaiveDate::from_ymd_opt(2026, 2, 10).expect("valid date")),
2933            })
2934            .await
2935            .expect("day a summary");
2936
2937        assert_eq!(day_a_summary.total_sync_mismatches, 3);
2938        assert_eq!(
2939            day_a_summary.sync_mismatch_breakdown,
2940            HashMap::from([
2941                (String::from("message_count"), 2_u64),
2942                (String::from("pending_question"), 1_u64),
2943            ])
2944        );
2945
2946        let full_summary = storage
2947            .summary(MetricsDateFilter::default())
2948            .await
2949            .expect("full summary");
2950        assert_eq!(full_summary.total_sync_mismatches, 4);
2951        assert_eq!(
2952            full_summary.sync_mismatch_breakdown.get("last_message_id"),
2953            Some(&1_u64)
2954        );
2955    }
2956}