1use crate::core::config::TelemetryConfig;
8use crate::store::Store;
9use crate::sync::IngestExportBatch;
10use crate::sync::client::{PostBatchOutcome, SyncHttpClient};
11use crate::sync::outbound::{EventsBatchBody, OutboundEvent};
12use crate::sync::smart::{
13 OutboundRepoSnapshotChunk, OutboundToolSpan, OutboundWorkspaceFactRow, RepoSnapshotsBatchBody,
14 ToolSpansBatchBody, WorkspaceFactsBatchBody,
15};
16use crate::telemetry::ExporterRegistry;
17use anyhow::Context;
18use anyhow::Result;
19use std::path::Path;
20use std::thread;
21use std::time::Duration;
22use uuid::Uuid;
23
24#[derive(Clone, Copy)]
26pub struct FlushExporters<'a> {
27 pub telemetry: &'a TelemetryConfig,
28 pub registry: Option<&'a ExporterRegistry>,
29}
30
31pub fn flush_outbox_once(
33 store: &Store,
34 workspace_root: &Path,
35 cfg: &crate::core::config::SyncConfig,
36 team_salt: &[u8; 32],
37 flush: &FlushExporters<'_>,
38) -> Result<FlushStats> {
39 if cfg.endpoint.is_empty() {
40 return Ok(FlushStats::default());
41 }
42 let client = SyncHttpClient::new(&cfg.endpoint, &cfg.team_token)?;
43 let workspace_hash = crate::sync::outbound::workspace_hash(team_salt, workspace_root);
44 let mut stats = FlushStats::default();
45
46 while store.outbox_pending_count()? > 0 {
47 let rows = store.list_outbox_pending(10_000)?;
48 if rows.is_empty() {
49 break;
50 }
51 let Some(kind) = rows.first().map(|(_, k, _)| k.clone()) else {
52 break;
53 };
54 let sent = match build_batch(&rows, cfg, &cfg.team_id, &workspace_hash, &kind)? {
55 Some((ids, batch)) => post_batch_resilient(&client, store, batch, &ids, flush)?,
56 None => break,
57 };
58 stats.batches += sent.batches;
59 stats.events_sent += sent.events;
60 }
61
62 Ok(stats)
63}
64
65#[derive(Debug, Default, Clone)]
66pub struct FlushStats {
67 pub batches: u64,
68 pub events_sent: u64,
69}
70
71#[derive(Default)]
72struct Sent {
73 batches: u64,
74 events: u64,
75}
76
77fn build_batch(
78 rows: &[(i64, String, String)],
79 cfg: &crate::core::config::SyncConfig,
80 team_id: &str,
81 workspace_hash: &str,
82 kind: &str,
83) -> Result<Option<(Vec<i64>, IngestExportBatch)>> {
84 match kind {
85 "events" => {
86 let (ids, events) = pack_batch_payloads::<OutboundEvent>(rows, cfg, kind)?;
87 if ids.is_empty() {
88 return Ok(None);
89 }
90 Ok(Some((
91 ids,
92 IngestExportBatch::Events(EventsBatchBody {
93 team_id: team_id.into(),
94 workspace_hash: workspace_hash.into(),
95 events,
96 }),
97 )))
98 }
99 "tool_spans" => {
100 let (ids, spans) = pack_batch_payloads::<OutboundToolSpan>(rows, cfg, kind)?;
101 if ids.is_empty() {
102 return Ok(None);
103 }
104 Ok(Some((
105 ids,
106 IngestExportBatch::ToolSpans(ToolSpansBatchBody {
107 team_id: team_id.into(),
108 workspace_hash: workspace_hash.into(),
109 spans,
110 }),
111 )))
112 }
113 "repo_snapshots" => {
114 let (ids, snapshots) =
115 pack_batch_payloads::<OutboundRepoSnapshotChunk>(rows, cfg, kind)?;
116 if ids.is_empty() {
117 return Ok(None);
118 }
119 Ok(Some((
120 ids,
121 IngestExportBatch::RepoSnapshots(RepoSnapshotsBatchBody {
122 team_id: team_id.into(),
123 workspace_hash: workspace_hash.into(),
124 snapshots,
125 }),
126 )))
127 }
128 "workspace_facts" => {
129 let (ids, facts) = pack_batch_payloads::<OutboundWorkspaceFactRow>(rows, cfg, kind)?;
130 if ids.is_empty() {
131 return Ok(None);
132 }
133 Ok(Some((
134 ids,
135 IngestExportBatch::WorkspaceFacts(WorkspaceFactsBatchBody {
136 team_id: team_id.into(),
137 workspace_hash: workspace_hash.into(),
138 facts,
139 }),
140 )))
141 }
142 "session_evals" => {
143 let (ids, evals) = pack_batch_payloads::<crate::eval::types::EvalRow>(rows, cfg, kind)?;
144 if ids.is_empty() {
145 return Ok(None);
146 }
147 Ok(Some((
148 ids,
149 IngestExportBatch::SessionEvals(crate::sync::export_batch::SessionEvalsBatchBody {
150 evals,
151 }),
152 )))
153 }
154 _ => Ok(None),
155 }
156}
157
158fn pack_batch_payloads<T>(
159 rows: &[(i64, String, String)],
160 cfg: &crate::core::config::SyncConfig,
161 kind: &str,
162) -> Result<(Vec<i64>, Vec<T>)>
163where
164 T: serde::de::DeserializeOwned + serde::Serialize,
165{
166 let mut ids = Vec::new();
167 let mut out = Vec::new();
168 let mut bytes = 0usize;
169 let max_ev = cfg.events_per_batch_max.max(1);
170 for (id, row_kind, raw) in rows {
171 if row_kind != kind {
172 break;
173 }
174 let item: T = serde_json::from_str(raw).context("parse outbox payload")?;
175 let inc = serde_json::to_vec(&item)?.len();
176 if out.len() >= max_ev {
177 break;
178 }
179 if bytes + inc > cfg.max_body_bytes && !out.is_empty() {
180 break;
181 }
182 bytes += inc;
183 ids.push(*id);
184 out.push(item);
185 }
186 Ok((ids, out))
187}
188
189fn post_with_fanout(
191 client: &SyncHttpClient,
192 body: &IngestExportBatch,
193 key: &Uuid,
194 flush: &FlushExporters<'_>,
195) -> Result<(
196 Result<PostBatchOutcome, anyhow::Error>,
197 Result<(), anyhow::Error>,
198)> {
199 let fan_body = body.clone();
200 let reg = flush.registry;
201 let fail_open = flush.telemetry.fail_open;
202 Ok(std::thread::scope(|s| {
203 let handle = s.spawn(move || {
204 if let Some(r) = reg {
205 r.fan_out(fail_open, &fan_body)
206 } else {
207 Ok(())
208 }
209 });
210 let post_res: Result<PostBatchOutcome, anyhow::Error> = (|| {
211 let o = match body {
212 IngestExportBatch::Events(b) => client.post_events_batch(b, key)?,
213 IngestExportBatch::ToolSpans(b) => client.post_tool_spans_batch(b, key)?,
214 IngestExportBatch::RepoSnapshots(b) => client.post_repo_snapshots_batch(b, key)?,
215 IngestExportBatch::WorkspaceFacts(b) => {
216 client.post_workspace_facts_batch(b, key)?
217 }
218 IngestExportBatch::SessionEvals(b) => client.post_session_evals_batch(b, key)?,
219 };
220 Ok(o)
221 })();
222 let fan_res = match handle.join() {
223 Ok(Ok(())) => Ok(()),
224 Ok(Err(e)) => Err(e),
225 Err(p) => Err(anyhow::anyhow!("telemetry fan-out join panicked: {p:?}")),
226 };
227 (post_res, fan_res)
228 }))
229}
230
231fn post_batch_resilient(
232 client: &SyncHttpClient,
233 store: &Store,
234 body: IngestExportBatch,
235 ids: &[i64],
236 flush: &FlushExporters<'_>,
237) -> Result<Sent> {
238 let mut backoff = Duration::from_millis(200);
239 let max_backoff = Duration::from_secs(30);
240 let mut server_failures = 0u32;
241
242 loop {
243 if body.item_count() == 0 {
244 return Ok(Sent::default());
245 }
246
247 let key = Uuid::now_v7();
248 let (post_res, fan_res) = post_with_fanout(client, &body, &key, flush)?;
249 let outcome = post_res;
250
251 let outcome = match outcome {
252 Ok(o) => o,
253 Err(e) => {
254 if fan_res.is_err() {
255 tracing::trace!(error = %e, "primary post and fan-out both failed");
256 }
257 return Err(e);
258 }
259 };
260
261 match outcome {
262 PostBatchOutcome::Accepted { .. } | PostBatchOutcome::Conflict => {
263 if let Err(e) = fan_res {
264 return Err(
265 e.context("telemetry fan-out (before outbox commit; fail_open = false)")
266 );
267 }
268 store.mark_outbox_sent(ids)?;
269 store.set_sync_state_ok()?;
270 return Ok(Sent {
271 batches: 1,
272 events: ids.len() as u64,
273 });
274 }
275 PostBatchOutcome::TooLarge => {
276 if let Err(e) = fan_res {
277 tracing::warn!(error = %e, "telemetry fan-out failed; continuing 413 split");
278 }
279 if body.item_count() <= 1 {
280 store.set_sync_state_error("413: single event too large for server")?;
281 anyhow::bail!(
282 "413: single event too large; tighten redaction or max_body_bytes"
283 );
284 }
285 let mid = body.item_count() / 2;
286 let left_ids = ids[..mid].to_vec();
287 let right_ids = ids[mid..].to_vec();
288 let (left_body, right_body) = split_batch(body, mid);
289 let a = post_batch_resilient(client, store, left_body, &left_ids, flush)?;
290 let b = post_batch_resilient(client, store, right_body, &right_ids, flush)?;
291 return Ok(Sent {
292 batches: a.batches + b.batches,
293 events: a.events + b.events,
294 });
295 }
296 PostBatchOutcome::RateLimited(d) => {
297 if let Err(e) = fan_res {
298 tracing::warn!(error = %e, "telemetry fan-out failed during 429; will retry");
299 }
300 thread::sleep(d);
301 }
302 PostBatchOutcome::Unauthorized => {
303 if let Err(e) = fan_res {
304 tracing::warn!(error = %e, "telemetry fan-out during 401");
305 }
306 let msg = "401 unauthorized (check team_token)";
307 store.set_sync_state_error(msg)?;
308 anyhow::bail!("{msg}");
309 }
310 PostBatchOutcome::ClientError(c) => {
311 if let Err(e) = fan_res {
312 tracing::warn!(error = %e, "telemetry fan-out during client error {c}");
313 }
314 let msg = format!("HTTP client error {c}");
315 store.set_sync_state_error(&msg)?;
316 anyhow::bail!("{msg}");
317 }
318 PostBatchOutcome::ServerError(c) => {
319 if let Err(e) = fan_res {
320 tracing::warn!(error = %e, "telemetry fan-out during {c} server error");
321 }
322 server_failures += 1;
323 if server_failures > 12 {
324 let msg = format!("HTTP server error {c} (exhausted retries)");
325 store.set_sync_state_error(&msg)?;
326 anyhow::bail!("{msg}");
327 }
328 thread::sleep(backoff);
329 backoff = (backoff * 2).min(max_backoff);
330 }
331 }
332 }
333}
334
335fn split_batch(body: IngestExportBatch, mid: usize) -> (IngestExportBatch, IngestExportBatch) {
336 match body {
337 IngestExportBatch::Events(body) => (
338 IngestExportBatch::Events(EventsBatchBody {
339 team_id: body.team_id.clone(),
340 workspace_hash: body.workspace_hash.clone(),
341 events: body.events[..mid].to_vec(),
342 }),
343 IngestExportBatch::Events(EventsBatchBody {
344 team_id: body.team_id,
345 workspace_hash: body.workspace_hash,
346 events: body.events[mid..].to_vec(),
347 }),
348 ),
349 IngestExportBatch::ToolSpans(body) => (
350 IngestExportBatch::ToolSpans(ToolSpansBatchBody {
351 team_id: body.team_id.clone(),
352 workspace_hash: body.workspace_hash.clone(),
353 spans: body.spans[..mid].to_vec(),
354 }),
355 IngestExportBatch::ToolSpans(ToolSpansBatchBody {
356 team_id: body.team_id,
357 workspace_hash: body.workspace_hash,
358 spans: body.spans[mid..].to_vec(),
359 }),
360 ),
361 IngestExportBatch::RepoSnapshots(body) => (
362 IngestExportBatch::RepoSnapshots(RepoSnapshotsBatchBody {
363 team_id: body.team_id.clone(),
364 workspace_hash: body.workspace_hash.clone(),
365 snapshots: body.snapshots[..mid].to_vec(),
366 }),
367 IngestExportBatch::RepoSnapshots(RepoSnapshotsBatchBody {
368 team_id: body.team_id,
369 workspace_hash: body.workspace_hash,
370 snapshots: body.snapshots[mid..].to_vec(),
371 }),
372 ),
373 IngestExportBatch::WorkspaceFacts(body) => (
374 IngestExportBatch::WorkspaceFacts(WorkspaceFactsBatchBody {
375 team_id: body.team_id.clone(),
376 workspace_hash: body.workspace_hash.clone(),
377 facts: body.facts[..mid].to_vec(),
378 }),
379 IngestExportBatch::WorkspaceFacts(WorkspaceFactsBatchBody {
380 team_id: body.team_id,
381 workspace_hash: body.workspace_hash,
382 facts: body.facts[mid..].to_vec(),
383 }),
384 ),
385 IngestExportBatch::SessionEvals(body) => (
386 IngestExportBatch::SessionEvals(crate::sync::export_batch::SessionEvalsBatchBody {
387 evals: body.evals[..mid].to_vec(),
388 }),
389 IngestExportBatch::SessionEvals(crate::sync::export_batch::SessionEvalsBatchBody {
390 evals: body.evals[mid..].to_vec(),
391 }),
392 ),
393 }
394}