1use std::path::Path;
2use std::time::Duration;
3
4use interprocess::local_socket::tokio::prelude::LocalSocketStream;
5use interprocess::local_socket::traits::tokio::Stream;
6use interprocess::local_socket::{GenericFilePath, Name, ToFsName};
7use serde::de::DeserializeOwned;
8use serde_json::Value;
9
10use crate::errors::{KodeBridgeError, Result};
11use crate::http_client::{send_request, RequestBuilder, Response};
12use crate::metrics::global_metrics;
13use crate::pool::{ConnectionPool, PoolConfig, PooledConnection};
14use crate::retry::{RetryConfig, RetryExecutor};
15use http::Method;
16use std::str::FromStr;
17use tracing::{debug, trace};
18
19#[derive(Debug, Clone)]
21pub struct ClientConfig {
22 pub default_timeout: Duration,
24 pub pool_config: PoolConfig,
26 pub enable_pooling: bool,
28 pub max_retries: usize,
30 pub retry_delay: Duration,
31 pub max_concurrent_requests: usize,
33 pub max_requests_per_second: Option<f64>,
35}
36
37impl Default for ClientConfig {
38 fn default() -> Self {
39 Self {
40 default_timeout: Duration::from_secs(10), pool_config: PoolConfig::default(),
42 enable_pooling: true,
43 max_retries: 5, retry_delay: Duration::from_millis(50), max_concurrent_requests: 8, max_requests_per_second: Some(10.0), }
48 }
49}
50
51pub struct IpcHttpClient {
56 name: Name<'static>,
57 config: ClientConfig,
58 pool: Option<ConnectionPool>,
59 retry_executor: RetryExecutor,
60}
61
62pub struct HttpRequestBuilder<'a> {
64 client: &'a IpcHttpClient,
65 method: Method,
66 path: String,
67 body: Option<Value>,
68 timeout: Option<Duration>,
69 headers: Vec<(String, String)>,
70}
71
72#[derive(Debug)]
74pub struct HttpResponse {
75 inner: Response,
76}
77
78impl HttpResponse {
79 fn new(response: Response) -> Self {
80 Self { inner: response }
81 }
82
83 pub fn status(&self) -> u16 {
85 self.inner.status_code()
86 }
87
88 pub fn headers(&self) -> Value {
90 let headers_map: std::collections::HashMap<String, String> = self
91 .inner
92 .headers()
93 .iter()
94 .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
95 .collect();
96 serde_json::to_value(headers_map).unwrap_or(Value::Null)
97 }
98
99 pub fn body(&self) -> Result<String> {
101 self.inner.text()
102 }
103
104 pub fn is_success(&self) -> bool {
106 self.inner.is_success()
107 }
108
109 pub fn is_client_error(&self) -> bool {
111 self.inner.is_client_error()
112 }
113
114 pub fn is_server_error(&self) -> bool {
116 self.inner.is_server_error()
117 }
118
119 pub fn content_length(&self) -> u64 {
121 self.inner.content_length().unwrap_or(0)
122 }
123
124 pub fn json<T>(&self) -> Result<T>
126 where
127 T: DeserializeOwned,
128 {
129 self.inner.json()
130 }
131
132 pub fn json_value(&self) -> Result<Value> {
134 self.inner.json_value()
135 }
136
137 pub fn into_inner(self) -> Response {
139 self.inner
140 }
141
142 pub fn to_legacy(&self) -> crate::response::LegacyResponse {
144 self.inner.to_legacy()
145 }
146}
147
148impl IpcHttpClient {
149 pub fn new<P>(path: P) -> Result<Self>
151 where
152 P: AsRef<Path>,
153 {
154 Self::with_config(path, ClientConfig::default())
155 }
156
157 pub fn with_config<P>(path: P, config: ClientConfig) -> Result<Self>
159 where
160 P: AsRef<Path>,
161 {
162 let name = path
163 .as_ref()
164 .to_fs_name::<GenericFilePath>()
165 .map_err(|e| KodeBridgeError::configuration(format!("Invalid path: {}", e)))?
166 .into_owned();
167
168 let pool = if config.enable_pooling {
169 Some(ConnectionPool::new(
170 name.clone(),
171 config.pool_config.clone(),
172 ))
173 } else {
174 None
175 };
176
177 let retry_config = RetryConfig::for_network_operations()
179 .max_attempts(config.max_retries)
180 .base_delay(Duration::from_millis(config.pool_config.retry_delay_ms));
181
182 let retry_executor = RetryExecutor::new(retry_config);
183
184 Ok(Self {
185 name,
186 config,
187 pool,
188 retry_executor,
189 })
190 }
191
192 async fn create_direct_connection(&self) -> Result<LocalSocketStream> {
194 let mut last_error = None;
195
196 for attempt in 0..self.config.max_retries {
197 if attempt > 0 {
198 tokio::time::sleep(self.config.retry_delay).await;
199 }
200
201 match LocalSocketStream::connect(self.name.clone()).await {
202 Ok(stream) => {
203 debug!("Created direct connection on attempt {}", attempt + 1);
204 return Ok(stream);
205 }
206 Err(e) => {
207 trace!("Connection attempt {} failed: {}", attempt + 1, e);
208 last_error = Some(e);
209 }
210 }
211 }
212
213 Err(KodeBridgeError::connection(format!(
214 "Failed to create connection after {} attempts: {}",
215 self.config.max_retries,
216 last_error
217 .map(|e| e.to_string())
218 .unwrap_or_else(|| "Unknown error".to_string())
219 )))
220 }
221
222 async fn get_connection(&self) -> Result<Either<PooledConnection, LocalSocketStream>> {
224 let metrics = global_metrics();
225
226 if let Some(ref pool) = self.pool {
227 match pool.get_connection().await {
228 Ok(conn) => {
229 metrics.connection_created(true); Ok(Either::Pool(conn))
231 }
232 Err(e) => {
233 metrics.connection_failed();
234 Err(e)
235 }
236 }
237 } else {
238 match self.create_direct_connection().await {
239 Ok(stream) => {
240 metrics.connection_created(false); Ok(Either::Direct(stream))
242 }
243 Err(e) => {
244 metrics.connection_failed();
245 Err(e)
246 }
247 }
248 }
249 }
250
251 pub async fn request(
253 &self,
254 method: &str,
255 path: &str,
256 body: Option<&serde_json::Value>,
257 ) -> crate::errors::AnyResult<crate::response::LegacyResponse> {
258 let response = self
259 .send_request_internal(method, path, body, self.config.default_timeout)
260 .await?;
261 Ok(response.to_legacy())
262 }
263
264 async fn send_request_internal(
266 &self,
267 method: &str,
268 path: &str,
269 body: Option<&Value>,
270 timeout: Duration,
271 ) -> Result<Response> {
272 let method = Method::from_str(method)
273 .map_err(|e| KodeBridgeError::invalid_request(format!("Invalid method: {}", e)))?;
274
275 let mut builder = RequestBuilder::new(method.clone(), path.to_string());
276
277 if let Some(json_body) = body {
278 builder = builder.json(json_body)?;
279 }
280
281 let request = builder.build()?;
282
283 self.retry_executor
285 .execute_with_context(&format!("{} {}", method.as_str(), path), || async {
286 let result = tokio::time::timeout(timeout, async {
288 let mut connection = self.get_connection().await?;
289
290 match &mut connection {
291 Either::Pool(conn) => {
292 if let Some(stream) = conn.stream() {
293 send_request(stream, request.clone()).await
294 } else {
295 Err(KodeBridgeError::connection("Pooled connection is invalid"))
296 }
297 }
298 Either::Direct(stream) => send_request(stream, request.clone()).await,
299 }
300 })
301 .await;
302
303 match result {
304 Ok(response) => response,
305 Err(_) => Err(KodeBridgeError::timeout(timeout.as_millis() as u64)),
306 }
307 })
308 .await
309 }
310
311 pub fn get(&self, path: &str) -> HttpRequestBuilder<'_> {
313 HttpRequestBuilder::new(self, Method::GET, path)
314 }
315
316 pub fn post(&self, path: &str) -> HttpRequestBuilder<'_> {
318 HttpRequestBuilder::new(self, Method::POST, path)
319 }
320
321 pub fn put(&self, path: &str) -> HttpRequestBuilder<'_> {
323 HttpRequestBuilder::new(self, Method::PUT, path)
324 }
325
326 pub fn delete(&self, path: &str) -> HttpRequestBuilder<'_> {
328 HttpRequestBuilder::new(self, Method::DELETE, path)
329 }
330
331 pub fn patch(&self, path: &str) -> HttpRequestBuilder<'_> {
333 HttpRequestBuilder::new(self, Method::PATCH, path)
334 }
335
336 pub fn head(&self, path: &str) -> HttpRequestBuilder<'_> {
338 HttpRequestBuilder::new(self, Method::HEAD, path)
339 }
340
341 pub fn options(&self, path: &str) -> HttpRequestBuilder<'_> {
343 HttpRequestBuilder::new(self, Method::OPTIONS, path)
344 }
345
346 pub fn pool_stats(&self) -> Option<crate::pool::PoolStats> {
348 self.pool.as_ref().map(|p| p.stats())
349 }
350
351 pub fn close(&self) {
353 if let Some(ref pool) = self.pool {
354 pool.close();
355 }
356 }
357}
358
359impl<'a> HttpRequestBuilder<'a> {
360 fn new(client: &'a IpcHttpClient, method: Method, path: &str) -> Self {
361 Self {
362 client,
363 method,
364 path: path.to_string(),
365 body: None,
366 timeout: None,
367 headers: Vec::new(),
368 }
369 }
370
371 pub fn json_body(mut self, body: &Value) -> Self {
373 self.body = Some(body.clone());
374 self
375 }
376
377 pub fn timeout(mut self, timeout: Duration) -> Self {
379 self.timeout = Some(timeout);
380 self
381 }
382
383 pub fn header<K, V>(mut self, key: K, value: V) -> Self
385 where
386 K: Into<String>,
387 V: Into<String>,
388 {
389 self.headers.push((key.into(), value.into()));
390 self
391 }
392
393 pub async fn send(self) -> Result<HttpResponse> {
395 let metrics = global_metrics();
396 let tracker = metrics.request_start(self.method.as_str());
397
398 let timeout = self.timeout.unwrap_or(self.client.config.default_timeout);
399
400 match self
401 .client
402 .send_request_internal(
403 self.method.as_str(),
404 &self.path,
405 self.body.as_ref(),
406 timeout,
407 )
408 .await
409 {
410 Ok(response) => {
411 tracker.success(response.status_code());
412 Ok(HttpResponse::new(response))
413 }
414 Err(e) => {
415 tracker.failure(&format!("{:?}", e));
416 Err(e)
417 }
418 }
419 }
420}
421
422enum Either<A, B> {
424 Pool(A),
425 Direct(B),
426}
427
428impl Drop for IpcHttpClient {
429 fn drop(&mut self) {
430 self.close();
431 }
432}