Skip to main content

kaizen/sync/
engine.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! Flush outbox batches: size limits, split on 413, backoff on 429 / transient errors.
3//! Optional `FlushExporters` runs HTTP fan-out in parallel with the primary Kaizen POST; only
4//! a successful (or 409) primary result commits outbox. Secondary `Err` is observed only in that
5//! same step (and blocks commit when `fail_open` is `false`).
6
7use crate::core::config::TelemetryConfig;
8use crate::store::Store;
9use crate::sync::IngestExportBatch;
10use crate::sync::client::{PostBatchOutcome, SyncHttpClient};
11use crate::sync::outbound::{EventsBatchBody, OutboundEvent};
12use crate::sync::smart::{
13    OutboundRepoSnapshotChunk, OutboundToolSpan, OutboundWorkspaceFactRow, RepoSnapshotsBatchBody,
14    ToolSpansBatchBody, WorkspaceFactsBatchBody,
15};
16use crate::telemetry::ExporterRegistry;
17use anyhow::Context;
18use anyhow::Result;
19use std::path::Path;
20use std::thread;
21use std::time::Duration;
22use uuid::Uuid;
23
24/// Context for optional pluggable sinks (see [`crate::telemetry`]). Only holds references; copy freely.
25#[derive(Clone, Copy)]
26pub struct FlushExporters<'a> {
27    pub telemetry: &'a TelemetryConfig,
28    pub registry: Option<&'a ExporterRegistry>,
29}
30
31/// Flush pending outbox rows (all that fit batch constraints per iteration).
32pub fn flush_outbox_once(
33    store: &Store,
34    workspace_root: &Path,
35    cfg: &crate::core::config::SyncConfig,
36    team_salt: &[u8; 32],
37    flush: &FlushExporters<'_>,
38) -> Result<FlushStats> {
39    if cfg.endpoint.is_empty() {
40        return Ok(FlushStats::default());
41    }
42    let client = SyncHttpClient::new(&cfg.endpoint, &cfg.team_token)?;
43    let workspace_hash = crate::sync::outbound::workspace_hash(team_salt, workspace_root);
44    let project_name = crate::core::project_identity::project_name(workspace_root);
45    let mut stats = FlushStats::default();
46
47    while store.outbox_pending_count()? > 0 {
48        let rows = store.list_outbox_pending(10_000)?;
49        if rows.is_empty() {
50            break;
51        }
52        let Some(kind) = rows.first().map(|(_, k, _)| k.clone()) else {
53            break;
54        };
55        let sent = match build_batch(
56            &rows,
57            cfg,
58            &cfg.team_id,
59            &workspace_hash,
60            &project_name,
61            &kind,
62        )? {
63            Some((ids, batch)) => post_batch_resilient(&client, store, batch, &ids, flush)?,
64            None => break,
65        };
66        stats.batches += sent.batches;
67        stats.events_sent += sent.events;
68    }
69
70    Ok(stats)
71}
72
73#[derive(Debug, Default, Clone)]
74pub struct FlushStats {
75    pub batches: u64,
76    pub events_sent: u64,
77}
78
79#[derive(Default)]
80struct Sent {
81    batches: u64,
82    events: u64,
83}
84
85fn build_batch(
86    rows: &[(i64, String, String)],
87    cfg: &crate::core::config::SyncConfig,
88    team_id: &str,
89    workspace_hash: &str,
90    project_name: &Option<String>,
91    kind: &str,
92) -> Result<Option<(Vec<i64>, IngestExportBatch)>> {
93    match kind {
94        "events" => {
95            let (ids, events) = pack_batch_payloads::<OutboundEvent>(rows, cfg, kind)?;
96            if ids.is_empty() {
97                return Ok(None);
98            }
99            Ok(Some((
100                ids,
101                IngestExportBatch::Events(EventsBatchBody {
102                    team_id: team_id.into(),
103                    workspace_hash: workspace_hash.into(),
104                    project_name: project_name.clone(),
105                    events,
106                }),
107            )))
108        }
109        "tool_spans" => {
110            let (ids, spans) = pack_batch_payloads::<OutboundToolSpan>(rows, cfg, kind)?;
111            if ids.is_empty() {
112                return Ok(None);
113            }
114            Ok(Some((
115                ids,
116                IngestExportBatch::ToolSpans(ToolSpansBatchBody {
117                    team_id: team_id.into(),
118                    workspace_hash: workspace_hash.into(),
119                    project_name: project_name.clone(),
120                    spans,
121                }),
122            )))
123        }
124        "repo_snapshots" => {
125            let (ids, snapshots) =
126                pack_batch_payloads::<OutboundRepoSnapshotChunk>(rows, cfg, kind)?;
127            if ids.is_empty() {
128                return Ok(None);
129            }
130            Ok(Some((
131                ids,
132                IngestExportBatch::RepoSnapshots(RepoSnapshotsBatchBody {
133                    team_id: team_id.into(),
134                    workspace_hash: workspace_hash.into(),
135                    project_name: project_name.clone(),
136                    snapshots,
137                }),
138            )))
139        }
140        "workspace_facts" => {
141            let (ids, facts) = pack_batch_payloads::<OutboundWorkspaceFactRow>(rows, cfg, kind)?;
142            if ids.is_empty() {
143                return Ok(None);
144            }
145            Ok(Some((
146                ids,
147                IngestExportBatch::WorkspaceFacts(WorkspaceFactsBatchBody {
148                    team_id: team_id.into(),
149                    workspace_hash: workspace_hash.into(),
150                    project_name: project_name.clone(),
151                    facts,
152                }),
153            )))
154        }
155        "session_evals" => {
156            let (ids, evals) = pack_batch_payloads::<crate::eval::types::EvalRow>(rows, cfg, kind)?;
157            if ids.is_empty() {
158                return Ok(None);
159            }
160            Ok(Some((
161                ids,
162                IngestExportBatch::SessionEvals(crate::sync::export_batch::SessionEvalsBatchBody {
163                    evals,
164                }),
165            )))
166        }
167        "session_feedback" => {
168            let (ids, feedback) =
169                pack_batch_payloads::<crate::feedback::types::FeedbackRecord>(rows, cfg, kind)?;
170            if ids.is_empty() {
171                return Ok(None);
172            }
173            Ok(Some((
174                ids,
175                IngestExportBatch::SessionFeedback(
176                    crate::sync::export_batch::SessionFeedbackBatchBody { feedback },
177                ),
178            )))
179        }
180        _ => Ok(None),
181    }
182}
183
184fn pack_batch_payloads<T>(
185    rows: &[(i64, String, String)],
186    cfg: &crate::core::config::SyncConfig,
187    kind: &str,
188) -> Result<(Vec<i64>, Vec<T>)>
189where
190    T: serde::de::DeserializeOwned + serde::Serialize,
191{
192    let mut ids = Vec::new();
193    let mut out = Vec::new();
194    let mut bytes = 0usize;
195    let max_ev = cfg.events_per_batch_max.max(1);
196    for (id, row_kind, raw) in rows {
197        if row_kind != kind {
198            break;
199        }
200        let item: T = serde_json::from_str(raw).context("parse outbox payload")?;
201        let inc = serde_json::to_vec(&item)?.len();
202        if out.len() >= max_ev {
203            break;
204        }
205        if bytes + inc > cfg.max_body_bytes && !out.is_empty() {
206            break;
207        }
208        bytes += inc;
209        ids.push(*id);
210        out.push(item);
211    }
212    Ok((ids, out))
213}
214
215/// Primary POST in parallel with optional exporter fan-out. Returns `(post, fan)`.
216fn post_with_fanout(
217    client: &SyncHttpClient,
218    body: &IngestExportBatch,
219    key: &Uuid,
220    flush: &FlushExporters<'_>,
221) -> Result<(
222    Result<PostBatchOutcome, anyhow::Error>,
223    Result<(), anyhow::Error>,
224)> {
225    let fan_body = body.clone();
226    let reg = flush.registry;
227    let fail_open = flush.telemetry.fail_open;
228    Ok(std::thread::scope(|s| {
229        let handle = s.spawn(move || {
230            if let Some(r) = reg {
231                r.fan_out(fail_open, &fan_body)
232            } else {
233                Ok(())
234            }
235        });
236        let post_res: Result<PostBatchOutcome, anyhow::Error> = (|| {
237            let o = match body {
238                IngestExportBatch::Events(b) => client.post_events_batch(b, key)?,
239                IngestExportBatch::ToolSpans(b) => client.post_tool_spans_batch(b, key)?,
240                IngestExportBatch::RepoSnapshots(b) => client.post_repo_snapshots_batch(b, key)?,
241                IngestExportBatch::WorkspaceFacts(b) => {
242                    client.post_workspace_facts_batch(b, key)?
243                }
244                IngestExportBatch::SessionEvals(b) => client.post_session_evals_batch(b, key)?,
245                IngestExportBatch::SessionFeedback(_) => PostBatchOutcome::Accepted {
246                    received: 0,
247                    deduped: 0,
248                },
249            };
250            Ok(o)
251        })();
252        let fan_res = match handle.join() {
253            Ok(Ok(())) => Ok(()),
254            Ok(Err(e)) => Err(e),
255            Err(p) => Err(anyhow::anyhow!("telemetry fan-out join panicked: {p:?}")),
256        };
257        (post_res, fan_res)
258    }))
259}
260
261fn post_batch_resilient(
262    client: &SyncHttpClient,
263    store: &Store,
264    body: IngestExportBatch,
265    ids: &[i64],
266    flush: &FlushExporters<'_>,
267) -> Result<Sent> {
268    let mut backoff = Duration::from_millis(200);
269    let max_backoff = Duration::from_secs(30);
270    let mut server_failures = 0u32;
271
272    loop {
273        if body.item_count() == 0 {
274            return Ok(Sent::default());
275        }
276
277        let key = Uuid::now_v7();
278        let (post_res, fan_res) = post_with_fanout(client, &body, &key, flush)?;
279        let outcome = post_res;
280
281        let outcome = match outcome {
282            Ok(o) => o,
283            Err(e) => {
284                if fan_res.is_err() {
285                    tracing::trace!(error = %e, "primary post and fan-out both failed");
286                }
287                return Err(e);
288            }
289        };
290
291        match outcome {
292            PostBatchOutcome::Accepted { .. } | PostBatchOutcome::Conflict => {
293                if let Err(e) = fan_res {
294                    return Err(
295                        e.context("telemetry fan-out (before outbox commit; fail_open = false)")
296                    );
297                }
298                store.mark_outbox_sent(ids)?;
299                store.set_sync_state_ok()?;
300                return Ok(Sent {
301                    batches: 1,
302                    events: ids.len() as u64,
303                });
304            }
305            PostBatchOutcome::TooLarge => {
306                if let Err(e) = fan_res {
307                    tracing::warn!(error = %e, "telemetry fan-out failed; continuing 413 split");
308                }
309                if body.item_count() <= 1 {
310                    store.set_sync_state_error("413: single event too large for server")?;
311                    anyhow::bail!(
312                        "413: single event too large; tighten redaction or max_body_bytes"
313                    );
314                }
315                let mid = body.item_count() / 2;
316                let left_ids = ids[..mid].to_vec();
317                let right_ids = ids[mid..].to_vec();
318                let (left_body, right_body) = split_batch(body, mid);
319                let a = post_batch_resilient(client, store, left_body, &left_ids, flush)?;
320                let b = post_batch_resilient(client, store, right_body, &right_ids, flush)?;
321                return Ok(Sent {
322                    batches: a.batches + b.batches,
323                    events: a.events + b.events,
324                });
325            }
326            PostBatchOutcome::RateLimited(d) => {
327                if let Err(e) = fan_res {
328                    tracing::warn!(error = %e, "telemetry fan-out failed during 429; will retry");
329                }
330                thread::sleep(d);
331            }
332            PostBatchOutcome::Unauthorized => {
333                if let Err(e) = fan_res {
334                    tracing::warn!(error = %e, "telemetry fan-out during 401");
335                }
336                let msg = "401 unauthorized (check team_token)";
337                store.set_sync_state_error(msg)?;
338                anyhow::bail!("{msg}");
339            }
340            PostBatchOutcome::ClientError(c) => {
341                if let Err(e) = fan_res {
342                    tracing::warn!(error = %e, "telemetry fan-out during client error {c}");
343                }
344                let msg = format!("HTTP client error {c}");
345                store.set_sync_state_error(&msg)?;
346                anyhow::bail!("{msg}");
347            }
348            PostBatchOutcome::ServerError(c) => {
349                if let Err(e) = fan_res {
350                    tracing::warn!(error = %e, "telemetry fan-out during {c} server error");
351                }
352                server_failures += 1;
353                if server_failures > 12 {
354                    let msg = format!("HTTP server error {c} (exhausted retries)");
355                    store.set_sync_state_error(&msg)?;
356                    anyhow::bail!("{msg}");
357                }
358                thread::sleep(backoff);
359                backoff = (backoff * 2).min(max_backoff);
360            }
361        }
362    }
363}
364
365fn split_batch(body: IngestExportBatch, mid: usize) -> (IngestExportBatch, IngestExportBatch) {
366    match body {
367        IngestExportBatch::Events(body) => (
368            IngestExportBatch::Events(EventsBatchBody {
369                team_id: body.team_id.clone(),
370                workspace_hash: body.workspace_hash.clone(),
371                project_name: body.project_name.clone(),
372                events: body.events[..mid].to_vec(),
373            }),
374            IngestExportBatch::Events(EventsBatchBody {
375                team_id: body.team_id,
376                workspace_hash: body.workspace_hash,
377                project_name: body.project_name,
378                events: body.events[mid..].to_vec(),
379            }),
380        ),
381        IngestExportBatch::ToolSpans(body) => (
382            IngestExportBatch::ToolSpans(ToolSpansBatchBody {
383                team_id: body.team_id.clone(),
384                workspace_hash: body.workspace_hash.clone(),
385                project_name: body.project_name.clone(),
386                spans: body.spans[..mid].to_vec(),
387            }),
388            IngestExportBatch::ToolSpans(ToolSpansBatchBody {
389                team_id: body.team_id,
390                workspace_hash: body.workspace_hash,
391                project_name: body.project_name,
392                spans: body.spans[mid..].to_vec(),
393            }),
394        ),
395        IngestExportBatch::RepoSnapshots(body) => (
396            IngestExportBatch::RepoSnapshots(RepoSnapshotsBatchBody {
397                team_id: body.team_id.clone(),
398                workspace_hash: body.workspace_hash.clone(),
399                project_name: body.project_name.clone(),
400                snapshots: body.snapshots[..mid].to_vec(),
401            }),
402            IngestExportBatch::RepoSnapshots(RepoSnapshotsBatchBody {
403                team_id: body.team_id,
404                workspace_hash: body.workspace_hash,
405                project_name: body.project_name,
406                snapshots: body.snapshots[mid..].to_vec(),
407            }),
408        ),
409        IngestExportBatch::WorkspaceFacts(body) => (
410            IngestExportBatch::WorkspaceFacts(WorkspaceFactsBatchBody {
411                team_id: body.team_id.clone(),
412                workspace_hash: body.workspace_hash.clone(),
413                project_name: body.project_name.clone(),
414                facts: body.facts[..mid].to_vec(),
415            }),
416            IngestExportBatch::WorkspaceFacts(WorkspaceFactsBatchBody {
417                team_id: body.team_id,
418                workspace_hash: body.workspace_hash,
419                project_name: body.project_name,
420                facts: body.facts[mid..].to_vec(),
421            }),
422        ),
423        IngestExportBatch::SessionEvals(body) => (
424            IngestExportBatch::SessionEvals(crate::sync::export_batch::SessionEvalsBatchBody {
425                evals: body.evals[..mid].to_vec(),
426            }),
427            IngestExportBatch::SessionEvals(crate::sync::export_batch::SessionEvalsBatchBody {
428                evals: body.evals[mid..].to_vec(),
429            }),
430        ),
431        IngestExportBatch::SessionFeedback(body) => (
432            IngestExportBatch::SessionFeedback(
433                crate::sync::export_batch::SessionFeedbackBatchBody {
434                    feedback: body.feedback[..mid].to_vec(),
435                },
436            ),
437            IngestExportBatch::SessionFeedback(
438                crate::sync::export_batch::SessionFeedbackBatchBody {
439                    feedback: body.feedback[mid..].to_vec(),
440                },
441            ),
442        ),
443    }
444}