use crate::core::identity::ActorIdentity;
use crate::sync::outbound::OutboundEvent;
use crate::sync::smart::{OutboundRepoSnapshotChunk, OutboundToolSpan};
use serde::{Deserialize, Serialize};
pub const KAIZEN_SCHEMA_VERSION: u32 = 1;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct CanonicalEnvelope {
pub kaizen_schema_version: u32,
pub team_id: String,
pub workspace_hash: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub identity: Option<ActorIdentity>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum CanonicalEventName {
Event,
ToolSpan,
RepoSnapshotChunk,
WorkspaceFactSnapshot,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EventItem {
pub envelope: CanonicalEnvelope,
pub name: CanonicalEventName,
pub event: OutboundEvent,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ToolSpanItem {
pub envelope: CanonicalEnvelope,
pub name: CanonicalEventName,
pub span: OutboundToolSpan,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RepoSnapshotChunkItem {
pub envelope: CanonicalEnvelope,
pub name: CanonicalEventName,
pub chunk: OutboundRepoSnapshotChunk,
}
#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
pub struct WorkspaceFactSnapshotItem {
pub skill_slugs: Vec<String>,
pub rule_slugs: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum CanonicalItem {
Event(EventItem),
ToolSpan(ToolSpanItem),
RepoSnapshotChunk(RepoSnapshotChunkItem),
WorkspaceFactSnapshot {
envelope: CanonicalEnvelope,
name: CanonicalEventName,
payload: WorkspaceFactSnapshotItem,
},
}
pub fn expand_ingest_batch(batch: &crate::sync::IngestExportBatch) -> Vec<CanonicalItem> {
use crate::sync::IngestExportBatch;
let mut out: Vec<CanonicalItem> = Vec::new();
match batch {
IngestExportBatch::Events(b) => {
let env = canonical_envelope(&b.team_id, &b.workspace_hash, b.project_name.as_deref());
for e in &b.events {
out.push(CanonicalItem::Event(EventItem {
envelope: env.clone(),
name: CanonicalEventName::Event,
event: e.clone(),
}));
}
}
IngestExportBatch::ToolSpans(b) => {
let env = canonical_envelope(&b.team_id, &b.workspace_hash, b.project_name.as_deref());
for span in &b.spans {
out.push(CanonicalItem::ToolSpan(ToolSpanItem {
envelope: env.clone(),
name: CanonicalEventName::ToolSpan,
span: span.clone(),
}));
}
}
IngestExportBatch::RepoSnapshots(b) => {
let env = canonical_envelope(&b.team_id, &b.workspace_hash, b.project_name.as_deref());
for chunk in &b.snapshots {
out.push(CanonicalItem::RepoSnapshotChunk(RepoSnapshotChunkItem {
envelope: env.clone(),
name: CanonicalEventName::RepoSnapshotChunk,
chunk: chunk.clone(),
}));
}
}
IngestExportBatch::WorkspaceFacts(b) => {
let env = canonical_envelope(&b.team_id, &b.workspace_hash, b.project_name.as_deref());
for row in &b.facts {
out.push(CanonicalItem::WorkspaceFactSnapshot {
envelope: env.clone(),
name: CanonicalEventName::WorkspaceFactSnapshot,
payload: WorkspaceFactSnapshotItem {
skill_slugs: row.skill_slugs.clone(),
rule_slugs: row.rule_slugs.clone(),
},
});
}
}
IngestExportBatch::SessionEvals(_) => {}
IngestExportBatch::SessionFeedback(_) => {}
}
out
}
impl CanonicalItem {
pub fn telemetry_kind(&self) -> &'static str {
match self {
CanonicalItem::Event(_) => "kaizen.event",
CanonicalItem::ToolSpan(_) => "kaizen.tool_span",
CanonicalItem::RepoSnapshotChunk(_) => "kaizen.repo_snapshot_chunk",
CanonicalItem::WorkspaceFactSnapshot { .. } => "kaizen.workspace_fact_snapshot",
}
}
pub fn envelope_kaizen_schema_version(&self) -> Option<u32> {
match self {
CanonicalItem::Event(i) => Some(i.envelope.kaizen_schema_version),
CanonicalItem::ToolSpan(i) => Some(i.envelope.kaizen_schema_version),
CanonicalItem::RepoSnapshotChunk(i) => Some(i.envelope.kaizen_schema_version),
CanonicalItem::WorkspaceFactSnapshot { envelope, .. } => {
Some(envelope.kaizen_schema_version)
}
}
}
}
fn canonical_envelope(
team_id: &str,
workspace_hash: &str,
project_name: Option<&str>,
) -> CanonicalEnvelope {
CanonicalEnvelope {
kaizen_schema_version: KAIZEN_SCHEMA_VERSION,
team_id: team_id.to_string(),
workspace_hash: workspace_hash.to_string(),
identity: project_name.map(project_identity),
}
}
fn project_identity(project_name: &str) -> ActorIdentity {
ActorIdentity {
workspace_label: Some(project_name.to_string()),
..Default::default()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::sync::IngestExportBatch;
use crate::sync::outbound::EventsBatchBody;
use crate::sync::smart::{OutboundToolSpan, ToolSpansBatchBody};
#[test]
fn expand_events_one_per_row() {
let b = IngestExportBatch::Events(EventsBatchBody {
team_id: "t1".into(),
workspace_hash: "wh".into(),
project_name: Some("kaizen".into()),
events: vec![
OutboundEvent {
session_id_hash: "s1".into(),
event_seq: 0,
ts_ms: 1,
agent: "a".into(),
model: "m".into(),
kind: "message".into(),
source: "hook".into(),
tool: None,
tool_call_id: None,
tokens_in: None,
tokens_out: None,
reasoning_tokens: None,
cost_usd_e6: None,
payload: serde_json::json!({}),
},
OutboundEvent {
session_id_hash: "s1".into(),
event_seq: 1,
ts_ms: 2,
agent: "a".into(),
model: "m".into(),
kind: "message".into(),
source: "hook".into(),
tool: None,
tool_call_id: None,
tokens_in: None,
tokens_out: None,
reasoning_tokens: None,
cost_usd_e6: None,
payload: serde_json::json!({}),
},
],
});
let v = expand_ingest_batch(&b);
assert_eq!(v.len(), 2);
assert_eq!(
v[0].envelope_kaizen_schema_version().unwrap(),
KAIZEN_SCHEMA_VERSION
);
let CanonicalItem::Event(item) = &v[0] else {
panic!("expected event");
};
assert_eq!(
item.envelope
.identity
.as_ref()
.and_then(|i| i.workspace_label.as_deref()),
Some("kaizen")
);
}
#[test]
fn expand_tool_spans_n_items() {
let b = IngestExportBatch::ToolSpans(ToolSpansBatchBody {
team_id: "t".into(),
workspace_hash: "w".into(),
project_name: None,
spans: vec![OutboundToolSpan {
session_id_hash: "sh".into(),
span_id_hash: "ph".into(),
tool: None,
status: "ok".into(),
started_at_ms: None,
ended_at_ms: None,
lead_time_ms: None,
tokens_in: None,
tokens_out: None,
reasoning_tokens: None,
cost_usd_e6: None,
path_hashes: vec![],
}],
});
let v = expand_ingest_batch(&b);
assert_eq!(v.len(), 1);
assert!(matches!(v[0], CanonicalItem::ToolSpan(_)));
}
#[test]
fn expand_workspace_facts_one_per_row() {
use crate::sync::smart::{OutboundWorkspaceFactRow, WorkspaceFactsBatchBody};
let b = IngestExportBatch::WorkspaceFacts(WorkspaceFactsBatchBody {
team_id: "t".into(),
workspace_hash: "w".into(),
project_name: None,
facts: vec![OutboundWorkspaceFactRow {
skill_slugs: vec!["a".into()],
rule_slugs: vec!["b".into()],
}],
});
let v = expand_ingest_batch(&b);
assert_eq!(v.len(), 1);
assert!(matches!(v[0], CanonicalItem::WorkspaceFactSnapshot { .. }));
}
}