1use crate::core::config::TelemetryConfig;
8use crate::store::Store;
9use crate::sync::IngestExportBatch;
10use crate::sync::client::{PostBatchOutcome, SyncHttpClient};
11use crate::sync::outbound::{EventsBatchBody, OutboundEvent};
12use crate::sync::smart::{
13 OutboundRepoSnapshotChunk, OutboundToolSpan, OutboundWorkspaceFactRow, RepoSnapshotsBatchBody,
14 ToolSpansBatchBody, WorkspaceFactsBatchBody,
15};
16use crate::telemetry::ExporterRegistry;
17use anyhow::Context;
18use anyhow::Result;
19use std::path::Path;
20use std::thread;
21use std::time::Duration;
22use uuid::Uuid;
23
24#[derive(Clone, Copy)]
26pub struct FlushExporters<'a> {
27 pub telemetry: &'a TelemetryConfig,
28 pub registry: Option<&'a ExporterRegistry>,
29}
30
31pub fn flush_outbox_once(
33 store: &Store,
34 workspace_root: &Path,
35 cfg: &crate::core::config::SyncConfig,
36 team_salt: &[u8; 32],
37 flush: &FlushExporters<'_>,
38) -> Result<FlushStats> {
39 if cfg.endpoint.is_empty() {
40 return Ok(FlushStats::default());
41 }
42 let client = SyncHttpClient::new(&cfg.endpoint, &cfg.team_token)?;
43 let workspace_hash = crate::sync::outbound::workspace_hash(team_salt, workspace_root);
44 let mut stats = FlushStats::default();
45
46 while store.outbox_pending_count()? > 0 {
47 let rows = store.list_outbox_pending(10_000)?;
48 if rows.is_empty() {
49 break;
50 }
51 let Some(kind) = rows.first().map(|(_, k, _)| k.clone()) else {
52 break;
53 };
54 let sent = match build_batch(&rows, cfg, &cfg.team_id, &workspace_hash, &kind)? {
55 Some((ids, batch)) => post_batch_resilient(&client, store, batch, &ids, flush)?,
56 None => break,
57 };
58 stats.batches += sent.batches;
59 stats.events_sent += sent.events;
60 }
61
62 Ok(stats)
63}
64
65#[derive(Debug, Default, Clone)]
66pub struct FlushStats {
67 pub batches: u64,
68 pub events_sent: u64,
69}
70
71#[derive(Default)]
72struct Sent {
73 batches: u64,
74 events: u64,
75}
76
77fn build_batch(
78 rows: &[(i64, String, String)],
79 cfg: &crate::core::config::SyncConfig,
80 team_id: &str,
81 workspace_hash: &str,
82 kind: &str,
83) -> Result<Option<(Vec<i64>, IngestExportBatch)>> {
84 match kind {
85 "events" => {
86 let (ids, events) = pack_batch_payloads::<OutboundEvent>(rows, cfg, kind)?;
87 if ids.is_empty() {
88 return Ok(None);
89 }
90 Ok(Some((
91 ids,
92 IngestExportBatch::Events(EventsBatchBody {
93 team_id: team_id.into(),
94 workspace_hash: workspace_hash.into(),
95 events,
96 }),
97 )))
98 }
99 "tool_spans" => {
100 let (ids, spans) = pack_batch_payloads::<OutboundToolSpan>(rows, cfg, kind)?;
101 if ids.is_empty() {
102 return Ok(None);
103 }
104 Ok(Some((
105 ids,
106 IngestExportBatch::ToolSpans(ToolSpansBatchBody {
107 team_id: team_id.into(),
108 workspace_hash: workspace_hash.into(),
109 spans,
110 }),
111 )))
112 }
113 "repo_snapshots" => {
114 let (ids, snapshots) =
115 pack_batch_payloads::<OutboundRepoSnapshotChunk>(rows, cfg, kind)?;
116 if ids.is_empty() {
117 return Ok(None);
118 }
119 Ok(Some((
120 ids,
121 IngestExportBatch::RepoSnapshots(RepoSnapshotsBatchBody {
122 team_id: team_id.into(),
123 workspace_hash: workspace_hash.into(),
124 snapshots,
125 }),
126 )))
127 }
128 "workspace_facts" => {
129 let (ids, facts) = pack_batch_payloads::<OutboundWorkspaceFactRow>(rows, cfg, kind)?;
130 if ids.is_empty() {
131 return Ok(None);
132 }
133 Ok(Some((
134 ids,
135 IngestExportBatch::WorkspaceFacts(WorkspaceFactsBatchBody {
136 team_id: team_id.into(),
137 workspace_hash: workspace_hash.into(),
138 facts,
139 }),
140 )))
141 }
142 "session_evals" => {
143 let (ids, evals) = pack_batch_payloads::<crate::eval::types::EvalRow>(rows, cfg, kind)?;
144 if ids.is_empty() {
145 return Ok(None);
146 }
147 Ok(Some((
148 ids,
149 IngestExportBatch::SessionEvals(crate::sync::export_batch::SessionEvalsBatchBody {
150 evals,
151 }),
152 )))
153 }
154 "session_feedback" => {
155 let (ids, feedback) =
156 pack_batch_payloads::<crate::feedback::types::FeedbackRecord>(rows, cfg, kind)?;
157 if ids.is_empty() {
158 return Ok(None);
159 }
160 Ok(Some((
161 ids,
162 IngestExportBatch::SessionFeedback(
163 crate::sync::export_batch::SessionFeedbackBatchBody { feedback },
164 ),
165 )))
166 }
167 _ => Ok(None),
168 }
169}
170
171fn pack_batch_payloads<T>(
172 rows: &[(i64, String, String)],
173 cfg: &crate::core::config::SyncConfig,
174 kind: &str,
175) -> Result<(Vec<i64>, Vec<T>)>
176where
177 T: serde::de::DeserializeOwned + serde::Serialize,
178{
179 let mut ids = Vec::new();
180 let mut out = Vec::new();
181 let mut bytes = 0usize;
182 let max_ev = cfg.events_per_batch_max.max(1);
183 for (id, row_kind, raw) in rows {
184 if row_kind != kind {
185 break;
186 }
187 let item: T = serde_json::from_str(raw).context("parse outbox payload")?;
188 let inc = serde_json::to_vec(&item)?.len();
189 if out.len() >= max_ev {
190 break;
191 }
192 if bytes + inc > cfg.max_body_bytes && !out.is_empty() {
193 break;
194 }
195 bytes += inc;
196 ids.push(*id);
197 out.push(item);
198 }
199 Ok((ids, out))
200}
201
202fn post_with_fanout(
204 client: &SyncHttpClient,
205 body: &IngestExportBatch,
206 key: &Uuid,
207 flush: &FlushExporters<'_>,
208) -> Result<(
209 Result<PostBatchOutcome, anyhow::Error>,
210 Result<(), anyhow::Error>,
211)> {
212 let fan_body = body.clone();
213 let reg = flush.registry;
214 let fail_open = flush.telemetry.fail_open;
215 Ok(std::thread::scope(|s| {
216 let handle = s.spawn(move || {
217 if let Some(r) = reg {
218 r.fan_out(fail_open, &fan_body)
219 } else {
220 Ok(())
221 }
222 });
223 let post_res: Result<PostBatchOutcome, anyhow::Error> = (|| {
224 let o = match body {
225 IngestExportBatch::Events(b) => client.post_events_batch(b, key)?,
226 IngestExportBatch::ToolSpans(b) => client.post_tool_spans_batch(b, key)?,
227 IngestExportBatch::RepoSnapshots(b) => client.post_repo_snapshots_batch(b, key)?,
228 IngestExportBatch::WorkspaceFacts(b) => {
229 client.post_workspace_facts_batch(b, key)?
230 }
231 IngestExportBatch::SessionEvals(b) => client.post_session_evals_batch(b, key)?,
232 IngestExportBatch::SessionFeedback(_) => PostBatchOutcome::Accepted {
233 received: 0,
234 deduped: 0,
235 },
236 };
237 Ok(o)
238 })();
239 let fan_res = match handle.join() {
240 Ok(Ok(())) => Ok(()),
241 Ok(Err(e)) => Err(e),
242 Err(p) => Err(anyhow::anyhow!("telemetry fan-out join panicked: {p:?}")),
243 };
244 (post_res, fan_res)
245 }))
246}
247
248fn post_batch_resilient(
249 client: &SyncHttpClient,
250 store: &Store,
251 body: IngestExportBatch,
252 ids: &[i64],
253 flush: &FlushExporters<'_>,
254) -> Result<Sent> {
255 let mut backoff = Duration::from_millis(200);
256 let max_backoff = Duration::from_secs(30);
257 let mut server_failures = 0u32;
258
259 loop {
260 if body.item_count() == 0 {
261 return Ok(Sent::default());
262 }
263
264 let key = Uuid::now_v7();
265 let (post_res, fan_res) = post_with_fanout(client, &body, &key, flush)?;
266 let outcome = post_res;
267
268 let outcome = match outcome {
269 Ok(o) => o,
270 Err(e) => {
271 if fan_res.is_err() {
272 tracing::trace!(error = %e, "primary post and fan-out both failed");
273 }
274 return Err(e);
275 }
276 };
277
278 match outcome {
279 PostBatchOutcome::Accepted { .. } | PostBatchOutcome::Conflict => {
280 if let Err(e) = fan_res {
281 return Err(
282 e.context("telemetry fan-out (before outbox commit; fail_open = false)")
283 );
284 }
285 store.mark_outbox_sent(ids)?;
286 store.set_sync_state_ok()?;
287 return Ok(Sent {
288 batches: 1,
289 events: ids.len() as u64,
290 });
291 }
292 PostBatchOutcome::TooLarge => {
293 if let Err(e) = fan_res {
294 tracing::warn!(error = %e, "telemetry fan-out failed; continuing 413 split");
295 }
296 if body.item_count() <= 1 {
297 store.set_sync_state_error("413: single event too large for server")?;
298 anyhow::bail!(
299 "413: single event too large; tighten redaction or max_body_bytes"
300 );
301 }
302 let mid = body.item_count() / 2;
303 let left_ids = ids[..mid].to_vec();
304 let right_ids = ids[mid..].to_vec();
305 let (left_body, right_body) = split_batch(body, mid);
306 let a = post_batch_resilient(client, store, left_body, &left_ids, flush)?;
307 let b = post_batch_resilient(client, store, right_body, &right_ids, flush)?;
308 return Ok(Sent {
309 batches: a.batches + b.batches,
310 events: a.events + b.events,
311 });
312 }
313 PostBatchOutcome::RateLimited(d) => {
314 if let Err(e) = fan_res {
315 tracing::warn!(error = %e, "telemetry fan-out failed during 429; will retry");
316 }
317 thread::sleep(d);
318 }
319 PostBatchOutcome::Unauthorized => {
320 if let Err(e) = fan_res {
321 tracing::warn!(error = %e, "telemetry fan-out during 401");
322 }
323 let msg = "401 unauthorized (check team_token)";
324 store.set_sync_state_error(msg)?;
325 anyhow::bail!("{msg}");
326 }
327 PostBatchOutcome::ClientError(c) => {
328 if let Err(e) = fan_res {
329 tracing::warn!(error = %e, "telemetry fan-out during client error {c}");
330 }
331 let msg = format!("HTTP client error {c}");
332 store.set_sync_state_error(&msg)?;
333 anyhow::bail!("{msg}");
334 }
335 PostBatchOutcome::ServerError(c) => {
336 if let Err(e) = fan_res {
337 tracing::warn!(error = %e, "telemetry fan-out during {c} server error");
338 }
339 server_failures += 1;
340 if server_failures > 12 {
341 let msg = format!("HTTP server error {c} (exhausted retries)");
342 store.set_sync_state_error(&msg)?;
343 anyhow::bail!("{msg}");
344 }
345 thread::sleep(backoff);
346 backoff = (backoff * 2).min(max_backoff);
347 }
348 }
349 }
350}
351
352fn split_batch(body: IngestExportBatch, mid: usize) -> (IngestExportBatch, IngestExportBatch) {
353 match body {
354 IngestExportBatch::Events(body) => (
355 IngestExportBatch::Events(EventsBatchBody {
356 team_id: body.team_id.clone(),
357 workspace_hash: body.workspace_hash.clone(),
358 events: body.events[..mid].to_vec(),
359 }),
360 IngestExportBatch::Events(EventsBatchBody {
361 team_id: body.team_id,
362 workspace_hash: body.workspace_hash,
363 events: body.events[mid..].to_vec(),
364 }),
365 ),
366 IngestExportBatch::ToolSpans(body) => (
367 IngestExportBatch::ToolSpans(ToolSpansBatchBody {
368 team_id: body.team_id.clone(),
369 workspace_hash: body.workspace_hash.clone(),
370 spans: body.spans[..mid].to_vec(),
371 }),
372 IngestExportBatch::ToolSpans(ToolSpansBatchBody {
373 team_id: body.team_id,
374 workspace_hash: body.workspace_hash,
375 spans: body.spans[mid..].to_vec(),
376 }),
377 ),
378 IngestExportBatch::RepoSnapshots(body) => (
379 IngestExportBatch::RepoSnapshots(RepoSnapshotsBatchBody {
380 team_id: body.team_id.clone(),
381 workspace_hash: body.workspace_hash.clone(),
382 snapshots: body.snapshots[..mid].to_vec(),
383 }),
384 IngestExportBatch::RepoSnapshots(RepoSnapshotsBatchBody {
385 team_id: body.team_id,
386 workspace_hash: body.workspace_hash,
387 snapshots: body.snapshots[mid..].to_vec(),
388 }),
389 ),
390 IngestExportBatch::WorkspaceFacts(body) => (
391 IngestExportBatch::WorkspaceFacts(WorkspaceFactsBatchBody {
392 team_id: body.team_id.clone(),
393 workspace_hash: body.workspace_hash.clone(),
394 facts: body.facts[..mid].to_vec(),
395 }),
396 IngestExportBatch::WorkspaceFacts(WorkspaceFactsBatchBody {
397 team_id: body.team_id,
398 workspace_hash: body.workspace_hash,
399 facts: body.facts[mid..].to_vec(),
400 }),
401 ),
402 IngestExportBatch::SessionEvals(body) => (
403 IngestExportBatch::SessionEvals(crate::sync::export_batch::SessionEvalsBatchBody {
404 evals: body.evals[..mid].to_vec(),
405 }),
406 IngestExportBatch::SessionEvals(crate::sync::export_batch::SessionEvalsBatchBody {
407 evals: body.evals[mid..].to_vec(),
408 }),
409 ),
410 IngestExportBatch::SessionFeedback(body) => (
411 IngestExportBatch::SessionFeedback(
412 crate::sync::export_batch::SessionFeedbackBatchBody {
413 feedback: body.feedback[..mid].to_vec(),
414 },
415 ),
416 IngestExportBatch::SessionFeedback(
417 crate::sync::export_batch::SessionFeedbackBatchBody {
418 feedback: body.feedback[mid..].to_vec(),
419 },
420 ),
421 ),
422 }
423}