1use crate::core::identity::ActorIdentity;
6use crate::sync::outbound::OutboundEvent;
7use crate::sync::smart::{OutboundRepoSnapshotChunk, OutboundToolSpan};
8use serde::{Deserialize, Serialize};
9
10pub const KAIZEN_SCHEMA_VERSION: u32 = 1;
12
13#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
15pub struct CanonicalEnvelope {
16 pub kaizen_schema_version: u32,
17 pub team_id: String,
18 pub workspace_hash: String,
19 #[serde(skip_serializing_if = "Option::is_none")]
20 pub identity: Option<ActorIdentity>,
21}
22
23#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
25#[serde(rename_all = "snake_case")]
26pub enum CanonicalEventName {
27 Event,
29 ToolSpan,
31 RepoSnapshotChunk,
33 WorkspaceFactSnapshot,
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize)]
39pub struct EventItem {
40 pub envelope: CanonicalEnvelope,
41 pub name: CanonicalEventName,
42 pub event: OutboundEvent,
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct ToolSpanItem {
48 pub envelope: CanonicalEnvelope,
49 pub name: CanonicalEventName,
50 pub span: OutboundToolSpan,
51}
52
53#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct RepoSnapshotChunkItem {
56 pub envelope: CanonicalEnvelope,
57 pub name: CanonicalEventName,
58 pub chunk: OutboundRepoSnapshotChunk,
59}
60
61#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
63pub struct WorkspaceFactSnapshotItem {
64 pub skill_slugs: Vec<String>,
66 pub rule_slugs: Vec<String>,
67}
68
69#[derive(Debug, Clone, Serialize, Deserialize)]
71#[serde(tag = "kind", rename_all = "snake_case")]
72pub enum CanonicalItem {
73 Event(EventItem),
74 ToolSpan(ToolSpanItem),
75 RepoSnapshotChunk(RepoSnapshotChunkItem),
76 WorkspaceFactSnapshot {
78 envelope: CanonicalEnvelope,
79 name: CanonicalEventName,
80 payload: WorkspaceFactSnapshotItem,
81 },
82}
83
84pub fn expand_ingest_batch(batch: &crate::sync::IngestExportBatch) -> Vec<CanonicalItem> {
86 use crate::sync::IngestExportBatch;
87 let mut out: Vec<CanonicalItem> = Vec::new();
88 match batch {
89 IngestExportBatch::Events(b) => {
90 let env = canonical_envelope(&b.team_id, &b.workspace_hash, b.project_name.as_deref());
91 for e in &b.events {
92 out.push(CanonicalItem::Event(EventItem {
93 envelope: env.clone(),
94 name: CanonicalEventName::Event,
95 event: e.clone(),
96 }));
97 }
98 }
99 IngestExportBatch::ToolSpans(b) => {
100 let env = canonical_envelope(&b.team_id, &b.workspace_hash, b.project_name.as_deref());
101 for span in &b.spans {
102 out.push(CanonicalItem::ToolSpan(ToolSpanItem {
103 envelope: env.clone(),
104 name: CanonicalEventName::ToolSpan,
105 span: span.clone(),
106 }));
107 }
108 }
109 IngestExportBatch::RepoSnapshots(b) => {
110 let env = canonical_envelope(&b.team_id, &b.workspace_hash, b.project_name.as_deref());
111 for chunk in &b.snapshots {
112 out.push(CanonicalItem::RepoSnapshotChunk(RepoSnapshotChunkItem {
113 envelope: env.clone(),
114 name: CanonicalEventName::RepoSnapshotChunk,
115 chunk: chunk.clone(),
116 }));
117 }
118 }
119 IngestExportBatch::WorkspaceFacts(b) => {
120 let env = canonical_envelope(&b.team_id, &b.workspace_hash, b.project_name.as_deref());
121 for row in &b.facts {
122 out.push(CanonicalItem::WorkspaceFactSnapshot {
123 envelope: env.clone(),
124 name: CanonicalEventName::WorkspaceFactSnapshot,
125 payload: WorkspaceFactSnapshotItem {
126 skill_slugs: row.skill_slugs.clone(),
127 rule_slugs: row.rule_slugs.clone(),
128 },
129 });
130 }
131 }
132 IngestExportBatch::SessionEvals(_) => {}
133 IngestExportBatch::SessionFeedback(_) => {}
134 }
135 out
136}
137
138impl CanonicalItem {
139 pub fn telemetry_kind(&self) -> &'static str {
141 match self {
142 CanonicalItem::Event(_) => "kaizen.event",
143 CanonicalItem::ToolSpan(_) => "kaizen.tool_span",
144 CanonicalItem::RepoSnapshotChunk(_) => "kaizen.repo_snapshot_chunk",
145 CanonicalItem::WorkspaceFactSnapshot { .. } => "kaizen.workspace_fact_snapshot",
146 }
147 }
148
149 pub fn envelope_kaizen_schema_version(&self) -> Option<u32> {
151 match self {
152 CanonicalItem::Event(i) => Some(i.envelope.kaizen_schema_version),
153 CanonicalItem::ToolSpan(i) => Some(i.envelope.kaizen_schema_version),
154 CanonicalItem::RepoSnapshotChunk(i) => Some(i.envelope.kaizen_schema_version),
155 CanonicalItem::WorkspaceFactSnapshot { envelope, .. } => {
156 Some(envelope.kaizen_schema_version)
157 }
158 }
159 }
160}
161
162fn canonical_envelope(
163 team_id: &str,
164 workspace_hash: &str,
165 project_name: Option<&str>,
166) -> CanonicalEnvelope {
167 CanonicalEnvelope {
168 kaizen_schema_version: KAIZEN_SCHEMA_VERSION,
169 team_id: team_id.to_string(),
170 workspace_hash: workspace_hash.to_string(),
171 identity: project_name.map(project_identity),
172 }
173}
174
175fn project_identity(project_name: &str) -> ActorIdentity {
176 ActorIdentity {
177 workspace_label: Some(project_name.to_string()),
178 ..Default::default()
179 }
180}
181
182#[cfg(test)]
183mod tests {
184 use super::*;
185 use crate::sync::IngestExportBatch;
186 use crate::sync::outbound::EventsBatchBody;
187 use crate::sync::smart::{OutboundToolSpan, ToolSpansBatchBody};
188
189 #[test]
190 fn expand_events_one_per_row() {
191 let b = IngestExportBatch::Events(EventsBatchBody {
192 team_id: "t1".into(),
193 workspace_hash: "wh".into(),
194 project_name: Some("kaizen".into()),
195 events: vec![
196 OutboundEvent {
197 session_id_hash: "s1".into(),
198 event_seq: 0,
199 ts_ms: 1,
200 agent: "a".into(),
201 model: "m".into(),
202 kind: "message".into(),
203 source: "hook".into(),
204 tool: None,
205 tool_call_id: None,
206 tokens_in: None,
207 tokens_out: None,
208 reasoning_tokens: None,
209 cost_usd_e6: None,
210 payload: serde_json::json!({}),
211 },
212 OutboundEvent {
213 session_id_hash: "s1".into(),
214 event_seq: 1,
215 ts_ms: 2,
216 agent: "a".into(),
217 model: "m".into(),
218 kind: "message".into(),
219 source: "hook".into(),
220 tool: None,
221 tool_call_id: None,
222 tokens_in: None,
223 tokens_out: None,
224 reasoning_tokens: None,
225 cost_usd_e6: None,
226 payload: serde_json::json!({}),
227 },
228 ],
229 });
230 let v = expand_ingest_batch(&b);
231 assert_eq!(v.len(), 2);
232 assert_eq!(
233 v[0].envelope_kaizen_schema_version().unwrap(),
234 KAIZEN_SCHEMA_VERSION
235 );
236 let CanonicalItem::Event(item) = &v[0] else {
237 panic!("expected event");
238 };
239 assert_eq!(
240 item.envelope
241 .identity
242 .as_ref()
243 .and_then(|i| i.workspace_label.as_deref()),
244 Some("kaizen")
245 );
246 }
247
248 #[test]
249 fn expand_tool_spans_n_items() {
250 let b = IngestExportBatch::ToolSpans(ToolSpansBatchBody {
251 team_id: "t".into(),
252 workspace_hash: "w".into(),
253 project_name: None,
254 spans: vec![OutboundToolSpan {
255 session_id_hash: "sh".into(),
256 span_id_hash: "ph".into(),
257 tool: None,
258 status: "ok".into(),
259 started_at_ms: None,
260 ended_at_ms: None,
261 lead_time_ms: None,
262 tokens_in: None,
263 tokens_out: None,
264 reasoning_tokens: None,
265 cost_usd_e6: None,
266 path_hashes: vec![],
267 }],
268 });
269 let v = expand_ingest_batch(&b);
270 assert_eq!(v.len(), 1);
271 assert!(matches!(v[0], CanonicalItem::ToolSpan(_)));
272 }
273
274 #[test]
275 fn expand_workspace_facts_one_per_row() {
276 use crate::sync::smart::{OutboundWorkspaceFactRow, WorkspaceFactsBatchBody};
277 let b = IngestExportBatch::WorkspaceFacts(WorkspaceFactsBatchBody {
278 team_id: "t".into(),
279 workspace_hash: "w".into(),
280 project_name: None,
281 facts: vec![OutboundWorkspaceFactRow {
282 skill_slugs: vec!["a".into()],
283 rule_slugs: vec!["b".into()],
284 }],
285 });
286 let v = expand_ingest_batch(&b);
287 assert_eq!(v.len(), 1);
288 assert!(matches!(v[0], CanonicalItem::WorkspaceFactSnapshot { .. }));
289 }
290}