1use crate::sync::export_batch::SessionEvalsBatchBody;
5use crate::sync::outbound::EventsBatchBody;
6use crate::sync::smart::{RepoSnapshotsBatchBody, ToolSpansBatchBody, WorkspaceFactsBatchBody};
7use anyhow::{Context, Result};
8use flate2::Compression;
9use flate2::write::GzEncoder;
10use reqwest::blocking::Client;
11use reqwest::header::{AUTHORIZATION, CONTENT_ENCODING, CONTENT_TYPE, RETRY_AFTER};
12use std::io::Write;
13use std::time::Duration;
14use uuid::Uuid;
15
16pub const CLIENT_HEADER_VALUE: &str = concat!("kaizen/", env!("CARGO_PKG_VERSION"));
17
18pub enum PostBatchOutcome {
19 Accepted { received: u64, deduped: u64 },
20 Conflict,
21 TooLarge,
22 RateLimited(Duration),
23 Unauthorized,
24 ClientError(u16),
25 ServerError(u16),
26}
27
28pub struct SyncHttpClient {
29 http: Client,
30 endpoint: String,
31 team_token: String,
32}
33
34impl SyncHttpClient {
35 pub fn new(endpoint: &str, team_token: &str) -> Result<Self> {
36 let http = Client::builder()
37 .timeout(Duration::from_secs(120))
38 .build()
39 .context("reqwest client")?;
40 Ok(Self {
41 http,
42 endpoint: endpoint.trim_end_matches('/').to_string(),
43 team_token: team_token.to_string(),
44 })
45 }
46
47 pub fn post_events_batch(
49 &self,
50 body: &EventsBatchBody,
51 idempotency_key: &Uuid,
52 ) -> Result<PostBatchOutcome> {
53 self.post_json_gzip("/v1/events", body, idempotency_key)
54 }
55
56 pub fn post_tool_spans_batch(
57 &self,
58 body: &ToolSpansBatchBody,
59 idempotency_key: &Uuid,
60 ) -> Result<PostBatchOutcome> {
61 self.post_json_gzip("/v1/tool-spans", body, idempotency_key)
62 }
63
64 pub fn post_repo_snapshots_batch(
65 &self,
66 body: &RepoSnapshotsBatchBody,
67 idempotency_key: &Uuid,
68 ) -> Result<PostBatchOutcome> {
69 self.post_json_gzip("/v1/repo-snapshots", body, idempotency_key)
70 }
71
72 pub fn post_workspace_facts_batch(
73 &self,
74 body: &WorkspaceFactsBatchBody,
75 idempotency_key: &Uuid,
76 ) -> Result<PostBatchOutcome> {
77 self.post_json_gzip("/v1/workspace-facts", body, idempotency_key)
78 }
79
80 pub fn post_session_evals_batch(
81 &self,
82 body: &SessionEvalsBatchBody,
83 idempotency_key: &Uuid,
84 ) -> Result<PostBatchOutcome> {
85 self.post_json_gzip("/v1/session-evals", body, idempotency_key)
86 }
87
88 fn post_json_gzip<T: serde::Serialize>(
89 &self,
90 path: &str,
91 body: &T,
92 idempotency_key: &Uuid,
93 ) -> Result<PostBatchOutcome> {
94 let json = serde_json::to_vec(body).context("serialize batch")?;
95 let mut enc = GzEncoder::new(Vec::new(), Compression::default());
96 enc.write_all(&json).context("gzip write")?;
97 let gz = enc.finish().context("gzip finish")?;
98
99 let url = format!("{}{}", self.endpoint, path);
100 let resp = self
101 .http
102 .post(&url)
103 .header(AUTHORIZATION, format!("Bearer {}", self.team_token))
104 .header(CONTENT_TYPE, "application/json")
105 .header(CONTENT_ENCODING, "gzip")
106 .header("X-Kaizen-Idempotency-Key", idempotency_key.to_string())
107 .header("X-Kaizen-Client", CLIENT_HEADER_VALUE)
108 .body(gz)
109 .send()
110 .with_context(|| format!("POST {path}"))?;
111
112 let status = resp.status();
113 if status.as_u16() == 202 {
114 let bytes = resp.bytes().unwrap_or_default();
115 let v: serde_json::Value = if bytes.is_empty() {
116 serde_json::json!({})
117 } else {
118 serde_json::from_slice(&bytes).unwrap_or(serde_json::json!({}))
119 };
120 let received = v.get("received").and_then(|x| x.as_u64()).unwrap_or(0);
121 let deduped = v.get("deduped").and_then(|x| x.as_u64()).unwrap_or(0);
122 return Ok(PostBatchOutcome::Accepted { received, deduped });
123 }
124 if status.as_u16() == 409 {
125 return Ok(PostBatchOutcome::Conflict);
126 }
127 if status.as_u16() == 413 {
128 return Ok(PostBatchOutcome::TooLarge);
129 }
130 if status.as_u16() == 429 {
131 let d = retry_after_duration(resp.headers().get(RETRY_AFTER));
132 return Ok(PostBatchOutcome::RateLimited(d));
133 }
134 if status.as_u16() == 401 {
135 return Ok(PostBatchOutcome::Unauthorized);
136 }
137 let code = status.as_u16();
138 if status.is_client_error() {
139 return Ok(PostBatchOutcome::ClientError(code));
140 }
141 if status.is_server_error() {
142 return Ok(PostBatchOutcome::ServerError(code));
143 }
144 Ok(PostBatchOutcome::ServerError(code))
145 }
146
147 pub fn health(&self) -> Result<bool> {
148 let url = format!("{}/v1/health", self.endpoint);
149 let resp = self.http.get(&url).send().context("GET /v1/health")?;
150 Ok(resp.status().is_success())
151 }
152}
153
154fn retry_after_duration(h: Option<&reqwest::header::HeaderValue>) -> Duration {
155 let Some(h) = h else {
156 return Duration::from_secs(2);
157 };
158 let s = h.to_str().unwrap_or("2");
159 if let Ok(secs) = s.parse::<u64>() {
160 return Duration::from_secs(secs.max(1));
161 }
162 Duration::from_secs(2)
163}