1use std::path::Path;
2use std::time::Duration;
3
4use interprocess::local_socket::tokio::prelude::LocalSocketStream;
5use interprocess::local_socket::traits::tokio::Stream;
6use interprocess::local_socket::{GenericFilePath, Name, ToFsName};
7use serde::de::DeserializeOwned;
8use serde_json::Value;
9
10use crate::errors::{KodeBridgeError, Result};
11use crate::http_client::{send_request, RequestBuilder, Response};
12use crate::metrics::global_metrics;
13use crate::pool::{ConnectionPool, PoolConfig, PooledConnection};
14use crate::retry::{RetryConfig, RetryExecutor};
15use http::Method;
16use std::str::FromStr;
17use tracing::{debug, trace};
18
19#[derive(Debug, Clone)]
21pub struct ClientConfig {
22 pub default_timeout: Duration,
24 pub pool_config: PoolConfig,
26 pub enable_pooling: bool,
28 pub max_retries: usize,
30 pub retry_delay: Duration,
31 pub max_concurrent_requests: usize,
33 pub max_requests_per_second: Option<f64>,
35}
36
37impl Default for ClientConfig {
38 fn default() -> Self {
39 Self {
40 default_timeout: Duration::from_secs(5), pool_config: PoolConfig::default(),
42 enable_pooling: true,
43 max_retries: 3, retry_delay: Duration::from_millis(25), max_concurrent_requests: 16, max_requests_per_second: Some(50.0), }
48 }
49}
50
51pub struct IpcHttpClient {
56 name: Name<'static>,
57 config: ClientConfig,
58 pool: Option<ConnectionPool>,
59 retry_executor: RetryExecutor,
60 put_retry_executor: RetryExecutor,
62}
63
64pub struct HttpRequestBuilder<'a> {
66 client: &'a IpcHttpClient,
67 method: Method,
68 path: String,
69 body: Option<Value>,
70 timeout: Option<Duration>,
71 headers: Vec<(String, String)>,
72 put_optimized: bool,
74 expected_size: Option<usize>,
76}
77
78#[derive(Debug)]
80pub struct HttpResponse {
81 inner: Response,
82}
83
84impl HttpResponse {
85 fn new(response: Response) -> Self {
86 Self { inner: response }
87 }
88
89 pub fn status(&self) -> u16 {
91 self.inner.status_code()
92 }
93
94 pub fn headers(&self) -> Value {
96 let headers_map: std::collections::HashMap<String, String> = self
97 .inner
98 .headers()
99 .iter()
100 .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
101 .collect();
102 serde_json::to_value(headers_map).unwrap_or(Value::Null)
103 }
104
105 pub fn body(&self) -> Result<String> {
107 self.inner.text()
108 }
109
110 pub fn is_success(&self) -> bool {
112 self.inner.is_success()
113 }
114
115 pub fn is_client_error(&self) -> bool {
117 self.inner.is_client_error()
118 }
119
120 pub fn is_server_error(&self) -> bool {
122 self.inner.is_server_error()
123 }
124
125 pub fn content_length(&self) -> u64 {
127 self.inner.content_length().unwrap_or(0)
128 }
129
130 pub fn json<T>(&self) -> Result<T>
132 where
133 T: DeserializeOwned,
134 {
135 self.inner.json()
136 }
137
138 pub fn json_value(&self) -> Result<Value> {
140 self.inner.json_value()
141 }
142
143 pub fn into_inner(self) -> Response {
145 self.inner
146 }
147
148 pub fn to_legacy(&self) -> crate::response::LegacyResponse {
150 self.inner.to_legacy()
151 }
152}
153
154impl IpcHttpClient {
155 pub fn new<P>(path: P) -> Result<Self>
157 where
158 P: AsRef<Path>,
159 {
160 Self::with_config(path, ClientConfig::default())
161 }
162
163 pub fn with_config<P>(path: P, config: ClientConfig) -> Result<Self>
165 where
166 P: AsRef<Path>,
167 {
168 let name = path
169 .as_ref()
170 .to_fs_name::<GenericFilePath>()
171 .map_err(|e| KodeBridgeError::configuration(format!("Invalid path: {}", e)))?
172 .into_owned();
173
174 let pool = if config.enable_pooling {
175 Some(ConnectionPool::new(
176 name.clone(),
177 config.pool_config.clone(),
178 ))
179 } else {
180 None
181 };
182
183 let retry_config = RetryConfig::for_network_operations()
185 .max_attempts(config.max_retries.min(3)) .base_delay(Duration::from_millis(
187 config.pool_config.retry_delay_ms.min(25),
188 )); let retry_executor = RetryExecutor::new(retry_config);
191
192 let put_retry_config = RetryConfig::for_put_requests();
194 let put_retry_executor = RetryExecutor::new(put_retry_config);
195
196 Ok(Self {
197 name,
198 config,
199 pool,
200 retry_executor,
201 put_retry_executor,
202 })
203 }
204
205 async fn create_direct_connection(&self) -> Result<LocalSocketStream> {
207 let mut last_error = None;
208
209 for attempt in 0..self.config.max_retries {
210 if attempt > 0 {
211 tokio::time::sleep(self.config.retry_delay).await;
212 }
213
214 match LocalSocketStream::connect(self.name.clone()).await {
215 Ok(stream) => {
216 debug!("Created direct connection on attempt {}", attempt + 1);
217 return Ok(stream);
218 }
219 Err(e) => {
220 trace!("Connection attempt {} failed: {}", attempt + 1, e);
221 last_error = Some(e);
222 }
223 }
224 }
225
226 Err(KodeBridgeError::connection(format!(
227 "Failed to create connection after {} attempts: {}",
228 self.config.max_retries,
229 last_error
230 .map(|e| e.to_string())
231 .unwrap_or_else(|| "Unknown error".to_string())
232 )))
233 }
234
235 async fn get_connection(&self) -> Result<Either<PooledConnection, LocalSocketStream>> {
237 let metrics = global_metrics();
238
239 if let Some(ref pool) = self.pool {
240 match pool.get_connection().await {
241 Ok(conn) => {
242 metrics.connection_created(true); Ok(Either::Pool(conn))
244 }
245 Err(e) => {
246 metrics.connection_failed();
247 Err(e)
248 }
249 }
250 } else {
251 match self.create_direct_connection().await {
252 Ok(stream) => {
253 metrics.connection_created(false); Ok(Either::Direct(stream))
255 }
256 Err(e) => {
257 metrics.connection_failed();
258 Err(e)
259 }
260 }
261 }
262 }
263
264 pub async fn request(
266 &self,
267 method: &str,
268 path: &str,
269 body: Option<&serde_json::Value>,
270 ) -> crate::errors::AnyResult<crate::response::LegacyResponse> {
271 let response = self
272 .send_request_internal(method, path, body, self.config.default_timeout)
273 .await?;
274 Ok(response.to_legacy())
275 }
276
277 async fn send_request_internal(
279 &self,
280 method: &str,
281 path: &str,
282 body: Option<&Value>,
283 timeout: Duration,
284 ) -> Result<Response> {
285 let method = Method::from_str(method)
286 .map_err(|e| KodeBridgeError::invalid_request(format!("Invalid method: {}", e)))?;
287
288 let mut builder = RequestBuilder::new(method.clone(), path.to_string());
289
290 if let Some(json_body) = body {
291 builder = builder.json(json_body)?;
292 }
293
294 let request = builder.build()?;
295
296 self.retry_executor
298 .execute_with_context(&format!("{} {}", method.as_str(), path), || async {
299 let result = tokio::time::timeout(timeout, async {
301 let mut connection = self.get_connection().await?;
302
303 match &mut connection {
304 Either::Pool(conn) => {
305 if let Some(stream) = conn.stream() {
306 send_request(stream, request.clone()).await
307 } else {
308 Err(KodeBridgeError::connection("Pooled connection is invalid"))
309 }
310 }
311 Either::Direct(stream) => send_request(stream, request.clone()).await,
312 }
313 })
314 .await;
315
316 match result {
317 Ok(response) => response,
318 Err(_) => Err(KodeBridgeError::timeout(timeout.as_millis() as u64)),
319 }
320 })
321 .await
322 }
323
324 async fn send_request_with_optimization(
326 &self,
327 method: &str,
328 path: &str,
329 body: Option<&Value>,
330 timeout: Duration,
331 is_put_optimized: bool,
332 expected_size: Option<usize>,
333 ) -> Result<Response> {
334 let method_enum = Method::from_str(method)
335 .map_err(|e| KodeBridgeError::invalid_request(format!("Invalid method: {}", e)))?;
336
337 let mut builder = RequestBuilder::new(method_enum.clone(), path.to_string());
338
339 if let Some(json_body) = body {
340 builder = builder.json(json_body)?;
341 }
342
343 let request = builder.build()?;
344
345 let retry_context = if is_put_optimized {
347 format!("PUT_OPTIMIZED {}", path)
348 } else {
349 format!("{} {}", method, path)
350 };
351
352 let retry_executor = if is_put_optimized {
354 &self.put_retry_executor } else {
356 &self.retry_executor };
358
359 retry_executor
360 .execute_with_context(&retry_context, || async {
361 let result = tokio::time::timeout(timeout, async {
363 let mut connection = if is_put_optimized && expected_size.unwrap_or(0) > 10240 {
364 self.get_fresh_connection().await?
366 } else {
367 self.get_connection().await?
368 };
369
370 match &mut connection {
371 Either::Pool(conn) => {
372 if let Some(stream) = conn.stream() {
373 send_request(stream, request.clone()).await
374 } else {
375 Err(KodeBridgeError::connection("Pooled connection is invalid"))
376 }
377 }
378 Either::Direct(stream) => send_request(stream, request.clone()).await,
379 }
380 })
381 .await;
382
383 match result {
384 Ok(response) => response,
385 Err(_) => Err(KodeBridgeError::timeout(timeout.as_millis() as u64)),
386 }
387 })
388 .await
389 }
390
391 async fn get_fresh_connection(&self) -> Result<Either<PooledConnection, LocalSocketStream>> {
393 use interprocess::local_socket::tokio::prelude::LocalSocketStream;
394
395 if let Some(ref pool) = self.pool {
397 match tokio::time::timeout(Duration::from_millis(20), pool.get_fresh_connection()).await
398 {
399 Ok(Ok(conn)) => return Ok(Either::Pool(conn)),
400 Ok(Err(_)) | Err(_) => {
401 }
403 }
404 }
405
406 match tokio::time::timeout(
408 Duration::from_millis(100),
409 LocalSocketStream::connect(self.name.clone()),
410 )
411 .await
412 {
413 Ok(Ok(stream)) => Ok(Either::Direct(stream)),
414 Ok(Err(_)) | Err(_) => {
415 if let Some(ref pool) = self.pool {
417 let conn = pool.get_connection().await?;
418 Ok(Either::Pool(conn))
419 } else {
420 Err(KodeBridgeError::connection(
421 "Failed to get fresh connection",
422 ))
423 }
424 }
425 }
426 }
427
428 pub fn get(&self, path: &str) -> HttpRequestBuilder<'_> {
430 HttpRequestBuilder::new(self, Method::GET, path)
431 }
432
433 pub fn post(&self, path: &str) -> HttpRequestBuilder<'_> {
435 HttpRequestBuilder::new(self, Method::POST, path)
436 }
437
438 pub fn put(&self, path: &str) -> HttpRequestBuilder<'_> {
440 HttpRequestBuilder::new(self, Method::PUT, path)
441 }
442
443 pub async fn put_batch(&self, requests: Vec<(String, Value)>) -> Result<Vec<HttpResponse>> {
445 let batch_size = requests.len();
446 if batch_size == 0 {
447 return Ok(Vec::new());
448 }
449
450 let concurrent_limit = std::cmp::min(self.config.max_concurrent_requests, batch_size);
452 let mut responses = Vec::with_capacity(batch_size);
453
454 for chunk in requests.chunks(concurrent_limit) {
456 let mut futures = Vec::new();
457
458 for (path, body) in chunk {
459 let path = path.clone();
460 let body = body.clone();
461
462 let future = self.put(&path).json_body(&body).optimize_for_put().send();
463
464 futures.push(future);
465 }
466
467 let chunk_results = futures::future::join_all(futures).await;
469
470 for result in chunk_results {
471 match result {
472 Ok(response) => responses.push(response),
473 Err(e) => return Err(e),
474 }
475 }
476 }
477
478 Ok(responses)
479 }
480
481 pub fn delete(&self, path: &str) -> HttpRequestBuilder<'_> {
483 HttpRequestBuilder::new(self, Method::DELETE, path)
484 }
485
486 pub fn patch(&self, path: &str) -> HttpRequestBuilder<'_> {
488 HttpRequestBuilder::new(self, Method::PATCH, path)
489 }
490
491 pub fn head(&self, path: &str) -> HttpRequestBuilder<'_> {
493 HttpRequestBuilder::new(self, Method::HEAD, path)
494 }
495
496 pub fn options(&self, path: &str) -> HttpRequestBuilder<'_> {
498 HttpRequestBuilder::new(self, Method::OPTIONS, path)
499 }
500
501 pub fn pool_stats(&self) -> Option<crate::pool::PoolStats> {
503 self.pool.as_ref().map(|p| p.stats())
504 }
505
506 pub fn close(&self) {
508 if let Some(ref pool) = self.pool {
509 pool.close();
510 }
511 }
512
513 pub async fn preheat_for_puts(&self, count: usize) {
515 if let Some(ref pool) = self.pool {
516 pool.preheat_for_puts(count).await;
517 }
518 }
519
520 fn calculate_smart_timeout(&self, method: &str, body_size: Option<usize>) -> Duration {
522 match method {
523 "PUT" | "POST" => {
524 match body_size {
525 Some(size) if size > 5 * 1024 * 1024 => Duration::from_secs(30), Some(size) if size > 1024 * 1024 => Duration::from_secs(15), Some(size) if size > 100 * 1024 => Duration::from_secs(8), Some(size) if size > 10 * 1024 => Duration::from_secs(4), _ => Duration::from_secs(2), }
531 }
532 _ => self.config.default_timeout, }
534 }
535}
536
537impl<'a> HttpRequestBuilder<'a> {
538 fn new(client: &'a IpcHttpClient, method: Method, path: &str) -> Self {
539 let is_put = method == Method::PUT;
540 Self {
541 client,
542 method,
543 path: path.to_string(),
544 body: None,
545 timeout: None,
546 headers: Vec::new(),
547 put_optimized: is_put, expected_size: None,
549 }
550 }
551
552 pub fn json_body(mut self, body: &Value) -> Self {
554 self.body = Some(body.clone());
555
556 if self.method == Method::PUT {
558 if let Ok(json_bytes) = serde_json::to_vec(body) {
559 self.expected_size = Some(json_bytes.len());
560 }
561 }
562
563 self
564 }
565
566 pub fn timeout(mut self, timeout: Duration) -> Self {
568 self.timeout = Some(timeout);
569 self
570 }
571
572 pub fn expected_size(mut self, size: usize) -> Self {
574 self.expected_size = Some(size);
575 self
576 }
577
578 pub fn optimize_for_put(mut self) -> Self {
580 self.put_optimized = true;
581 self
582 }
583
584 pub fn header<K, V>(mut self, key: K, value: V) -> Self
586 where
587 K: Into<String>,
588 V: Into<String>,
589 {
590 self.headers.push((key.into(), value.into()));
591 self
592 }
593
594 pub async fn send(self) -> Result<HttpResponse> {
596 let metrics = global_metrics();
597 let tracker = metrics.request_start(self.method.as_str());
598
599 let timeout = if self.put_optimized {
601 self.timeout.unwrap_or_else(|| {
602 self.client
603 .calculate_smart_timeout(self.method.as_str(), self.expected_size)
604 })
605 } else {
606 self.timeout.unwrap_or(self.client.config.default_timeout)
607 };
608
609 match self
610 .client
611 .send_request_with_optimization(
612 self.method.as_str(),
613 &self.path,
614 self.body.as_ref(),
615 timeout,
616 self.put_optimized,
617 self.expected_size,
618 )
619 .await
620 {
621 Ok(response) => {
622 tracker.success(response.status_code());
623 Ok(HttpResponse::new(response))
624 }
625 Err(e) => {
626 tracker.failure(&format!("{:?}", e));
627 Err(e)
628 }
629 }
630 }
631}
632
633enum Either<A, B> {
635 Pool(A),
636 Direct(B),
637}
638
639impl Drop for IpcHttpClient {
640 fn drop(&mut self) {
641 self.close();
642 }
643}