kaizen/sync/
telemetry_replay.rs1use crate::core::config::SyncConfig;
6use crate::sync::export_batch::IngestExportBatch;
7use crate::sync::outbound::{EventsBatchBody, OutboundEvent};
8use anyhow::Context;
9use anyhow::Result;
10
11pub fn chunk_events_into_ingest_batches(
13 team_id: String,
14 workspace_hash: String,
15 events: Vec<OutboundEvent>,
16 cfg: &SyncConfig,
17) -> Result<Vec<IngestExportBatch>> {
18 let max_ev = cfg.events_per_batch_max.max(1);
19 let max_bytes = cfg.max_body_bytes;
20 let mut batches = Vec::new();
21 let mut cur: Vec<OutboundEvent> = Vec::new();
22 let mut bytes = 0usize;
23
24 for ev in events {
25 let inc = serde_json::to_vec(&ev).context("serialize outbound event for batch sizing")?;
26 if !cur.is_empty() && (cur.len() >= max_ev || bytes + inc.len() > max_bytes) {
27 batches.push(IngestExportBatch::Events(EventsBatchBody {
28 team_id: team_id.clone(),
29 workspace_hash: workspace_hash.clone(),
30 events: std::mem::take(&mut cur),
31 }));
32 bytes = 0;
33 }
34 cur.push(ev);
35 bytes += inc.len();
36 if cur.len() >= max_ev {
37 batches.push(IngestExportBatch::Events(EventsBatchBody {
38 team_id: team_id.clone(),
39 workspace_hash: workspace_hash.clone(),
40 events: std::mem::take(&mut cur),
41 }));
42 bytes = 0;
43 }
44 }
45 if !cur.is_empty() {
46 batches.push(IngestExportBatch::Events(EventsBatchBody {
47 team_id,
48 workspace_hash,
49 events: cur,
50 }));
51 }
52 Ok(batches)
53}
54
55#[cfg(test)]
56mod tests {
57 use super::*;
58 use crate::sync::outbound::OutboundEvent;
59 use serde_json::json;
60
61 fn dummy_ev(payload: serde_json::Value) -> OutboundEvent {
62 OutboundEvent {
63 session_id_hash: "h".into(),
64 event_seq: 0,
65 ts_ms: 0,
66 agent: "a".into(),
67 model: "m".into(),
68 kind: "message".into(),
69 source: "hook".into(),
70 tool: None,
71 tool_call_id: None,
72 tokens_in: None,
73 tokens_out: None,
74 reasoning_tokens: None,
75 cost_usd_e6: None,
76 payload,
77 }
78 }
79
80 fn cfg_count_only(max: usize) -> SyncConfig {
81 SyncConfig {
82 events_per_batch_max: max,
83 max_body_bytes: 10_000_000,
84 ..Default::default()
85 }
86 }
87
88 #[test]
89 fn splits_on_event_count() {
90 let cfg = cfg_count_only(2);
91 let events: Vec<_> = (0..5)
92 .map(|i| {
93 let mut e = dummy_ev(json!({"i": i}));
94 e.event_seq = i;
95 e
96 })
97 .collect();
98 let batches =
99 chunk_events_into_ingest_batches("t".into(), "w".into(), events, &cfg).unwrap();
100 assert_eq!(batches.len(), 3);
101 let counts: Vec<_> = batches
102 .iter()
103 .map(|b| match b {
104 IngestExportBatch::Events(e) => e.events.len(),
105 _ => panic!("expected events batch"),
106 })
107 .collect();
108 assert_eq!(counts, vec![2, 2, 1]);
109 }
110
111 #[test]
112 fn splits_on_byte_budget() {
113 let cfg = SyncConfig {
114 events_per_batch_max: 100,
115 max_body_bytes: 50,
116 ..Default::default()
117 };
118 let events = vec![
119 dummy_ev(json!({"x": "aa"})),
120 dummy_ev(json!({"x": "bbbbbbbbbb"})),
121 dummy_ev(json!({"x": "cc"})),
122 ];
123 let batches =
124 chunk_events_into_ingest_batches("t".into(), "w".into(), events, &cfg).unwrap();
125 assert!(batches.len() >= 2);
126 for b in &batches {
127 let IngestExportBatch::Events(body) = b else {
128 panic!();
129 };
130 let ser = serde_json::to_vec(&body.events).unwrap();
131 assert!(ser.len() <= cfg.max_body_bytes || body.events.len() == 1);
132 }
133 }
134}