Skip to main content

kaizen/sync/
engine.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! Flush outbox batches: size limits, split on 413, backoff on 429 / transient errors.
3//! Optional `FlushExporters` runs HTTP fan-out in parallel with the primary Kaizen POST; only
4//! a successful (or 409) primary result commits outbox. Secondary `Err` is observed only in that
5//! same step (and blocks commit when `fail_open` is `false`).
6
7use crate::core::config::TelemetryConfig;
8use crate::store::Store;
9use crate::sync::IngestExportBatch;
10use crate::sync::client::{PostBatchOutcome, SyncHttpClient};
11use crate::sync::outbound::{EventsBatchBody, OutboundEvent};
12use crate::sync::smart::{
13    OutboundRepoSnapshotChunk, OutboundToolSpan, OutboundWorkspaceFactRow, RepoSnapshotsBatchBody,
14    ToolSpansBatchBody, WorkspaceFactsBatchBody,
15};
16use crate::telemetry::ExporterRegistry;
17use anyhow::Context;
18use anyhow::Result;
19use std::path::Path;
20use std::thread;
21use std::time::Duration;
22use uuid::Uuid;
23
24/// Context for optional pluggable sinks (see [`crate::telemetry`]). Only holds references; copy freely.
25#[derive(Clone, Copy)]
26pub struct FlushExporters<'a> {
27    pub telemetry: &'a TelemetryConfig,
28    pub registry: Option<&'a ExporterRegistry>,
29}
30
31/// Flush pending outbox rows (all that fit batch constraints per iteration).
32pub fn flush_outbox_once(
33    store: &Store,
34    workspace_root: &Path,
35    cfg: &crate::core::config::SyncConfig,
36    team_salt: &[u8; 32],
37    flush: &FlushExporters<'_>,
38) -> Result<FlushStats> {
39    if cfg.endpoint.is_empty() {
40        return Ok(FlushStats::default());
41    }
42    let client = SyncHttpClient::new(&cfg.endpoint, &cfg.team_token)?;
43    let workspace_hash = crate::sync::outbound::workspace_hash(team_salt, workspace_root);
44    let mut stats = FlushStats::default();
45
46    while store.outbox_pending_count()? > 0 {
47        let rows = store.list_outbox_pending(10_000)?;
48        if rows.is_empty() {
49            break;
50        }
51        let Some(kind) = rows.first().map(|(_, k, _)| k.clone()) else {
52            break;
53        };
54        let sent = match build_batch(&rows, cfg, &cfg.team_id, &workspace_hash, &kind)? {
55            Some((ids, batch)) => post_batch_resilient(&client, store, batch, &ids, flush)?,
56            None => break,
57        };
58        stats.batches += sent.batches;
59        stats.events_sent += sent.events;
60    }
61
62    Ok(stats)
63}
64
65#[derive(Debug, Default, Clone)]
66pub struct FlushStats {
67    pub batches: u64,
68    pub events_sent: u64,
69}
70
71#[derive(Default)]
72struct Sent {
73    batches: u64,
74    events: u64,
75}
76
77fn build_batch(
78    rows: &[(i64, String, String)],
79    cfg: &crate::core::config::SyncConfig,
80    team_id: &str,
81    workspace_hash: &str,
82    kind: &str,
83) -> Result<Option<(Vec<i64>, IngestExportBatch)>> {
84    match kind {
85        "events" => {
86            let (ids, events) = pack_batch_payloads::<OutboundEvent>(rows, cfg, kind)?;
87            if ids.is_empty() {
88                return Ok(None);
89            }
90            Ok(Some((
91                ids,
92                IngestExportBatch::Events(EventsBatchBody {
93                    team_id: team_id.into(),
94                    workspace_hash: workspace_hash.into(),
95                    events,
96                }),
97            )))
98        }
99        "tool_spans" => {
100            let (ids, spans) = pack_batch_payloads::<OutboundToolSpan>(rows, cfg, kind)?;
101            if ids.is_empty() {
102                return Ok(None);
103            }
104            Ok(Some((
105                ids,
106                IngestExportBatch::ToolSpans(ToolSpansBatchBody {
107                    team_id: team_id.into(),
108                    workspace_hash: workspace_hash.into(),
109                    spans,
110                }),
111            )))
112        }
113        "repo_snapshots" => {
114            let (ids, snapshots) =
115                pack_batch_payloads::<OutboundRepoSnapshotChunk>(rows, cfg, kind)?;
116            if ids.is_empty() {
117                return Ok(None);
118            }
119            Ok(Some((
120                ids,
121                IngestExportBatch::RepoSnapshots(RepoSnapshotsBatchBody {
122                    team_id: team_id.into(),
123                    workspace_hash: workspace_hash.into(),
124                    snapshots,
125                }),
126            )))
127        }
128        "workspace_facts" => {
129            let (ids, facts) = pack_batch_payloads::<OutboundWorkspaceFactRow>(rows, cfg, kind)?;
130            if ids.is_empty() {
131                return Ok(None);
132            }
133            Ok(Some((
134                ids,
135                IngestExportBatch::WorkspaceFacts(WorkspaceFactsBatchBody {
136                    team_id: team_id.into(),
137                    workspace_hash: workspace_hash.into(),
138                    facts,
139                }),
140            )))
141        }
142        "session_evals" => {
143            let (ids, evals) = pack_batch_payloads::<crate::eval::types::EvalRow>(rows, cfg, kind)?;
144            if ids.is_empty() {
145                return Ok(None);
146            }
147            Ok(Some((
148                ids,
149                IngestExportBatch::SessionEvals(crate::sync::export_batch::SessionEvalsBatchBody {
150                    evals,
151                }),
152            )))
153        }
154        "session_feedback" => {
155            let (ids, feedback) =
156                pack_batch_payloads::<crate::feedback::types::FeedbackRecord>(rows, cfg, kind)?;
157            if ids.is_empty() {
158                return Ok(None);
159            }
160            Ok(Some((
161                ids,
162                IngestExportBatch::SessionFeedback(
163                    crate::sync::export_batch::SessionFeedbackBatchBody { feedback },
164                ),
165            )))
166        }
167        _ => Ok(None),
168    }
169}
170
171fn pack_batch_payloads<T>(
172    rows: &[(i64, String, String)],
173    cfg: &crate::core::config::SyncConfig,
174    kind: &str,
175) -> Result<(Vec<i64>, Vec<T>)>
176where
177    T: serde::de::DeserializeOwned + serde::Serialize,
178{
179    let mut ids = Vec::new();
180    let mut out = Vec::new();
181    let mut bytes = 0usize;
182    let max_ev = cfg.events_per_batch_max.max(1);
183    for (id, row_kind, raw) in rows {
184        if row_kind != kind {
185            break;
186        }
187        let item: T = serde_json::from_str(raw).context("parse outbox payload")?;
188        let inc = serde_json::to_vec(&item)?.len();
189        if out.len() >= max_ev {
190            break;
191        }
192        if bytes + inc > cfg.max_body_bytes && !out.is_empty() {
193            break;
194        }
195        bytes += inc;
196        ids.push(*id);
197        out.push(item);
198    }
199    Ok((ids, out))
200}
201
202/// Primary POST in parallel with optional exporter fan-out. Returns `(post, fan)`.
203fn post_with_fanout(
204    client: &SyncHttpClient,
205    body: &IngestExportBatch,
206    key: &Uuid,
207    flush: &FlushExporters<'_>,
208) -> Result<(
209    Result<PostBatchOutcome, anyhow::Error>,
210    Result<(), anyhow::Error>,
211)> {
212    let fan_body = body.clone();
213    let reg = flush.registry;
214    let fail_open = flush.telemetry.fail_open;
215    Ok(std::thread::scope(|s| {
216        let handle = s.spawn(move || {
217            if let Some(r) = reg {
218                r.fan_out(fail_open, &fan_body)
219            } else {
220                Ok(())
221            }
222        });
223        let post_res: Result<PostBatchOutcome, anyhow::Error> = (|| {
224            let o = match body {
225                IngestExportBatch::Events(b) => client.post_events_batch(b, key)?,
226                IngestExportBatch::ToolSpans(b) => client.post_tool_spans_batch(b, key)?,
227                IngestExportBatch::RepoSnapshots(b) => client.post_repo_snapshots_batch(b, key)?,
228                IngestExportBatch::WorkspaceFacts(b) => {
229                    client.post_workspace_facts_batch(b, key)?
230                }
231                IngestExportBatch::SessionEvals(b) => client.post_session_evals_batch(b, key)?,
232                IngestExportBatch::SessionFeedback(_) => PostBatchOutcome::Accepted {
233                    received: 0,
234                    deduped: 0,
235                },
236            };
237            Ok(o)
238        })();
239        let fan_res = match handle.join() {
240            Ok(Ok(())) => Ok(()),
241            Ok(Err(e)) => Err(e),
242            Err(p) => Err(anyhow::anyhow!("telemetry fan-out join panicked: {p:?}")),
243        };
244        (post_res, fan_res)
245    }))
246}
247
248fn post_batch_resilient(
249    client: &SyncHttpClient,
250    store: &Store,
251    body: IngestExportBatch,
252    ids: &[i64],
253    flush: &FlushExporters<'_>,
254) -> Result<Sent> {
255    let mut backoff = Duration::from_millis(200);
256    let max_backoff = Duration::from_secs(30);
257    let mut server_failures = 0u32;
258
259    loop {
260        if body.item_count() == 0 {
261            return Ok(Sent::default());
262        }
263
264        let key = Uuid::now_v7();
265        let (post_res, fan_res) = post_with_fanout(client, &body, &key, flush)?;
266        let outcome = post_res;
267
268        let outcome = match outcome {
269            Ok(o) => o,
270            Err(e) => {
271                if fan_res.is_err() {
272                    tracing::trace!(error = %e, "primary post and fan-out both failed");
273                }
274                return Err(e);
275            }
276        };
277
278        match outcome {
279            PostBatchOutcome::Accepted { .. } | PostBatchOutcome::Conflict => {
280                if let Err(e) = fan_res {
281                    return Err(
282                        e.context("telemetry fan-out (before outbox commit; fail_open = false)")
283                    );
284                }
285                store.mark_outbox_sent(ids)?;
286                store.set_sync_state_ok()?;
287                return Ok(Sent {
288                    batches: 1,
289                    events: ids.len() as u64,
290                });
291            }
292            PostBatchOutcome::TooLarge => {
293                if let Err(e) = fan_res {
294                    tracing::warn!(error = %e, "telemetry fan-out failed; continuing 413 split");
295                }
296                if body.item_count() <= 1 {
297                    store.set_sync_state_error("413: single event too large for server")?;
298                    anyhow::bail!(
299                        "413: single event too large; tighten redaction or max_body_bytes"
300                    );
301                }
302                let mid = body.item_count() / 2;
303                let left_ids = ids[..mid].to_vec();
304                let right_ids = ids[mid..].to_vec();
305                let (left_body, right_body) = split_batch(body, mid);
306                let a = post_batch_resilient(client, store, left_body, &left_ids, flush)?;
307                let b = post_batch_resilient(client, store, right_body, &right_ids, flush)?;
308                return Ok(Sent {
309                    batches: a.batches + b.batches,
310                    events: a.events + b.events,
311                });
312            }
313            PostBatchOutcome::RateLimited(d) => {
314                if let Err(e) = fan_res {
315                    tracing::warn!(error = %e, "telemetry fan-out failed during 429; will retry");
316                }
317                thread::sleep(d);
318            }
319            PostBatchOutcome::Unauthorized => {
320                if let Err(e) = fan_res {
321                    tracing::warn!(error = %e, "telemetry fan-out during 401");
322                }
323                let msg = "401 unauthorized (check team_token)";
324                store.set_sync_state_error(msg)?;
325                anyhow::bail!("{msg}");
326            }
327            PostBatchOutcome::ClientError(c) => {
328                if let Err(e) = fan_res {
329                    tracing::warn!(error = %e, "telemetry fan-out during client error {c}");
330                }
331                let msg = format!("HTTP client error {c}");
332                store.set_sync_state_error(&msg)?;
333                anyhow::bail!("{msg}");
334            }
335            PostBatchOutcome::ServerError(c) => {
336                if let Err(e) = fan_res {
337                    tracing::warn!(error = %e, "telemetry fan-out during {c} server error");
338                }
339                server_failures += 1;
340                if server_failures > 12 {
341                    let msg = format!("HTTP server error {c} (exhausted retries)");
342                    store.set_sync_state_error(&msg)?;
343                    anyhow::bail!("{msg}");
344                }
345                thread::sleep(backoff);
346                backoff = (backoff * 2).min(max_backoff);
347            }
348        }
349    }
350}
351
352fn split_batch(body: IngestExportBatch, mid: usize) -> (IngestExportBatch, IngestExportBatch) {
353    match body {
354        IngestExportBatch::Events(body) => (
355            IngestExportBatch::Events(EventsBatchBody {
356                team_id: body.team_id.clone(),
357                workspace_hash: body.workspace_hash.clone(),
358                events: body.events[..mid].to_vec(),
359            }),
360            IngestExportBatch::Events(EventsBatchBody {
361                team_id: body.team_id,
362                workspace_hash: body.workspace_hash,
363                events: body.events[mid..].to_vec(),
364            }),
365        ),
366        IngestExportBatch::ToolSpans(body) => (
367            IngestExportBatch::ToolSpans(ToolSpansBatchBody {
368                team_id: body.team_id.clone(),
369                workspace_hash: body.workspace_hash.clone(),
370                spans: body.spans[..mid].to_vec(),
371            }),
372            IngestExportBatch::ToolSpans(ToolSpansBatchBody {
373                team_id: body.team_id,
374                workspace_hash: body.workspace_hash,
375                spans: body.spans[mid..].to_vec(),
376            }),
377        ),
378        IngestExportBatch::RepoSnapshots(body) => (
379            IngestExportBatch::RepoSnapshots(RepoSnapshotsBatchBody {
380                team_id: body.team_id.clone(),
381                workspace_hash: body.workspace_hash.clone(),
382                snapshots: body.snapshots[..mid].to_vec(),
383            }),
384            IngestExportBatch::RepoSnapshots(RepoSnapshotsBatchBody {
385                team_id: body.team_id,
386                workspace_hash: body.workspace_hash,
387                snapshots: body.snapshots[mid..].to_vec(),
388            }),
389        ),
390        IngestExportBatch::WorkspaceFacts(body) => (
391            IngestExportBatch::WorkspaceFacts(WorkspaceFactsBatchBody {
392                team_id: body.team_id.clone(),
393                workspace_hash: body.workspace_hash.clone(),
394                facts: body.facts[..mid].to_vec(),
395            }),
396            IngestExportBatch::WorkspaceFacts(WorkspaceFactsBatchBody {
397                team_id: body.team_id,
398                workspace_hash: body.workspace_hash,
399                facts: body.facts[mid..].to_vec(),
400            }),
401        ),
402        IngestExportBatch::SessionEvals(body) => (
403            IngestExportBatch::SessionEvals(crate::sync::export_batch::SessionEvalsBatchBody {
404                evals: body.evals[..mid].to_vec(),
405            }),
406            IngestExportBatch::SessionEvals(crate::sync::export_batch::SessionEvalsBatchBody {
407                evals: body.evals[mid..].to_vec(),
408            }),
409        ),
410        IngestExportBatch::SessionFeedback(body) => (
411            IngestExportBatch::SessionFeedback(
412                crate::sync::export_batch::SessionFeedbackBatchBody {
413                    feedback: body.feedback[..mid].to_vec(),
414                },
415            ),
416            IngestExportBatch::SessionFeedback(
417                crate::sync::export_batch::SessionFeedbackBatchBody {
418                    feedback: body.feedback[mid..].to_vec(),
419                },
420            ),
421        ),
422    }
423}