Skip to main content

kaizen/sync/
smart.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! Sync shapes for tool spans and repo snapshots.
3
4use crate::core::config::try_team_salt;
5use crate::metrics::types::{FileFact, RepoEdge, RepoSnapshotRecord};
6use crate::retro::inputs::{scan_rule_files, scan_skill_files};
7use crate::store::{Store, sqlite::ToolSpanSyncRow};
8use crate::sync::context::SyncIngestContext;
9use crate::sync::outbound::{hash_with_salt, workspace_hash};
10use anyhow::Result;
11use serde::{Deserialize, Serialize};
12use std::path::Path;
13
14const SNAPSHOT_CHUNK: usize = 100;
15
16#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct ToolSpansBatchBody {
18    pub team_id: String,
19    pub workspace_hash: String,
20    pub spans: Vec<OutboundToolSpan>,
21}
22
23#[derive(Debug, Clone, Serialize, Deserialize)]
24pub struct OutboundToolSpan {
25    pub session_id_hash: String,
26    pub span_id_hash: String,
27    #[serde(skip_serializing_if = "Option::is_none")]
28    pub tool: Option<String>,
29    pub status: String,
30    pub started_at_ms: Option<u64>,
31    pub ended_at_ms: Option<u64>,
32    pub lead_time_ms: Option<u64>,
33    pub tokens_in: Option<u32>,
34    pub tokens_out: Option<u32>,
35    pub reasoning_tokens: Option<u32>,
36    pub cost_usd_e6: Option<i64>,
37    pub path_hashes: Vec<String>,
38}
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
41pub struct RepoSnapshotsBatchBody {
42    pub team_id: String,
43    pub workspace_hash: String,
44    pub snapshots: Vec<OutboundRepoSnapshotChunk>,
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
48pub struct OutboundRepoSnapshotChunk {
49    pub snapshot_id_hash: String,
50    pub commit_hash: Option<String>,
51    pub indexed_at_ms: u64,
52    pub dirty: bool,
53    pub chunk_index: u32,
54    pub chunk_total: u32,
55    pub file_facts: Vec<OutboundFileFact>,
56    pub edges: Vec<OutboundRepoEdge>,
57}
58
59#[derive(Debug, Clone, Serialize, Deserialize)]
60pub struct OutboundFileFact {
61    pub path_hash: String,
62    pub language: String,
63    pub bytes: u64,
64    pub loc: u32,
65    pub sloc: u32,
66    pub complexity_total: u32,
67    pub max_fn_complexity: u32,
68    pub symbol_count: u32,
69    pub import_count: u32,
70    pub fan_in: u32,
71    pub fan_out: u32,
72    pub churn_30d: u32,
73    pub churn_90d: u32,
74    pub authors_90d: u32,
75    pub last_changed_ms: Option<u64>,
76}
77
78#[derive(Debug, Clone, Serialize, Deserialize)]
79pub struct OutboundRepoEdge {
80    pub from_hash: String,
81    pub to_hash: String,
82    pub kind: String,
83    pub weight: u32,
84}
85
86/// One outbox row for `workspace_facts` (hashed slugs, no raw paths by default).
87#[derive(Debug, Clone, Serialize, Deserialize)]
88pub struct OutboundWorkspaceFactRow {
89    pub skill_slugs: Vec<String>,
90    pub rule_slugs: Vec<String>,
91}
92
93#[derive(Debug, Clone, Serialize, Deserialize)]
94pub struct WorkspaceFactsBatchBody {
95    pub team_id: String,
96    pub workspace_hash: String,
97    pub facts: Vec<OutboundWorkspaceFactRow>,
98}
99
100pub fn enqueue_tool_spans_for_session(
101    store: &Store,
102    session_id: &str,
103    ctx: &SyncIngestContext,
104) -> Result<()> {
105    let Some(salt) = try_team_salt(&ctx.sync) else {
106        return Ok(());
107    };
108    let rows = store.tool_spans_for_session(session_id)?;
109    let payloads = rows
110        .iter()
111        .map(|row| serde_json::to_string(&outbound_tool_span(row, &salt)))
112        .collect::<serde_json::Result<Vec<_>>>()?;
113    store.replace_outbox_rows(session_id, "tool_spans", &payloads)
114}
115
116pub fn enqueue_repo_snapshot(
117    store: &Store,
118    snapshot: &RepoSnapshotRecord,
119    facts: &[FileFact],
120    edges: &[RepoEdge],
121    ctx: &SyncIngestContext,
122) -> Result<()> {
123    let Some(salt) = try_team_salt(&ctx.sync) else {
124        return Ok(());
125    };
126    let chunks = outbound_snapshot_chunks(snapshot, facts, edges, &salt);
127    let payloads = chunks
128        .iter()
129        .map(serde_json::to_string)
130        .collect::<serde_json::Result<Vec<_>>>()?;
131    store.replace_outbox_rows(&snapshot.id, "repo_snapshots", &payloads)
132}
133
134/// Enqueue one workspace-facts row (replaces pending `workspace_facts` for this workspace id).
135/// Slugs are Blake3+hex hashes so exports stay redacted without extra config.
136pub fn enqueue_workspace_fact_snapshot(
137    store: &Store,
138    workspace_root: &Path,
139    ctx: &SyncIngestContext,
140) -> Result<()> {
141    let Some(salt) = try_team_salt(&ctx.sync) else {
142        return Ok(());
143    };
144    let now_ms = std::time::SystemTime::now()
145        .duration_since(std::time::UNIX_EPOCH)
146        .unwrap_or_default()
147        .as_millis() as u64;
148    let skills = scan_skill_files(workspace_root, now_ms)?;
149    let rules = scan_rule_files(workspace_root, now_ms)?;
150    let skill_slugs: Vec<String> = skills
151        .iter()
152        .map(|s| hash_with_salt(&salt, format!("slug:{}", s.slug).as_bytes()))
153        .collect();
154    let rule_slugs: Vec<String> = rules
155        .iter()
156        .map(|r| hash_with_salt(&salt, format!("rule:{}", r.slug).as_bytes()))
157        .collect();
158    let row = OutboundWorkspaceFactRow {
159        skill_slugs,
160        rule_slugs,
161    };
162    let raw = serde_json::to_string(&row)?;
163    let key = match workspace_hash_for(ctx) {
164        Some(h) => format!("w:{h}"),
165        None => "workspace".into(),
166    };
167    store.replace_outbox_rows(&key, "workspace_facts", &[raw])
168}
169
170pub fn outbound_tool_span(row: &ToolSpanSyncRow, salt: &[u8; 32]) -> OutboundToolSpan {
171    OutboundToolSpan {
172        session_id_hash: hash_with_salt(salt, row.session_id.as_bytes()),
173        span_id_hash: hash_with_salt(salt, row.span_id.as_bytes()),
174        tool: row.tool.clone(),
175        status: row.status.clone(),
176        started_at_ms: row.started_at_ms,
177        ended_at_ms: row.ended_at_ms,
178        lead_time_ms: row.lead_time_ms,
179        tokens_in: row.tokens_in,
180        tokens_out: row.tokens_out,
181        reasoning_tokens: row.reasoning_tokens,
182        cost_usd_e6: row.cost_usd_e6,
183        path_hashes: row
184            .paths
185            .iter()
186            .map(|path| hash_with_salt(salt, format!("path:{path}").as_bytes()))
187            .collect(),
188    }
189}
190
191pub fn outbound_snapshot_chunks(
192    snapshot: &RepoSnapshotRecord,
193    facts: &[FileFact],
194    edges: &[RepoEdge],
195    salt: &[u8; 32],
196) -> Vec<OutboundRepoSnapshotChunk> {
197    let fact_chunks = facts.chunks(SNAPSHOT_CHUNK).collect::<Vec<_>>();
198    let edge_chunks = edges.chunks(SNAPSHOT_CHUNK).collect::<Vec<_>>();
199    let total = fact_chunks.len().max(edge_chunks.len()).max(1) as u32;
200    (0..total)
201        .map(|idx| OutboundRepoSnapshotChunk {
202            snapshot_id_hash: hash_with_salt(salt, snapshot.id.as_bytes()),
203            commit_hash: snapshot
204                .head_commit
205                .as_ref()
206                .map(|commit| hash_with_salt(salt, format!("commit:{commit}").as_bytes())),
207            indexed_at_ms: snapshot.indexed_at_ms,
208            dirty: snapshot.dirty,
209            chunk_index: idx,
210            chunk_total: total,
211            file_facts: fact_chunks
212                .get(idx as usize)
213                .map(|chunk| chunk.iter().map(|fact| outbound_fact(fact, salt)).collect())
214                .unwrap_or_default(),
215            edges: edge_chunks
216                .get(idx as usize)
217                .map(|chunk| chunk.iter().map(|edge| outbound_edge(edge, salt)).collect())
218                .unwrap_or_default(),
219        })
220        .collect()
221}
222
223pub fn workspace_hash_for(ctx: &SyncIngestContext) -> Option<String> {
224    let salt = try_team_salt(&ctx.sync)?;
225    Some(workspace_hash(&salt, ctx.workspace_root()))
226}
227
228fn outbound_fact(fact: &FileFact, salt: &[u8; 32]) -> OutboundFileFact {
229    OutboundFileFact {
230        path_hash: hash_with_salt(salt, format!("path:{}", fact.path).as_bytes()),
231        language: fact.language.clone(),
232        bytes: fact.bytes,
233        loc: fact.loc,
234        sloc: fact.sloc,
235        complexity_total: fact.complexity_total,
236        max_fn_complexity: fact.max_fn_complexity,
237        symbol_count: fact.symbol_count,
238        import_count: fact.import_count,
239        fan_in: fact.fan_in,
240        fan_out: fact.fan_out,
241        churn_30d: fact.churn_30d,
242        churn_90d: fact.churn_90d,
243        authors_90d: fact.authors_90d,
244        last_changed_ms: fact.last_changed_ms,
245    }
246}
247
248fn outbound_edge(edge: &RepoEdge, salt: &[u8; 32]) -> OutboundRepoEdge {
249    OutboundRepoEdge {
250        from_hash: hash_with_salt(salt, format!("graph:{}", edge.from_path).as_bytes()),
251        to_hash: hash_with_salt(salt, format!("graph:{}", edge.to_path).as_bytes()),
252        kind: edge.kind.clone(),
253        weight: edge.weight,
254    }
255}