Skip to main content

kaizen/sync/
engine.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! Flush outbox batches: size limits, split on 413, backoff on 429 / transient errors.
3//! Optional `FlushExporters` runs HTTP fan-out in parallel with the primary Kaizen POST; only
4//! a successful (or 409) primary result commits outbox. Secondary `Err` is observed only in that
5//! same step (and blocks commit when `fail_open` is `false`).
6
7use crate::core::config::TelemetryConfig;
8use crate::store::Store;
9use crate::sync::IngestExportBatch;
10use crate::sync::client::{PostBatchOutcome, SyncHttpClient};
11use crate::sync::outbound::{EventsBatchBody, OutboundEvent};
12use crate::sync::smart::{
13    OutboundRepoSnapshotChunk, OutboundToolSpan, OutboundWorkspaceFactRow, RepoSnapshotsBatchBody,
14    ToolSpansBatchBody, WorkspaceFactsBatchBody,
15};
16use crate::telemetry::ExporterRegistry;
17use anyhow::Context;
18use anyhow::Result;
19use std::path::Path;
20use std::thread;
21use std::time::Duration;
22use uuid::Uuid;
23
24/// Context for optional pluggable sinks (see [`crate::telemetry`]). Only holds references; copy freely.
25#[derive(Clone, Copy)]
26pub struct FlushExporters<'a> {
27    pub telemetry: &'a TelemetryConfig,
28    pub registry: Option<&'a ExporterRegistry>,
29}
30
31/// Flush pending outbox rows (all that fit batch constraints per iteration).
32pub fn flush_outbox_once(
33    store: &Store,
34    workspace_root: &Path,
35    cfg: &crate::core::config::SyncConfig,
36    team_salt: &[u8; 32],
37    flush: &FlushExporters<'_>,
38) -> Result<FlushStats> {
39    if cfg.endpoint.is_empty() {
40        return Ok(FlushStats::default());
41    }
42    let client = SyncHttpClient::new(&cfg.endpoint, &cfg.team_token)?;
43    let workspace_hash = crate::sync::outbound::workspace_hash(team_salt, workspace_root);
44    let mut stats = FlushStats::default();
45
46    while store.outbox_pending_count()? > 0 {
47        let rows = store.list_outbox_pending(10_000)?;
48        if rows.is_empty() {
49            break;
50        }
51        let Some(kind) = rows.first().map(|(_, k, _)| k.clone()) else {
52            break;
53        };
54        let sent = match build_batch(&rows, cfg, &cfg.team_id, &workspace_hash, &kind)? {
55            Some((ids, batch)) => post_batch_resilient(&client, store, batch, &ids, flush)?,
56            None => break,
57        };
58        stats.batches += sent.batches;
59        stats.events_sent += sent.events;
60    }
61
62    Ok(stats)
63}
64
65#[derive(Debug, Default, Clone)]
66pub struct FlushStats {
67    pub batches: u64,
68    pub events_sent: u64,
69}
70
71#[derive(Default)]
72struct Sent {
73    batches: u64,
74    events: u64,
75}
76
77fn build_batch(
78    rows: &[(i64, String, String)],
79    cfg: &crate::core::config::SyncConfig,
80    team_id: &str,
81    workspace_hash: &str,
82    kind: &str,
83) -> Result<Option<(Vec<i64>, IngestExportBatch)>> {
84    match kind {
85        "events" => {
86            let (ids, events) = pack_batch_payloads::<OutboundEvent>(rows, cfg, kind)?;
87            if ids.is_empty() {
88                return Ok(None);
89            }
90            Ok(Some((
91                ids,
92                IngestExportBatch::Events(EventsBatchBody {
93                    team_id: team_id.into(),
94                    workspace_hash: workspace_hash.into(),
95                    events,
96                }),
97            )))
98        }
99        "tool_spans" => {
100            let (ids, spans) = pack_batch_payloads::<OutboundToolSpan>(rows, cfg, kind)?;
101            if ids.is_empty() {
102                return Ok(None);
103            }
104            Ok(Some((
105                ids,
106                IngestExportBatch::ToolSpans(ToolSpansBatchBody {
107                    team_id: team_id.into(),
108                    workspace_hash: workspace_hash.into(),
109                    spans,
110                }),
111            )))
112        }
113        "repo_snapshots" => {
114            let (ids, snapshots) =
115                pack_batch_payloads::<OutboundRepoSnapshotChunk>(rows, cfg, kind)?;
116            if ids.is_empty() {
117                return Ok(None);
118            }
119            Ok(Some((
120                ids,
121                IngestExportBatch::RepoSnapshots(RepoSnapshotsBatchBody {
122                    team_id: team_id.into(),
123                    workspace_hash: workspace_hash.into(),
124                    snapshots,
125                }),
126            )))
127        }
128        "workspace_facts" => {
129            let (ids, facts) = pack_batch_payloads::<OutboundWorkspaceFactRow>(rows, cfg, kind)?;
130            if ids.is_empty() {
131                return Ok(None);
132            }
133            Ok(Some((
134                ids,
135                IngestExportBatch::WorkspaceFacts(WorkspaceFactsBatchBody {
136                    team_id: team_id.into(),
137                    workspace_hash: workspace_hash.into(),
138                    facts,
139                }),
140            )))
141        }
142        "session_evals" => {
143            let (ids, evals) = pack_batch_payloads::<crate::eval::types::EvalRow>(rows, cfg, kind)?;
144            if ids.is_empty() {
145                return Ok(None);
146            }
147            Ok(Some((
148                ids,
149                IngestExportBatch::SessionEvals(crate::sync::export_batch::SessionEvalsBatchBody {
150                    evals,
151                }),
152            )))
153        }
154        _ => Ok(None),
155    }
156}
157
158fn pack_batch_payloads<T>(
159    rows: &[(i64, String, String)],
160    cfg: &crate::core::config::SyncConfig,
161    kind: &str,
162) -> Result<(Vec<i64>, Vec<T>)>
163where
164    T: serde::de::DeserializeOwned + serde::Serialize,
165{
166    let mut ids = Vec::new();
167    let mut out = Vec::new();
168    let mut bytes = 0usize;
169    let max_ev = cfg.events_per_batch_max.max(1);
170    for (id, row_kind, raw) in rows {
171        if row_kind != kind {
172            break;
173        }
174        let item: T = serde_json::from_str(raw).context("parse outbox payload")?;
175        let inc = serde_json::to_vec(&item)?.len();
176        if out.len() >= max_ev {
177            break;
178        }
179        if bytes + inc > cfg.max_body_bytes && !out.is_empty() {
180            break;
181        }
182        bytes += inc;
183        ids.push(*id);
184        out.push(item);
185    }
186    Ok((ids, out))
187}
188
189/// Primary POST in parallel with optional exporter fan-out. Returns `(post, fan)`.
190fn post_with_fanout(
191    client: &SyncHttpClient,
192    body: &IngestExportBatch,
193    key: &Uuid,
194    flush: &FlushExporters<'_>,
195) -> Result<(
196    Result<PostBatchOutcome, anyhow::Error>,
197    Result<(), anyhow::Error>,
198)> {
199    let fan_body = body.clone();
200    let reg = flush.registry;
201    let fail_open = flush.telemetry.fail_open;
202    Ok(std::thread::scope(|s| {
203        let handle = s.spawn(move || {
204            if let Some(r) = reg {
205                r.fan_out(fail_open, &fan_body)
206            } else {
207                Ok(())
208            }
209        });
210        let post_res: Result<PostBatchOutcome, anyhow::Error> = (|| {
211            let o = match body {
212                IngestExportBatch::Events(b) => client.post_events_batch(b, key)?,
213                IngestExportBatch::ToolSpans(b) => client.post_tool_spans_batch(b, key)?,
214                IngestExportBatch::RepoSnapshots(b) => client.post_repo_snapshots_batch(b, key)?,
215                IngestExportBatch::WorkspaceFacts(b) => {
216                    client.post_workspace_facts_batch(b, key)?
217                }
218                IngestExportBatch::SessionEvals(b) => client.post_session_evals_batch(b, key)?,
219            };
220            Ok(o)
221        })();
222        let fan_res = match handle.join() {
223            Ok(Ok(())) => Ok(()),
224            Ok(Err(e)) => Err(e),
225            Err(p) => Err(anyhow::anyhow!("telemetry fan-out join panicked: {p:?}")),
226        };
227        (post_res, fan_res)
228    }))
229}
230
231fn post_batch_resilient(
232    client: &SyncHttpClient,
233    store: &Store,
234    body: IngestExportBatch,
235    ids: &[i64],
236    flush: &FlushExporters<'_>,
237) -> Result<Sent> {
238    let mut backoff = Duration::from_millis(200);
239    let max_backoff = Duration::from_secs(30);
240    let mut server_failures = 0u32;
241
242    loop {
243        if body.item_count() == 0 {
244            return Ok(Sent::default());
245        }
246
247        let key = Uuid::now_v7();
248        let (post_res, fan_res) = post_with_fanout(client, &body, &key, flush)?;
249        let outcome = post_res;
250
251        let outcome = match outcome {
252            Ok(o) => o,
253            Err(e) => {
254                if fan_res.is_err() {
255                    tracing::trace!(error = %e, "primary post and fan-out both failed");
256                }
257                return Err(e);
258            }
259        };
260
261        match outcome {
262            PostBatchOutcome::Accepted { .. } | PostBatchOutcome::Conflict => {
263                if let Err(e) = fan_res {
264                    return Err(
265                        e.context("telemetry fan-out (before outbox commit; fail_open = false)")
266                    );
267                }
268                store.mark_outbox_sent(ids)?;
269                store.set_sync_state_ok()?;
270                return Ok(Sent {
271                    batches: 1,
272                    events: ids.len() as u64,
273                });
274            }
275            PostBatchOutcome::TooLarge => {
276                if let Err(e) = fan_res {
277                    tracing::warn!(error = %e, "telemetry fan-out failed; continuing 413 split");
278                }
279                if body.item_count() <= 1 {
280                    store.set_sync_state_error("413: single event too large for server")?;
281                    anyhow::bail!(
282                        "413: single event too large; tighten redaction or max_body_bytes"
283                    );
284                }
285                let mid = body.item_count() / 2;
286                let left_ids = ids[..mid].to_vec();
287                let right_ids = ids[mid..].to_vec();
288                let (left_body, right_body) = split_batch(body, mid);
289                let a = post_batch_resilient(client, store, left_body, &left_ids, flush)?;
290                let b = post_batch_resilient(client, store, right_body, &right_ids, flush)?;
291                return Ok(Sent {
292                    batches: a.batches + b.batches,
293                    events: a.events + b.events,
294                });
295            }
296            PostBatchOutcome::RateLimited(d) => {
297                if let Err(e) = fan_res {
298                    tracing::warn!(error = %e, "telemetry fan-out failed during 429; will retry");
299                }
300                thread::sleep(d);
301            }
302            PostBatchOutcome::Unauthorized => {
303                if let Err(e) = fan_res {
304                    tracing::warn!(error = %e, "telemetry fan-out during 401");
305                }
306                let msg = "401 unauthorized (check team_token)";
307                store.set_sync_state_error(msg)?;
308                anyhow::bail!("{msg}");
309            }
310            PostBatchOutcome::ClientError(c) => {
311                if let Err(e) = fan_res {
312                    tracing::warn!(error = %e, "telemetry fan-out during client error {c}");
313                }
314                let msg = format!("HTTP client error {c}");
315                store.set_sync_state_error(&msg)?;
316                anyhow::bail!("{msg}");
317            }
318            PostBatchOutcome::ServerError(c) => {
319                if let Err(e) = fan_res {
320                    tracing::warn!(error = %e, "telemetry fan-out during {c} server error");
321                }
322                server_failures += 1;
323                if server_failures > 12 {
324                    let msg = format!("HTTP server error {c} (exhausted retries)");
325                    store.set_sync_state_error(&msg)?;
326                    anyhow::bail!("{msg}");
327                }
328                thread::sleep(backoff);
329                backoff = (backoff * 2).min(max_backoff);
330            }
331        }
332    }
333}
334
335fn split_batch(body: IngestExportBatch, mid: usize) -> (IngestExportBatch, IngestExportBatch) {
336    match body {
337        IngestExportBatch::Events(body) => (
338            IngestExportBatch::Events(EventsBatchBody {
339                team_id: body.team_id.clone(),
340                workspace_hash: body.workspace_hash.clone(),
341                events: body.events[..mid].to_vec(),
342            }),
343            IngestExportBatch::Events(EventsBatchBody {
344                team_id: body.team_id,
345                workspace_hash: body.workspace_hash,
346                events: body.events[mid..].to_vec(),
347            }),
348        ),
349        IngestExportBatch::ToolSpans(body) => (
350            IngestExportBatch::ToolSpans(ToolSpansBatchBody {
351                team_id: body.team_id.clone(),
352                workspace_hash: body.workspace_hash.clone(),
353                spans: body.spans[..mid].to_vec(),
354            }),
355            IngestExportBatch::ToolSpans(ToolSpansBatchBody {
356                team_id: body.team_id,
357                workspace_hash: body.workspace_hash,
358                spans: body.spans[mid..].to_vec(),
359            }),
360        ),
361        IngestExportBatch::RepoSnapshots(body) => (
362            IngestExportBatch::RepoSnapshots(RepoSnapshotsBatchBody {
363                team_id: body.team_id.clone(),
364                workspace_hash: body.workspace_hash.clone(),
365                snapshots: body.snapshots[..mid].to_vec(),
366            }),
367            IngestExportBatch::RepoSnapshots(RepoSnapshotsBatchBody {
368                team_id: body.team_id,
369                workspace_hash: body.workspace_hash,
370                snapshots: body.snapshots[mid..].to_vec(),
371            }),
372        ),
373        IngestExportBatch::WorkspaceFacts(body) => (
374            IngestExportBatch::WorkspaceFacts(WorkspaceFactsBatchBody {
375                team_id: body.team_id.clone(),
376                workspace_hash: body.workspace_hash.clone(),
377                facts: body.facts[..mid].to_vec(),
378            }),
379            IngestExportBatch::WorkspaceFacts(WorkspaceFactsBatchBody {
380                team_id: body.team_id,
381                workspace_hash: body.workspace_hash,
382                facts: body.facts[mid..].to_vec(),
383            }),
384        ),
385        IngestExportBatch::SessionEvals(body) => (
386            IngestExportBatch::SessionEvals(crate::sync::export_batch::SessionEvalsBatchBody {
387                evals: body.evals[..mid].to_vec(),
388            }),
389            IngestExportBatch::SessionEvals(crate::sync::export_batch::SessionEvalsBatchBody {
390                evals: body.evals[mid..].to_vec(),
391            }),
392        ),
393    }
394}