Skip to main content

kaizen/store/
query.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! Analytics query facade. DuckDB scans cold Parquet; SQLite remains warm detail store.
3
4use crate::store::sqlite::{Store, SummaryStats};
5use anyhow::Result;
6use std::path::Path;
7#[cfg(feature = "analytics-duckdb")]
8use std::path::PathBuf;
9
10pub struct QueryStore {
11    #[cfg(feature = "analytics-duckdb")]
12    root: PathBuf,
13}
14
15impl QueryStore {
16    pub fn open(_root: &Path) -> Result<Self> {
17        Ok(Self {
18            #[cfg(feature = "analytics-duckdb")]
19            root: _root.to_path_buf(),
20        })
21    }
22
23    pub fn summary_stats(&self, sqlite: &Store, workspace: &str) -> Result<SummaryStats> {
24        self.merge_cold_stats(sqlite.summary_stats(workspace)?)
25    }
26
27    #[cfg(feature = "analytics-duckdb")]
28    fn merge_cold_stats(&self, mut stats: SummaryStats) -> Result<SummaryStats> {
29        if self.events_glob_exists() {
30            let duck = duckdb::Connection::open_in_memory()?;
31            let glob = sql_string(&self.events_glob());
32            let cost: i64 = duck.query_row(
33                &format!("SELECT COALESCE(SUM(cost_usd_e6), 0) FROM read_parquet({glob})"),
34                [],
35                |r| r.get(0),
36            )?;
37            stats.total_cost_usd_e6 = stats.total_cost_usd_e6.saturating_add(cost);
38            stats.top_tools = merge_top_tools(
39                stats.top_tools,
40                cold_top_tools(&duck, &glob).unwrap_or_default(),
41            );
42        }
43        Ok(stats)
44    }
45
46    #[cfg(not(feature = "analytics-duckdb"))]
47    fn merge_cold_stats(&self, stats: SummaryStats) -> Result<SummaryStats> {
48        Ok(stats)
49    }
50
51    pub fn cold_event_count(&self) -> Result<u64> {
52        #[cfg(feature = "analytics-duckdb")]
53        {
54            if !self.events_glob_exists() {
55                return Ok(0);
56            }
57            let duck = duckdb::Connection::open_in_memory()?;
58            let sql = format!(
59                "SELECT COUNT(*) FROM read_parquet({})",
60                sql_string(&self.events_glob())
61            );
62            let n: i64 = duck.query_row(&sql, [], |r| r.get(0))?;
63            Ok(n as u64)
64        }
65        #[cfg(not(feature = "analytics-duckdb"))]
66        {
67            Ok(0)
68        }
69    }
70
71    #[cfg(feature = "analytics-duckdb")]
72    fn events_glob(&self) -> String {
73        self.root
74            .join("cold/events/*.parquet")
75            .to_string_lossy()
76            .to_string()
77    }
78
79    #[cfg(feature = "analytics-duckdb")]
80    fn events_glob_exists(&self) -> bool {
81        self.root.join("cold/events").exists()
82    }
83}
84
85#[cfg(feature = "analytics-duckdb")]
86fn cold_top_tools(duck: &duckdb::Connection, glob: &str) -> Result<Vec<(String, u64)>> {
87    let sql = format!(
88        "SELECT tool, COUNT(*) FROM read_parquet({glob}) \
89         WHERE tool IS NOT NULL GROUP BY tool ORDER BY COUNT(*) DESC LIMIT 10"
90    );
91    let mut stmt = duck.prepare(&sql)?;
92    let rows = stmt.query_map([], |r| {
93        Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
94    })?;
95    Ok(rows.filter_map(|r| r.ok()).collect())
96}
97
98#[cfg(feature = "analytics-duckdb")]
99fn merge_top_tools(mut warm: Vec<(String, u64)>, cold: Vec<(String, u64)>) -> Vec<(String, u64)> {
100    for (tool, n) in cold {
101        if let Some((_, total)) = warm.iter_mut().find(|(t, _)| t == &tool) {
102            *total += n;
103        } else {
104            warm.push((tool, n));
105        }
106    }
107    warm.sort_by(|a, b| b.1.cmp(&a.1).then_with(|| a.0.cmp(&b.0)));
108    warm.truncate(10);
109    warm
110}
111
112#[cfg(feature = "analytics-duckdb")]
113fn sql_string(s: &str) -> String {
114    format!("'{}'", s.replace('\'', "''"))
115}