Skip to main content

kaizen/sync/
client.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! HTTP sync client: gzip JSON, retries, batch split on 413.
3
4use crate::sync::outbound::EventsBatchBody;
5use crate::sync::smart::{RepoSnapshotsBatchBody, ToolSpansBatchBody, WorkspaceFactsBatchBody};
6use anyhow::{Context, Result};
7use flate2::Compression;
8use flate2::write::GzEncoder;
9use reqwest::blocking::Client;
10use reqwest::header::{AUTHORIZATION, CONTENT_ENCODING, CONTENT_TYPE, RETRY_AFTER};
11use std::io::Write;
12use std::time::Duration;
13use uuid::Uuid;
14
15pub const CLIENT_HEADER_VALUE: &str = concat!("kaizen/", env!("CARGO_PKG_VERSION"));
16
17pub enum PostBatchOutcome {
18    Accepted { received: u64, deduped: u64 },
19    Conflict,
20    TooLarge,
21    RateLimited(Duration),
22    Unauthorized,
23    ClientError(u16),
24    ServerError(u16),
25}
26
27pub struct SyncHttpClient {
28    http: Client,
29    endpoint: String,
30    team_token: String,
31}
32
33impl SyncHttpClient {
34    pub fn new(endpoint: &str, team_token: &str) -> Result<Self> {
35        let http = Client::builder()
36            .timeout(Duration::from_secs(120))
37            .build()
38            .context("reqwest client")?;
39        Ok(Self {
40            http,
41            endpoint: endpoint.trim_end_matches('/').to_string(),
42            team_token: team_token.to_string(),
43        })
44    }
45
46    /// POST one batch. `idempotency_key` is a fresh UUIDv7 per attempt when retrying after failure.
47    pub fn post_events_batch(
48        &self,
49        body: &EventsBatchBody,
50        idempotency_key: &Uuid,
51    ) -> Result<PostBatchOutcome> {
52        self.post_json_gzip("/v1/events", body, idempotency_key)
53    }
54
55    pub fn post_tool_spans_batch(
56        &self,
57        body: &ToolSpansBatchBody,
58        idempotency_key: &Uuid,
59    ) -> Result<PostBatchOutcome> {
60        self.post_json_gzip("/v1/tool-spans", body, idempotency_key)
61    }
62
63    pub fn post_repo_snapshots_batch(
64        &self,
65        body: &RepoSnapshotsBatchBody,
66        idempotency_key: &Uuid,
67    ) -> Result<PostBatchOutcome> {
68        self.post_json_gzip("/v1/repo-snapshots", body, idempotency_key)
69    }
70
71    pub fn post_workspace_facts_batch(
72        &self,
73        body: &WorkspaceFactsBatchBody,
74        idempotency_key: &Uuid,
75    ) -> Result<PostBatchOutcome> {
76        self.post_json_gzip("/v1/workspace-facts", body, idempotency_key)
77    }
78
79    fn post_json_gzip<T: serde::Serialize>(
80        &self,
81        path: &str,
82        body: &T,
83        idempotency_key: &Uuid,
84    ) -> Result<PostBatchOutcome> {
85        let json = serde_json::to_vec(body).context("serialize batch")?;
86        let mut enc = GzEncoder::new(Vec::new(), Compression::default());
87        enc.write_all(&json).context("gzip write")?;
88        let gz = enc.finish().context("gzip finish")?;
89
90        let url = format!("{}{}", self.endpoint, path);
91        let resp = self
92            .http
93            .post(&url)
94            .header(AUTHORIZATION, format!("Bearer {}", self.team_token))
95            .header(CONTENT_TYPE, "application/json")
96            .header(CONTENT_ENCODING, "gzip")
97            .header("X-Kaizen-Idempotency-Key", idempotency_key.to_string())
98            .header("X-Kaizen-Client", CLIENT_HEADER_VALUE)
99            .body(gz)
100            .send()
101            .with_context(|| format!("POST {path}"))?;
102
103        let status = resp.status();
104        if status.as_u16() == 202 {
105            let bytes = resp.bytes().unwrap_or_default();
106            let v: serde_json::Value = if bytes.is_empty() {
107                serde_json::json!({})
108            } else {
109                serde_json::from_slice(&bytes).unwrap_or(serde_json::json!({}))
110            };
111            let received = v.get("received").and_then(|x| x.as_u64()).unwrap_or(0);
112            let deduped = v.get("deduped").and_then(|x| x.as_u64()).unwrap_or(0);
113            return Ok(PostBatchOutcome::Accepted { received, deduped });
114        }
115        if status.as_u16() == 409 {
116            return Ok(PostBatchOutcome::Conflict);
117        }
118        if status.as_u16() == 413 {
119            return Ok(PostBatchOutcome::TooLarge);
120        }
121        if status.as_u16() == 429 {
122            let d = retry_after_duration(resp.headers().get(RETRY_AFTER));
123            return Ok(PostBatchOutcome::RateLimited(d));
124        }
125        if status.as_u16() == 401 {
126            return Ok(PostBatchOutcome::Unauthorized);
127        }
128        let code = status.as_u16();
129        if status.is_client_error() {
130            return Ok(PostBatchOutcome::ClientError(code));
131        }
132        if status.is_server_error() {
133            return Ok(PostBatchOutcome::ServerError(code));
134        }
135        Ok(PostBatchOutcome::ServerError(code))
136    }
137
138    pub fn health(&self) -> Result<bool> {
139        let url = format!("{}/v1/health", self.endpoint);
140        let resp = self.http.get(&url).send().context("GET /v1/health")?;
141        Ok(resp.status().is_success())
142    }
143}
144
145fn retry_after_duration(h: Option<&reqwest::header::HeaderValue>) -> Duration {
146    let Some(h) = h else {
147        return Duration::from_secs(2);
148    };
149    let s = h.to_str().unwrap_or("2");
150    if let Ok(secs) = s.parse::<u64>() {
151        return Duration::from_secs(secs.max(1));
152    }
153    Duration::from_secs(2)
154}