1use std::path::Path;
2use std::time::Duration;
3
4use interprocess::local_socket::tokio::prelude::LocalSocketStream;
5use interprocess::local_socket::traits::tokio::Stream;
6use interprocess::local_socket::{GenericFilePath, Name, ToFsName};
7use serde::de::DeserializeOwned;
8use serde_json::Value;
9
10use crate::errors::{KodeBridgeError, Result};
11use crate::http_client::{RequestBuilder, Response, send_request};
12use crate::pool::{ConnectionPool, PoolConfig, PooledConnection};
13use http::Method;
14use std::str::FromStr;
15use tracing::{debug, trace};
16
17#[derive(Debug, Clone)]
19pub struct ClientConfig {
20 pub default_timeout: Duration,
22 pub pool_config: PoolConfig,
24 pub enable_pooling: bool,
26 pub max_retries: usize,
28 pub retry_delay: Duration,
29}
30
31impl Default for ClientConfig {
32 fn default() -> Self {
33 Self {
34 default_timeout: Duration::from_secs(30),
35 pool_config: PoolConfig::default(),
36 enable_pooling: true,
37 max_retries: 3,
38 retry_delay: Duration::from_millis(100),
39 }
40 }
41}
42
43pub struct IpcHttpClient {
48 name: Name<'static>,
49 config: ClientConfig,
50 pool: Option<ConnectionPool>,
51}
52
53pub struct HttpRequestBuilder<'a> {
55 client: &'a IpcHttpClient,
56 method: Method,
57 path: String,
58 body: Option<Value>,
59 timeout: Option<Duration>,
60 headers: Vec<(String, String)>,
61}
62
63#[derive(Debug)]
65pub struct HttpResponse {
66 inner: Response,
67}
68
69impl HttpResponse {
70 fn new(response: Response) -> Self {
71 Self { inner: response }
72 }
73
74 pub fn status(&self) -> u16 {
76 self.inner.status_code()
77 }
78
79 pub fn headers(&self) -> Value {
81 let headers_map: std::collections::HashMap<String, String> = self
82 .inner
83 .headers()
84 .iter()
85 .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
86 .collect();
87 serde_json::to_value(headers_map).unwrap_or(Value::Null)
88 }
89
90 pub fn body(&self) -> Result<String> {
92 self.inner.text()
93 }
94
95 pub fn is_success(&self) -> bool {
97 self.inner.is_success()
98 }
99
100 pub fn is_client_error(&self) -> bool {
102 self.inner.is_client_error()
103 }
104
105 pub fn is_server_error(&self) -> bool {
107 self.inner.is_server_error()
108 }
109
110 pub fn content_length(&self) -> u64 {
112 self.inner.content_length().unwrap_or(0)
113 }
114
115 pub fn json<T>(&self) -> Result<T>
117 where
118 T: DeserializeOwned,
119 {
120 self.inner.json()
121 }
122
123 pub fn json_value(&self) -> Result<Value> {
125 self.inner.json_value()
126 }
127
128 pub fn into_inner(self) -> Response {
130 self.inner
131 }
132
133 pub fn to_legacy(&self) -> crate::response::LegacyResponse {
135 self.inner.to_legacy()
136 }
137}
138
139impl IpcHttpClient {
140 pub fn new<P>(path: P) -> Result<Self>
142 where
143 P: AsRef<Path>,
144 {
145 Self::with_config(path, ClientConfig::default())
146 }
147
148 pub fn with_config<P>(path: P, config: ClientConfig) -> Result<Self>
150 where
151 P: AsRef<Path>,
152 {
153 let name = path
154 .as_ref()
155 .to_fs_name::<GenericFilePath>()
156 .map_err(|e| KodeBridgeError::configuration(format!("Invalid path: {}", e)))?
157 .into_owned();
158
159 let pool = if config.enable_pooling {
160 Some(ConnectionPool::new(
161 name.clone(),
162 config.pool_config.clone(),
163 ))
164 } else {
165 None
166 };
167
168 Ok(Self { name, config, pool })
169 }
170
171 async fn create_direct_connection(&self) -> Result<LocalSocketStream> {
173 let mut last_error = None;
174
175 for attempt in 0..self.config.max_retries {
176 if attempt > 0 {
177 tokio::time::sleep(self.config.retry_delay).await;
178 }
179
180 match LocalSocketStream::connect(self.name.clone()).await {
181 Ok(stream) => {
182 debug!("Created direct connection on attempt {}", attempt + 1);
183 return Ok(stream);
184 }
185 Err(e) => {
186 trace!("Connection attempt {} failed: {}", attempt + 1, e);
187 last_error = Some(e);
188 }
189 }
190 }
191
192 Err(KodeBridgeError::connection(format!(
193 "Failed to create connection after {} attempts: {}",
194 self.config.max_retries,
195 last_error.unwrap()
196 )))
197 }
198
199 async fn get_connection(&self) -> Result<Either<PooledConnection, LocalSocketStream>> {
201 if let Some(ref pool) = self.pool {
202 pool.get_connection().await.map(Either::Pool)
203 } else {
204 self.create_direct_connection().await.map(Either::Direct)
205 }
206 }
207
208 pub async fn request(
210 &self,
211 method: &str,
212 path: &str,
213 body: Option<&serde_json::Value>,
214 ) -> crate::errors::AnyResult<crate::response::LegacyResponse> {
215 let response = self
216 .send_request_internal(method, path, body, self.config.default_timeout)
217 .await?;
218 Ok(response.to_legacy())
219 }
220
221 async fn send_request_internal(
223 &self,
224 method: &str,
225 path: &str,
226 body: Option<&Value>,
227 timeout: Duration,
228 ) -> Result<Response> {
229 let method = Method::from_str(method)
230 .map_err(|e| KodeBridgeError::invalid_request(format!("Invalid method: {}", e)))?;
231
232 let mut builder = RequestBuilder::new(method, path.to_string());
233
234 if let Some(json_body) = body {
235 builder = builder.json(json_body)?;
236 }
237
238 let request = builder.build()?;
239
240 let result = tokio::time::timeout(timeout, async {
242 let mut connection = self.get_connection().await?;
243
244 match &mut connection {
245 Either::Pool(conn) => {
246 if let Some(stream) = conn.stream() {
247 send_request(stream, request).await
248 } else {
249 Err(KodeBridgeError::connection("Pooled connection is invalid"))
250 }
251 }
252 Either::Direct(stream) => send_request(stream, request).await,
253 }
254 })
255 .await;
256
257 match result {
258 Ok(response) => response,
259 Err(_) => Err(KodeBridgeError::timeout(timeout.as_millis() as u64)),
260 }
261 }
262
263 pub fn get(&self, path: &str) -> HttpRequestBuilder<'_> {
265 HttpRequestBuilder::new(self, Method::GET, path)
266 }
267
268 pub fn post(&self, path: &str) -> HttpRequestBuilder<'_> {
270 HttpRequestBuilder::new(self, Method::POST, path)
271 }
272
273 pub fn put(&self, path: &str) -> HttpRequestBuilder<'_> {
275 HttpRequestBuilder::new(self, Method::PUT, path)
276 }
277
278 pub fn delete(&self, path: &str) -> HttpRequestBuilder<'_> {
280 HttpRequestBuilder::new(self, Method::DELETE, path)
281 }
282
283 pub fn patch(&self, path: &str) -> HttpRequestBuilder<'_> {
285 HttpRequestBuilder::new(self, Method::PATCH, path)
286 }
287
288 pub fn head(&self, path: &str) -> HttpRequestBuilder<'_> {
290 HttpRequestBuilder::new(self, Method::HEAD, path)
291 }
292
293 pub fn options(&self, path: &str) -> HttpRequestBuilder<'_> {
295 HttpRequestBuilder::new(self, Method::OPTIONS, path)
296 }
297
298 pub fn pool_stats(&self) -> Option<crate::pool::PoolStats> {
300 self.pool.as_ref().map(|p| p.stats())
301 }
302
303 pub fn close(&self) {
305 if let Some(ref pool) = self.pool {
306 pool.close();
307 }
308 }
309}
310
311impl<'a> HttpRequestBuilder<'a> {
312 fn new(client: &'a IpcHttpClient, method: Method, path: &str) -> Self {
313 Self {
314 client,
315 method,
316 path: path.to_string(),
317 body: None,
318 timeout: None,
319 headers: Vec::new(),
320 }
321 }
322
323 pub fn json_body(mut self, body: &Value) -> Self {
325 self.body = Some(body.clone());
326 self
327 }
328
329 pub fn timeout(mut self, timeout: Duration) -> Self {
331 self.timeout = Some(timeout);
332 self
333 }
334
335 pub fn header<K, V>(mut self, key: K, value: V) -> Self
337 where
338 K: Into<String>,
339 V: Into<String>,
340 {
341 self.headers.push((key.into(), value.into()));
342 self
343 }
344
345 pub async fn send(self) -> Result<HttpResponse> {
347 let timeout = self.timeout.unwrap_or(self.client.config.default_timeout);
348 let response = self
349 .client
350 .send_request_internal(
351 self.method.as_str(),
352 &self.path,
353 self.body.as_ref(),
354 timeout,
355 )
356 .await?;
357
358 Ok(HttpResponse::new(response))
359 }
360}
361
362enum Either<A, B> {
364 Pool(A),
365 Direct(B),
366}
367
368impl Drop for IpcHttpClient {
369 fn drop(&mut self) {
370 self.close();
371 }
372}