1use crate::core::config::try_team_salt;
5use crate::core::event::{Event, EventKind, EventSource, SessionRecord, SessionStatus};
6use crate::core::trace_span::{TraceSpanKind, TraceSpanRecord};
7use crate::metrics::types::{
8 FileFact, RankedFile, RankedTool, RepoEdge, RepoSnapshotRecord, ToolSpanView,
9};
10use crate::store::event_index::index_event_derived;
11use crate::store::projector::{DEFAULT_ORPHAN_TTL_MS, Projector, ProjectorEvent};
12use crate::store::tool_span_index::{
13 clear_session_spans, rebuild_tool_spans_for_session, upsert_tool_span_record,
14};
15use crate::store::{hot_log::HotLog, outbox_redb::Outbox};
16use crate::sync::context::SyncIngestContext;
17use crate::sync::outbound::outbound_event_from_row;
18use crate::sync::redact::redact_payload;
19use crate::sync::smart::enqueue_tool_spans_for_session;
20use anyhow::{Context, Result};
21use rusqlite::types::Value;
22use rusqlite::{
23 Connection, OpenFlags, OptionalExtension, TransactionBehavior, params, params_from_iter,
24};
25use std::cell::RefCell;
26use std::collections::{HashMap, HashSet};
27use std::path::{Path, PathBuf};
28
29pub(super) const SYNTHETIC_TS_CEILING_MS: i64 = 1_000_000_000_000;
32pub(super) const DEFAULT_MMAP_MB: u64 = 256;
33pub(super) const SESSION_SELECT: &str =
34 "SELECT id, agent, model, workspace, started_at_ms, ended_at_ms,
35 status, trace_path, start_commit, end_commit, branch, dirty_start, dirty_end,
36 repo_binding_source, prompt_fingerprint, parent_session_id, agent_version, os, arch,
37 repo_file_count, repo_total_loc FROM sessions";
38pub(super) const PAIN_HOTSPOTS_SQL: &str = "
39 SELECT f.path,
40 COUNT(s.id) * f.complexity_total AS value,
41 f.complexity_total,
42 f.churn_30d
43 FROM file_facts f
44 LEFT JOIN tool_span_paths tsp ON tsp.path = f.path
45 LEFT JOIN tool_spans ts ON ts.span_id = tsp.span_id
46 AND ((ts.started_at_ms >= ?3 AND ts.started_at_ms <= ?4)
47 OR (ts.started_at_ms IS NULL AND ts.ended_at_ms >= ?3 AND ts.ended_at_ms <= ?4))
48 LEFT JOIN sessions s ON s.id = ts.session_id AND s.workspace = ?2
49 WHERE f.snapshot_id = ?1
50 GROUP BY f.path, f.complexity_total, f.churn_30d
51 ORDER BY value DESC, f.path ASC
52 LIMIT 10";
53pub(super) const TOOL_RANK_ROWS_SQL: &str = "
54 WITH scoped AS (
55 SELECT COALESCE(ts.tool, 'unknown') AS tool,
56 ts.lead_time_ms,
57 COALESCE(ts.tokens_in, 0) + COALESCE(ts.tokens_out, 0)
58 + COALESCE(ts.reasoning_tokens, 0) AS total_tokens,
59 COALESCE(ts.reasoning_tokens, 0) AS reasoning_tokens
60 FROM tool_spans ts
61 JOIN sessions s ON s.id = ts.session_id
62 WHERE s.workspace = ?1
63 AND ((ts.started_at_ms >= ?2 AND ts.started_at_ms <= ?3)
64 OR (ts.started_at_ms IS NULL AND ts.ended_at_ms >= ?2 AND ts.ended_at_ms <= ?3))
65 ),
66 agg AS (
67 SELECT tool, COUNT(*) AS calls, SUM(total_tokens) AS total_tokens,
68 SUM(reasoning_tokens) AS total_reasoning_tokens
69 FROM scoped GROUP BY tool
70 ),
71 lat AS (
72 SELECT tool, lead_time_ms,
73 ROW_NUMBER() OVER (PARTITION BY tool ORDER BY lead_time_ms) AS rn,
74 COUNT(*) OVER (PARTITION BY tool) AS n
75 FROM scoped WHERE lead_time_ms IS NOT NULL
76 ),
77 pct AS (
78 SELECT tool,
79 MAX(CASE WHEN rn = CAST(((n - 1) * 50) / 100 AS INTEGER) + 1 THEN lead_time_ms END) AS p50_ms,
80 MAX(CASE WHEN rn = CAST(((n - 1) * 95) / 100 AS INTEGER) + 1 THEN lead_time_ms END) AS p95_ms
81 FROM lat GROUP BY tool
82 )
83 SELECT agg.tool, agg.calls, pct.p50_ms, pct.p95_ms,
84 agg.total_tokens, agg.total_reasoning_tokens
85 FROM agg LEFT JOIN pct ON pct.tool = agg.tool";
86
87#[derive(Clone)]
89pub struct InsightsStats {
90 pub total_sessions: u64,
91 pub running_sessions: u64,
92 pub total_events: u64,
93 pub sessions_by_day: Vec<(String, u64)>,
95 pub recent: Vec<(SessionRecord, u64)>,
97 pub top_tools: Vec<(String, u64)>,
99 pub total_cost_usd_e6: i64,
100 pub sessions_with_cost: u64,
101}
102
103pub struct SyncStatusSnapshot {
105 pub pending_outbox: u64,
106 pub last_success_ms: Option<u64>,
107 pub last_error: Option<String>,
108 pub consecutive_failures: u32,
109}
110
111#[derive(serde::Serialize)]
113pub struct SummaryStats {
114 pub session_count: u64,
115 pub total_cost_usd_e6: i64,
116 pub by_agent: Vec<(String, u64)>,
117 pub by_model: Vec<(String, u64)>,
118 pub top_tools: Vec<(String, u64)>,
119}
120
121#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, serde::Serialize)]
123#[serde(rename_all = "lowercase")]
124pub enum GuidanceKind {
125 Skill,
126 Rule,
127}
128
129#[derive(Clone, Debug, serde::Serialize)]
131pub struct GuidancePerfRow {
132 pub kind: GuidanceKind,
133 pub id: String,
134 pub sessions: u64,
135 pub sessions_pct: f64,
136 pub total_cost_usd_e6: i64,
137 pub avg_cost_per_session_usd: Option<f64>,
138 pub vs_workspace_avg_cost_per_session_usd: Option<f64>,
139 pub on_disk: bool,
140}
141
142#[derive(Clone, Debug, serde::Serialize)]
144pub struct GuidanceReport {
145 pub workspace: String,
146 pub window_start_ms: u64,
147 pub window_end_ms: u64,
148 pub sessions_in_window: u64,
149 pub workspace_avg_cost_per_session_usd: Option<f64>,
150 pub rows: Vec<GuidancePerfRow>,
151}
152
153#[derive(Debug, Clone, Copy, Default, Eq, PartialEq)]
155pub struct PruneStats {
156 pub sessions_removed: u64,
157 pub events_removed: u64,
158}
159
160#[derive(Debug, Clone, Eq, PartialEq)]
162pub struct SessionOutcomeRow {
163 pub session_id: String,
164 pub test_passed: Option<i64>,
165 pub test_failed: Option<i64>,
166 pub test_skipped: Option<i64>,
167 pub build_ok: Option<bool>,
168 pub lint_errors: Option<i64>,
169 pub revert_lines_14d: Option<i64>,
170 pub pr_open: Option<i64>,
171 pub ci_ok: Option<bool>,
172 pub measured_at_ms: u64,
173 pub measure_error: Option<String>,
174}
175
176#[derive(Debug, Clone)]
178pub struct SessionSampleAgg {
179 pub session_id: String,
180 pub sample_count: u64,
181 pub max_cpu_percent: f64,
182 pub max_rss_bytes: u64,
183}
184
185pub const SYNC_STATE_LAST_AGENT_SCAN_MS: &str = "last_agent_scan_ms";
187pub const SYNC_STATE_LAST_AUTO_PRUNE_MS: &str = "last_auto_prune_ms";
188pub const SYNC_STATE_SEARCH_DIRTY_MS: &str = "search_dirty_ms";
189
190pub struct ToolSpanSyncRow {
191 pub span_id: String,
192 pub session_id: String,
193 pub tool: Option<String>,
194 pub tool_call_id: Option<String>,
195 pub status: String,
196 pub started_at_ms: Option<u64>,
197 pub ended_at_ms: Option<u64>,
198 pub lead_time_ms: Option<u64>,
199 pub tokens_in: Option<u32>,
200 pub tokens_out: Option<u32>,
201 pub reasoning_tokens: Option<u32>,
202 pub cost_usd_e6: Option<i64>,
203 pub paths: Vec<String>,
204}
205
206pub(crate) struct CaptureQualityRow {
207 pub source: String,
208 pub has_tokens: bool,
209 pub has_cost: bool,
210 pub has_latency: bool,
211 pub has_context: bool,
212 pub cache_read_tokens: u64,
213 pub cache_creation_tokens: u64,
214}
215
216pub(crate) struct TraceSpanQualityRow {
217 pub kind: String,
218 pub is_orphan: bool,
219}
220
221#[derive(Debug, Clone, Copy, Eq, PartialEq)]
222pub enum StoreOpenMode {
223 ReadWrite,
224 ReadOnlyQuery,
225}
226
227#[derive(Debug, Clone)]
228pub struct SessionStatusRow {
229 pub id: String,
230 pub status: SessionStatus,
231 pub ended_at_ms: Option<u64>,
232}
233
234#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
235pub struct SessionFilter {
236 pub agent_prefix: Option<String>,
237 pub status: Option<SessionStatus>,
238 pub since_ms: Option<u64>,
239}
240
241#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
242pub struct SessionPage {
243 pub rows: Vec<SessionRecord>,
244 pub total: usize,
245 pub next_offset: Option<usize>,
246}
247
248#[derive(Clone)]
249struct SpanTreeCacheEntry {
250 session_id: String,
251 last_event_seq: Option<u64>,
252 nodes: Vec<crate::store::span_tree::SpanNode>,
253}
254
255pub struct Store {
256 conn: Connection,
257 root: PathBuf,
258 hot_log: RefCell<Option<HotLog>>,
259 search_writer: RefCell<Option<crate::search::PendingWriter>>,
260 span_tree_cache: RefCell<Option<SpanTreeCacheEntry>>,
261 projector: RefCell<Projector>,
262}
263
264mod artifact_windows;
265mod evals;
266mod event_projector;
267mod event_read;
268mod event_write;
269mod events;
270mod experiment_windows;
271mod feedback;
272mod guidance;
273mod guidance_candidates;
274mod maintenance;
275mod metrics;
276mod outcomes;
277mod prompts;
278mod report_windows;
279mod reports;
280mod rows;
281mod samples;
282mod schema;
283mod session_read;
284mod session_window;
285mod sessions;
286mod sync;
287#[cfg(test)]
288mod tests;
289mod tool_span_sync;
290mod tool_spans;
291mod trace_spans;
292mod visualization;
293
294pub(super) fn now_ms() -> u64 {
295 std::time::SystemTime::now()
296 .duration_since(std::time::UNIX_EPOCH)
297 .unwrap_or_default()
298 .as_millis() as u64
299}
300
301impl Store {
302 pub(crate) fn conn(&self) -> &Connection {
303 &self.conn
304 }
305
306 pub fn open(path: &Path) -> Result<Self> {
307 Self::open_with_mode(path, StoreOpenMode::ReadWrite)
308 }
309
310 pub fn open_read_only(path: &Path) -> Result<Self> {
311 Self::open_with_mode(path, StoreOpenMode::ReadOnlyQuery)
312 }
313
314 pub fn open_query(path: &Path) -> Result<Self> {
315 Self::open_with_mode(path, StoreOpenMode::ReadOnlyQuery)
316 }
317
318 pub fn open_with_mode(path: &Path, mode: StoreOpenMode) -> Result<Self> {
319 if let Some(parent) = path.parent() {
320 std::fs::create_dir_all(parent)?;
321 }
322 let conn = match mode {
323 StoreOpenMode::ReadWrite => Connection::open(path),
324 StoreOpenMode::ReadOnlyQuery => Connection::open_with_flags(
325 path,
326 OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX,
327 ),
328 }
329 .with_context(|| format!("open db: {}", path.display()))?;
330 schema::apply_pragmas(&conn, mode)?;
331 if mode == StoreOpenMode::ReadWrite {
332 for sql in schema::MIGRATIONS {
333 conn.execute_batch(sql)?;
334 }
335 schema::ensure_schema_columns(&conn)?;
336 }
337 let store = Self {
338 conn,
339 root: path
340 .parent()
341 .unwrap_or_else(|| Path::new("."))
342 .to_path_buf(),
343 hot_log: RefCell::new(None),
344 search_writer: RefCell::new(None),
345 span_tree_cache: RefCell::new(None),
346 projector: RefCell::new(Projector::default()),
347 };
348 if mode == StoreOpenMode::ReadWrite {
349 store.warm_projector()?;
350 }
351 Ok(store)
352 }
353
354 pub(super) fn invalidate_span_tree_cache(&self) {
355 *self.span_tree_cache.borrow_mut() = None;
356 }
357
358 pub(super) fn warm_projector(&self) -> Result<()> {
359 let ids = self.running_session_ids()?;
360 let mut projector = self.projector.borrow_mut();
361 for id in ids {
362 for event in self.list_events_for_session(&id)? {
363 let _ = projector.apply(&event);
364 }
365 }
366 Ok(())
367 }
368
369 pub(super) fn outbox(&self) -> Result<Outbox> {
370 Outbox::open(&self.root)
371 }
372}
373
374impl Drop for Store {
375 fn drop(&mut self) {
376 if let Some(writer) = self.search_writer.get_mut().as_mut() {
377 let _ = writer.commit();
378 }
379 }
380}