1use std::path::Path;
2use std::time::Duration;
3
4use interprocess::local_socket::tokio::prelude::LocalSocketStream;
5use interprocess::local_socket::traits::tokio::Stream as _;
6use interprocess::local_socket::{GenericFilePath, Name, ToFsName as _};
7use serde::de::DeserializeOwned;
8use serde_json::Value;
9
10use crate::errors::{KodeBridgeError, Result};
11use crate::http_client::{send_request, RequestBuilder, Response};
12use crate::metrics::global_metrics;
13use crate::pool::{ConnectionPool, PoolConfig, PooledConnection};
14use crate::retry::{RetryConfig, RetryExecutor};
15use http::Method;
16use std::str::FromStr as _;
17use tracing::{debug, trace};
18
19#[derive(Debug, Clone)]
21pub struct ClientConfig {
22 pub default_timeout: Duration,
24 pub pool_config: PoolConfig,
26 pub enable_pooling: bool,
28 pub max_retries: usize,
30 pub retry_delay: Duration,
31 pub max_concurrent_requests: usize,
33 pub max_requests_per_second: Option<f64>,
35}
36
37impl Default for ClientConfig {
38 fn default() -> Self {
39 Self {
40 default_timeout: Duration::from_secs(5), pool_config: PoolConfig::default(),
42 enable_pooling: true,
43 max_retries: 3, retry_delay: Duration::from_millis(25), max_concurrent_requests: 16, max_requests_per_second: Some(50.0), }
48 }
49}
50
51pub struct IpcHttpClient {
56 name: Name<'static>,
57 config: ClientConfig,
58 pool: Option<ConnectionPool>,
59 retry_executor: RetryExecutor,
60 put_retry_executor: RetryExecutor,
62}
63
64pub struct HttpRequestBuilder<'a> {
66 client: &'a IpcHttpClient,
67 method: Method,
68 path: String,
69 body: Option<Value>,
70 timeout: Option<Duration>,
71 headers: Vec<(String, String)>,
72 put_optimized: bool,
74 expected_size: Option<usize>,
76}
77
78#[derive(Debug)]
80pub struct HttpResponse {
81 inner: Response,
82}
83
84impl HttpResponse {
85 const fn new(response: Response) -> Self {
86 Self { inner: response }
87 }
88
89 pub const fn status(&self) -> u16 {
91 self.inner.status_code()
92 }
93
94 pub fn headers(&self) -> Value {
96 let headers_map: std::collections::HashMap<String, String> = self
97 .inner
98 .headers()
99 .iter()
100 .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
101 .collect();
102 serde_json::to_value(headers_map).unwrap_or(Value::Null)
103 }
104
105 pub fn body(&self) -> Result<String> {
107 self.inner.text()
108 }
109
110 pub fn is_success(&self) -> bool {
112 self.inner.is_success()
113 }
114
115 pub fn is_client_error(&self) -> bool {
117 self.inner.is_client_error()
118 }
119
120 pub fn is_server_error(&self) -> bool {
122 self.inner.is_server_error()
123 }
124
125 pub fn content_length(&self) -> u64 {
127 self.inner.content_length().unwrap_or(0)
128 }
129
130 pub fn json<T>(&self) -> Result<T>
132 where
133 T: DeserializeOwned,
134 {
135 self.inner.json()
136 }
137
138 pub fn json_value(&self) -> Result<Value> {
140 self.inner.json_value()
141 }
142
143 pub fn into_inner(self) -> Response {
145 self.inner
146 }
147
148 pub fn to_legacy(&self) -> crate::response::LegacyResponse {
150 self.inner.to_legacy()
151 }
152}
153
154impl IpcHttpClient {
155 pub fn new<P>(path: P) -> Result<Self>
157 where
158 P: AsRef<Path>,
159 {
160 Self::with_config(path, ClientConfig::default())
161 }
162
163 pub fn with_config<P>(path: P, config: ClientConfig) -> Result<Self>
165 where
166 P: AsRef<Path>,
167 {
168 let name = path
169 .as_ref()
170 .to_fs_name::<GenericFilePath>()
171 .map_err(|e| KodeBridgeError::configuration(format!("Invalid path: {}", e)))?
172 .into_owned();
173
174 let pool = if config.enable_pooling {
175 Some(ConnectionPool::new(name.clone(), config.pool_config.clone()))
176 } else {
177 None
178 };
179
180 let retry_config = RetryConfig::for_network_operations()
182 .max_attempts(config.max_retries)
183 .base_delay(Duration::from_millis(config.pool_config.retry_delay_ms));
184
185 let retry_executor = RetryExecutor::new(retry_config);
186
187 let put_retry_config = RetryConfig::for_put_requests();
189 let put_retry_executor = RetryExecutor::new(put_retry_config);
190
191 Ok(Self {
192 name,
193 config,
194 pool,
195 retry_executor,
196 put_retry_executor,
197 })
198 }
199
200 async fn create_direct_connection(&self) -> Result<LocalSocketStream> {
202 let mut last_error = None;
203
204 for attempt in 0..self.config.max_retries {
205 if attempt > 0 {
206 tokio::time::sleep(self.config.retry_delay).await;
207 }
208
209 match LocalSocketStream::connect(self.name.clone()).await {
210 Ok(stream) => {
211 debug!("Created direct connection on attempt {}", attempt + 1);
212 return Ok(stream);
213 }
214 Err(e) => {
215 trace!("Connection attempt {} failed: {}", attempt + 1, e);
216 last_error = Some(e);
217 }
218 }
219 }
220
221 Err(KodeBridgeError::connection(format!(
222 "Failed to create connection after {} attempts: {}",
223 self.config.max_retries,
224 last_error
225 .map(|e| e.to_string())
226 .unwrap_or_else(|| "Unknown error".to_string())
227 )))
228 }
229
230 async fn get_connection(&self) -> Result<Either<PooledConnection, LocalSocketStream>> {
232 let metrics = global_metrics();
233
234 if let Some(ref pool) = self.pool {
235 match pool.get_connection().await {
236 Ok(conn) => {
237 metrics.connection_created(true); Ok(Either::Pool(conn))
239 }
240 Err(e) => {
241 metrics.connection_failed();
242 Err(e)
243 }
244 }
245 } else {
246 match self.create_direct_connection().await {
247 Ok(stream) => {
248 metrics.connection_created(false); Ok(Either::Direct(stream))
250 }
251 Err(e) => {
252 metrics.connection_failed();
253 Err(e)
254 }
255 }
256 }
257 }
258
259 pub async fn request(
261 &self,
262 method: &str,
263 path: &str,
264 body: Option<&serde_json::Value>,
265 ) -> crate::errors::AnyResult<crate::response::LegacyResponse> {
266 let response = self
267 .send_request_internal(method, path, body, self.config.default_timeout)
268 .await?;
269 Ok(response.to_legacy())
270 }
271
272 async fn send_request_internal(
274 &self,
275 method: &str,
276 path: &str,
277 body: Option<&Value>,
278 timeout: Duration,
279 ) -> Result<Response> {
280 let method =
281 Method::from_str(method).map_err(|e| KodeBridgeError::invalid_request(format!("Invalid method: {}", e)))?;
282
283 let mut builder = RequestBuilder::new(method.clone(), path.to_string());
284
285 if let Some(json_body) = body {
289 builder = builder.json(json_body)?;
290 }
291
292 let request = builder.build()?;
293
294 self.retry_executor
296 .execute_with_context(&format!("{} {}", method.as_str(), path), || async {
297 let result = tokio::time::timeout(timeout, async {
299 let mut connection = self.get_connection().await?;
300
301 match &mut connection {
302 Either::Pool(conn) => {
303 if let Some(stream) = conn.stream() {
304 send_request(stream, request.clone()).await
305 } else {
306 Err(KodeBridgeError::connection("Pooled connection is invalid"))
307 }
308 }
309 Either::Direct(stream) => send_request(stream, request.clone()).await,
310 }
311 })
312 .await;
313
314 match result {
315 Ok(response) => response,
316 Err(_) => Err(KodeBridgeError::timeout(timeout.as_millis() as u64)),
317 }
318 })
319 .await
320 }
321
322 async fn send_request_with_optimization(
324 &self,
325 method: &str,
326 path: &str,
327 body: Option<&Value>,
328 headers: &[(String, String)],
329 timeout: Duration,
330 is_put_optimized: bool,
331 expected_size: Option<usize>,
332 ) -> Result<Response> {
333 let method_enum =
334 Method::from_str(method).map_err(|e| KodeBridgeError::invalid_request(format!("Invalid method: {}", e)))?;
335
336 let mut builder = RequestBuilder::new(method_enum.clone(), path.to_string());
337
338 for (key, value) in headers {
340 builder = builder.header(key.as_str(), value.as_str());
341 }
342
343 if let Some(json_body) = body {
344 builder = builder.json(json_body)?;
345 }
346
347 let request = builder.build()?;
348
349 let retry_context = if is_put_optimized {
351 format!("PUT_OPTIMIZED {}", path)
352 } else {
353 format!("{} {}", method, path)
354 };
355
356 let retry_executor = if is_put_optimized {
358 &self.put_retry_executor } else {
360 &self.retry_executor };
362
363 retry_executor
364 .execute_with_context(&retry_context, || async {
365 let result = tokio::time::timeout(timeout, async {
367 let mut connection = if is_put_optimized && expected_size.unwrap_or(0) > 10240 {
368 self.get_fresh_connection().await?
370 } else {
371 self.get_connection().await?
372 };
373
374 match &mut connection {
375 Either::Pool(conn) => {
376 if let Some(stream) = conn.stream() {
377 send_request(stream, request.clone()).await
378 } else {
379 Err(KodeBridgeError::connection("Pooled connection is invalid"))
380 }
381 }
382 Either::Direct(stream) => send_request(stream, request.clone()).await,
383 }
384 })
385 .await;
386
387 match result {
388 Ok(response) => response,
389 Err(_) => Err(KodeBridgeError::timeout(timeout.as_millis() as u64)),
390 }
391 })
392 .await
393 }
394
395 async fn get_fresh_connection(&self) -> Result<Either<PooledConnection, LocalSocketStream>> {
397 use interprocess::local_socket::tokio::prelude::LocalSocketStream;
398
399 if let Some(ref pool) = self.pool {
401 match tokio::time::timeout(Duration::from_millis(20), pool.get_fresh_connection()).await {
402 Ok(Ok(conn)) => return Ok(Either::Pool(conn)),
403 Ok(Err(_)) | Err(_) => {
404 }
406 }
407 }
408
409 match tokio::time::timeout(
411 Duration::from_millis(100),
412 LocalSocketStream::connect(self.name.clone()),
413 )
414 .await
415 {
416 Ok(Ok(stream)) => Ok(Either::Direct(stream)),
417 Ok(Err(_)) | Err(_) => {
418 if let Some(ref pool) = self.pool {
420 let conn = pool.get_connection().await?;
421 Ok(Either::Pool(conn))
422 } else {
423 Err(KodeBridgeError::connection("Failed to get fresh connection"))
424 }
425 }
426 }
427 }
428
429 pub fn get(&self, path: &str) -> HttpRequestBuilder<'_> {
431 HttpRequestBuilder::new(self, Method::GET, path)
432 }
433
434 pub fn post(&self, path: &str) -> HttpRequestBuilder<'_> {
436 HttpRequestBuilder::new(self, Method::POST, path)
437 }
438
439 pub fn put(&self, path: &str) -> HttpRequestBuilder<'_> {
441 HttpRequestBuilder::new(self, Method::PUT, path)
442 }
443
444 pub async fn put_batch(&self, requests: Vec<(String, Value)>) -> Result<Vec<HttpResponse>> {
446 let batch_size = requests.len();
447 if batch_size == 0 {
448 return Ok(Vec::new());
449 }
450
451 let concurrent_limit = std::cmp::min(self.config.max_concurrent_requests, batch_size);
453 let mut responses = Vec::with_capacity(batch_size);
454
455 for chunk in requests.chunks(concurrent_limit) {
457 let mut futures = Vec::new();
458
459 for (path, body) in chunk {
460 let path = path.clone();
461 let body = body.clone();
462
463 let future = self.put(&path).json_body(&body).optimize_for_put().send();
464
465 futures.push(future);
466 }
467
468 let chunk_results = futures::future::join_all(futures).await;
470
471 for result in chunk_results {
472 match result {
473 Ok(response) => responses.push(response),
474 Err(e) => return Err(e),
475 }
476 }
477 }
478
479 Ok(responses)
480 }
481
482 pub fn delete(&self, path: &str) -> HttpRequestBuilder<'_> {
484 HttpRequestBuilder::new(self, Method::DELETE, path)
485 }
486
487 pub fn patch(&self, path: &str) -> HttpRequestBuilder<'_> {
489 HttpRequestBuilder::new(self, Method::PATCH, path)
490 }
491
492 pub fn head(&self, path: &str) -> HttpRequestBuilder<'_> {
494 HttpRequestBuilder::new(self, Method::HEAD, path)
495 }
496
497 pub fn options(&self, path: &str) -> HttpRequestBuilder<'_> {
499 HttpRequestBuilder::new(self, Method::OPTIONS, path)
500 }
501
502 pub fn pool_stats(&self) -> Option<crate::pool::PoolStats> {
504 self.pool.as_ref().map(|p| p.stats())
505 }
506
507 pub fn close(&self) {
509 if let Some(ref pool) = self.pool {
510 pool.close();
511 }
512 }
513
514 pub async fn preheat_for_puts(&self, count: usize) {
516 if let Some(ref pool) = self.pool {
517 pool.preheat_for_puts(count).await;
518 }
519 }
520
521 fn calculate_smart_timeout(&self, method: &str, body_size: Option<usize>) -> Duration {
523 match method {
524 "PUT" | "POST" => {
525 match body_size {
526 Some(size) if size > 5 * 1024 * 1024 => Duration::from_secs(30), Some(size) if size > 1024 * 1024 => Duration::from_secs(15), Some(size) if size > 100 * 1024 => Duration::from_secs(8), Some(size) if size > 10 * 1024 => Duration::from_secs(4), _ => Duration::from_secs(2), }
532 }
533 _ => self.config.default_timeout, }
535 }
536}
537
538impl<'a> HttpRequestBuilder<'a> {
539 fn new(client: &'a IpcHttpClient, method: Method, path: &str) -> Self {
540 let is_put = method == Method::PUT;
541 Self {
542 client,
543 method,
544 path: path.to_string(),
545 body: None,
546 timeout: None,
547 headers: Vec::new(),
548 put_optimized: is_put, expected_size: None,
550 }
551 }
552
553 pub fn json_body(mut self, body: &Value) -> Self {
555 self.body = Some(body.clone());
556
557 if self.method == Method::PUT {
559 if let Ok(json_bytes) = serde_json::to_vec(body) {
560 self.expected_size = Some(json_bytes.len());
561 }
562 }
563
564 self
565 }
566
567 pub const fn timeout(mut self, timeout: Duration) -> Self {
569 self.timeout = Some(timeout);
570 self
571 }
572
573 pub const fn expected_size(mut self, size: usize) -> Self {
575 self.expected_size = Some(size);
576 self
577 }
578
579 pub const fn optimize_for_put(mut self) -> Self {
581 self.put_optimized = true;
582 self
583 }
584
585 pub fn header<K, V>(mut self, key: K, value: V) -> Self
587 where
588 K: Into<String>,
589 V: Into<String>,
590 {
591 self.headers.push((key.into(), value.into()));
592 self
593 }
594
595 pub async fn send(self) -> Result<HttpResponse> {
597 let metrics = global_metrics();
598 let tracker = metrics.request_start(self.method.as_str());
599
600 let timeout = if self.put_optimized {
602 self.timeout.unwrap_or_else(|| {
603 self.client
604 .calculate_smart_timeout(self.method.as_str(), self.expected_size)
605 })
606 } else {
607 self.timeout.unwrap_or(self.client.config.default_timeout)
608 };
609
610 match self
611 .client
612 .send_request_with_optimization(
613 self.method.as_str(),
614 &self.path,
615 self.body.as_ref(),
616 &self.headers,
617 timeout,
618 self.put_optimized,
619 self.expected_size,
620 )
621 .await
622 {
623 Ok(response) => {
624 tracker.success(response.status_code());
625 Ok(HttpResponse::new(response))
626 }
627 Err(e) => {
628 tracker.failure(&format!("{:?}", e));
629 Err(e)
630 }
631 }
632 }
633}
634
635enum Either<A, B> {
637 Pool(A),
638 Direct(B),
639}
640
641impl Drop for IpcHttpClient {
642 fn drop(&mut self) {
643 self.close();
644 }
645}