1use std::path::Path;
2use std::time::Duration;
3
4use interprocess::local_socket::tokio::prelude::LocalSocketStream;
5use interprocess::local_socket::traits::tokio::Stream;
6use interprocess::local_socket::{GenericFilePath, Name, ToFsName};
7use serde::de::DeserializeOwned;
8use serde_json::Value;
9
10use crate::errors::{KodeBridgeError, Result};
11use crate::http_client::{send_request, RequestBuilder, Response};
12use crate::metrics::global_metrics;
13use crate::pool::{ConnectionPool, PoolConfig, PooledConnection};
14use crate::retry::{RetryConfig, RetryExecutor};
15use http::Method;
16use std::str::FromStr;
17use tracing::{debug, trace};
18
19#[derive(Debug, Clone)]
21pub struct ClientConfig {
22 pub default_timeout: Duration,
24 pub pool_config: PoolConfig,
26 pub enable_pooling: bool,
28 pub max_retries: usize,
30 pub retry_delay: Duration,
31 pub max_concurrent_requests: usize,
33 pub max_requests_per_second: Option<f64>,
35}
36
37impl Default for ClientConfig {
38 fn default() -> Self {
39 Self {
40 default_timeout: Duration::from_secs(5), pool_config: PoolConfig::default(),
42 enable_pooling: true,
43 max_retries: 3, retry_delay: Duration::from_millis(25), max_concurrent_requests: 16, max_requests_per_second: Some(50.0), }
48 }
49}
50
51pub struct IpcHttpClient {
56 name: Name<'static>,
57 config: ClientConfig,
58 pool: Option<ConnectionPool>,
59 retry_executor: RetryExecutor,
60 put_retry_executor: RetryExecutor,
62}
63
64pub struct HttpRequestBuilder<'a> {
66 client: &'a IpcHttpClient,
67 method: Method,
68 path: String,
69 body: Option<Value>,
70 timeout: Option<Duration>,
71 headers: Vec<(String, String)>,
72 put_optimized: bool,
74 expected_size: Option<usize>,
76}
77
78#[derive(Debug)]
80pub struct HttpResponse {
81 inner: Response,
82}
83
84impl HttpResponse {
85 fn new(response: Response) -> Self {
86 Self { inner: response }
87 }
88
89 pub fn status(&self) -> u16 {
91 self.inner.status_code()
92 }
93
94 pub fn headers(&self) -> Value {
96 let headers_map: std::collections::HashMap<String, String> = self
97 .inner
98 .headers()
99 .iter()
100 .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
101 .collect();
102 serde_json::to_value(headers_map).unwrap_or(Value::Null)
103 }
104
105 pub fn body(&self) -> Result<String> {
107 self.inner.text()
108 }
109
110 pub fn is_success(&self) -> bool {
112 self.inner.is_success()
113 }
114
115 pub fn is_client_error(&self) -> bool {
117 self.inner.is_client_error()
118 }
119
120 pub fn is_server_error(&self) -> bool {
122 self.inner.is_server_error()
123 }
124
125 pub fn content_length(&self) -> u64 {
127 self.inner.content_length().unwrap_or(0)
128 }
129
130 pub fn json<T>(&self) -> Result<T>
132 where
133 T: DeserializeOwned,
134 {
135 self.inner.json()
136 }
137
138 pub fn json_value(&self) -> Result<Value> {
140 self.inner.json_value()
141 }
142
143 pub fn into_inner(self) -> Response {
145 self.inner
146 }
147
148 pub fn to_legacy(&self) -> crate::response::LegacyResponse {
150 self.inner.to_legacy()
151 }
152}
153
154impl IpcHttpClient {
155 pub fn new<P>(path: P) -> Result<Self>
157 where
158 P: AsRef<Path>,
159 {
160 Self::with_config(path, ClientConfig::default())
161 }
162
163 pub fn with_config<P>(path: P, config: ClientConfig) -> Result<Self>
165 where
166 P: AsRef<Path>,
167 {
168 let name = path
169 .as_ref()
170 .to_fs_name::<GenericFilePath>()
171 .map_err(|e| KodeBridgeError::configuration(format!("Invalid path: {}", e)))?
172 .into_owned();
173
174 let pool = if config.enable_pooling {
175 Some(ConnectionPool::new(
176 name.clone(),
177 config.pool_config.clone(),
178 ))
179 } else {
180 None
181 };
182
183 let retry_config = RetryConfig::for_network_operations()
185 .max_attempts(config.max_retries)
186 .base_delay(Duration::from_millis(config.pool_config.retry_delay_ms));
187
188 let retry_executor = RetryExecutor::new(retry_config);
189
190 let put_retry_config = RetryConfig::for_put_requests();
192 let put_retry_executor = RetryExecutor::new(put_retry_config);
193
194 Ok(Self {
195 name,
196 config,
197 pool,
198 retry_executor,
199 put_retry_executor,
200 })
201 }
202
203 async fn create_direct_connection(&self) -> Result<LocalSocketStream> {
205 let mut last_error = None;
206
207 for attempt in 0..self.config.max_retries {
208 if attempt > 0 {
209 tokio::time::sleep(self.config.retry_delay).await;
210 }
211
212 match LocalSocketStream::connect(self.name.clone()).await {
213 Ok(stream) => {
214 debug!("Created direct connection on attempt {}", attempt + 1);
215 return Ok(stream);
216 }
217 Err(e) => {
218 trace!("Connection attempt {} failed: {}", attempt + 1, e);
219 last_error = Some(e);
220 }
221 }
222 }
223
224 Err(KodeBridgeError::connection(format!(
225 "Failed to create connection after {} attempts: {}",
226 self.config.max_retries,
227 last_error
228 .map(|e| e.to_string())
229 .unwrap_or_else(|| "Unknown error".to_string())
230 )))
231 }
232
233 async fn get_connection(&self) -> Result<Either<PooledConnection, LocalSocketStream>> {
235 let metrics = global_metrics();
236
237 if let Some(ref pool) = self.pool {
238 match pool.get_connection().await {
239 Ok(conn) => {
240 metrics.connection_created(true); Ok(Either::Pool(conn))
242 }
243 Err(e) => {
244 metrics.connection_failed();
245 Err(e)
246 }
247 }
248 } else {
249 match self.create_direct_connection().await {
250 Ok(stream) => {
251 metrics.connection_created(false); Ok(Either::Direct(stream))
253 }
254 Err(e) => {
255 metrics.connection_failed();
256 Err(e)
257 }
258 }
259 }
260 }
261
262 pub async fn request(
264 &self,
265 method: &str,
266 path: &str,
267 body: Option<&serde_json::Value>,
268 ) -> crate::errors::AnyResult<crate::response::LegacyResponse> {
269 let response = self
270 .send_request_internal(method, path, body, self.config.default_timeout)
271 .await?;
272 Ok(response.to_legacy())
273 }
274
275 async fn send_request_internal(
277 &self,
278 method: &str,
279 path: &str,
280 body: Option<&Value>,
281 timeout: Duration,
282 ) -> Result<Response> {
283 let method = Method::from_str(method)
284 .map_err(|e| KodeBridgeError::invalid_request(format!("Invalid method: {}", e)))?;
285
286 let mut builder = RequestBuilder::new(method.clone(), path.to_string());
287
288 if let Some(json_body) = body {
292 builder = builder.json(json_body)?;
293 }
294
295 let request = builder.build()?;
296
297 self.retry_executor
299 .execute_with_context(&format!("{} {}", method.as_str(), path), || async {
300 let result = tokio::time::timeout(timeout, async {
302 let mut connection = self.get_connection().await?;
303
304 match &mut connection {
305 Either::Pool(conn) => {
306 if let Some(stream) = conn.stream() {
307 send_request(stream, request.clone()).await
308 } else {
309 Err(KodeBridgeError::connection("Pooled connection is invalid"))
310 }
311 }
312 Either::Direct(stream) => send_request(stream, request.clone()).await,
313 }
314 })
315 .await;
316
317 match result {
318 Ok(response) => response,
319 Err(_) => Err(KodeBridgeError::timeout(timeout.as_millis() as u64)),
320 }
321 })
322 .await
323 }
324
325 async fn send_request_with_optimization(
327 &self,
328 method: &str,
329 path: &str,
330 body: Option<&Value>,
331 headers: &[(String, String)],
332 timeout: Duration,
333 is_put_optimized: bool,
334 expected_size: Option<usize>,
335 ) -> Result<Response> {
336 let method_enum = Method::from_str(method)
337 .map_err(|e| KodeBridgeError::invalid_request(format!("Invalid method: {}", e)))?;
338
339 let mut builder = RequestBuilder::new(method_enum.clone(), path.to_string());
340
341 for (key, value) in headers {
343 builder = builder.header(key.as_str(), value.as_str());
344 }
345
346 if let Some(json_body) = body {
347 builder = builder.json(json_body)?;
348 }
349
350 let request = builder.build()?;
351
352 let retry_context = if is_put_optimized {
354 format!("PUT_OPTIMIZED {}", path)
355 } else {
356 format!("{} {}", method, path)
357 };
358
359 let retry_executor = if is_put_optimized {
361 &self.put_retry_executor } else {
363 &self.retry_executor };
365
366 retry_executor
367 .execute_with_context(&retry_context, || async {
368 let result = tokio::time::timeout(timeout, async {
370 let mut connection = if is_put_optimized && expected_size.unwrap_or(0) > 10240 {
371 self.get_fresh_connection().await?
373 } else {
374 self.get_connection().await?
375 };
376
377 match &mut connection {
378 Either::Pool(conn) => {
379 if let Some(stream) = conn.stream() {
380 send_request(stream, request.clone()).await
381 } else {
382 Err(KodeBridgeError::connection("Pooled connection is invalid"))
383 }
384 }
385 Either::Direct(stream) => send_request(stream, request.clone()).await,
386 }
387 })
388 .await;
389
390 match result {
391 Ok(response) => response,
392 Err(_) => Err(KodeBridgeError::timeout(timeout.as_millis() as u64)),
393 }
394 })
395 .await
396 }
397
398 async fn get_fresh_connection(&self) -> Result<Either<PooledConnection, LocalSocketStream>> {
400 use interprocess::local_socket::tokio::prelude::LocalSocketStream;
401
402 if let Some(ref pool) = self.pool {
404 match tokio::time::timeout(Duration::from_millis(20), pool.get_fresh_connection()).await
405 {
406 Ok(Ok(conn)) => return Ok(Either::Pool(conn)),
407 Ok(Err(_)) | Err(_) => {
408 }
410 }
411 }
412
413 match tokio::time::timeout(
415 Duration::from_millis(100),
416 LocalSocketStream::connect(self.name.clone()),
417 )
418 .await
419 {
420 Ok(Ok(stream)) => Ok(Either::Direct(stream)),
421 Ok(Err(_)) | Err(_) => {
422 if let Some(ref pool) = self.pool {
424 let conn = pool.get_connection().await?;
425 Ok(Either::Pool(conn))
426 } else {
427 Err(KodeBridgeError::connection(
428 "Failed to get fresh connection",
429 ))
430 }
431 }
432 }
433 }
434
435 pub fn get(&self, path: &str) -> HttpRequestBuilder<'_> {
437 HttpRequestBuilder::new(self, Method::GET, path)
438 }
439
440 pub fn post(&self, path: &str) -> HttpRequestBuilder<'_> {
442 HttpRequestBuilder::new(self, Method::POST, path)
443 }
444
445 pub fn put(&self, path: &str) -> HttpRequestBuilder<'_> {
447 HttpRequestBuilder::new(self, Method::PUT, path)
448 }
449
450 pub async fn put_batch(&self, requests: Vec<(String, Value)>) -> Result<Vec<HttpResponse>> {
452 let batch_size = requests.len();
453 if batch_size == 0 {
454 return Ok(Vec::new());
455 }
456
457 let concurrent_limit = std::cmp::min(self.config.max_concurrent_requests, batch_size);
459 let mut responses = Vec::with_capacity(batch_size);
460
461 for chunk in requests.chunks(concurrent_limit) {
463 let mut futures = Vec::new();
464
465 for (path, body) in chunk {
466 let path = path.clone();
467 let body = body.clone();
468
469 let future = self.put(&path).json_body(&body).optimize_for_put().send();
470
471 futures.push(future);
472 }
473
474 let chunk_results = futures::future::join_all(futures).await;
476
477 for result in chunk_results {
478 match result {
479 Ok(response) => responses.push(response),
480 Err(e) => return Err(e),
481 }
482 }
483 }
484
485 Ok(responses)
486 }
487
488 pub fn delete(&self, path: &str) -> HttpRequestBuilder<'_> {
490 HttpRequestBuilder::new(self, Method::DELETE, path)
491 }
492
493 pub fn patch(&self, path: &str) -> HttpRequestBuilder<'_> {
495 HttpRequestBuilder::new(self, Method::PATCH, path)
496 }
497
498 pub fn head(&self, path: &str) -> HttpRequestBuilder<'_> {
500 HttpRequestBuilder::new(self, Method::HEAD, path)
501 }
502
503 pub fn options(&self, path: &str) -> HttpRequestBuilder<'_> {
505 HttpRequestBuilder::new(self, Method::OPTIONS, path)
506 }
507
508 pub fn pool_stats(&self) -> Option<crate::pool::PoolStats> {
510 self.pool.as_ref().map(|p| p.stats())
511 }
512
513 pub fn close(&self) {
515 if let Some(ref pool) = self.pool {
516 pool.close();
517 }
518 }
519
520 pub async fn preheat_for_puts(&self, count: usize) {
522 if let Some(ref pool) = self.pool {
523 pool.preheat_for_puts(count).await;
524 }
525 }
526
527 fn calculate_smart_timeout(&self, method: &str, body_size: Option<usize>) -> Duration {
529 match method {
530 "PUT" | "POST" => {
531 match body_size {
532 Some(size) if size > 5 * 1024 * 1024 => Duration::from_secs(30), Some(size) if size > 1024 * 1024 => Duration::from_secs(15), Some(size) if size > 100 * 1024 => Duration::from_secs(8), Some(size) if size > 10 * 1024 => Duration::from_secs(4), _ => Duration::from_secs(2), }
538 }
539 _ => self.config.default_timeout, }
541 }
542}
543
544impl<'a> HttpRequestBuilder<'a> {
545 fn new(client: &'a IpcHttpClient, method: Method, path: &str) -> Self {
546 let is_put = method == Method::PUT;
547 Self {
548 client,
549 method,
550 path: path.to_string(),
551 body: None,
552 timeout: None,
553 headers: Vec::new(),
554 put_optimized: is_put, expected_size: None,
556 }
557 }
558
559 pub fn json_body(mut self, body: &Value) -> Self {
561 self.body = Some(body.clone());
562
563 if self.method == Method::PUT {
565 if let Ok(json_bytes) = serde_json::to_vec(body) {
566 self.expected_size = Some(json_bytes.len());
567 }
568 }
569
570 self
571 }
572
573 pub fn timeout(mut self, timeout: Duration) -> Self {
575 self.timeout = Some(timeout);
576 self
577 }
578
579 pub fn expected_size(mut self, size: usize) -> Self {
581 self.expected_size = Some(size);
582 self
583 }
584
585 pub fn optimize_for_put(mut self) -> Self {
587 self.put_optimized = true;
588 self
589 }
590
591 pub fn header<K, V>(mut self, key: K, value: V) -> Self
593 where
594 K: Into<String>,
595 V: Into<String>,
596 {
597 self.headers.push((key.into(), value.into()));
598 self
599 }
600
601 pub async fn send(self) -> Result<HttpResponse> {
603 let metrics = global_metrics();
604 let tracker = metrics.request_start(self.method.as_str());
605
606 let timeout = if self.put_optimized {
608 self.timeout.unwrap_or_else(|| {
609 self.client
610 .calculate_smart_timeout(self.method.as_str(), self.expected_size)
611 })
612 } else {
613 self.timeout.unwrap_or(self.client.config.default_timeout)
614 };
615
616 match self
617 .client
618 .send_request_with_optimization(
619 self.method.as_str(),
620 &self.path,
621 self.body.as_ref(),
622 &self.headers,
623 timeout,
624 self.put_optimized,
625 self.expected_size,
626 )
627 .await
628 {
629 Ok(response) => {
630 tracker.success(response.status_code());
631 Ok(HttpResponse::new(response))
632 }
633 Err(e) => {
634 tracker.failure(&format!("{:?}", e));
635 Err(e)
636 }
637 }
638 }
639}
640
641enum Either<A, B> {
643 Pool(A),
644 Direct(B),
645}
646
647impl Drop for IpcHttpClient {
648 fn drop(&mut self) {
649 self.close();
650 }
651}