1use crate::core::config::TelemetryConfig;
8use crate::store::Store;
9use crate::sync::IngestExportBatch;
10use crate::sync::client::{PostBatchOutcome, SyncHttpClient};
11use crate::sync::outbound::{EventsBatchBody, OutboundEvent};
12use crate::sync::smart::{
13 OutboundRepoSnapshotChunk, OutboundToolSpan, OutboundWorkspaceFactRow, RepoSnapshotsBatchBody,
14 ToolSpansBatchBody, WorkspaceFactsBatchBody,
15};
16use crate::telemetry::ExporterRegistry;
17use anyhow::Context;
18use anyhow::Result;
19use std::path::Path;
20use std::thread;
21use std::time::Duration;
22use uuid::Uuid;
23
24#[derive(Clone, Copy)]
26pub struct FlushExporters<'a> {
27 pub telemetry: &'a TelemetryConfig,
28 pub registry: Option<&'a ExporterRegistry>,
29}
30
31pub fn flush_outbox_once(
33 store: &Store,
34 workspace_root: &Path,
35 cfg: &crate::core::config::SyncConfig,
36 team_salt: &[u8; 32],
37 flush: &FlushExporters<'_>,
38) -> Result<FlushStats> {
39 if cfg.endpoint.is_empty() {
40 return Ok(FlushStats::default());
41 }
42 let client = SyncHttpClient::new(&cfg.endpoint, &cfg.team_token)?;
43 let workspace_hash = crate::sync::outbound::workspace_hash(team_salt, workspace_root);
44 let project_name = crate::core::project_identity::project_name(workspace_root);
45 let mut stats = FlushStats::default();
46
47 while store.outbox_pending_count()? > 0 {
48 let rows = store.list_outbox_pending(10_000)?;
49 if rows.is_empty() {
50 break;
51 }
52 let Some(kind) = rows.first().map(|(_, k, _)| k.clone()) else {
53 break;
54 };
55 let sent = match build_batch(
56 &rows,
57 cfg,
58 &cfg.team_id,
59 &workspace_hash,
60 &project_name,
61 &kind,
62 )? {
63 Some((ids, batch)) => post_batch_resilient(&client, store, batch, &ids, flush)?,
64 None => break,
65 };
66 stats.batches += sent.batches;
67 stats.events_sent += sent.events;
68 }
69
70 Ok(stats)
71}
72
73#[derive(Debug, Default, Clone)]
74pub struct FlushStats {
75 pub batches: u64,
76 pub events_sent: u64,
77}
78
79#[derive(Default)]
80struct Sent {
81 batches: u64,
82 events: u64,
83}
84
85fn build_batch(
86 rows: &[(i64, String, String)],
87 cfg: &crate::core::config::SyncConfig,
88 team_id: &str,
89 workspace_hash: &str,
90 project_name: &Option<String>,
91 kind: &str,
92) -> Result<Option<(Vec<i64>, IngestExportBatch)>> {
93 match kind {
94 "events" => {
95 let (ids, events) = pack_batch_payloads::<OutboundEvent>(rows, cfg, kind)?;
96 if ids.is_empty() {
97 return Ok(None);
98 }
99 Ok(Some((
100 ids,
101 IngestExportBatch::Events(EventsBatchBody {
102 team_id: team_id.into(),
103 workspace_hash: workspace_hash.into(),
104 project_name: project_name.clone(),
105 events,
106 }),
107 )))
108 }
109 "tool_spans" => {
110 let (ids, spans) = pack_batch_payloads::<OutboundToolSpan>(rows, cfg, kind)?;
111 if ids.is_empty() {
112 return Ok(None);
113 }
114 Ok(Some((
115 ids,
116 IngestExportBatch::ToolSpans(ToolSpansBatchBody {
117 team_id: team_id.into(),
118 workspace_hash: workspace_hash.into(),
119 project_name: project_name.clone(),
120 spans,
121 }),
122 )))
123 }
124 "repo_snapshots" => {
125 let (ids, snapshots) =
126 pack_batch_payloads::<OutboundRepoSnapshotChunk>(rows, cfg, kind)?;
127 if ids.is_empty() {
128 return Ok(None);
129 }
130 Ok(Some((
131 ids,
132 IngestExportBatch::RepoSnapshots(RepoSnapshotsBatchBody {
133 team_id: team_id.into(),
134 workspace_hash: workspace_hash.into(),
135 project_name: project_name.clone(),
136 snapshots,
137 }),
138 )))
139 }
140 "workspace_facts" => {
141 let (ids, facts) = pack_batch_payloads::<OutboundWorkspaceFactRow>(rows, cfg, kind)?;
142 if ids.is_empty() {
143 return Ok(None);
144 }
145 Ok(Some((
146 ids,
147 IngestExportBatch::WorkspaceFacts(WorkspaceFactsBatchBody {
148 team_id: team_id.into(),
149 workspace_hash: workspace_hash.into(),
150 project_name: project_name.clone(),
151 facts,
152 }),
153 )))
154 }
155 "session_evals" => {
156 let (ids, evals) = pack_batch_payloads::<crate::eval::types::EvalRow>(rows, cfg, kind)?;
157 if ids.is_empty() {
158 return Ok(None);
159 }
160 Ok(Some((
161 ids,
162 IngestExportBatch::SessionEvals(crate::sync::export_batch::SessionEvalsBatchBody {
163 evals,
164 }),
165 )))
166 }
167 "session_feedback" => {
168 let (ids, feedback) =
169 pack_batch_payloads::<crate::feedback::types::FeedbackRecord>(rows, cfg, kind)?;
170 if ids.is_empty() {
171 return Ok(None);
172 }
173 Ok(Some((
174 ids,
175 IngestExportBatch::SessionFeedback(
176 crate::sync::export_batch::SessionFeedbackBatchBody { feedback },
177 ),
178 )))
179 }
180 _ => Ok(None),
181 }
182}
183
184fn pack_batch_payloads<T>(
185 rows: &[(i64, String, String)],
186 cfg: &crate::core::config::SyncConfig,
187 kind: &str,
188) -> Result<(Vec<i64>, Vec<T>)>
189where
190 T: serde::de::DeserializeOwned + serde::Serialize,
191{
192 let mut ids = Vec::new();
193 let mut out = Vec::new();
194 let mut bytes = 0usize;
195 let max_ev = cfg.events_per_batch_max.max(1);
196 for (id, row_kind, raw) in rows {
197 if row_kind != kind {
198 break;
199 }
200 let item: T = serde_json::from_str(raw).context("parse outbox payload")?;
201 let inc = serde_json::to_vec(&item)?.len();
202 if out.len() >= max_ev {
203 break;
204 }
205 if bytes + inc > cfg.max_body_bytes && !out.is_empty() {
206 break;
207 }
208 bytes += inc;
209 ids.push(*id);
210 out.push(item);
211 }
212 Ok((ids, out))
213}
214
215fn post_with_fanout(
217 client: &SyncHttpClient,
218 body: &IngestExportBatch,
219 key: &Uuid,
220 flush: &FlushExporters<'_>,
221) -> Result<(
222 Result<PostBatchOutcome, anyhow::Error>,
223 Result<(), anyhow::Error>,
224)> {
225 let fan_body = body.clone();
226 let reg = flush.registry;
227 let fail_open = flush.telemetry.fail_open;
228 Ok(std::thread::scope(|s| {
229 let handle = s.spawn(move || {
230 if let Some(r) = reg {
231 r.fan_out(fail_open, &fan_body)
232 } else {
233 Ok(())
234 }
235 });
236 let post_res: Result<PostBatchOutcome, anyhow::Error> = (|| {
237 let o = match body {
238 IngestExportBatch::Events(b) => client.post_events_batch(b, key)?,
239 IngestExportBatch::ToolSpans(b) => client.post_tool_spans_batch(b, key)?,
240 IngestExportBatch::RepoSnapshots(b) => client.post_repo_snapshots_batch(b, key)?,
241 IngestExportBatch::WorkspaceFacts(b) => {
242 client.post_workspace_facts_batch(b, key)?
243 }
244 IngestExportBatch::SessionEvals(b) => client.post_session_evals_batch(b, key)?,
245 IngestExportBatch::SessionFeedback(_) => PostBatchOutcome::Accepted {
246 received: 0,
247 deduped: 0,
248 },
249 };
250 Ok(o)
251 })();
252 let fan_res = match handle.join() {
253 Ok(Ok(())) => Ok(()),
254 Ok(Err(e)) => Err(e),
255 Err(p) => Err(anyhow::anyhow!("telemetry fan-out join panicked: {p:?}")),
256 };
257 (post_res, fan_res)
258 }))
259}
260
261fn post_batch_resilient(
262 client: &SyncHttpClient,
263 store: &Store,
264 body: IngestExportBatch,
265 ids: &[i64],
266 flush: &FlushExporters<'_>,
267) -> Result<Sent> {
268 let mut backoff = Duration::from_millis(200);
269 let max_backoff = Duration::from_secs(30);
270 let mut server_failures = 0u32;
271
272 loop {
273 if body.item_count() == 0 {
274 return Ok(Sent::default());
275 }
276
277 let key = Uuid::now_v7();
278 let (post_res, fan_res) = post_with_fanout(client, &body, &key, flush)?;
279 let outcome = post_res;
280
281 let outcome = match outcome {
282 Ok(o) => o,
283 Err(e) => {
284 if fan_res.is_err() {
285 tracing::trace!(error = %e, "primary post and fan-out both failed");
286 }
287 return Err(e);
288 }
289 };
290
291 match outcome {
292 PostBatchOutcome::Accepted { .. } | PostBatchOutcome::Conflict => {
293 if let Err(e) = fan_res {
294 return Err(
295 e.context("telemetry fan-out (before outbox commit; fail_open = false)")
296 );
297 }
298 store.mark_outbox_sent(ids)?;
299 store.set_sync_state_ok()?;
300 return Ok(Sent {
301 batches: 1,
302 events: ids.len() as u64,
303 });
304 }
305 PostBatchOutcome::TooLarge => {
306 if let Err(e) = fan_res {
307 tracing::warn!(error = %e, "telemetry fan-out failed; continuing 413 split");
308 }
309 if body.item_count() <= 1 {
310 store.set_sync_state_error("413: single event too large for server")?;
311 anyhow::bail!(
312 "413: single event too large; tighten redaction or max_body_bytes"
313 );
314 }
315 let mid = body.item_count() / 2;
316 let left_ids = ids[..mid].to_vec();
317 let right_ids = ids[mid..].to_vec();
318 let (left_body, right_body) = split_batch(body, mid);
319 let a = post_batch_resilient(client, store, left_body, &left_ids, flush)?;
320 let b = post_batch_resilient(client, store, right_body, &right_ids, flush)?;
321 return Ok(Sent {
322 batches: a.batches + b.batches,
323 events: a.events + b.events,
324 });
325 }
326 PostBatchOutcome::RateLimited(d) => {
327 if let Err(e) = fan_res {
328 tracing::warn!(error = %e, "telemetry fan-out failed during 429; will retry");
329 }
330 thread::sleep(d);
331 }
332 PostBatchOutcome::Unauthorized => {
333 if let Err(e) = fan_res {
334 tracing::warn!(error = %e, "telemetry fan-out during 401");
335 }
336 let msg = "401 unauthorized (check team_token)";
337 store.set_sync_state_error(msg)?;
338 anyhow::bail!("{msg}");
339 }
340 PostBatchOutcome::ClientError(c) => {
341 if let Err(e) = fan_res {
342 tracing::warn!(error = %e, "telemetry fan-out during client error {c}");
343 }
344 let msg = format!("HTTP client error {c}");
345 store.set_sync_state_error(&msg)?;
346 anyhow::bail!("{msg}");
347 }
348 PostBatchOutcome::ServerError(c) => {
349 if let Err(e) = fan_res {
350 tracing::warn!(error = %e, "telemetry fan-out during {c} server error");
351 }
352 server_failures += 1;
353 if server_failures > 12 {
354 let msg = format!("HTTP server error {c} (exhausted retries)");
355 store.set_sync_state_error(&msg)?;
356 anyhow::bail!("{msg}");
357 }
358 thread::sleep(backoff);
359 backoff = (backoff * 2).min(max_backoff);
360 }
361 }
362 }
363}
364
365fn split_batch(body: IngestExportBatch, mid: usize) -> (IngestExportBatch, IngestExportBatch) {
366 match body {
367 IngestExportBatch::Events(body) => (
368 IngestExportBatch::Events(EventsBatchBody {
369 team_id: body.team_id.clone(),
370 workspace_hash: body.workspace_hash.clone(),
371 project_name: body.project_name.clone(),
372 events: body.events[..mid].to_vec(),
373 }),
374 IngestExportBatch::Events(EventsBatchBody {
375 team_id: body.team_id,
376 workspace_hash: body.workspace_hash,
377 project_name: body.project_name,
378 events: body.events[mid..].to_vec(),
379 }),
380 ),
381 IngestExportBatch::ToolSpans(body) => (
382 IngestExportBatch::ToolSpans(ToolSpansBatchBody {
383 team_id: body.team_id.clone(),
384 workspace_hash: body.workspace_hash.clone(),
385 project_name: body.project_name.clone(),
386 spans: body.spans[..mid].to_vec(),
387 }),
388 IngestExportBatch::ToolSpans(ToolSpansBatchBody {
389 team_id: body.team_id,
390 workspace_hash: body.workspace_hash,
391 project_name: body.project_name,
392 spans: body.spans[mid..].to_vec(),
393 }),
394 ),
395 IngestExportBatch::RepoSnapshots(body) => (
396 IngestExportBatch::RepoSnapshots(RepoSnapshotsBatchBody {
397 team_id: body.team_id.clone(),
398 workspace_hash: body.workspace_hash.clone(),
399 project_name: body.project_name.clone(),
400 snapshots: body.snapshots[..mid].to_vec(),
401 }),
402 IngestExportBatch::RepoSnapshots(RepoSnapshotsBatchBody {
403 team_id: body.team_id,
404 workspace_hash: body.workspace_hash,
405 project_name: body.project_name,
406 snapshots: body.snapshots[mid..].to_vec(),
407 }),
408 ),
409 IngestExportBatch::WorkspaceFacts(body) => (
410 IngestExportBatch::WorkspaceFacts(WorkspaceFactsBatchBody {
411 team_id: body.team_id.clone(),
412 workspace_hash: body.workspace_hash.clone(),
413 project_name: body.project_name.clone(),
414 facts: body.facts[..mid].to_vec(),
415 }),
416 IngestExportBatch::WorkspaceFacts(WorkspaceFactsBatchBody {
417 team_id: body.team_id,
418 workspace_hash: body.workspace_hash,
419 project_name: body.project_name,
420 facts: body.facts[mid..].to_vec(),
421 }),
422 ),
423 IngestExportBatch::SessionEvals(body) => (
424 IngestExportBatch::SessionEvals(crate::sync::export_batch::SessionEvalsBatchBody {
425 evals: body.evals[..mid].to_vec(),
426 }),
427 IngestExportBatch::SessionEvals(crate::sync::export_batch::SessionEvalsBatchBody {
428 evals: body.evals[mid..].to_vec(),
429 }),
430 ),
431 IngestExportBatch::SessionFeedback(body) => (
432 IngestExportBatch::SessionFeedback(
433 crate::sync::export_batch::SessionFeedbackBatchBody {
434 feedback: body.feedback[..mid].to_vec(),
435 },
436 ),
437 IngestExportBatch::SessionFeedback(
438 crate::sync::export_batch::SessionFeedbackBatchBody {
439 feedback: body.feedback[mid..].to_vec(),
440 },
441 ),
442 ),
443 }
444}