Skip to main content

kaizen/sync/
engine.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! Flush outbox batches: size limits, split on 413, backoff on 429 / transient errors.
3//! Optional `FlushExporters` runs HTTP fan-out in parallel with the primary Kaizen POST; only
4//! a successful (or 409) primary result commits outbox. Secondary `Err` is observed only in that
5//! same step (and blocks commit when `fail_open` is `false`).
6
7use crate::core::config::TelemetryConfig;
8use crate::store::Store;
9use crate::sync::IngestExportBatch;
10use crate::sync::client::{PostBatchOutcome, SyncHttpClient};
11use crate::sync::outbound::{EventsBatchBody, OutboundEvent};
12use crate::sync::smart::{
13    OutboundRepoSnapshotChunk, OutboundToolSpan, OutboundWorkspaceFactRow, RepoSnapshotsBatchBody,
14    ToolSpansBatchBody, WorkspaceFactsBatchBody,
15};
16use crate::telemetry::ExporterRegistry;
17use anyhow::Context;
18use anyhow::Result;
19use std::path::Path;
20use std::thread;
21use std::time::Duration;
22use uuid::Uuid;
23
24/// Context for optional pluggable sinks (see [`crate::telemetry`]). Only holds references; copy freely.
25#[derive(Clone, Copy)]
26pub struct FlushExporters<'a> {
27    pub telemetry: &'a TelemetryConfig,
28    pub registry: Option<&'a ExporterRegistry>,
29}
30
31/// Flush pending outbox rows (all that fit batch constraints per iteration).
32pub fn flush_outbox_once(
33    store: &Store,
34    workspace_root: &Path,
35    cfg: &crate::core::config::SyncConfig,
36    team_salt: &[u8; 32],
37    flush: &FlushExporters<'_>,
38) -> Result<FlushStats> {
39    if cfg.endpoint.is_empty() {
40        return Ok(FlushStats::default());
41    }
42    let client = SyncHttpClient::new(&cfg.endpoint, &cfg.team_token)?;
43    let workspace_hash = crate::sync::outbound::workspace_hash(team_salt, workspace_root);
44    let mut stats = FlushStats::default();
45
46    while store.outbox_pending_count()? > 0 {
47        let rows = store.list_outbox_pending(10_000)?;
48        if rows.is_empty() {
49            break;
50        }
51        let Some(kind) = rows.first().map(|(_, k, _)| k.clone()) else {
52            break;
53        };
54        let sent = match build_batch(&rows, cfg, &cfg.team_id, &workspace_hash, &kind)? {
55            Some((ids, batch)) => post_batch_resilient(&client, store, batch, &ids, flush)?,
56            None => break,
57        };
58        stats.batches += sent.batches;
59        stats.events_sent += sent.events;
60    }
61
62    Ok(stats)
63}
64
65#[derive(Debug, Default, Clone)]
66pub struct FlushStats {
67    pub batches: u64,
68    pub events_sent: u64,
69}
70
71#[derive(Default)]
72struct Sent {
73    batches: u64,
74    events: u64,
75}
76
77fn build_batch(
78    rows: &[(i64, String, String)],
79    cfg: &crate::core::config::SyncConfig,
80    team_id: &str,
81    workspace_hash: &str,
82    kind: &str,
83) -> Result<Option<(Vec<i64>, IngestExportBatch)>> {
84    match kind {
85        "events" => {
86            let (ids, events) = pack_batch_payloads::<OutboundEvent>(rows, cfg, kind)?;
87            if ids.is_empty() {
88                return Ok(None);
89            }
90            Ok(Some((
91                ids,
92                IngestExportBatch::Events(EventsBatchBody {
93                    team_id: team_id.into(),
94                    workspace_hash: workspace_hash.into(),
95                    events,
96                }),
97            )))
98        }
99        "tool_spans" => {
100            let (ids, spans) = pack_batch_payloads::<OutboundToolSpan>(rows, cfg, kind)?;
101            if ids.is_empty() {
102                return Ok(None);
103            }
104            Ok(Some((
105                ids,
106                IngestExportBatch::ToolSpans(ToolSpansBatchBody {
107                    team_id: team_id.into(),
108                    workspace_hash: workspace_hash.into(),
109                    spans,
110                }),
111            )))
112        }
113        "repo_snapshots" => {
114            let (ids, snapshots) =
115                pack_batch_payloads::<OutboundRepoSnapshotChunk>(rows, cfg, kind)?;
116            if ids.is_empty() {
117                return Ok(None);
118            }
119            Ok(Some((
120                ids,
121                IngestExportBatch::RepoSnapshots(RepoSnapshotsBatchBody {
122                    team_id: team_id.into(),
123                    workspace_hash: workspace_hash.into(),
124                    snapshots,
125                }),
126            )))
127        }
128        "workspace_facts" => {
129            let (ids, facts) = pack_batch_payloads::<OutboundWorkspaceFactRow>(rows, cfg, kind)?;
130            if ids.is_empty() {
131                return Ok(None);
132            }
133            Ok(Some((
134                ids,
135                IngestExportBatch::WorkspaceFacts(WorkspaceFactsBatchBody {
136                    team_id: team_id.into(),
137                    workspace_hash: workspace_hash.into(),
138                    facts,
139                }),
140            )))
141        }
142        _ => Ok(None),
143    }
144}
145
146fn pack_batch_payloads<T>(
147    rows: &[(i64, String, String)],
148    cfg: &crate::core::config::SyncConfig,
149    kind: &str,
150) -> Result<(Vec<i64>, Vec<T>)>
151where
152    T: serde::de::DeserializeOwned + serde::Serialize,
153{
154    let mut ids = Vec::new();
155    let mut out = Vec::new();
156    let mut bytes = 0usize;
157    let max_ev = cfg.events_per_batch_max.max(1);
158    for (id, row_kind, raw) in rows {
159        if row_kind != kind {
160            break;
161        }
162        let item: T = serde_json::from_str(raw).context("parse outbox payload")?;
163        let inc = serde_json::to_vec(&item)?.len();
164        if out.len() >= max_ev {
165            break;
166        }
167        if bytes + inc > cfg.max_body_bytes && !out.is_empty() {
168            break;
169        }
170        bytes += inc;
171        ids.push(*id);
172        out.push(item);
173    }
174    Ok((ids, out))
175}
176
177/// Primary POST in parallel with optional exporter fan-out. Returns `(post, fan)`.
178fn post_with_fanout(
179    client: &SyncHttpClient,
180    body: &IngestExportBatch,
181    key: &Uuid,
182    flush: &FlushExporters<'_>,
183) -> Result<(
184    Result<PostBatchOutcome, anyhow::Error>,
185    Result<(), anyhow::Error>,
186)> {
187    let fan_body = body.clone();
188    let reg = flush.registry;
189    let fail_open = flush.telemetry.fail_open;
190    Ok(std::thread::scope(|s| {
191        let handle = s.spawn(move || {
192            if let Some(r) = reg {
193                r.fan_out(fail_open, &fan_body)
194            } else {
195                Ok(())
196            }
197        });
198        let post_res: Result<PostBatchOutcome, anyhow::Error> = (|| {
199            let o = match body {
200                IngestExportBatch::Events(b) => client.post_events_batch(b, key)?,
201                IngestExportBatch::ToolSpans(b) => client.post_tool_spans_batch(b, key)?,
202                IngestExportBatch::RepoSnapshots(b) => client.post_repo_snapshots_batch(b, key)?,
203                IngestExportBatch::WorkspaceFacts(b) => {
204                    client.post_workspace_facts_batch(b, key)?
205                }
206            };
207            Ok(o)
208        })();
209        let fan_res = match handle.join() {
210            Ok(Ok(())) => Ok(()),
211            Ok(Err(e)) => Err(e),
212            Err(p) => Err(anyhow::anyhow!("telemetry fan-out join panicked: {p:?}")),
213        };
214        (post_res, fan_res)
215    }))
216}
217
218fn post_batch_resilient(
219    client: &SyncHttpClient,
220    store: &Store,
221    body: IngestExportBatch,
222    ids: &[i64],
223    flush: &FlushExporters<'_>,
224) -> Result<Sent> {
225    let mut backoff = Duration::from_millis(200);
226    let max_backoff = Duration::from_secs(30);
227    let mut server_failures = 0u32;
228
229    loop {
230        if body.item_count() == 0 {
231            return Ok(Sent::default());
232        }
233
234        let key = Uuid::now_v7();
235        let (post_res, fan_res) = post_with_fanout(client, &body, &key, flush)?;
236        let outcome = post_res;
237
238        let outcome = match outcome {
239            Ok(o) => o,
240            Err(e) => {
241                if fan_res.is_err() {
242                    tracing::trace!(error = %e, "primary post and fan-out both failed");
243                }
244                return Err(e);
245            }
246        };
247
248        match outcome {
249            PostBatchOutcome::Accepted { .. } | PostBatchOutcome::Conflict => {
250                if let Err(e) = fan_res {
251                    return Err(
252                        e.context("telemetry fan-out (before outbox commit; fail_open = false)")
253                    );
254                }
255                store.mark_outbox_sent(ids)?;
256                store.set_sync_state_ok()?;
257                return Ok(Sent {
258                    batches: 1,
259                    events: ids.len() as u64,
260                });
261            }
262            PostBatchOutcome::TooLarge => {
263                if let Err(e) = fan_res {
264                    tracing::warn!(error = %e, "telemetry fan-out failed; continuing 413 split");
265                }
266                if body.item_count() <= 1 {
267                    store.set_sync_state_error("413: single event too large for server")?;
268                    anyhow::bail!(
269                        "413: single event too large; tighten redaction or max_body_bytes"
270                    );
271                }
272                let mid = body.item_count() / 2;
273                let left_ids = ids[..mid].to_vec();
274                let right_ids = ids[mid..].to_vec();
275                let (left_body, right_body) = split_batch(body, mid);
276                let a = post_batch_resilient(client, store, left_body, &left_ids, flush)?;
277                let b = post_batch_resilient(client, store, right_body, &right_ids, flush)?;
278                return Ok(Sent {
279                    batches: a.batches + b.batches,
280                    events: a.events + b.events,
281                });
282            }
283            PostBatchOutcome::RateLimited(d) => {
284                if let Err(e) = fan_res {
285                    tracing::warn!(error = %e, "telemetry fan-out failed during 429; will retry");
286                }
287                thread::sleep(d);
288            }
289            PostBatchOutcome::Unauthorized => {
290                if let Err(e) = fan_res {
291                    tracing::warn!(error = %e, "telemetry fan-out during 401");
292                }
293                let msg = "401 unauthorized (check team_token)";
294                store.set_sync_state_error(msg)?;
295                anyhow::bail!("{msg}");
296            }
297            PostBatchOutcome::ClientError(c) => {
298                if let Err(e) = fan_res {
299                    tracing::warn!(error = %e, "telemetry fan-out during client error {c}");
300                }
301                let msg = format!("HTTP client error {c}");
302                store.set_sync_state_error(&msg)?;
303                anyhow::bail!("{msg}");
304            }
305            PostBatchOutcome::ServerError(c) => {
306                if let Err(e) = fan_res {
307                    tracing::warn!(error = %e, "telemetry fan-out during {c} server error");
308                }
309                server_failures += 1;
310                if server_failures > 12 {
311                    let msg = format!("HTTP server error {c} (exhausted retries)");
312                    store.set_sync_state_error(&msg)?;
313                    anyhow::bail!("{msg}");
314                }
315                thread::sleep(backoff);
316                backoff = (backoff * 2).min(max_backoff);
317            }
318        }
319    }
320}
321
322fn split_batch(body: IngestExportBatch, mid: usize) -> (IngestExportBatch, IngestExportBatch) {
323    match body {
324        IngestExportBatch::Events(body) => (
325            IngestExportBatch::Events(EventsBatchBody {
326                team_id: body.team_id.clone(),
327                workspace_hash: body.workspace_hash.clone(),
328                events: body.events[..mid].to_vec(),
329            }),
330            IngestExportBatch::Events(EventsBatchBody {
331                team_id: body.team_id,
332                workspace_hash: body.workspace_hash,
333                events: body.events[mid..].to_vec(),
334            }),
335        ),
336        IngestExportBatch::ToolSpans(body) => (
337            IngestExportBatch::ToolSpans(ToolSpansBatchBody {
338                team_id: body.team_id.clone(),
339                workspace_hash: body.workspace_hash.clone(),
340                spans: body.spans[..mid].to_vec(),
341            }),
342            IngestExportBatch::ToolSpans(ToolSpansBatchBody {
343                team_id: body.team_id,
344                workspace_hash: body.workspace_hash,
345                spans: body.spans[mid..].to_vec(),
346            }),
347        ),
348        IngestExportBatch::RepoSnapshots(body) => (
349            IngestExportBatch::RepoSnapshots(RepoSnapshotsBatchBody {
350                team_id: body.team_id.clone(),
351                workspace_hash: body.workspace_hash.clone(),
352                snapshots: body.snapshots[..mid].to_vec(),
353            }),
354            IngestExportBatch::RepoSnapshots(RepoSnapshotsBatchBody {
355                team_id: body.team_id,
356                workspace_hash: body.workspace_hash,
357                snapshots: body.snapshots[mid..].to_vec(),
358            }),
359        ),
360        IngestExportBatch::WorkspaceFacts(body) => (
361            IngestExportBatch::WorkspaceFacts(WorkspaceFactsBatchBody {
362                team_id: body.team_id.clone(),
363                workspace_hash: body.workspace_hash.clone(),
364                facts: body.facts[..mid].to_vec(),
365            }),
366            IngestExportBatch::WorkspaceFacts(WorkspaceFactsBatchBody {
367                team_id: body.team_id,
368                workspace_hash: body.workspace_hash,
369                facts: body.facts[mid..].to_vec(),
370            }),
371        ),
372    }
373}