1use crate::core::config::TelemetryConfig;
8use crate::store::Store;
9use crate::sync::IngestExportBatch;
10use crate::sync::client::{PostBatchOutcome, SyncHttpClient};
11use crate::sync::outbound::{EventsBatchBody, OutboundEvent};
12use crate::sync::smart::{
13 OutboundRepoSnapshotChunk, OutboundToolSpan, OutboundWorkspaceFactRow, RepoSnapshotsBatchBody,
14 ToolSpansBatchBody, WorkspaceFactsBatchBody,
15};
16use crate::telemetry::ExporterRegistry;
17use anyhow::Context;
18use anyhow::Result;
19use std::path::Path;
20use std::thread;
21use std::time::Duration;
22use uuid::Uuid;
23
24#[derive(Clone, Copy)]
26pub struct FlushExporters<'a> {
27 pub telemetry: &'a TelemetryConfig,
28 pub registry: Option<&'a ExporterRegistry>,
29}
30
31pub fn flush_outbox_once(
33 store: &Store,
34 workspace_root: &Path,
35 cfg: &crate::core::config::SyncConfig,
36 team_salt: &[u8; 32],
37 flush: &FlushExporters<'_>,
38) -> Result<FlushStats> {
39 if cfg.endpoint.is_empty() {
40 return Ok(FlushStats::default());
41 }
42 let client = SyncHttpClient::new(&cfg.endpoint, &cfg.team_token)?;
43 let workspace_hash = crate::sync::outbound::workspace_hash(team_salt, workspace_root);
44 let mut stats = FlushStats::default();
45
46 while store.outbox_pending_count()? > 0 {
47 let rows = store.list_outbox_pending(10_000)?;
48 if rows.is_empty() {
49 break;
50 }
51 let Some(kind) = rows.first().map(|(_, k, _)| k.clone()) else {
52 break;
53 };
54 let sent = match build_batch(&rows, cfg, &cfg.team_id, &workspace_hash, &kind)? {
55 Some((ids, batch)) => post_batch_resilient(&client, store, batch, &ids, flush)?,
56 None => break,
57 };
58 stats.batches += sent.batches;
59 stats.events_sent += sent.events;
60 }
61
62 Ok(stats)
63}
64
65#[derive(Debug, Default, Clone)]
66pub struct FlushStats {
67 pub batches: u64,
68 pub events_sent: u64,
69}
70
71#[derive(Default)]
72struct Sent {
73 batches: u64,
74 events: u64,
75}
76
77fn build_batch(
78 rows: &[(i64, String, String)],
79 cfg: &crate::core::config::SyncConfig,
80 team_id: &str,
81 workspace_hash: &str,
82 kind: &str,
83) -> Result<Option<(Vec<i64>, IngestExportBatch)>> {
84 match kind {
85 "events" => {
86 let (ids, events) = pack_batch_payloads::<OutboundEvent>(rows, cfg, kind)?;
87 if ids.is_empty() {
88 return Ok(None);
89 }
90 Ok(Some((
91 ids,
92 IngestExportBatch::Events(EventsBatchBody {
93 team_id: team_id.into(),
94 workspace_hash: workspace_hash.into(),
95 events,
96 }),
97 )))
98 }
99 "tool_spans" => {
100 let (ids, spans) = pack_batch_payloads::<OutboundToolSpan>(rows, cfg, kind)?;
101 if ids.is_empty() {
102 return Ok(None);
103 }
104 Ok(Some((
105 ids,
106 IngestExportBatch::ToolSpans(ToolSpansBatchBody {
107 team_id: team_id.into(),
108 workspace_hash: workspace_hash.into(),
109 spans,
110 }),
111 )))
112 }
113 "repo_snapshots" => {
114 let (ids, snapshots) =
115 pack_batch_payloads::<OutboundRepoSnapshotChunk>(rows, cfg, kind)?;
116 if ids.is_empty() {
117 return Ok(None);
118 }
119 Ok(Some((
120 ids,
121 IngestExportBatch::RepoSnapshots(RepoSnapshotsBatchBody {
122 team_id: team_id.into(),
123 workspace_hash: workspace_hash.into(),
124 snapshots,
125 }),
126 )))
127 }
128 "workspace_facts" => {
129 let (ids, facts) = pack_batch_payloads::<OutboundWorkspaceFactRow>(rows, cfg, kind)?;
130 if ids.is_empty() {
131 return Ok(None);
132 }
133 Ok(Some((
134 ids,
135 IngestExportBatch::WorkspaceFacts(WorkspaceFactsBatchBody {
136 team_id: team_id.into(),
137 workspace_hash: workspace_hash.into(),
138 facts,
139 }),
140 )))
141 }
142 _ => Ok(None),
143 }
144}
145
146fn pack_batch_payloads<T>(
147 rows: &[(i64, String, String)],
148 cfg: &crate::core::config::SyncConfig,
149 kind: &str,
150) -> Result<(Vec<i64>, Vec<T>)>
151where
152 T: serde::de::DeserializeOwned + serde::Serialize,
153{
154 let mut ids = Vec::new();
155 let mut out = Vec::new();
156 let mut bytes = 0usize;
157 let max_ev = cfg.events_per_batch_max.max(1);
158 for (id, row_kind, raw) in rows {
159 if row_kind != kind {
160 break;
161 }
162 let item: T = serde_json::from_str(raw).context("parse outbox payload")?;
163 let inc = serde_json::to_vec(&item)?.len();
164 if out.len() >= max_ev {
165 break;
166 }
167 if bytes + inc > cfg.max_body_bytes && !out.is_empty() {
168 break;
169 }
170 bytes += inc;
171 ids.push(*id);
172 out.push(item);
173 }
174 Ok((ids, out))
175}
176
177fn post_with_fanout(
179 client: &SyncHttpClient,
180 body: &IngestExportBatch,
181 key: &Uuid,
182 flush: &FlushExporters<'_>,
183) -> Result<(
184 Result<PostBatchOutcome, anyhow::Error>,
185 Result<(), anyhow::Error>,
186)> {
187 let fan_body = body.clone();
188 let reg = flush.registry;
189 let fail_open = flush.telemetry.fail_open;
190 Ok(std::thread::scope(|s| {
191 let handle = s.spawn(move || {
192 if let Some(r) = reg {
193 r.fan_out(fail_open, &fan_body)
194 } else {
195 Ok(())
196 }
197 });
198 let post_res: Result<PostBatchOutcome, anyhow::Error> = (|| {
199 let o = match body {
200 IngestExportBatch::Events(b) => client.post_events_batch(b, key)?,
201 IngestExportBatch::ToolSpans(b) => client.post_tool_spans_batch(b, key)?,
202 IngestExportBatch::RepoSnapshots(b) => client.post_repo_snapshots_batch(b, key)?,
203 IngestExportBatch::WorkspaceFacts(b) => {
204 client.post_workspace_facts_batch(b, key)?
205 }
206 };
207 Ok(o)
208 })();
209 let fan_res = match handle.join() {
210 Ok(Ok(())) => Ok(()),
211 Ok(Err(e)) => Err(e),
212 Err(p) => Err(anyhow::anyhow!("telemetry fan-out join panicked: {p:?}")),
213 };
214 (post_res, fan_res)
215 }))
216}
217
218fn post_batch_resilient(
219 client: &SyncHttpClient,
220 store: &Store,
221 body: IngestExportBatch,
222 ids: &[i64],
223 flush: &FlushExporters<'_>,
224) -> Result<Sent> {
225 let mut backoff = Duration::from_millis(200);
226 let max_backoff = Duration::from_secs(30);
227 let mut server_failures = 0u32;
228
229 loop {
230 if body.item_count() == 0 {
231 return Ok(Sent::default());
232 }
233
234 let key = Uuid::now_v7();
235 let (post_res, fan_res) = post_with_fanout(client, &body, &key, flush)?;
236 let outcome = post_res;
237
238 let outcome = match outcome {
239 Ok(o) => o,
240 Err(e) => {
241 if fan_res.is_err() {
242 tracing::trace!(error = %e, "primary post and fan-out both failed");
243 }
244 return Err(e);
245 }
246 };
247
248 match outcome {
249 PostBatchOutcome::Accepted { .. } | PostBatchOutcome::Conflict => {
250 if let Err(e) = fan_res {
251 return Err(
252 e.context("telemetry fan-out (before outbox commit; fail_open = false)")
253 );
254 }
255 store.mark_outbox_sent(ids)?;
256 store.set_sync_state_ok()?;
257 return Ok(Sent {
258 batches: 1,
259 events: ids.len() as u64,
260 });
261 }
262 PostBatchOutcome::TooLarge => {
263 if let Err(e) = fan_res {
264 tracing::warn!(error = %e, "telemetry fan-out failed; continuing 413 split");
265 }
266 if body.item_count() <= 1 {
267 store.set_sync_state_error("413: single event too large for server")?;
268 anyhow::bail!(
269 "413: single event too large; tighten redaction or max_body_bytes"
270 );
271 }
272 let mid = body.item_count() / 2;
273 let left_ids = ids[..mid].to_vec();
274 let right_ids = ids[mid..].to_vec();
275 let (left_body, right_body) = split_batch(body, mid);
276 let a = post_batch_resilient(client, store, left_body, &left_ids, flush)?;
277 let b = post_batch_resilient(client, store, right_body, &right_ids, flush)?;
278 return Ok(Sent {
279 batches: a.batches + b.batches,
280 events: a.events + b.events,
281 });
282 }
283 PostBatchOutcome::RateLimited(d) => {
284 if let Err(e) = fan_res {
285 tracing::warn!(error = %e, "telemetry fan-out failed during 429; will retry");
286 }
287 thread::sleep(d);
288 }
289 PostBatchOutcome::Unauthorized => {
290 if let Err(e) = fan_res {
291 tracing::warn!(error = %e, "telemetry fan-out during 401");
292 }
293 let msg = "401 unauthorized (check team_token)";
294 store.set_sync_state_error(msg)?;
295 anyhow::bail!("{msg}");
296 }
297 PostBatchOutcome::ClientError(c) => {
298 if let Err(e) = fan_res {
299 tracing::warn!(error = %e, "telemetry fan-out during client error {c}");
300 }
301 let msg = format!("HTTP client error {c}");
302 store.set_sync_state_error(&msg)?;
303 anyhow::bail!("{msg}");
304 }
305 PostBatchOutcome::ServerError(c) => {
306 if let Err(e) = fan_res {
307 tracing::warn!(error = %e, "telemetry fan-out during {c} server error");
308 }
309 server_failures += 1;
310 if server_failures > 12 {
311 let msg = format!("HTTP server error {c} (exhausted retries)");
312 store.set_sync_state_error(&msg)?;
313 anyhow::bail!("{msg}");
314 }
315 thread::sleep(backoff);
316 backoff = (backoff * 2).min(max_backoff);
317 }
318 }
319 }
320}
321
322fn split_batch(body: IngestExportBatch, mid: usize) -> (IngestExportBatch, IngestExportBatch) {
323 match body {
324 IngestExportBatch::Events(body) => (
325 IngestExportBatch::Events(EventsBatchBody {
326 team_id: body.team_id.clone(),
327 workspace_hash: body.workspace_hash.clone(),
328 events: body.events[..mid].to_vec(),
329 }),
330 IngestExportBatch::Events(EventsBatchBody {
331 team_id: body.team_id,
332 workspace_hash: body.workspace_hash,
333 events: body.events[mid..].to_vec(),
334 }),
335 ),
336 IngestExportBatch::ToolSpans(body) => (
337 IngestExportBatch::ToolSpans(ToolSpansBatchBody {
338 team_id: body.team_id.clone(),
339 workspace_hash: body.workspace_hash.clone(),
340 spans: body.spans[..mid].to_vec(),
341 }),
342 IngestExportBatch::ToolSpans(ToolSpansBatchBody {
343 team_id: body.team_id,
344 workspace_hash: body.workspace_hash,
345 spans: body.spans[mid..].to_vec(),
346 }),
347 ),
348 IngestExportBatch::RepoSnapshots(body) => (
349 IngestExportBatch::RepoSnapshots(RepoSnapshotsBatchBody {
350 team_id: body.team_id.clone(),
351 workspace_hash: body.workspace_hash.clone(),
352 snapshots: body.snapshots[..mid].to_vec(),
353 }),
354 IngestExportBatch::RepoSnapshots(RepoSnapshotsBatchBody {
355 team_id: body.team_id,
356 workspace_hash: body.workspace_hash,
357 snapshots: body.snapshots[mid..].to_vec(),
358 }),
359 ),
360 IngestExportBatch::WorkspaceFacts(body) => (
361 IngestExportBatch::WorkspaceFacts(WorkspaceFactsBatchBody {
362 team_id: body.team_id.clone(),
363 workspace_hash: body.workspace_hash.clone(),
364 facts: body.facts[..mid].to_vec(),
365 }),
366 IngestExportBatch::WorkspaceFacts(WorkspaceFactsBatchBody {
367 team_id: body.team_id,
368 workspace_hash: body.workspace_hash,
369 facts: body.facts[mid..].to_vec(),
370 }),
371 ),
372 }
373}