kaizen/sync/
telemetry_replay.rs1use crate::core::config::SyncConfig;
6use crate::sync::export_batch::IngestExportBatch;
7use crate::sync::outbound::{EventsBatchBody, OutboundEvent};
8use crate::sync::smart::{OutboundToolSpan, ToolSpansBatchBody};
9use anyhow::Context;
10use anyhow::Result;
11
12pub fn chunk_events_into_ingest_batches(
14 team_id: String,
15 workspace_hash: String,
16 project_name: Option<String>,
17 events: Vec<OutboundEvent>,
18 cfg: &SyncConfig,
19) -> Result<Vec<IngestExportBatch>> {
20 let max_ev = cfg.events_per_batch_max.max(1);
21 let max_bytes = cfg.max_body_bytes;
22 let mut batches = Vec::new();
23 let mut cur: Vec<OutboundEvent> = Vec::new();
24 let mut bytes = 0usize;
25
26 for ev in events {
27 let inc = serde_json::to_vec(&ev).context("serialize outbound event for batch sizing")?;
28 if !cur.is_empty() && (cur.len() >= max_ev || bytes + inc.len() > max_bytes) {
29 batches.push(IngestExportBatch::Events(EventsBatchBody {
30 team_id: team_id.clone(),
31 workspace_hash: workspace_hash.clone(),
32 project_name: project_name.clone(),
33 events: std::mem::take(&mut cur),
34 }));
35 bytes = 0;
36 }
37 cur.push(ev);
38 bytes += inc.len();
39 if cur.len() >= max_ev {
40 batches.push(IngestExportBatch::Events(EventsBatchBody {
41 team_id: team_id.clone(),
42 workspace_hash: workspace_hash.clone(),
43 project_name: project_name.clone(),
44 events: std::mem::take(&mut cur),
45 }));
46 bytes = 0;
47 }
48 }
49 if !cur.is_empty() {
50 batches.push(IngestExportBatch::Events(EventsBatchBody {
51 team_id,
52 workspace_hash,
53 project_name,
54 events: cur,
55 }));
56 }
57 Ok(batches)
58}
59
60pub fn chunk_tool_spans_into_ingest_batches(
64 team_id: String,
65 workspace_hash: String,
66 project_name: Option<String>,
67 spans: Vec<OutboundToolSpan>,
68 cfg: &SyncConfig,
69) -> Result<Vec<IngestExportBatch>> {
70 let max_ev = cfg.events_per_batch_max.max(1);
71 let max_bytes = cfg.max_body_bytes;
72 let mut batches = Vec::new();
73 let mut cur: Vec<OutboundToolSpan> = Vec::new();
74 let mut bytes = 0usize;
75
76 for span in spans {
77 let inc =
78 serde_json::to_vec(&span).context("serialize outbound tool span for batch sizing")?;
79 if !cur.is_empty() && (cur.len() >= max_ev || bytes + inc.len() > max_bytes) {
80 batches.push(IngestExportBatch::ToolSpans(ToolSpansBatchBody {
81 team_id: team_id.clone(),
82 workspace_hash: workspace_hash.clone(),
83 project_name: project_name.clone(),
84 spans: std::mem::take(&mut cur),
85 }));
86 bytes = 0;
87 }
88 cur.push(span);
89 bytes += inc.len();
90 if cur.len() >= max_ev {
91 batches.push(IngestExportBatch::ToolSpans(ToolSpansBatchBody {
92 team_id: team_id.clone(),
93 workspace_hash: workspace_hash.clone(),
94 project_name: project_name.clone(),
95 spans: std::mem::take(&mut cur),
96 }));
97 bytes = 0;
98 }
99 }
100 if !cur.is_empty() {
101 batches.push(IngestExportBatch::ToolSpans(ToolSpansBatchBody {
102 team_id,
103 workspace_hash,
104 project_name,
105 spans: cur,
106 }));
107 }
108 Ok(batches)
109}
110
111#[cfg(test)]
112mod tests {
113 use super::*;
114 use crate::sync::outbound::OutboundEvent;
115 use serde_json::json;
116
117 fn dummy_ev(payload: serde_json::Value) -> OutboundEvent {
118 OutboundEvent {
119 session_id_hash: "h".into(),
120 event_seq: 0,
121 ts_ms: 0,
122 agent: "a".into(),
123 model: "m".into(),
124 kind: "message".into(),
125 source: "hook".into(),
126 tool: None,
127 tool_call_id: None,
128 tokens_in: None,
129 tokens_out: None,
130 reasoning_tokens: None,
131 cost_usd_e6: None,
132 payload,
133 }
134 }
135
136 fn cfg_count_only(max: usize) -> SyncConfig {
137 SyncConfig {
138 events_per_batch_max: max,
139 max_body_bytes: 10_000_000,
140 ..Default::default()
141 }
142 }
143
144 #[test]
145 fn splits_on_event_count() {
146 let cfg = cfg_count_only(2);
147 let events: Vec<_> = (0..5)
148 .map(|i| {
149 let mut e = dummy_ev(json!({"i": i}));
150 e.event_seq = i;
151 e
152 })
153 .collect();
154 let batches =
155 chunk_events_into_ingest_batches("t".into(), "w".into(), None, events, &cfg).unwrap();
156 assert_eq!(batches.len(), 3);
157 let counts: Vec<_> = batches
158 .iter()
159 .map(|b| match b {
160 IngestExportBatch::Events(e) => e.events.len(),
161 _ => panic!("expected events batch"),
162 })
163 .collect();
164 assert_eq!(counts, vec![2, 2, 1]);
165 }
166
167 #[test]
168 fn splits_on_byte_budget() {
169 let cfg = SyncConfig {
170 events_per_batch_max: 100,
171 max_body_bytes: 50,
172 ..Default::default()
173 };
174 let events = vec![
175 dummy_ev(json!({"x": "aa"})),
176 dummy_ev(json!({"x": "bbbbbbbbbb"})),
177 dummy_ev(json!({"x": "cc"})),
178 ];
179 let batches =
180 chunk_events_into_ingest_batches("t".into(), "w".into(), None, events, &cfg).unwrap();
181 assert!(batches.len() >= 2);
182 for b in &batches {
183 let IngestExportBatch::Events(body) = b else {
184 panic!();
185 };
186 let ser = serde_json::to_vec(&body.events).unwrap();
187 assert!(ser.len() <= cfg.max_body_bytes || body.events.len() == 1);
188 }
189 }
190}