1use std::path::Path;
2use std::time::Duration;
3
4use interprocess::local_socket::tokio::prelude::LocalSocketStream;
5use interprocess::local_socket::traits::tokio::Stream as _;
6use interprocess::local_socket::{GenericFilePath, Name, ToFsName as _};
7use serde::de::DeserializeOwned;
8use serde_json::Value;
9
10use crate::errors::{KodeBridgeError, Result};
11use crate::http_client::{send_request, RequestBuilder, Response};
12use crate::metrics::global_metrics;
13use crate::pool::{ConnectionPool, PoolConfig, PooledConnection};
14use crate::retry::{RetryConfig, RetryExecutor};
15use bytes::Bytes;
16use http::Method;
17use std::str::FromStr as _;
18use tracing::{debug, trace};
19
20#[derive(Debug, Clone)]
22pub struct ClientConfig {
23 pub default_timeout: Duration,
25 pub pool_config: PoolConfig,
27 pub enable_pooling: bool,
29 pub max_retries: usize,
31 pub retry_delay: Duration,
32 pub max_concurrent_requests: usize,
34 pub max_requests_per_second: Option<f64>,
36}
37
38impl Default for ClientConfig {
39 fn default() -> Self {
40 Self {
41 default_timeout: Duration::from_secs(5), pool_config: PoolConfig::default(),
43 enable_pooling: true,
44 max_retries: 3, retry_delay: Duration::from_millis(25), max_concurrent_requests: 16, max_requests_per_second: Some(50.0), }
49 }
50}
51
52pub struct IpcHttpClient {
57 name: Name<'static>,
58 config: ClientConfig,
59 pool: Option<ConnectionPool>,
60 retry_executor: RetryExecutor,
61 put_retry_executor: RetryExecutor,
63}
64
65pub struct HttpRequestBuilder<'a> {
67 client: &'a IpcHttpClient,
68 method: Method,
69 path: String,
70 body: Option<RequestBody>,
71 timeout: Option<Duration>,
72 headers: Vec<(String, String)>,
73 put_optimized: bool,
75 expected_size: Option<usize>,
77}
78
79enum RequestBody {
80 Json(Value),
81 JsonBytes(Bytes),
82}
83
84#[derive(Debug)]
86pub struct HttpResponse {
87 inner: Response,
88}
89
90impl HttpResponse {
91 const fn new(response: Response) -> Self {
92 Self { inner: response }
93 }
94
95 pub const fn status(&self) -> u16 {
97 self.inner.status_code()
98 }
99
100 pub fn headers(&self) -> Value {
102 let headers_map: std::collections::HashMap<String, String> = self
103 .inner
104 .headers()
105 .iter()
106 .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
107 .collect();
108 serde_json::to_value(headers_map).unwrap_or(Value::Null)
109 }
110
111 pub fn body(&self) -> Result<String> {
113 self.inner.text()
114 }
115
116 pub fn is_success(&self) -> bool {
118 self.inner.is_success()
119 }
120
121 pub fn is_client_error(&self) -> bool {
123 self.inner.is_client_error()
124 }
125
126 pub fn is_server_error(&self) -> bool {
128 self.inner.is_server_error()
129 }
130
131 pub fn content_length(&self) -> u64 {
133 self.inner.content_length().unwrap_or(0)
134 }
135
136 pub fn json<T>(&self) -> Result<T>
138 where
139 T: DeserializeOwned,
140 {
141 self.inner.json()
142 }
143
144 pub fn json_value(&self) -> Result<Value> {
146 self.inner.json_value()
147 }
148
149 pub fn into_inner(self) -> Response {
151 self.inner
152 }
153
154 pub fn to_legacy(&self) -> crate::response::LegacyResponse {
156 self.inner.to_legacy()
157 }
158}
159
160impl IpcHttpClient {
161 pub fn new<P>(path: P) -> Result<Self>
163 where
164 P: AsRef<Path>,
165 {
166 Self::with_config(path, ClientConfig::default())
167 }
168
169 pub fn with_config<P>(path: P, config: ClientConfig) -> Result<Self>
171 where
172 P: AsRef<Path>,
173 {
174 let name = path
175 .as_ref()
176 .to_fs_name::<GenericFilePath>()
177 .map_err(|e| KodeBridgeError::configuration(format!("Invalid path: {}", e)))?
178 .into_owned();
179
180 let pool = if config.enable_pooling {
181 Some(ConnectionPool::new(name.clone(), config.pool_config.clone()))
182 } else {
183 None
184 };
185
186 let retry_config = RetryConfig::for_network_operations()
188 .max_attempts(config.max_retries)
189 .base_delay(Duration::from_millis(config.pool_config.retry_delay_ms));
190
191 let retry_executor = RetryExecutor::new(retry_config);
192
193 let put_retry_config = RetryConfig::for_put_requests();
195 let put_retry_executor = RetryExecutor::new(put_retry_config);
196
197 Ok(Self {
198 name,
199 config,
200 pool,
201 retry_executor,
202 put_retry_executor,
203 })
204 }
205
206 async fn create_direct_connection(&self) -> Result<LocalSocketStream> {
208 let mut last_error = None;
209
210 for attempt in 0..self.config.max_retries {
211 if attempt > 0 {
212 tokio::time::sleep(self.config.retry_delay).await;
213 }
214
215 match LocalSocketStream::connect(self.name.clone()).await {
216 Ok(stream) => {
217 debug!("Created direct connection on attempt {}", attempt + 1);
218 return Ok(stream);
219 }
220 Err(e) => {
221 trace!("Connection attempt {} failed: {}", attempt + 1, e);
222 last_error = Some(e);
223 }
224 }
225 }
226
227 Err(KodeBridgeError::connection(format!(
228 "Failed to create connection after {} attempts: {}",
229 self.config.max_retries,
230 last_error
231 .map(|e| e.to_string())
232 .unwrap_or_else(|| "Unknown error".to_string())
233 )))
234 }
235
236 async fn get_connection(&self) -> Result<Either<PooledConnection, LocalSocketStream>> {
238 let metrics = global_metrics();
239
240 if let Some(ref pool) = self.pool {
241 match pool.get_connection().await {
242 Ok(conn) => {
243 metrics.connection_created(true); Ok(Either::Pool(conn))
245 }
246 Err(e) => {
247 metrics.connection_failed();
248 Err(e)
249 }
250 }
251 } else {
252 match self.create_direct_connection().await {
253 Ok(stream) => {
254 metrics.connection_created(false); Ok(Either::Direct(stream))
256 }
257 Err(e) => {
258 metrics.connection_failed();
259 Err(e)
260 }
261 }
262 }
263 }
264
265 pub async fn request(
267 &self,
268 method: &str,
269 path: &str,
270 body: Option<&serde_json::Value>,
271 ) -> crate::errors::AnyResult<crate::response::LegacyResponse> {
272 let response = self
273 .send_request_internal(method, path, body, self.config.default_timeout)
274 .await?;
275 Ok(response.to_legacy())
276 }
277
278 async fn send_request_internal(
280 &self,
281 method: &str,
282 path: &str,
283 body: Option<&Value>,
284 timeout: Duration,
285 ) -> Result<Response> {
286 let method =
287 Method::from_str(method).map_err(|e| KodeBridgeError::invalid_request(format!("Invalid method: {}", e)))?;
288
289 let mut builder = RequestBuilder::new(method.clone(), path.to_string());
290
291 if let Some(json_body) = body {
295 builder = builder.json(json_body)?;
296 }
297
298 let request = builder.build()?;
299
300 self.retry_executor
302 .execute_with_context(&format!("{} {}", method.as_str(), path), || async {
303 let result = tokio::time::timeout(timeout, async {
305 let mut connection = self.get_connection().await?;
306
307 match &mut connection {
308 Either::Pool(conn) => {
309 if let Some(stream) = conn.stream() {
310 let result = send_request(stream, request.clone()).await;
311 if result.is_err() {
312 conn.invalidate();
313 }
314 result
315 } else {
316 conn.invalidate();
317 Err(KodeBridgeError::connection("Pooled connection is invalid"))
318 }
319 }
320 Either::Direct(stream) => send_request(stream, request.clone()).await,
321 }
322 })
323 .await;
324
325 match result {
326 Ok(response) => response,
327 Err(_) => Err(KodeBridgeError::timeout(timeout.as_millis() as u64)),
328 }
329 })
330 .await
331 }
332
333 async fn send_request_with_optimization(
335 &self,
336 method: &str,
337 path: &str,
338 body: Option<&RequestBody>,
339 headers: &[(String, String)],
340 timeout: Duration,
341 is_put_optimized: bool,
342 expected_size: Option<usize>,
343 ) -> Result<Response> {
344 let method_enum =
345 Method::from_str(method).map_err(|e| KodeBridgeError::invalid_request(format!("Invalid method: {}", e)))?;
346
347 let mut builder = RequestBuilder::new(method_enum.clone(), path.to_string());
348
349 for (key, value) in headers {
351 builder = builder.header(key.as_str(), value.as_str());
352 }
353
354 if let Some(body) = body {
355 builder = match body {
356 RequestBody::Json(value) => builder.json(value)?,
357 RequestBody::JsonBytes(bytes) => builder.body_bytes(bytes.clone(), "application/json")?,
358 };
359 }
360
361 let request = builder.build()?;
362
363 let retry_context = if is_put_optimized {
365 format!("PUT_OPTIMIZED {}", path)
366 } else {
367 format!("{} {}", method, path)
368 };
369
370 let retry_executor = if is_put_optimized {
372 &self.put_retry_executor } else {
374 &self.retry_executor };
376
377 retry_executor
378 .execute_with_context(&retry_context, || async {
379 let result = tokio::time::timeout(timeout, async {
381 let mut connection = if is_put_optimized && expected_size.unwrap_or(0) > 10240 {
382 self.get_fresh_connection().await?
384 } else {
385 self.get_connection().await?
386 };
387
388 match &mut connection {
389 Either::Pool(conn) => {
390 if let Some(stream) = conn.stream() {
391 let result = send_request(stream, request.clone()).await;
392 if result.is_err() {
393 conn.invalidate();
394 }
395 result
396 } else {
397 conn.invalidate();
398 Err(KodeBridgeError::connection("Pooled connection is invalid"))
399 }
400 }
401 Either::Direct(stream) => send_request(stream, request.clone()).await,
402 }
403 })
404 .await;
405
406 match result {
407 Ok(response) => response,
408 Err(_) => Err(KodeBridgeError::timeout(timeout.as_millis() as u64)),
409 }
410 })
411 .await
412 }
413
414 async fn get_fresh_connection(&self) -> Result<Either<PooledConnection, LocalSocketStream>> {
416 use interprocess::local_socket::tokio::prelude::LocalSocketStream;
417
418 if let Some(ref pool) = self.pool {
420 match tokio::time::timeout(Duration::from_millis(20), pool.get_fresh_connection()).await {
421 Ok(Ok(conn)) => return Ok(Either::Pool(conn)),
422 Ok(Err(_)) | Err(_) => {
423 }
425 }
426 }
427
428 match tokio::time::timeout(
430 Duration::from_millis(100),
431 LocalSocketStream::connect(self.name.clone()),
432 )
433 .await
434 {
435 Ok(Ok(stream)) => Ok(Either::Direct(stream)),
436 Ok(Err(_)) | Err(_) => {
437 if let Some(ref pool) = self.pool {
439 let conn = pool.get_connection().await?;
440 Ok(Either::Pool(conn))
441 } else {
442 Err(KodeBridgeError::connection("Failed to get fresh connection"))
443 }
444 }
445 }
446 }
447
448 pub fn get(&self, path: &str) -> HttpRequestBuilder<'_> {
450 HttpRequestBuilder::new(self, Method::GET, path)
451 }
452
453 pub fn post(&self, path: &str) -> HttpRequestBuilder<'_> {
455 HttpRequestBuilder::new(self, Method::POST, path)
456 }
457
458 pub fn put(&self, path: &str) -> HttpRequestBuilder<'_> {
460 HttpRequestBuilder::new(self, Method::PUT, path)
461 }
462
463 pub async fn put_batch(&self, requests: Vec<(String, Value)>) -> Result<Vec<HttpResponse>> {
465 let batch_size = requests.len();
466 if batch_size == 0 {
467 return Ok(Vec::new());
468 }
469
470 let concurrent_limit = std::cmp::min(self.config.max_concurrent_requests, batch_size);
471 let mut responses = Vec::with_capacity(batch_size);
472 let mut pending = Vec::with_capacity(concurrent_limit);
473
474 for (path, body) in requests {
475 let body = Bytes::from(body.to_string());
476 pending.push(self.put(&path).json_bytes(body).optimize_for_put().send());
477
478 if pending.len() == concurrent_limit {
479 let chunk_results = futures::future::join_all(std::mem::take(&mut pending)).await;
480 for result in chunk_results {
481 match result {
482 Ok(response) => responses.push(response),
483 Err(e) => return Err(e),
484 }
485 }
486 }
487 }
488
489 if !pending.is_empty() {
490 let chunk_results = futures::future::join_all(pending).await;
491 for result in chunk_results {
492 match result {
493 Ok(response) => responses.push(response),
494 Err(e) => return Err(e),
495 }
496 }
497 }
498
499 Ok(responses)
500 }
501
502 pub fn delete(&self, path: &str) -> HttpRequestBuilder<'_> {
504 HttpRequestBuilder::new(self, Method::DELETE, path)
505 }
506
507 pub fn patch(&self, path: &str) -> HttpRequestBuilder<'_> {
509 HttpRequestBuilder::new(self, Method::PATCH, path)
510 }
511
512 pub fn head(&self, path: &str) -> HttpRequestBuilder<'_> {
514 HttpRequestBuilder::new(self, Method::HEAD, path)
515 }
516
517 pub fn options(&self, path: &str) -> HttpRequestBuilder<'_> {
519 HttpRequestBuilder::new(self, Method::OPTIONS, path)
520 }
521
522 pub fn pool_stats(&self) -> Option<crate::pool::PoolStats> {
524 self.pool.as_ref().map(|p| p.stats())
525 }
526
527 pub fn close(&self) {
529 if let Some(ref pool) = self.pool {
530 pool.close();
531 }
532 }
533
534 pub async fn preheat_for_puts(&self, count: usize) {
536 if let Some(ref pool) = self.pool {
537 pool.preheat_for_puts(count).await;
538 }
539 }
540
541 fn calculate_smart_timeout(&self, method: &str, body_size: Option<usize>) -> Duration {
543 match method {
544 "PUT" | "POST" => {
545 match body_size {
546 Some(size) if size > 5 * 1024 * 1024 => Duration::from_secs(30), Some(size) if size > 1024 * 1024 => Duration::from_secs(15), Some(size) if size > 100 * 1024 => Duration::from_secs(8), Some(size) if size > 10 * 1024 => Duration::from_secs(4), _ => Duration::from_secs(2), }
552 }
553 _ => self.config.default_timeout, }
555 }
556}
557
558impl<'a> HttpRequestBuilder<'a> {
559 fn new(client: &'a IpcHttpClient, method: Method, path: &str) -> Self {
560 let is_put = method == Method::PUT;
561 Self {
562 client,
563 method,
564 path: path.to_string(),
565 body: None,
566 timeout: None,
567 headers: Vec::new(),
568 put_optimized: is_put, expected_size: None,
570 }
571 }
572
573 pub fn json_body(mut self, body: &Value) -> Self {
575 self.body = Some(RequestBody::Json(body.clone()));
576 self
577 }
578
579 pub fn json_bytes(mut self, body: Bytes) -> Self {
581 self.expected_size = Some(body.len());
582 self.body = Some(RequestBody::JsonBytes(body));
583 self
584 }
585
586 pub const fn timeout(mut self, timeout: Duration) -> Self {
588 self.timeout = Some(timeout);
589 self
590 }
591
592 pub const fn expected_size(mut self, size: usize) -> Self {
594 self.expected_size = Some(size);
595 self
596 }
597
598 pub const fn optimize_for_put(mut self) -> Self {
600 self.put_optimized = true;
601 self
602 }
603
604 pub fn header<K, V>(mut self, key: K, value: V) -> Self
606 where
607 K: Into<String>,
608 V: Into<String>,
609 {
610 self.headers.push((key.into(), value.into()));
611 self
612 }
613
614 pub async fn send(self) -> Result<HttpResponse> {
616 let metrics = global_metrics();
617 let tracker = metrics.request_start(self.method.as_str());
618
619 let timeout = if self.put_optimized {
621 self.timeout.unwrap_or_else(|| {
622 self.client
623 .calculate_smart_timeout(self.method.as_str(), self.expected_size)
624 })
625 } else {
626 self.timeout.unwrap_or(self.client.config.default_timeout)
627 };
628
629 match self
630 .client
631 .send_request_with_optimization(
632 self.method.as_str(),
633 &self.path,
634 self.body.as_ref(),
635 &self.headers,
636 timeout,
637 self.put_optimized,
638 self.expected_size,
639 )
640 .await
641 {
642 Ok(response) => {
643 tracker.success(response.status_code());
644 Ok(HttpResponse::new(response))
645 }
646 Err(e) => {
647 tracker.failure(&format!("{:?}", e));
648 Err(e)
649 }
650 }
651 }
652}
653
654enum Either<A, B> {
656 Pool(A),
657 Direct(B),
658}
659
660impl Drop for IpcHttpClient {
661 fn drop(&mut self) {
662 self.close();
663 }
664}