Skip to main content

kaizen/sync/
outbound.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! Typed outbound JSON matching `POST /v1/events` (single-event shape used in outbox rows).
3
4use crate::core::event::{Event, EventKind, EventSource, SessionRecord};
5use blake3::Hasher;
6use serde::{Deserialize, Serialize};
7use std::path::Path;
8
9const BLAKE3_PREFIX: &str = "blake3:";
10
11/// Full batch body for `POST /v1/events`.
12#[derive(Debug, Clone, Serialize, Deserialize)]
13pub struct EventsBatchBody {
14    pub team_id: String,
15    pub workspace_hash: String,
16    pub events: Vec<OutboundEvent>,
17}
18
19/// One event in the ingest API shape (after redaction).
20#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct OutboundEvent {
22    pub session_id_hash: String,
23    pub event_seq: u64,
24    pub ts_ms: u64,
25    pub agent: String,
26    pub model: String,
27    pub kind: String,
28    pub source: String,
29    #[serde(skip_serializing_if = "Option::is_none")]
30    pub tool: Option<String>,
31    #[serde(skip_serializing_if = "Option::is_none")]
32    pub tool_call_id: Option<String>,
33    #[serde(skip_serializing_if = "Option::is_none")]
34    pub tokens_in: Option<u32>,
35    #[serde(skip_serializing_if = "Option::is_none")]
36    pub tokens_out: Option<u32>,
37    #[serde(skip_serializing_if = "Option::is_none")]
38    pub reasoning_tokens: Option<u32>,
39    #[serde(skip_serializing_if = "Option::is_none")]
40    pub cost_usd_e6: Option<i64>,
41    pub payload: serde_json::Value,
42}
43
44pub fn hash_with_salt(team_salt: &[u8; 32], material: &[u8]) -> String {
45    let mut h = Hasher::new();
46    h.update(team_salt);
47    h.update(material);
48    format!("{BLAKE3_PREFIX}{}", hex::encode(h.finalize().as_bytes()))
49}
50
51pub fn workspace_hash(team_salt: &[u8; 32], workspace_abs: &Path) -> String {
52    let normalized = workspace_abs.to_string_lossy();
53    hash_with_salt(team_salt, normalized.as_bytes())
54}
55
56pub fn outbound_event_from_row(
57    e: &Event,
58    session: &SessionRecord,
59    team_salt: &[u8; 32],
60) -> OutboundEvent {
61    OutboundEvent {
62        session_id_hash: hash_with_salt(team_salt, e.session_id.as_bytes()),
63        event_seq: e.seq,
64        ts_ms: e.ts_ms,
65        agent: session.agent.clone(),
66        model: session
67            .model
68            .clone()
69            .unwrap_or_else(|| "unknown".to_string()),
70        kind: kind_api(&e.kind),
71        source: source_api(&e.source),
72        tool: e.tool.clone(),
73        tool_call_id: e.tool_call_id.clone(),
74        tokens_in: e.tokens_in,
75        tokens_out: e.tokens_out,
76        reasoning_tokens: e.reasoning_tokens,
77        cost_usd_e6: e.cost_usd_e6,
78        payload: e.payload.clone(),
79    }
80}
81
82fn kind_api(k: &EventKind) -> String {
83    match k {
84        EventKind::ToolCall => "tool_call",
85        EventKind::ToolResult => "tool_result",
86        EventKind::Message => "message",
87        EventKind::Error => "error",
88        EventKind::Cost => "cost",
89        EventKind::Hook => "hook",
90    }
91    .to_string()
92}
93
94fn source_api(s: &EventSource) -> String {
95    match s {
96        EventSource::Tail => "tail",
97        EventSource::Hook => "hook",
98        EventSource::Proxy => "proxy",
99    }
100    .to_string()
101}
102
103#[cfg(test)]
104mod tests {
105    use super::*;
106    use serde_json::json;
107
108    #[test]
109    fn workspace_hash_stable_for_same_salt_and_path() {
110        let salt = [7u8; 32];
111        let p = Path::new("/tmp/ws");
112        let a = workspace_hash(&salt, p);
113        let b = workspace_hash(&salt, p);
114        assert_eq!(a, b);
115        assert!(a.starts_with(BLAKE3_PREFIX));
116    }
117
118    #[test]
119    fn outbound_maps_kind_snake_case() {
120        let salt = [0u8; 32];
121        let session = SessionRecord {
122            id: "sid".into(),
123            agent: "cursor".into(),
124            model: Some("m1".into()),
125            workspace: "/w".into(),
126            started_at_ms: 0,
127            ended_at_ms: None,
128            status: crate::core::event::SessionStatus::Running,
129            trace_path: "".into(),
130            start_commit: None,
131            end_commit: None,
132            branch: None,
133            dirty_start: None,
134            dirty_end: None,
135            repo_binding_source: None,
136        };
137        let ev = Event {
138            session_id: "sid".into(),
139            seq: 3,
140            ts_ms: 99,
141            ts_exact: false,
142            kind: EventKind::ToolCall,
143            source: EventSource::Hook,
144            tool: Some("Edit".into()),
145            tool_call_id: Some("call_1".into()),
146            tokens_in: None,
147            tokens_out: None,
148            reasoning_tokens: None,
149            cost_usd_e6: None,
150            payload: json!({}),
151        };
152        let o = outbound_event_from_row(&ev, &session, &salt);
153        assert_eq!(o.kind, "tool_call");
154        assert_eq!(o.source, "hook");
155        assert_eq!(o.event_seq, 3);
156    }
157}