1use std::path::Path;
2use std::time::Duration;
3
4use interprocess::local_socket::tokio::prelude::LocalSocketStream;
5use interprocess::local_socket::traits::tokio::Stream;
6use interprocess::local_socket::{GenericFilePath, Name, ToFsName};
7use serde::de::DeserializeOwned;
8use serde_json::Value;
9
10use crate::errors::{KodeBridgeError, Result};
11use crate::http_client::{send_request, RequestBuilder, Response};
12use crate::metrics::global_metrics;
13use crate::pool::{ConnectionPool, PoolConfig, PooledConnection};
14use crate::retry::{RetryConfig, RetryExecutor};
15use http::Method;
16use std::str::FromStr;
17use tracing::{debug, trace};
18
19#[derive(Debug, Clone)]
21pub struct ClientConfig {
22 pub default_timeout: Duration,
24 pub pool_config: PoolConfig,
26 pub enable_pooling: bool,
28 pub max_retries: usize,
30 pub retry_delay: Duration,
31 pub max_concurrent_requests: usize,
33 pub max_requests_per_second: Option<f64>,
35}
36
37impl Default for ClientConfig {
38 fn default() -> Self {
39 Self {
40 default_timeout: Duration::from_secs(5), pool_config: PoolConfig::default(),
42 enable_pooling: true,
43 max_retries: 3, retry_delay: Duration::from_millis(25), max_concurrent_requests: 16, max_requests_per_second: Some(50.0), }
48 }
49}
50
51pub struct IpcHttpClient {
56 name: Name<'static>,
57 config: ClientConfig,
58 pool: Option<ConnectionPool>,
59 retry_executor: RetryExecutor,
60 put_retry_executor: RetryExecutor,
62}
63
64pub struct HttpRequestBuilder<'a> {
66 client: &'a IpcHttpClient,
67 method: Method,
68 path: String,
69 body: Option<Value>,
70 timeout: Option<Duration>,
71 headers: Vec<(String, String)>,
72 put_optimized: bool,
74 expected_size: Option<usize>,
76}
77
78#[derive(Debug)]
80pub struct HttpResponse {
81 inner: Response,
82}
83
84impl HttpResponse {
85 fn new(response: Response) -> Self {
86 Self { inner: response }
87 }
88
89 pub fn status(&self) -> u16 {
91 self.inner.status_code()
92 }
93
94 pub fn headers(&self) -> Value {
96 let headers_map: std::collections::HashMap<String, String> = self
97 .inner
98 .headers()
99 .iter()
100 .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
101 .collect();
102 serde_json::to_value(headers_map).unwrap_or(Value::Null)
103 }
104
105 pub fn body(&self) -> Result<String> {
107 self.inner.text()
108 }
109
110 pub fn is_success(&self) -> bool {
112 self.inner.is_success()
113 }
114
115 pub fn is_client_error(&self) -> bool {
117 self.inner.is_client_error()
118 }
119
120 pub fn is_server_error(&self) -> bool {
122 self.inner.is_server_error()
123 }
124
125 pub fn content_length(&self) -> u64 {
127 self.inner.content_length().unwrap_or(0)
128 }
129
130 pub fn json<T>(&self) -> Result<T>
132 where
133 T: DeserializeOwned,
134 {
135 self.inner.json()
136 }
137
138 pub fn json_value(&self) -> Result<Value> {
140 self.inner.json_value()
141 }
142
143 pub fn into_inner(self) -> Response {
145 self.inner
146 }
147
148 pub fn to_legacy(&self) -> crate::response::LegacyResponse {
150 self.inner.to_legacy()
151 }
152}
153
154impl IpcHttpClient {
155 pub fn new<P>(path: P) -> Result<Self>
157 where
158 P: AsRef<Path>,
159 {
160 Self::with_config(path, ClientConfig::default())
161 }
162
163 pub fn with_config<P>(path: P, config: ClientConfig) -> Result<Self>
165 where
166 P: AsRef<Path>,
167 {
168 let name = path
169 .as_ref()
170 .to_fs_name::<GenericFilePath>()
171 .map_err(|e| KodeBridgeError::configuration(format!("Invalid path: {}", e)))?
172 .into_owned();
173
174 let pool = if config.enable_pooling {
175 Some(ConnectionPool::new(
176 name.clone(),
177 config.pool_config.clone(),
178 ))
179 } else {
180 None
181 };
182
183 let retry_config = RetryConfig::for_network_operations()
185 .max_attempts(config.max_retries.min(3)) .base_delay(Duration::from_millis(
187 config.pool_config.retry_delay_ms.min(25),
188 )); let retry_executor = RetryExecutor::new(retry_config);
191
192 let put_retry_config = RetryConfig::for_put_requests();
194 let put_retry_executor = RetryExecutor::new(put_retry_config);
195
196 Ok(Self {
197 name,
198 config,
199 pool,
200 retry_executor,
201 put_retry_executor,
202 })
203 }
204
205 async fn create_direct_connection(&self) -> Result<LocalSocketStream> {
207 let mut last_error = None;
208
209 for attempt in 0..self.config.max_retries {
210 if attempt > 0 {
211 tokio::time::sleep(self.config.retry_delay).await;
212 }
213
214 match LocalSocketStream::connect(self.name.clone()).await {
215 Ok(stream) => {
216 debug!("Created direct connection on attempt {}", attempt + 1);
217 return Ok(stream);
218 }
219 Err(e) => {
220 trace!("Connection attempt {} failed: {}", attempt + 1, e);
221 last_error = Some(e);
222 }
223 }
224 }
225
226 Err(KodeBridgeError::connection(format!(
227 "Failed to create connection after {} attempts: {}",
228 self.config.max_retries,
229 last_error
230 .map(|e| e.to_string())
231 .unwrap_or_else(|| "Unknown error".to_string())
232 )))
233 }
234
235 async fn get_connection(&self) -> Result<Either<PooledConnection, LocalSocketStream>> {
237 let metrics = global_metrics();
238
239 if let Some(ref pool) = self.pool {
240 match pool.get_connection().await {
241 Ok(conn) => {
242 metrics.connection_created(true); Ok(Either::Pool(conn))
244 }
245 Err(e) => {
246 metrics.connection_failed();
247 Err(e)
248 }
249 }
250 } else {
251 match self.create_direct_connection().await {
252 Ok(stream) => {
253 metrics.connection_created(false); Ok(Either::Direct(stream))
255 }
256 Err(e) => {
257 metrics.connection_failed();
258 Err(e)
259 }
260 }
261 }
262 }
263
264 pub async fn request(
266 &self,
267 method: &str,
268 path: &str,
269 body: Option<&serde_json::Value>,
270 ) -> crate::errors::AnyResult<crate::response::LegacyResponse> {
271 let response = self
272 .send_request_internal(method, path, body, self.config.default_timeout)
273 .await?;
274 Ok(response.to_legacy())
275 }
276
277 async fn send_request_internal(
279 &self,
280 method: &str,
281 path: &str,
282 body: Option<&Value>,
283 timeout: Duration,
284 ) -> Result<Response> {
285 let method = Method::from_str(method)
286 .map_err(|e| KodeBridgeError::invalid_request(format!("Invalid method: {}", e)))?;
287
288 let mut builder = RequestBuilder::new(method.clone(), path.to_string());
289
290 if let Some(json_body) = body {
294 builder = builder.json(json_body)?;
295 }
296
297 let request = builder.build()?;
298
299 self.retry_executor
301 .execute_with_context(&format!("{} {}", method.as_str(), path), || async {
302 let result = tokio::time::timeout(timeout, async {
304 let mut connection = self.get_connection().await?;
305
306 match &mut connection {
307 Either::Pool(conn) => {
308 if let Some(stream) = conn.stream() {
309 send_request(stream, request.clone()).await
310 } else {
311 Err(KodeBridgeError::connection("Pooled connection is invalid"))
312 }
313 }
314 Either::Direct(stream) => send_request(stream, request.clone()).await,
315 }
316 })
317 .await;
318
319 match result {
320 Ok(response) => response,
321 Err(_) => Err(KodeBridgeError::timeout(timeout.as_millis() as u64)),
322 }
323 })
324 .await
325 }
326
327 async fn send_request_with_optimization(
329 &self,
330 method: &str,
331 path: &str,
332 body: Option<&Value>,
333 headers: &[(String, String)],
334 timeout: Duration,
335 is_put_optimized: bool,
336 expected_size: Option<usize>,
337 ) -> Result<Response> {
338 let method_enum = Method::from_str(method)
339 .map_err(|e| KodeBridgeError::invalid_request(format!("Invalid method: {}", e)))?;
340
341 let mut builder = RequestBuilder::new(method_enum.clone(), path.to_string());
342
343 for (key, value) in headers {
345 builder = builder.header(key.as_str(), value.as_str());
346 }
347
348 if let Some(json_body) = body {
349 builder = builder.json(json_body)?;
350 }
351
352 let request = builder.build()?;
353
354 let retry_context = if is_put_optimized {
356 format!("PUT_OPTIMIZED {}", path)
357 } else {
358 format!("{} {}", method, path)
359 };
360
361 let retry_executor = if is_put_optimized {
363 &self.put_retry_executor } else {
365 &self.retry_executor };
367
368 retry_executor
369 .execute_with_context(&retry_context, || async {
370 let result = tokio::time::timeout(timeout, async {
372 let mut connection = if is_put_optimized && expected_size.unwrap_or(0) > 10240 {
373 self.get_fresh_connection().await?
375 } else {
376 self.get_connection().await?
377 };
378
379 match &mut connection {
380 Either::Pool(conn) => {
381 if let Some(stream) = conn.stream() {
382 send_request(stream, request.clone()).await
383 } else {
384 Err(KodeBridgeError::connection("Pooled connection is invalid"))
385 }
386 }
387 Either::Direct(stream) => send_request(stream, request.clone()).await,
388 }
389 })
390 .await;
391
392 match result {
393 Ok(response) => response,
394 Err(_) => Err(KodeBridgeError::timeout(timeout.as_millis() as u64)),
395 }
396 })
397 .await
398 }
399
400 async fn get_fresh_connection(&self) -> Result<Either<PooledConnection, LocalSocketStream>> {
402 use interprocess::local_socket::tokio::prelude::LocalSocketStream;
403
404 if let Some(ref pool) = self.pool {
406 match tokio::time::timeout(Duration::from_millis(20), pool.get_fresh_connection()).await
407 {
408 Ok(Ok(conn)) => return Ok(Either::Pool(conn)),
409 Ok(Err(_)) | Err(_) => {
410 }
412 }
413 }
414
415 match tokio::time::timeout(
417 Duration::from_millis(100),
418 LocalSocketStream::connect(self.name.clone()),
419 )
420 .await
421 {
422 Ok(Ok(stream)) => Ok(Either::Direct(stream)),
423 Ok(Err(_)) | Err(_) => {
424 if let Some(ref pool) = self.pool {
426 let conn = pool.get_connection().await?;
427 Ok(Either::Pool(conn))
428 } else {
429 Err(KodeBridgeError::connection(
430 "Failed to get fresh connection",
431 ))
432 }
433 }
434 }
435 }
436
437 pub fn get(&self, path: &str) -> HttpRequestBuilder<'_> {
439 HttpRequestBuilder::new(self, Method::GET, path)
440 }
441
442 pub fn post(&self, path: &str) -> HttpRequestBuilder<'_> {
444 HttpRequestBuilder::new(self, Method::POST, path)
445 }
446
447 pub fn put(&self, path: &str) -> HttpRequestBuilder<'_> {
449 HttpRequestBuilder::new(self, Method::PUT, path)
450 }
451
452 pub async fn put_batch(&self, requests: Vec<(String, Value)>) -> Result<Vec<HttpResponse>> {
454 let batch_size = requests.len();
455 if batch_size == 0 {
456 return Ok(Vec::new());
457 }
458
459 let concurrent_limit = std::cmp::min(self.config.max_concurrent_requests, batch_size);
461 let mut responses = Vec::with_capacity(batch_size);
462
463 for chunk in requests.chunks(concurrent_limit) {
465 let mut futures = Vec::new();
466
467 for (path, body) in chunk {
468 let path = path.clone();
469 let body = body.clone();
470
471 let future = self.put(&path).json_body(&body).optimize_for_put().send();
472
473 futures.push(future);
474 }
475
476 let chunk_results = futures::future::join_all(futures).await;
478
479 for result in chunk_results {
480 match result {
481 Ok(response) => responses.push(response),
482 Err(e) => return Err(e),
483 }
484 }
485 }
486
487 Ok(responses)
488 }
489
490 pub fn delete(&self, path: &str) -> HttpRequestBuilder<'_> {
492 HttpRequestBuilder::new(self, Method::DELETE, path)
493 }
494
495 pub fn patch(&self, path: &str) -> HttpRequestBuilder<'_> {
497 HttpRequestBuilder::new(self, Method::PATCH, path)
498 }
499
500 pub fn head(&self, path: &str) -> HttpRequestBuilder<'_> {
502 HttpRequestBuilder::new(self, Method::HEAD, path)
503 }
504
505 pub fn options(&self, path: &str) -> HttpRequestBuilder<'_> {
507 HttpRequestBuilder::new(self, Method::OPTIONS, path)
508 }
509
510 pub fn pool_stats(&self) -> Option<crate::pool::PoolStats> {
512 self.pool.as_ref().map(|p| p.stats())
513 }
514
515 pub fn close(&self) {
517 if let Some(ref pool) = self.pool {
518 pool.close();
519 }
520 }
521
522 pub async fn preheat_for_puts(&self, count: usize) {
524 if let Some(ref pool) = self.pool {
525 pool.preheat_for_puts(count).await;
526 }
527 }
528
529 fn calculate_smart_timeout(&self, method: &str, body_size: Option<usize>) -> Duration {
531 match method {
532 "PUT" | "POST" => {
533 match body_size {
534 Some(size) if size > 5 * 1024 * 1024 => Duration::from_secs(30), Some(size) if size > 1024 * 1024 => Duration::from_secs(15), Some(size) if size > 100 * 1024 => Duration::from_secs(8), Some(size) if size > 10 * 1024 => Duration::from_secs(4), _ => Duration::from_secs(2), }
540 }
541 _ => self.config.default_timeout, }
543 }
544}
545
546impl<'a> HttpRequestBuilder<'a> {
547 fn new(client: &'a IpcHttpClient, method: Method, path: &str) -> Self {
548 let is_put = method == Method::PUT;
549 Self {
550 client,
551 method,
552 path: path.to_string(),
553 body: None,
554 timeout: None,
555 headers: Vec::new(),
556 put_optimized: is_put, expected_size: None,
558 }
559 }
560
561 pub fn json_body(mut self, body: &Value) -> Self {
563 self.body = Some(body.clone());
564
565 if self.method == Method::PUT {
567 if let Ok(json_bytes) = serde_json::to_vec(body) {
568 self.expected_size = Some(json_bytes.len());
569 }
570 }
571
572 self
573 }
574
575 pub fn timeout(mut self, timeout: Duration) -> Self {
577 self.timeout = Some(timeout);
578 self
579 }
580
581 pub fn expected_size(mut self, size: usize) -> Self {
583 self.expected_size = Some(size);
584 self
585 }
586
587 pub fn optimize_for_put(mut self) -> Self {
589 self.put_optimized = true;
590 self
591 }
592
593 pub fn header<K, V>(mut self, key: K, value: V) -> Self
595 where
596 K: Into<String>,
597 V: Into<String>,
598 {
599 self.headers.push((key.into(), value.into()));
600 self
601 }
602
603 pub async fn send(self) -> Result<HttpResponse> {
605 let metrics = global_metrics();
606 let tracker = metrics.request_start(self.method.as_str());
607
608 let timeout = if self.put_optimized {
610 self.timeout.unwrap_or_else(|| {
611 self.client
612 .calculate_smart_timeout(self.method.as_str(), self.expected_size)
613 })
614 } else {
615 self.timeout.unwrap_or(self.client.config.default_timeout)
616 };
617
618 match self
619 .client
620 .send_request_with_optimization(
621 self.method.as_str(),
622 &self.path,
623 self.body.as_ref(),
624 &self.headers,
625 timeout,
626 self.put_optimized,
627 self.expected_size,
628 )
629 .await
630 {
631 Ok(response) => {
632 tracker.success(response.status_code());
633 Ok(HttpResponse::new(response))
634 }
635 Err(e) => {
636 tracker.failure(&format!("{:?}", e));
637 Err(e)
638 }
639 }
640 }
641}
642
643enum Either<A, B> {
645 Pool(A),
646 Direct(B),
647}
648
649impl Drop for IpcHttpClient {
650 fn drop(&mut self) {
651 self.close();
652 }
653}