Skip to main content

kaizen/sync/
smart.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! Sync shapes for tool spans and repo snapshots.
3
4use crate::core::config::try_team_salt;
5use crate::metrics::types::{FileFact, RepoEdge, RepoSnapshotRecord};
6use crate::retro::inputs::{scan_rule_files, scan_skill_files};
7use crate::store::{Store, sqlite::ToolSpanSyncRow};
8use crate::sync::context::SyncIngestContext;
9use crate::sync::outbound::{hash_with_salt, workspace_hash};
10use anyhow::Result;
11use serde::{Deserialize, Serialize};
12use std::path::Path;
13
14const SNAPSHOT_CHUNK: usize = 100;
15
16#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct ToolSpansBatchBody {
18    pub team_id: String,
19    pub workspace_hash: String,
20    #[serde(default, skip_serializing_if = "Option::is_none")]
21    pub project_name: Option<String>,
22    pub spans: Vec<OutboundToolSpan>,
23}
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct OutboundToolSpan {
27    pub session_id_hash: String,
28    pub span_id_hash: String,
29    #[serde(skip_serializing_if = "Option::is_none")]
30    pub tool: Option<String>,
31    pub status: String,
32    pub started_at_ms: Option<u64>,
33    pub ended_at_ms: Option<u64>,
34    pub lead_time_ms: Option<u64>,
35    pub tokens_in: Option<u32>,
36    pub tokens_out: Option<u32>,
37    pub reasoning_tokens: Option<u32>,
38    pub cost_usd_e6: Option<i64>,
39    pub path_hashes: Vec<String>,
40}
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
43pub struct RepoSnapshotsBatchBody {
44    pub team_id: String,
45    pub workspace_hash: String,
46    #[serde(default, skip_serializing_if = "Option::is_none")]
47    pub project_name: Option<String>,
48    pub snapshots: Vec<OutboundRepoSnapshotChunk>,
49}
50
51#[derive(Debug, Clone, Serialize, Deserialize)]
52pub struct OutboundRepoSnapshotChunk {
53    pub snapshot_id_hash: String,
54    pub commit_hash: Option<String>,
55    pub indexed_at_ms: u64,
56    pub dirty: bool,
57    pub chunk_index: u32,
58    pub chunk_total: u32,
59    pub file_facts: Vec<OutboundFileFact>,
60    pub edges: Vec<OutboundRepoEdge>,
61}
62
63#[derive(Debug, Clone, Serialize, Deserialize)]
64pub struct OutboundFileFact {
65    pub path_hash: String,
66    pub language: String,
67    pub bytes: u64,
68    pub loc: u32,
69    pub sloc: u32,
70    pub complexity_total: u32,
71    pub max_fn_complexity: u32,
72    pub symbol_count: u32,
73    pub import_count: u32,
74    pub fan_in: u32,
75    pub fan_out: u32,
76    pub churn_30d: u32,
77    pub churn_90d: u32,
78    pub authors_90d: u32,
79    pub last_changed_ms: Option<u64>,
80}
81
82#[derive(Debug, Clone, Serialize, Deserialize)]
83pub struct OutboundRepoEdge {
84    pub from_hash: String,
85    pub to_hash: String,
86    pub kind: String,
87    pub weight: u32,
88}
89
90/// One outbox row for `workspace_facts` (hashed slugs, no raw paths by default).
91#[derive(Debug, Clone, Serialize, Deserialize)]
92pub struct OutboundWorkspaceFactRow {
93    pub skill_slugs: Vec<String>,
94    pub rule_slugs: Vec<String>,
95}
96
97#[derive(Debug, Clone, Serialize, Deserialize)]
98pub struct WorkspaceFactsBatchBody {
99    pub team_id: String,
100    pub workspace_hash: String,
101    #[serde(default, skip_serializing_if = "Option::is_none")]
102    pub project_name: Option<String>,
103    pub facts: Vec<OutboundWorkspaceFactRow>,
104}
105
106pub fn enqueue_tool_spans_for_session(
107    store: &Store,
108    session_id: &str,
109    ctx: &SyncIngestContext,
110) -> Result<()> {
111    let Some(salt) = try_team_salt(&ctx.sync) else {
112        return Ok(());
113    };
114    let rows = store.tool_spans_for_session(session_id)?;
115    let payloads = rows
116        .iter()
117        .map(|row| serde_json::to_string(&outbound_tool_span(row, &salt)))
118        .collect::<serde_json::Result<Vec<_>>>()?;
119    store.replace_outbox_rows(session_id, "tool_spans", &payloads)
120}
121
122pub fn enqueue_repo_snapshot(
123    store: &Store,
124    snapshot: &RepoSnapshotRecord,
125    facts: &[FileFact],
126    edges: &[RepoEdge],
127    ctx: &SyncIngestContext,
128) -> Result<()> {
129    let Some(salt) = try_team_salt(&ctx.sync) else {
130        return Ok(());
131    };
132    let chunks = outbound_snapshot_chunks(snapshot, facts, edges, &salt);
133    let payloads = chunks
134        .iter()
135        .map(serde_json::to_string)
136        .collect::<serde_json::Result<Vec<_>>>()?;
137    store.replace_outbox_rows(&snapshot.id, "repo_snapshots", &payloads)
138}
139
140/// Enqueue one workspace-facts row (replaces pending `workspace_facts` for this workspace id).
141/// Slugs are Blake3+hex hashes so exports stay redacted without extra config.
142pub fn enqueue_workspace_fact_snapshot(
143    store: &Store,
144    workspace_root: &Path,
145    ctx: &SyncIngestContext,
146) -> Result<()> {
147    let Some(salt) = try_team_salt(&ctx.sync) else {
148        return Ok(());
149    };
150    let now_ms = std::time::SystemTime::now()
151        .duration_since(std::time::UNIX_EPOCH)
152        .unwrap_or_default()
153        .as_millis() as u64;
154    let skills = scan_skill_files(workspace_root, now_ms)?;
155    let rules = scan_rule_files(workspace_root, now_ms)?;
156    let skill_slugs: Vec<String> = skills
157        .iter()
158        .map(|s| hash_with_salt(&salt, format!("slug:{}", s.slug).as_bytes()))
159        .collect();
160    let rule_slugs: Vec<String> = rules
161        .iter()
162        .map(|r| hash_with_salt(&salt, format!("rule:{}", r.slug).as_bytes()))
163        .collect();
164    let row = OutboundWorkspaceFactRow {
165        skill_slugs,
166        rule_slugs,
167    };
168    let raw = serde_json::to_string(&row)?;
169    let key = match workspace_hash_for(ctx) {
170        Some(h) => format!("w:{h}"),
171        None => "workspace".into(),
172    };
173    store.replace_outbox_rows(&key, "workspace_facts", &[raw])
174}
175
176pub fn outbound_tool_span(row: &ToolSpanSyncRow, salt: &[u8; 32]) -> OutboundToolSpan {
177    OutboundToolSpan {
178        session_id_hash: hash_with_salt(salt, row.session_id.as_bytes()),
179        span_id_hash: hash_with_salt(salt, row.span_id.as_bytes()),
180        tool: row.tool.clone(),
181        status: row.status.clone(),
182        started_at_ms: row.started_at_ms,
183        ended_at_ms: row.ended_at_ms,
184        lead_time_ms: row.lead_time_ms,
185        tokens_in: row.tokens_in,
186        tokens_out: row.tokens_out,
187        reasoning_tokens: row.reasoning_tokens,
188        cost_usd_e6: row.cost_usd_e6,
189        path_hashes: row
190            .paths
191            .iter()
192            .map(|path| hash_with_salt(salt, format!("path:{path}").as_bytes()))
193            .collect(),
194    }
195}
196
197pub fn outbound_snapshot_chunks(
198    snapshot: &RepoSnapshotRecord,
199    facts: &[FileFact],
200    edges: &[RepoEdge],
201    salt: &[u8; 32],
202) -> Vec<OutboundRepoSnapshotChunk> {
203    let fact_chunks = facts.chunks(SNAPSHOT_CHUNK).collect::<Vec<_>>();
204    let edge_chunks = edges.chunks(SNAPSHOT_CHUNK).collect::<Vec<_>>();
205    let total = fact_chunks.len().max(edge_chunks.len()).max(1) as u32;
206    (0..total)
207        .map(|idx| OutboundRepoSnapshotChunk {
208            snapshot_id_hash: hash_with_salt(salt, snapshot.id.as_bytes()),
209            commit_hash: snapshot
210                .head_commit
211                .as_ref()
212                .map(|commit| hash_with_salt(salt, format!("commit:{commit}").as_bytes())),
213            indexed_at_ms: snapshot.indexed_at_ms,
214            dirty: snapshot.dirty,
215            chunk_index: idx,
216            chunk_total: total,
217            file_facts: fact_chunks
218                .get(idx as usize)
219                .map(|chunk| chunk.iter().map(|fact| outbound_fact(fact, salt)).collect())
220                .unwrap_or_default(),
221            edges: edge_chunks
222                .get(idx as usize)
223                .map(|chunk| chunk.iter().map(|edge| outbound_edge(edge, salt)).collect())
224                .unwrap_or_default(),
225        })
226        .collect()
227}
228
229pub fn workspace_hash_for(ctx: &SyncIngestContext) -> Option<String> {
230    let salt = try_team_salt(&ctx.sync)?;
231    Some(workspace_hash(&salt, ctx.workspace_root()))
232}
233
234fn outbound_fact(fact: &FileFact, salt: &[u8; 32]) -> OutboundFileFact {
235    OutboundFileFact {
236        path_hash: hash_with_salt(salt, format!("path:{}", fact.path).as_bytes()),
237        language: fact.language.clone(),
238        bytes: fact.bytes,
239        loc: fact.loc,
240        sloc: fact.sloc,
241        complexity_total: fact.complexity_total,
242        max_fn_complexity: fact.max_fn_complexity,
243        symbol_count: fact.symbol_count,
244        import_count: fact.import_count,
245        fan_in: fact.fan_in,
246        fan_out: fact.fan_out,
247        churn_30d: fact.churn_30d,
248        churn_90d: fact.churn_90d,
249        authors_90d: fact.authors_90d,
250        last_changed_ms: fact.last_changed_ms,
251    }
252}
253
254fn outbound_edge(edge: &RepoEdge, salt: &[u8; 32]) -> OutboundRepoEdge {
255    OutboundRepoEdge {
256        from_hash: hash_with_salt(salt, format!("graph:{}", edge.from_path).as_bytes()),
257        to_hash: hash_with_salt(salt, format!("graph:{}", edge.to_path).as_bytes()),
258        kind: edge.kind.clone(),
259        weight: edge.weight,
260    }
261}