1use crate::sync::outbound::EventsBatchBody;
5use crate::sync::smart::{RepoSnapshotsBatchBody, ToolSpansBatchBody, WorkspaceFactsBatchBody};
6use anyhow::{Context, Result};
7use flate2::Compression;
8use flate2::write::GzEncoder;
9use reqwest::blocking::Client;
10use reqwest::header::{AUTHORIZATION, CONTENT_ENCODING, CONTENT_TYPE, RETRY_AFTER};
11use std::io::Write;
12use std::time::Duration;
13use uuid::Uuid;
14
15pub const CLIENT_HEADER_VALUE: &str = concat!("kaizen/", env!("CARGO_PKG_VERSION"));
16
17pub enum PostBatchOutcome {
18 Accepted { received: u64, deduped: u64 },
19 Conflict,
20 TooLarge,
21 RateLimited(Duration),
22 Unauthorized,
23 ClientError(u16),
24 ServerError(u16),
25}
26
27pub struct SyncHttpClient {
28 http: Client,
29 endpoint: String,
30 team_token: String,
31}
32
33impl SyncHttpClient {
34 pub fn new(endpoint: &str, team_token: &str) -> Result<Self> {
35 let http = Client::builder()
36 .timeout(Duration::from_secs(120))
37 .build()
38 .context("reqwest client")?;
39 Ok(Self {
40 http,
41 endpoint: endpoint.trim_end_matches('/').to_string(),
42 team_token: team_token.to_string(),
43 })
44 }
45
46 pub fn post_events_batch(
48 &self,
49 body: &EventsBatchBody,
50 idempotency_key: &Uuid,
51 ) -> Result<PostBatchOutcome> {
52 self.post_json_gzip("/v1/events", body, idempotency_key)
53 }
54
55 pub fn post_tool_spans_batch(
56 &self,
57 body: &ToolSpansBatchBody,
58 idempotency_key: &Uuid,
59 ) -> Result<PostBatchOutcome> {
60 self.post_json_gzip("/v1/tool-spans", body, idempotency_key)
61 }
62
63 pub fn post_repo_snapshots_batch(
64 &self,
65 body: &RepoSnapshotsBatchBody,
66 idempotency_key: &Uuid,
67 ) -> Result<PostBatchOutcome> {
68 self.post_json_gzip("/v1/repo-snapshots", body, idempotency_key)
69 }
70
71 pub fn post_workspace_facts_batch(
72 &self,
73 body: &WorkspaceFactsBatchBody,
74 idempotency_key: &Uuid,
75 ) -> Result<PostBatchOutcome> {
76 self.post_json_gzip("/v1/workspace-facts", body, idempotency_key)
77 }
78
79 fn post_json_gzip<T: serde::Serialize>(
80 &self,
81 path: &str,
82 body: &T,
83 idempotency_key: &Uuid,
84 ) -> Result<PostBatchOutcome> {
85 let json = serde_json::to_vec(body).context("serialize batch")?;
86 let mut enc = GzEncoder::new(Vec::new(), Compression::default());
87 enc.write_all(&json).context("gzip write")?;
88 let gz = enc.finish().context("gzip finish")?;
89
90 let url = format!("{}{}", self.endpoint, path);
91 let resp = self
92 .http
93 .post(&url)
94 .header(AUTHORIZATION, format!("Bearer {}", self.team_token))
95 .header(CONTENT_TYPE, "application/json")
96 .header(CONTENT_ENCODING, "gzip")
97 .header("X-Kaizen-Idempotency-Key", idempotency_key.to_string())
98 .header("X-Kaizen-Client", CLIENT_HEADER_VALUE)
99 .body(gz)
100 .send()
101 .with_context(|| format!("POST {path}"))?;
102
103 let status = resp.status();
104 if status.as_u16() == 202 {
105 let bytes = resp.bytes().unwrap_or_default();
106 let v: serde_json::Value = if bytes.is_empty() {
107 serde_json::json!({})
108 } else {
109 serde_json::from_slice(&bytes).unwrap_or(serde_json::json!({}))
110 };
111 let received = v.get("received").and_then(|x| x.as_u64()).unwrap_or(0);
112 let deduped = v.get("deduped").and_then(|x| x.as_u64()).unwrap_or(0);
113 return Ok(PostBatchOutcome::Accepted { received, deduped });
114 }
115 if status.as_u16() == 409 {
116 return Ok(PostBatchOutcome::Conflict);
117 }
118 if status.as_u16() == 413 {
119 return Ok(PostBatchOutcome::TooLarge);
120 }
121 if status.as_u16() == 429 {
122 let d = retry_after_duration(resp.headers().get(RETRY_AFTER));
123 return Ok(PostBatchOutcome::RateLimited(d));
124 }
125 if status.as_u16() == 401 {
126 return Ok(PostBatchOutcome::Unauthorized);
127 }
128 let code = status.as_u16();
129 if status.is_client_error() {
130 return Ok(PostBatchOutcome::ClientError(code));
131 }
132 if status.is_server_error() {
133 return Ok(PostBatchOutcome::ServerError(code));
134 }
135 Ok(PostBatchOutcome::ServerError(code))
136 }
137
138 pub fn health(&self) -> Result<bool> {
139 let url = format!("{}/v1/health", self.endpoint);
140 let resp = self.http.get(&url).send().context("GET /v1/health")?;
141 Ok(resp.status().is_success())
142 }
143}
144
145fn retry_after_duration(h: Option<&reqwest::header::HeaderValue>) -> Duration {
146 let Some(h) = h else {
147 return Duration::from_secs(2);
148 };
149 let s = h.to_str().unwrap_or("2");
150 if let Ok(secs) = s.parse::<u64>() {
151 return Duration::from_secs(secs.max(1));
152 }
153 Duration::from_secs(2)
154}