Skip to main content

kaizen/store/
query.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! Analytics query facade. DuckDB scans cold Parquet; SQLite remains warm detail store.
3
4use crate::store::sqlite::{Store, SummaryStats};
5use anyhow::Result;
6use std::path::{Path, PathBuf};
7
8pub struct QueryStore {
9    root: PathBuf,
10}
11
12impl QueryStore {
13    pub fn open(root: &Path) -> Result<Self> {
14        Ok(Self {
15            root: root.to_path_buf(),
16        })
17    }
18
19    pub fn summary_stats(&self, sqlite: &Store, workspace: &str) -> Result<SummaryStats> {
20        let mut stats = sqlite.summary_stats(workspace)?;
21        #[cfg(feature = "analytics-duckdb")]
22        {
23            if self.events_glob_exists() {
24                let duck = duckdb::Connection::open_in_memory()?;
25                let glob = sql_string(&self.events_glob());
26                let cost: i64 = duck.query_row(
27                    &format!("SELECT COALESCE(SUM(cost_usd_e6), 0) FROM read_parquet({glob})"),
28                    [],
29                    |r| r.get(0),
30                )?;
31                stats.total_cost_usd_e6 = stats.total_cost_usd_e6.saturating_add(cost);
32                stats.top_tools = merge_top_tools(
33                    stats.top_tools,
34                    cold_top_tools(&duck, &glob).unwrap_or_default(),
35                );
36            }
37        }
38        Ok(stats)
39    }
40
41    pub fn cold_event_count(&self) -> Result<u64> {
42        #[cfg(feature = "analytics-duckdb")]
43        {
44            if !self.events_glob_exists() {
45                return Ok(0);
46            }
47            let duck = duckdb::Connection::open_in_memory()?;
48            let sql = format!(
49                "SELECT COUNT(*) FROM read_parquet({})",
50                sql_string(&self.events_glob())
51            );
52            let n: i64 = duck.query_row(&sql, [], |r| r.get(0))?;
53            Ok(n as u64)
54        }
55        #[cfg(not(feature = "analytics-duckdb"))]
56        {
57            Ok(0)
58        }
59    }
60
61    fn events_glob(&self) -> String {
62        self.root
63            .join("cold/events/*.parquet")
64            .to_string_lossy()
65            .to_string()
66    }
67
68    fn events_glob_exists(&self) -> bool {
69        self.root.join("cold/events").exists()
70    }
71}
72
73#[cfg(feature = "analytics-duckdb")]
74fn cold_top_tools(duck: &duckdb::Connection, glob: &str) -> Result<Vec<(String, u64)>> {
75    let sql = format!(
76        "SELECT tool, COUNT(*) FROM read_parquet({glob}) \
77         WHERE tool IS NOT NULL GROUP BY tool ORDER BY COUNT(*) DESC LIMIT 10"
78    );
79    let mut stmt = duck.prepare(&sql)?;
80    let rows = stmt.query_map([], |r| {
81        Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)? as u64))
82    })?;
83    Ok(rows.filter_map(|r| r.ok()).collect())
84}
85
86fn merge_top_tools(mut warm: Vec<(String, u64)>, cold: Vec<(String, u64)>) -> Vec<(String, u64)> {
87    for (tool, n) in cold {
88        if let Some((_, total)) = warm.iter_mut().find(|(t, _)| t == &tool) {
89            *total += n;
90        } else {
91            warm.push((tool, n));
92        }
93    }
94    warm.sort_by(|a, b| b.1.cmp(&a.1).then_with(|| a.0.cmp(&b.0)));
95    warm.truncate(10);
96    warm
97}
98
99fn sql_string(s: &str) -> String {
100    format!("'{}'", s.replace('\'', "''"))
101}