Skip to main content

kaizen/sync/
telemetry_replay.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! Chunk local [`OutboundEvent`] vectors into [`IngestExportBatch`] for telemetry replay (exporter
3//! fan-out only). Packing mirrors [`crate::sync::engine`] outbox batch limits.
4
5use crate::core::config::SyncConfig;
6use crate::sync::export_batch::IngestExportBatch;
7use crate::sync::outbound::{EventsBatchBody, OutboundEvent};
8use anyhow::Context;
9use anyhow::Result;
10
11/// Split redacted events into `Events` batches using `events_per_batch_max` and `max_body_bytes`.
12pub fn chunk_events_into_ingest_batches(
13    team_id: String,
14    workspace_hash: String,
15    events: Vec<OutboundEvent>,
16    cfg: &SyncConfig,
17) -> Result<Vec<IngestExportBatch>> {
18    let max_ev = cfg.events_per_batch_max.max(1);
19    let max_bytes = cfg.max_body_bytes;
20    let mut batches = Vec::new();
21    let mut cur: Vec<OutboundEvent> = Vec::new();
22    let mut bytes = 0usize;
23
24    for ev in events {
25        let inc = serde_json::to_vec(&ev).context("serialize outbound event for batch sizing")?;
26        if !cur.is_empty() && (cur.len() >= max_ev || bytes + inc.len() > max_bytes) {
27            batches.push(IngestExportBatch::Events(EventsBatchBody {
28                team_id: team_id.clone(),
29                workspace_hash: workspace_hash.clone(),
30                events: std::mem::take(&mut cur),
31            }));
32            bytes = 0;
33        }
34        cur.push(ev);
35        bytes += inc.len();
36        if cur.len() >= max_ev {
37            batches.push(IngestExportBatch::Events(EventsBatchBody {
38                team_id: team_id.clone(),
39                workspace_hash: workspace_hash.clone(),
40                events: std::mem::take(&mut cur),
41            }));
42            bytes = 0;
43        }
44    }
45    if !cur.is_empty() {
46        batches.push(IngestExportBatch::Events(EventsBatchBody {
47            team_id,
48            workspace_hash,
49            events: cur,
50        }));
51    }
52    Ok(batches)
53}
54
55#[cfg(test)]
56mod tests {
57    use super::*;
58    use crate::sync::outbound::OutboundEvent;
59    use serde_json::json;
60
61    fn dummy_ev(payload: serde_json::Value) -> OutboundEvent {
62        OutboundEvent {
63            session_id_hash: "h".into(),
64            event_seq: 0,
65            ts_ms: 0,
66            agent: "a".into(),
67            model: "m".into(),
68            kind: "message".into(),
69            source: "hook".into(),
70            tool: None,
71            tool_call_id: None,
72            tokens_in: None,
73            tokens_out: None,
74            reasoning_tokens: None,
75            cost_usd_e6: None,
76            payload,
77        }
78    }
79
80    fn cfg_count_only(max: usize) -> SyncConfig {
81        SyncConfig {
82            events_per_batch_max: max,
83            max_body_bytes: 10_000_000,
84            ..Default::default()
85        }
86    }
87
88    #[test]
89    fn splits_on_event_count() {
90        let cfg = cfg_count_only(2);
91        let events: Vec<_> = (0..5)
92            .map(|i| {
93                let mut e = dummy_ev(json!({"i": i}));
94                e.event_seq = i;
95                e
96            })
97            .collect();
98        let batches =
99            chunk_events_into_ingest_batches("t".into(), "w".into(), events, &cfg).unwrap();
100        assert_eq!(batches.len(), 3);
101        let counts: Vec<_> = batches
102            .iter()
103            .map(|b| match b {
104                IngestExportBatch::Events(e) => e.events.len(),
105                _ => panic!("expected events batch"),
106            })
107            .collect();
108        assert_eq!(counts, vec![2, 2, 1]);
109    }
110
111    #[test]
112    fn splits_on_byte_budget() {
113        let cfg = SyncConfig {
114            events_per_batch_max: 100,
115            max_body_bytes: 50,
116            ..Default::default()
117        };
118        let events = vec![
119            dummy_ev(json!({"x": "aa"})),
120            dummy_ev(json!({"x": "bbbbbbbbbb"})),
121            dummy_ev(json!({"x": "cc"})),
122        ];
123        let batches =
124            chunk_events_into_ingest_batches("t".into(), "w".into(), events, &cfg).unwrap();
125        assert!(batches.len() >= 2);
126        for b in &batches {
127            let IngestExportBatch::Events(body) = b else {
128                panic!();
129            };
130            let ser = serde_json::to_vec(&body.events).unwrap();
131            assert!(ser.len() <= cfg.max_body_bytes || body.events.len() == 1);
132        }
133    }
134}