Skip to main content

kaizen/sync/
canonical.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! Per-item canonical telemetry payloads: expand [`IngestExportBatch`](crate::sync::IngestExportBatch)
3//! for exporters, future provider pull, and goldens. Primary POST / outbox stay batch-oriented.
4
5use crate::core::identity::ActorIdentity;
6use crate::sync::outbound::OutboundEvent;
7use crate::sync::smart::{OutboundRepoSnapshotChunk, OutboundToolSpan};
8use serde::{Deserialize, Serialize};
9
10/// Forward evolution marker on exported or pulled items (read old payloads only in migration tools).
11pub const KAIZEN_SCHEMA_VERSION: u32 = 1;
12
13/// Shared context on every expanded item; identity is `None` until session/workspace wiring fills it.
14#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
15pub struct CanonicalEnvelope {
16    pub kaizen_schema_version: u32,
17    pub team_id: String,
18    pub workspace_hash: String,
19    #[serde(skip_serializing_if = "Option::is_none")]
20    pub identity: Option<ActorIdentity>,
21}
22
23/// One logical event name for third-party and docs (`print-schema` in a later phase).
24#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
25#[serde(rename_all = "snake_case")]
26pub enum CanonicalEventName {
27    /// One outbound event row.
28    Event,
29    /// One tool span.
30    ToolSpan,
31    /// One repo graph snapshot chunk.
32    RepoSnapshotChunk,
33    /// Skills / rules / workspace metadata (Phase 6 producer).
34    WorkspaceFactSnapshot,
35}
36
37/// Fully expanded item for a single `OutboundEvent` row.
38#[derive(Debug, Clone, Serialize, Deserialize)]
39pub struct EventItem {
40    pub envelope: CanonicalEnvelope,
41    pub name: CanonicalEventName,
42    pub event: OutboundEvent,
43}
44
45/// One tool span with batch context.
46#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct ToolSpanItem {
48    pub envelope: CanonicalEnvelope,
49    pub name: CanonicalEventName,
50    pub span: OutboundToolSpan,
51}
52
53/// One repo snapshot chunk with batch context.
54#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct RepoSnapshotChunkItem {
56    pub envelope: CanonicalEnvelope,
57    pub name: CanonicalEventName,
58    pub chunk: OutboundRepoSnapshotChunk,
59}
60
61/// Workspace-level facts (hashed skill/rule slugs from `.cursor/skills` and `.cursor/rules` discovery).
62#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
63pub struct WorkspaceFactSnapshotItem {
64    /// Redacted or hashed slugs / labels only by default.
65    pub skill_slugs: Vec<String>,
66    pub rule_slugs: Vec<String>,
67}
68
69/// Union of all canonical item shapes for `expand_ingest_batch` and mappers.
70#[derive(Debug, Clone, Serialize, Deserialize)]
71#[serde(tag = "kind", rename_all = "snake_case")]
72pub enum CanonicalItem {
73    Event(EventItem),
74    ToolSpan(ToolSpanItem),
75    RepoSnapshotChunk(RepoSnapshotChunkItem),
76    /// Populated in Phase 6; expand does not emit this from ingest batches.
77    WorkspaceFactSnapshot {
78        envelope: CanonicalEnvelope,
79        name: CanonicalEventName,
80        payload: WorkspaceFactSnapshotItem,
81    },
82}
83
84/// Expand a redacted ingest batch to one struct per event/span/chunk; never drops rows.
85pub fn expand_ingest_batch(batch: &crate::sync::IngestExportBatch) -> Vec<CanonicalItem> {
86    use crate::sync::IngestExportBatch;
87    let mut out: Vec<CanonicalItem> = Vec::new();
88    match batch {
89        IngestExportBatch::Events(b) => {
90            let env = canonical_envelope(&b.team_id, &b.workspace_hash, b.project_name.as_deref());
91            for e in &b.events {
92                out.push(CanonicalItem::Event(EventItem {
93                    envelope: env.clone(),
94                    name: CanonicalEventName::Event,
95                    event: e.clone(),
96                }));
97            }
98        }
99        IngestExportBatch::ToolSpans(b) => {
100            let env = canonical_envelope(&b.team_id, &b.workspace_hash, b.project_name.as_deref());
101            for span in &b.spans {
102                out.push(CanonicalItem::ToolSpan(ToolSpanItem {
103                    envelope: env.clone(),
104                    name: CanonicalEventName::ToolSpan,
105                    span: span.clone(),
106                }));
107            }
108        }
109        IngestExportBatch::RepoSnapshots(b) => {
110            let env = canonical_envelope(&b.team_id, &b.workspace_hash, b.project_name.as_deref());
111            for chunk in &b.snapshots {
112                out.push(CanonicalItem::RepoSnapshotChunk(RepoSnapshotChunkItem {
113                    envelope: env.clone(),
114                    name: CanonicalEventName::RepoSnapshotChunk,
115                    chunk: chunk.clone(),
116                }));
117            }
118        }
119        IngestExportBatch::WorkspaceFacts(b) => {
120            let env = canonical_envelope(&b.team_id, &b.workspace_hash, b.project_name.as_deref());
121            for row in &b.facts {
122                out.push(CanonicalItem::WorkspaceFactSnapshot {
123                    envelope: env.clone(),
124                    name: CanonicalEventName::WorkspaceFactSnapshot,
125                    payload: WorkspaceFactSnapshotItem {
126                        skill_slugs: row.skill_slugs.clone(),
127                        rule_slugs: row.rule_slugs.clone(),
128                    },
129                });
130            }
131        }
132        IngestExportBatch::SessionEvals(_) => {}
133        IngestExportBatch::SessionFeedback(_) => {}
134    }
135    out
136}
137
138impl CanonicalItem {
139    /// Short name for third-party tags / metrics (`kaizen.event`, `kaizen.tool_span`, …).
140    pub fn telemetry_kind(&self) -> &'static str {
141        match self {
142            CanonicalItem::Event(_) => "kaizen.event",
143            CanonicalItem::ToolSpan(_) => "kaizen.tool_span",
144            CanonicalItem::RepoSnapshotChunk(_) => "kaizen.repo_snapshot_chunk",
145            CanonicalItem::WorkspaceFactSnapshot { .. } => "kaizen.workspace_fact_snapshot",
146        }
147    }
148
149    /// Schema version for assertions and exporters; workspace fact variant included.
150    pub fn envelope_kaizen_schema_version(&self) -> Option<u32> {
151        match self {
152            CanonicalItem::Event(i) => Some(i.envelope.kaizen_schema_version),
153            CanonicalItem::ToolSpan(i) => Some(i.envelope.kaizen_schema_version),
154            CanonicalItem::RepoSnapshotChunk(i) => Some(i.envelope.kaizen_schema_version),
155            CanonicalItem::WorkspaceFactSnapshot { envelope, .. } => {
156                Some(envelope.kaizen_schema_version)
157            }
158        }
159    }
160}
161
162fn canonical_envelope(
163    team_id: &str,
164    workspace_hash: &str,
165    project_name: Option<&str>,
166) -> CanonicalEnvelope {
167    CanonicalEnvelope {
168        kaizen_schema_version: KAIZEN_SCHEMA_VERSION,
169        team_id: team_id.to_string(),
170        workspace_hash: workspace_hash.to_string(),
171        identity: project_name.map(project_identity),
172    }
173}
174
175fn project_identity(project_name: &str) -> ActorIdentity {
176    ActorIdentity {
177        workspace_label: Some(project_name.to_string()),
178        ..Default::default()
179    }
180}
181
182#[cfg(test)]
183mod tests {
184    use super::*;
185    use crate::sync::IngestExportBatch;
186    use crate::sync::outbound::EventsBatchBody;
187    use crate::sync::smart::{OutboundToolSpan, ToolSpansBatchBody};
188
189    #[test]
190    fn expand_events_one_per_row() {
191        let b = IngestExportBatch::Events(EventsBatchBody {
192            team_id: "t1".into(),
193            workspace_hash: "wh".into(),
194            project_name: Some("kaizen".into()),
195            events: vec![
196                OutboundEvent {
197                    session_id_hash: "s1".into(),
198                    event_seq: 0,
199                    ts_ms: 1,
200                    agent: "a".into(),
201                    model: "m".into(),
202                    kind: "message".into(),
203                    source: "hook".into(),
204                    tool: None,
205                    tool_call_id: None,
206                    tokens_in: None,
207                    tokens_out: None,
208                    reasoning_tokens: None,
209                    cost_usd_e6: None,
210                    payload: serde_json::json!({}),
211                },
212                OutboundEvent {
213                    session_id_hash: "s1".into(),
214                    event_seq: 1,
215                    ts_ms: 2,
216                    agent: "a".into(),
217                    model: "m".into(),
218                    kind: "message".into(),
219                    source: "hook".into(),
220                    tool: None,
221                    tool_call_id: None,
222                    tokens_in: None,
223                    tokens_out: None,
224                    reasoning_tokens: None,
225                    cost_usd_e6: None,
226                    payload: serde_json::json!({}),
227                },
228            ],
229        });
230        let v = expand_ingest_batch(&b);
231        assert_eq!(v.len(), 2);
232        assert_eq!(
233            v[0].envelope_kaizen_schema_version().unwrap(),
234            KAIZEN_SCHEMA_VERSION
235        );
236        let CanonicalItem::Event(item) = &v[0] else {
237            panic!("expected event");
238        };
239        assert_eq!(
240            item.envelope
241                .identity
242                .as_ref()
243                .and_then(|i| i.workspace_label.as_deref()),
244            Some("kaizen")
245        );
246    }
247
248    #[test]
249    fn expand_tool_spans_n_items() {
250        let b = IngestExportBatch::ToolSpans(ToolSpansBatchBody {
251            team_id: "t".into(),
252            workspace_hash: "w".into(),
253            project_name: None,
254            spans: vec![OutboundToolSpan {
255                session_id_hash: "sh".into(),
256                span_id_hash: "ph".into(),
257                tool: None,
258                status: "ok".into(),
259                started_at_ms: None,
260                ended_at_ms: None,
261                lead_time_ms: None,
262                tokens_in: None,
263                tokens_out: None,
264                reasoning_tokens: None,
265                cost_usd_e6: None,
266                path_hashes: vec![],
267            }],
268        });
269        let v = expand_ingest_batch(&b);
270        assert_eq!(v.len(), 1);
271        assert!(matches!(v[0], CanonicalItem::ToolSpan(_)));
272    }
273
274    #[test]
275    fn expand_workspace_facts_one_per_row() {
276        use crate::sync::smart::{OutboundWorkspaceFactRow, WorkspaceFactsBatchBody};
277        let b = IngestExportBatch::WorkspaceFacts(WorkspaceFactsBatchBody {
278            team_id: "t".into(),
279            workspace_hash: "w".into(),
280            project_name: None,
281            facts: vec![OutboundWorkspaceFactRow {
282                skill_slugs: vec!["a".into()],
283                rule_slugs: vec!["b".into()],
284            }],
285        });
286        let v = expand_ingest_batch(&b);
287        assert_eq!(v.len(), 1);
288        assert!(matches!(v[0], CanonicalItem::WorkspaceFactSnapshot { .. }));
289    }
290}