Skip to main content

kaizen/sync/
outbound.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! Typed outbound JSON matching `POST /v1/events` (single-event shape used in outbox rows).
3
4use crate::core::event::{Event, EventKind, EventSource, SessionRecord};
5use blake3::Hasher;
6use serde::{Deserialize, Serialize};
7use std::path::Path;
8
9const BLAKE3_PREFIX: &str = "blake3:";
10
11/// Full batch body for `POST /v1/events`.
12#[derive(Debug, Clone, Serialize, Deserialize)]
13pub struct EventsBatchBody {
14    pub team_id: String,
15    pub workspace_hash: String,
16    #[serde(default, skip_serializing_if = "Option::is_none")]
17    pub project_name: Option<String>,
18    pub events: Vec<OutboundEvent>,
19}
20
21/// One event in the ingest API shape (after redaction).
22#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct OutboundEvent {
24    pub session_id_hash: String,
25    pub event_seq: u64,
26    pub ts_ms: u64,
27    pub agent: String,
28    pub model: String,
29    pub kind: String,
30    pub source: String,
31    #[serde(skip_serializing_if = "Option::is_none")]
32    pub tool: Option<String>,
33    #[serde(skip_serializing_if = "Option::is_none")]
34    pub tool_call_id: Option<String>,
35    #[serde(skip_serializing_if = "Option::is_none")]
36    pub tokens_in: Option<u32>,
37    #[serde(skip_serializing_if = "Option::is_none")]
38    pub tokens_out: Option<u32>,
39    #[serde(skip_serializing_if = "Option::is_none")]
40    pub reasoning_tokens: Option<u32>,
41    #[serde(skip_serializing_if = "Option::is_none")]
42    pub cost_usd_e6: Option<i64>,
43    pub payload: serde_json::Value,
44}
45
46pub fn hash_with_salt(team_salt: &[u8; 32], material: &[u8]) -> String {
47    let mut h = Hasher::new();
48    h.update(team_salt);
49    h.update(material);
50    format!("{BLAKE3_PREFIX}{}", hex::encode(h.finalize().as_bytes()))
51}
52
53pub fn workspace_hash(team_salt: &[u8; 32], workspace_abs: &Path) -> String {
54    let normalized = workspace_abs.to_string_lossy();
55    hash_with_salt(team_salt, normalized.as_bytes())
56}
57
58pub fn outbound_event_from_row(
59    e: &Event,
60    session: &SessionRecord,
61    team_salt: &[u8; 32],
62) -> OutboundEvent {
63    OutboundEvent {
64        session_id_hash: hash_with_salt(team_salt, e.session_id.as_bytes()),
65        event_seq: e.seq,
66        ts_ms: e.ts_ms,
67        agent: session.agent.clone(),
68        model: session
69            .model
70            .clone()
71            .unwrap_or_else(|| "unknown".to_string()),
72        kind: kind_api(&e.kind),
73        source: source_api(&e.source),
74        tool: e.tool.clone(),
75        tool_call_id: e.tool_call_id.clone(),
76        tokens_in: e.tokens_in,
77        tokens_out: e.tokens_out,
78        reasoning_tokens: e.reasoning_tokens,
79        cost_usd_e6: e.cost_usd_e6,
80        payload: e.payload.clone(),
81    }
82}
83
84fn kind_api(k: &EventKind) -> String {
85    match k {
86        EventKind::ToolCall => "tool_call",
87        EventKind::ToolResult => "tool_result",
88        EventKind::Message => "message",
89        EventKind::Error => "error",
90        EventKind::Cost => "cost",
91        EventKind::Hook => "hook",
92        EventKind::Lifecycle => "lifecycle",
93    }
94    .to_string()
95}
96
97fn source_api(s: &EventSource) -> String {
98    match s {
99        EventSource::Tail => "tail",
100        EventSource::Hook => "hook",
101        EventSource::Proxy => "proxy",
102    }
103    .to_string()
104}
105
106#[cfg(test)]
107mod tests {
108    use super::*;
109    use serde_json::json;
110
111    #[test]
112    fn workspace_hash_stable_for_same_salt_and_path() {
113        let salt = [7u8; 32];
114        let p = Path::new("/tmp/ws");
115        let a = workspace_hash(&salt, p);
116        let b = workspace_hash(&salt, p);
117        assert_eq!(a, b);
118        assert!(a.starts_with(BLAKE3_PREFIX));
119    }
120
121    #[test]
122    fn outbound_maps_kind_snake_case() {
123        let salt = [0u8; 32];
124        let session = SessionRecord {
125            id: "sid".into(),
126            agent: "cursor".into(),
127            model: Some("m1".into()),
128            workspace: "/w".into(),
129            started_at_ms: 0,
130            ended_at_ms: None,
131            status: crate::core::event::SessionStatus::Running,
132            trace_path: "".into(),
133            start_commit: None,
134            end_commit: None,
135            branch: None,
136            dirty_start: None,
137            dirty_end: None,
138            repo_binding_source: None,
139            prompt_fingerprint: None,
140            parent_session_id: None,
141            agent_version: None,
142            os: None,
143            arch: None,
144            repo_file_count: None,
145            repo_total_loc: None,
146        };
147        let ev = Event {
148            session_id: "sid".into(),
149            seq: 3,
150            ts_ms: 99,
151            ts_exact: false,
152            kind: EventKind::ToolCall,
153            source: EventSource::Hook,
154            tool: Some("Edit".into()),
155            tool_call_id: Some("call_1".into()),
156            tokens_in: None,
157            tokens_out: None,
158            reasoning_tokens: None,
159            cost_usd_e6: None,
160            stop_reason: None,
161            latency_ms: None,
162            ttft_ms: None,
163            retry_count: None,
164            context_used_tokens: None,
165            context_max_tokens: None,
166            cache_creation_tokens: None,
167            cache_read_tokens: None,
168            system_prompt_tokens: None,
169            payload: json!({}),
170        };
171        let o = outbound_event_from_row(&ev, &session, &salt);
172        assert_eq!(o.kind, "tool_call");
173        assert_eq!(o.source, "hook");
174        assert_eq!(o.event_seq, 3);
175    }
176}