Skip to main content

kaizen/sync/
client.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! HTTP sync client: gzip JSON, retries, batch split on 413.
3
4use crate::sync::export_batch::SessionEvalsBatchBody;
5use crate::sync::outbound::EventsBatchBody;
6use crate::sync::smart::{RepoSnapshotsBatchBody, ToolSpansBatchBody, WorkspaceFactsBatchBody};
7use anyhow::{Context, Result};
8use flate2::Compression;
9use flate2::write::GzEncoder;
10use reqwest::blocking::Client;
11use reqwest::header::{AUTHORIZATION, CONTENT_ENCODING, CONTENT_TYPE, RETRY_AFTER};
12use std::io::Write;
13use std::time::Duration;
14use uuid::Uuid;
15
16pub const CLIENT_HEADER_VALUE: &str = concat!("kaizen/", env!("CARGO_PKG_VERSION"));
17
18pub enum PostBatchOutcome {
19    Accepted { received: u64, deduped: u64 },
20    Conflict,
21    TooLarge,
22    RateLimited(Duration),
23    Unauthorized,
24    ClientError(u16),
25    ServerError(u16),
26}
27
28pub struct SyncHttpClient {
29    http: Client,
30    endpoint: String,
31    team_token: String,
32}
33
34impl SyncHttpClient {
35    pub fn new(endpoint: &str, team_token: &str) -> Result<Self> {
36        let http = Client::builder()
37            .timeout(Duration::from_secs(120))
38            .build()
39            .context("reqwest client")?;
40        Ok(Self {
41            http,
42            endpoint: endpoint.trim_end_matches('/').to_string(),
43            team_token: team_token.to_string(),
44        })
45    }
46
47    /// POST one batch. `idempotency_key` is a fresh UUIDv7 per attempt when retrying after failure.
48    pub fn post_events_batch(
49        &self,
50        body: &EventsBatchBody,
51        idempotency_key: &Uuid,
52    ) -> Result<PostBatchOutcome> {
53        self.post_json_gzip("/v1/events", body, idempotency_key)
54    }
55
56    pub fn post_tool_spans_batch(
57        &self,
58        body: &ToolSpansBatchBody,
59        idempotency_key: &Uuid,
60    ) -> Result<PostBatchOutcome> {
61        self.post_json_gzip("/v1/tool-spans", body, idempotency_key)
62    }
63
64    pub fn post_repo_snapshots_batch(
65        &self,
66        body: &RepoSnapshotsBatchBody,
67        idempotency_key: &Uuid,
68    ) -> Result<PostBatchOutcome> {
69        self.post_json_gzip("/v1/repo-snapshots", body, idempotency_key)
70    }
71
72    pub fn post_workspace_facts_batch(
73        &self,
74        body: &WorkspaceFactsBatchBody,
75        idempotency_key: &Uuid,
76    ) -> Result<PostBatchOutcome> {
77        self.post_json_gzip("/v1/workspace-facts", body, idempotency_key)
78    }
79
80    pub fn post_session_evals_batch(
81        &self,
82        body: &SessionEvalsBatchBody,
83        idempotency_key: &Uuid,
84    ) -> Result<PostBatchOutcome> {
85        self.post_json_gzip("/v1/session-evals", body, idempotency_key)
86    }
87
88    fn post_json_gzip<T: serde::Serialize>(
89        &self,
90        path: &str,
91        body: &T,
92        idempotency_key: &Uuid,
93    ) -> Result<PostBatchOutcome> {
94        let json = serde_json::to_vec(body).context("serialize batch")?;
95        let mut enc = GzEncoder::new(Vec::new(), Compression::default());
96        enc.write_all(&json).context("gzip write")?;
97        let gz = enc.finish().context("gzip finish")?;
98
99        let url = format!("{}{}", self.endpoint, path);
100        let resp = self
101            .http
102            .post(&url)
103            .header(AUTHORIZATION, format!("Bearer {}", self.team_token))
104            .header(CONTENT_TYPE, "application/json")
105            .header(CONTENT_ENCODING, "gzip")
106            .header("X-Kaizen-Idempotency-Key", idempotency_key.to_string())
107            .header("X-Kaizen-Client", CLIENT_HEADER_VALUE)
108            .body(gz)
109            .send()
110            .with_context(|| format!("POST {path}"))?;
111
112        let status = resp.status();
113        if status.as_u16() == 202 {
114            let bytes = resp.bytes().unwrap_or_default();
115            let v: serde_json::Value = if bytes.is_empty() {
116                serde_json::json!({})
117            } else {
118                serde_json::from_slice(&bytes).unwrap_or(serde_json::json!({}))
119            };
120            let received = v.get("received").and_then(|x| x.as_u64()).unwrap_or(0);
121            let deduped = v.get("deduped").and_then(|x| x.as_u64()).unwrap_or(0);
122            return Ok(PostBatchOutcome::Accepted { received, deduped });
123        }
124        if status.as_u16() == 409 {
125            return Ok(PostBatchOutcome::Conflict);
126        }
127        if status.as_u16() == 413 {
128            return Ok(PostBatchOutcome::TooLarge);
129        }
130        if status.as_u16() == 429 {
131            let d = retry_after_duration(resp.headers().get(RETRY_AFTER));
132            return Ok(PostBatchOutcome::RateLimited(d));
133        }
134        if status.as_u16() == 401 {
135            return Ok(PostBatchOutcome::Unauthorized);
136        }
137        let code = status.as_u16();
138        if status.is_client_error() {
139            return Ok(PostBatchOutcome::ClientError(code));
140        }
141        if status.is_server_error() {
142            return Ok(PostBatchOutcome::ServerError(code));
143        }
144        Ok(PostBatchOutcome::ServerError(code))
145    }
146
147    pub fn health(&self) -> Result<bool> {
148        let url = format!("{}/v1/health", self.endpoint);
149        let resp = self.http.get(&url).send().context("GET /v1/health")?;
150        Ok(resp.status().is_success())
151    }
152}
153
154fn retry_after_duration(h: Option<&reqwest::header::HeaderValue>) -> Duration {
155    let Some(h) = h else {
156        return Duration::from_secs(2);
157    };
158    let s = h.to_str().unwrap_or("2");
159    if let Ok(secs) = s.parse::<u64>() {
160        return Duration::from_secs(secs.max(1));
161    }
162    Duration::from_secs(2)
163}