Skip to main content

kaizen/sync/
telemetry_replay.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! Chunk local [`OutboundEvent`] vectors into [`IngestExportBatch`] for telemetry replay (exporter
3//! fan-out only). Packing mirrors [`crate::sync::engine`] outbox batch limits.
4
5use crate::core::config::SyncConfig;
6use crate::sync::export_batch::IngestExportBatch;
7use crate::sync::outbound::{EventsBatchBody, OutboundEvent};
8use crate::sync::smart::{OutboundToolSpan, ToolSpansBatchBody};
9use anyhow::Context;
10use anyhow::Result;
11
12/// Split redacted events into `Events` batches using `events_per_batch_max` and `max_body_bytes`.
13pub fn chunk_events_into_ingest_batches(
14    team_id: String,
15    workspace_hash: String,
16    project_name: Option<String>,
17    events: Vec<OutboundEvent>,
18    cfg: &SyncConfig,
19) -> Result<Vec<IngestExportBatch>> {
20    let max_ev = cfg.events_per_batch_max.max(1);
21    let max_bytes = cfg.max_body_bytes;
22    let mut batches = Vec::new();
23    let mut cur: Vec<OutboundEvent> = Vec::new();
24    let mut bytes = 0usize;
25
26    for ev in events {
27        let inc = serde_json::to_vec(&ev).context("serialize outbound event for batch sizing")?;
28        if !cur.is_empty() && (cur.len() >= max_ev || bytes + inc.len() > max_bytes) {
29            batches.push(IngestExportBatch::Events(EventsBatchBody {
30                team_id: team_id.clone(),
31                workspace_hash: workspace_hash.clone(),
32                project_name: project_name.clone(),
33                events: std::mem::take(&mut cur),
34            }));
35            bytes = 0;
36        }
37        cur.push(ev);
38        bytes += inc.len();
39        if cur.len() >= max_ev {
40            batches.push(IngestExportBatch::Events(EventsBatchBody {
41                team_id: team_id.clone(),
42                workspace_hash: workspace_hash.clone(),
43                project_name: project_name.clone(),
44                events: std::mem::take(&mut cur),
45            }));
46            bytes = 0;
47        }
48    }
49    if !cur.is_empty() {
50        batches.push(IngestExportBatch::Events(EventsBatchBody {
51            team_id,
52            workspace_hash,
53            project_name,
54            events: cur,
55        }));
56    }
57    Ok(batches)
58}
59
60/// Same packing rules as [`chunk_events_into_ingest_batches`] but over [`OutboundToolSpan`]
61/// rows. Keeps the per-batch limits identical so exporters see consistent shapes regardless
62/// of payload kind.
63pub fn chunk_tool_spans_into_ingest_batches(
64    team_id: String,
65    workspace_hash: String,
66    project_name: Option<String>,
67    spans: Vec<OutboundToolSpan>,
68    cfg: &SyncConfig,
69) -> Result<Vec<IngestExportBatch>> {
70    let max_ev = cfg.events_per_batch_max.max(1);
71    let max_bytes = cfg.max_body_bytes;
72    let mut batches = Vec::new();
73    let mut cur: Vec<OutboundToolSpan> = Vec::new();
74    let mut bytes = 0usize;
75
76    for span in spans {
77        let inc =
78            serde_json::to_vec(&span).context("serialize outbound tool span for batch sizing")?;
79        if !cur.is_empty() && (cur.len() >= max_ev || bytes + inc.len() > max_bytes) {
80            batches.push(IngestExportBatch::ToolSpans(ToolSpansBatchBody {
81                team_id: team_id.clone(),
82                workspace_hash: workspace_hash.clone(),
83                project_name: project_name.clone(),
84                spans: std::mem::take(&mut cur),
85            }));
86            bytes = 0;
87        }
88        cur.push(span);
89        bytes += inc.len();
90        if cur.len() >= max_ev {
91            batches.push(IngestExportBatch::ToolSpans(ToolSpansBatchBody {
92                team_id: team_id.clone(),
93                workspace_hash: workspace_hash.clone(),
94                project_name: project_name.clone(),
95                spans: std::mem::take(&mut cur),
96            }));
97            bytes = 0;
98        }
99    }
100    if !cur.is_empty() {
101        batches.push(IngestExportBatch::ToolSpans(ToolSpansBatchBody {
102            team_id,
103            workspace_hash,
104            project_name,
105            spans: cur,
106        }));
107    }
108    Ok(batches)
109}
110
111#[cfg(test)]
112mod tests {
113    use super::*;
114    use crate::sync::outbound::OutboundEvent;
115    use serde_json::json;
116
117    fn dummy_ev(payload: serde_json::Value) -> OutboundEvent {
118        OutboundEvent {
119            session_id_hash: "h".into(),
120            event_seq: 0,
121            ts_ms: 0,
122            agent: "a".into(),
123            model: "m".into(),
124            kind: "message".into(),
125            source: "hook".into(),
126            tool: None,
127            tool_call_id: None,
128            tokens_in: None,
129            tokens_out: None,
130            reasoning_tokens: None,
131            cost_usd_e6: None,
132            payload,
133        }
134    }
135
136    fn cfg_count_only(max: usize) -> SyncConfig {
137        SyncConfig {
138            events_per_batch_max: max,
139            max_body_bytes: 10_000_000,
140            ..Default::default()
141        }
142    }
143
144    #[test]
145    fn splits_on_event_count() {
146        let cfg = cfg_count_only(2);
147        let events: Vec<_> = (0..5)
148            .map(|i| {
149                let mut e = dummy_ev(json!({"i": i}));
150                e.event_seq = i;
151                e
152            })
153            .collect();
154        let batches =
155            chunk_events_into_ingest_batches("t".into(), "w".into(), None, events, &cfg).unwrap();
156        assert_eq!(batches.len(), 3);
157        let counts: Vec<_> = batches
158            .iter()
159            .map(|b| match b {
160                IngestExportBatch::Events(e) => e.events.len(),
161                _ => panic!("expected events batch"),
162            })
163            .collect();
164        assert_eq!(counts, vec![2, 2, 1]);
165    }
166
167    #[test]
168    fn splits_on_byte_budget() {
169        let cfg = SyncConfig {
170            events_per_batch_max: 100,
171            max_body_bytes: 50,
172            ..Default::default()
173        };
174        let events = vec![
175            dummy_ev(json!({"x": "aa"})),
176            dummy_ev(json!({"x": "bbbbbbbbbb"})),
177            dummy_ev(json!({"x": "cc"})),
178        ];
179        let batches =
180            chunk_events_into_ingest_batches("t".into(), "w".into(), None, events, &cfg).unwrap();
181        assert!(batches.len() >= 2);
182        for b in &batches {
183            let IngestExportBatch::Events(body) = b else {
184                panic!();
185            };
186            let ser = serde_json::to_vec(&body.events).unwrap();
187            assert!(ser.len() <= cfg.max_body_bytes || body.events.len() == 1);
188        }
189    }
190}