1use crate::core::config::try_team_salt;
5use crate::core::event::{Event, EventKind, EventSource, SessionRecord, SessionStatus};
6use crate::core::trace_span::{TraceSpanKind, TraceSpanRecord};
7use crate::metrics::types::{
8 FileFact, RankedFile, RankedTool, RepoEdge, RepoSnapshotRecord, ToolSpanView,
9};
10use crate::store::event_index::index_event_derived;
11use crate::store::projector::{DEFAULT_ORPHAN_TTL_MS, Projector, ProjectorEvent};
12use crate::store::tool_span_index::{
13 clear_session_spans, rebuild_tool_spans_for_session, upsert_tool_span_record,
14};
15use crate::store::{hot_log::HotLog, outbox_redb::Outbox};
16use crate::sync::context::SyncIngestContext;
17use crate::sync::outbound::outbound_event_from_row;
18use crate::sync::redact::redact_payload;
19use crate::sync::smart::enqueue_tool_spans_for_session;
20use anyhow::{Context, Result};
21use rusqlite::types::Value;
22use rusqlite::{
23 Connection, OpenFlags, OptionalExtension, TransactionBehavior, params, params_from_iter,
24};
25use std::cell::RefCell;
26use std::collections::{HashMap, HashSet};
27use std::path::{Path, PathBuf};
28
29pub(super) const SYNTHETIC_TS_CEILING_MS: i64 = 1_000_000_000_000;
32pub(super) const DEFAULT_MMAP_MB: u64 = 256;
33pub(super) const SESSION_SELECT: &str =
34 "SELECT id, agent, model, workspace, started_at_ms, ended_at_ms,
35 status, trace_path, start_commit, end_commit, branch, dirty_start, dirty_end,
36 repo_binding_source, prompt_fingerprint, parent_session_id, agent_version, os, arch,
37 repo_file_count, repo_total_loc FROM sessions";
38pub(super) const PAIN_HOTSPOTS_SQL: &str = "
39 SELECT f.path,
40 COUNT(s.id) * f.complexity_total AS value,
41 f.complexity_total,
42 f.churn_30d
43 FROM file_facts f
44 LEFT JOIN tool_span_paths tsp ON tsp.path = f.path
45 LEFT JOIN tool_spans ts ON ts.span_id = tsp.span_id
46 AND ((ts.started_at_ms >= ?3 AND ts.started_at_ms <= ?4)
47 OR (ts.started_at_ms IS NULL AND ts.ended_at_ms >= ?3 AND ts.ended_at_ms <= ?4))
48 LEFT JOIN sessions s ON s.id = ts.session_id AND s.workspace = ?2
49 WHERE f.snapshot_id = ?1
50 GROUP BY f.path, f.complexity_total, f.churn_30d
51 ORDER BY value DESC, f.path ASC
52 LIMIT 10";
53pub(super) const TOOL_RANK_ROWS_SQL: &str = "
54 WITH scoped AS (
55 SELECT COALESCE(ts.tool, 'unknown') AS tool,
56 ts.lead_time_ms,
57 COALESCE(ts.tokens_in, 0) + COALESCE(ts.tokens_out, 0)
58 + COALESCE(ts.reasoning_tokens, 0) AS total_tokens,
59 COALESCE(ts.reasoning_tokens, 0) AS reasoning_tokens
60 FROM tool_spans ts
61 JOIN sessions s ON s.id = ts.session_id
62 WHERE s.workspace = ?1
63 AND ((ts.started_at_ms >= ?2 AND ts.started_at_ms <= ?3)
64 OR (ts.started_at_ms IS NULL AND ts.ended_at_ms >= ?2 AND ts.ended_at_ms <= ?3))
65 ),
66 agg AS (
67 SELECT tool, COUNT(*) AS calls, SUM(total_tokens) AS total_tokens,
68 SUM(reasoning_tokens) AS total_reasoning_tokens
69 FROM scoped GROUP BY tool
70 ),
71 lat AS (
72 SELECT tool, lead_time_ms,
73 ROW_NUMBER() OVER (PARTITION BY tool ORDER BY lead_time_ms) AS rn,
74 COUNT(*) OVER (PARTITION BY tool) AS n
75 FROM scoped WHERE lead_time_ms IS NOT NULL
76 ),
77 pct AS (
78 SELECT tool,
79 MAX(CASE WHEN rn = CAST(((n - 1) * 50) / 100 AS INTEGER) + 1 THEN lead_time_ms END) AS p50_ms,
80 MAX(CASE WHEN rn = CAST(((n - 1) * 95) / 100 AS INTEGER) + 1 THEN lead_time_ms END) AS p95_ms
81 FROM lat GROUP BY tool
82 )
83 SELECT agg.tool, agg.calls, pct.p50_ms, pct.p95_ms,
84 agg.total_tokens, agg.total_reasoning_tokens
85 FROM agg LEFT JOIN pct ON pct.tool = agg.tool";
86
87#[derive(Clone)]
89pub struct InsightsStats {
90 pub total_sessions: u64,
91 pub running_sessions: u64,
92 pub total_events: u64,
93 pub sessions_by_day: Vec<(String, u64)>,
95 pub recent: Vec<(SessionRecord, u64)>,
97 pub top_tools: Vec<(String, u64)>,
99 pub total_cost_usd_e6: i64,
100 pub sessions_with_cost: u64,
101}
102
103pub struct SyncStatusSnapshot {
105 pub pending_outbox: u64,
106 pub last_success_ms: Option<u64>,
107 pub last_error: Option<String>,
108 pub consecutive_failures: u32,
109}
110
111#[derive(serde::Serialize)]
113pub struct SummaryStats {
114 pub session_count: u64,
115 pub total_cost_usd_e6: i64,
116 pub by_agent: Vec<(String, u64)>,
117 pub by_model: Vec<(String, u64)>,
118 pub top_tools: Vec<(String, u64)>,
119}
120
121#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, serde::Serialize)]
123#[serde(rename_all = "lowercase")]
124pub enum GuidanceKind {
125 Skill,
126 Rule,
127}
128
129#[derive(Clone, Debug, serde::Serialize)]
131pub struct GuidancePerfRow {
132 pub kind: GuidanceKind,
133 pub id: String,
134 pub sessions: u64,
135 pub sessions_pct: f64,
136 pub total_cost_usd_e6: i64,
137 pub avg_cost_per_session_usd: Option<f64>,
138 pub vs_workspace_avg_cost_per_session_usd: Option<f64>,
139 pub on_disk: bool,
140}
141
142#[derive(Clone, Debug, serde::Serialize)]
144pub struct GuidanceReport {
145 pub workspace: String,
146 pub window_start_ms: u64,
147 pub window_end_ms: u64,
148 pub sessions_in_window: u64,
149 pub workspace_avg_cost_per_session_usd: Option<f64>,
150 pub rows: Vec<GuidancePerfRow>,
151}
152
153#[derive(Debug, Clone, Copy, Default, Eq, PartialEq)]
155pub struct PruneStats {
156 pub sessions_removed: u64,
157 pub events_removed: u64,
158}
159
160#[derive(Debug, Clone, Eq, PartialEq)]
162pub struct SessionOutcomeRow {
163 pub session_id: String,
164 pub test_passed: Option<i64>,
165 pub test_failed: Option<i64>,
166 pub test_skipped: Option<i64>,
167 pub build_ok: Option<bool>,
168 pub lint_errors: Option<i64>,
169 pub revert_lines_14d: Option<i64>,
170 pub pr_open: Option<i64>,
171 pub ci_ok: Option<bool>,
172 pub measured_at_ms: u64,
173 pub measure_error: Option<String>,
174}
175
176#[derive(Debug, Clone)]
178pub struct SessionSampleAgg {
179 pub session_id: String,
180 pub sample_count: u64,
181 pub max_cpu_percent: f64,
182 pub max_rss_bytes: u64,
183}
184
185pub const SYNC_STATE_LAST_AGENT_SCAN_MS: &str = "last_agent_scan_ms";
187pub const SYNC_STATE_LAST_AUTO_PRUNE_MS: &str = "last_auto_prune_ms";
188pub const SYNC_STATE_SEARCH_DIRTY_MS: &str = "search_dirty_ms";
189
190pub struct ToolSpanSyncRow {
191 pub span_id: String,
192 pub session_id: String,
193 pub tool: Option<String>,
194 pub tool_call_id: Option<String>,
195 pub status: String,
196 pub started_at_ms: Option<u64>,
197 pub ended_at_ms: Option<u64>,
198 pub lead_time_ms: Option<u64>,
199 pub tokens_in: Option<u32>,
200 pub tokens_out: Option<u32>,
201 pub reasoning_tokens: Option<u32>,
202 pub cost_usd_e6: Option<i64>,
203 pub paths: Vec<String>,
204}
205
206pub(crate) struct CaptureQualityRow {
207 pub source: String,
208 pub has_tokens: bool,
209 pub has_latency: bool,
210 pub has_context: bool,
211}
212
213pub(crate) struct TraceSpanQualityRow {
214 pub kind: String,
215 pub is_orphan: bool,
216}
217
218#[derive(Debug, Clone, Copy, Eq, PartialEq)]
219pub enum StoreOpenMode {
220 ReadWrite,
221 ReadOnlyQuery,
222}
223
224#[derive(Debug, Clone)]
225pub struct SessionStatusRow {
226 pub id: String,
227 pub status: SessionStatus,
228 pub ended_at_ms: Option<u64>,
229}
230
231#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
232pub struct SessionFilter {
233 pub agent_prefix: Option<String>,
234 pub status: Option<SessionStatus>,
235 pub since_ms: Option<u64>,
236}
237
238#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
239pub struct SessionPage {
240 pub rows: Vec<SessionRecord>,
241 pub total: usize,
242 pub next_offset: Option<usize>,
243}
244
245#[derive(Clone)]
246struct SpanTreeCacheEntry {
247 session_id: String,
248 last_event_seq: Option<u64>,
249 nodes: Vec<crate::store::span_tree::SpanNode>,
250}
251
252pub struct Store {
253 conn: Connection,
254 root: PathBuf,
255 hot_log: RefCell<Option<HotLog>>,
256 search_writer: RefCell<Option<crate::search::PendingWriter>>,
257 span_tree_cache: RefCell<Option<SpanTreeCacheEntry>>,
258 projector: RefCell<Projector>,
259}
260
261mod artifact_windows;
262mod evals;
263mod event_projector;
264mod event_read;
265mod event_write;
266mod events;
267mod experiment_windows;
268mod feedback;
269mod guidance;
270mod maintenance;
271mod metrics;
272mod outcomes;
273mod prompts;
274mod report_windows;
275mod reports;
276mod rows;
277mod samples;
278mod schema;
279mod session_read;
280mod session_window;
281mod sessions;
282mod sync;
283#[cfg(test)]
284mod tests;
285mod tool_span_sync;
286mod tool_spans;
287mod trace_spans;
288
289pub(super) fn now_ms() -> u64 {
290 std::time::SystemTime::now()
291 .duration_since(std::time::UNIX_EPOCH)
292 .unwrap_or_default()
293 .as_millis() as u64
294}
295
296impl Store {
297 pub(crate) fn conn(&self) -> &Connection {
298 &self.conn
299 }
300
301 pub fn open(path: &Path) -> Result<Self> {
302 Self::open_with_mode(path, StoreOpenMode::ReadWrite)
303 }
304
305 pub fn open_read_only(path: &Path) -> Result<Self> {
306 Self::open_with_mode(path, StoreOpenMode::ReadOnlyQuery)
307 }
308
309 pub fn open_query(path: &Path) -> Result<Self> {
310 Self::open_with_mode(path, StoreOpenMode::ReadOnlyQuery)
311 }
312
313 pub fn open_with_mode(path: &Path, mode: StoreOpenMode) -> Result<Self> {
314 if let Some(parent) = path.parent() {
315 std::fs::create_dir_all(parent)?;
316 }
317 let conn = match mode {
318 StoreOpenMode::ReadWrite => Connection::open(path),
319 StoreOpenMode::ReadOnlyQuery => Connection::open_with_flags(
320 path,
321 OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX,
322 ),
323 }
324 .with_context(|| format!("open db: {}", path.display()))?;
325 schema::apply_pragmas(&conn, mode)?;
326 if mode == StoreOpenMode::ReadWrite {
327 for sql in schema::MIGRATIONS {
328 conn.execute_batch(sql)?;
329 }
330 schema::ensure_schema_columns(&conn)?;
331 }
332 let store = Self {
333 conn,
334 root: path
335 .parent()
336 .unwrap_or_else(|| Path::new("."))
337 .to_path_buf(),
338 hot_log: RefCell::new(None),
339 search_writer: RefCell::new(None),
340 span_tree_cache: RefCell::new(None),
341 projector: RefCell::new(Projector::default()),
342 };
343 if mode == StoreOpenMode::ReadWrite {
344 store.warm_projector()?;
345 }
346 Ok(store)
347 }
348
349 pub(super) fn invalidate_span_tree_cache(&self) {
350 *self.span_tree_cache.borrow_mut() = None;
351 }
352
353 pub(super) fn warm_projector(&self) -> Result<()> {
354 let ids = self.running_session_ids()?;
355 let mut projector = self.projector.borrow_mut();
356 for id in ids {
357 for event in self.list_events_for_session(&id)? {
358 let _ = projector.apply(&event);
359 }
360 }
361 Ok(())
362 }
363
364 pub(super) fn outbox(&self) -> Result<Outbox> {
365 Outbox::open(&self.root)
366 }
367}
368
369impl Drop for Store {
370 fn drop(&mut self) {
371 if let Some(writer) = self.search_writer.get_mut().as_mut() {
372 let _ = writer.commit();
373 }
374 }
375}