kode_bridge/
ipc_http_client.rs

1use std::path::Path;
2use std::time::Duration;
3
4use interprocess::local_socket::tokio::prelude::LocalSocketStream;
5use interprocess::local_socket::traits::tokio::Stream;
6use interprocess::local_socket::{GenericFilePath, Name, ToFsName};
7use serde::de::DeserializeOwned;
8use serde_json::Value;
9
10use crate::errors::{KodeBridgeError, Result};
11use crate::http_client::{send_request, RequestBuilder, Response};
12use crate::metrics::global_metrics;
13use crate::pool::{ConnectionPool, PoolConfig, PooledConnection};
14use crate::retry::{RetryConfig, RetryExecutor};
15use http::Method;
16use std::str::FromStr;
17use tracing::{debug, trace};
18
19/// Configuration for IPC HTTP client
20#[derive(Debug, Clone)]
21pub struct ClientConfig {
22    /// Default timeout for requests
23    pub default_timeout: Duration,
24    /// Connection pool configuration
25    pub pool_config: PoolConfig,
26    /// Enable connection pooling
27    pub enable_pooling: bool,
28    /// Retry configuration
29    pub max_retries: usize,
30    pub retry_delay: Duration,
31    /// Concurrent request limit
32    pub max_concurrent_requests: usize,
33    /// Rate limiting: max requests per second
34    pub max_requests_per_second: Option<f64>,
35}
36
37impl Default for ClientConfig {
38    fn default() -> Self {
39        Self {
40            default_timeout: Duration::from_secs(5), // 减少默认超时到5秒
41            pool_config: PoolConfig::default(),
42            enable_pooling: true,
43            max_retries: 3,                         // 减少重试次数
44            retry_delay: Duration::from_millis(25), // 减少重试延迟
45            max_concurrent_requests: 16,            // 增加并发请求数
46            max_requests_per_second: Some(50.0),    // 增加请求速率限制
47        }
48    }
49}
50
51/// Generic IPC HTTP client that works on both Unix and Windows platforms
52///
53/// This client is optimized for request-response patterns with connection pooling support.
54/// For streaming functionality, use `IpcStreamClient` instead.
55pub struct IpcHttpClient {
56    name: Name<'static>,
57    config: ClientConfig,
58    pool: Option<ConnectionPool>,
59    retry_executor: RetryExecutor,
60    /// 专门用于PUT请求的重试执行器
61    put_retry_executor: RetryExecutor,
62}
63
64/// HTTP request builder for fluent API
65pub struct HttpRequestBuilder<'a> {
66    client: &'a IpcHttpClient,
67    method: Method,
68    path: String,
69    body: Option<Value>,
70    timeout: Option<Duration>,
71    headers: Vec<(String, String)>,
72    /// PUT专用优化标志
73    put_optimized: bool,
74    /// 预期数据大小,用于选择合适的缓冲区和超时
75    expected_size: Option<usize>,
76}
77
78/// Enhanced HTTP response wrapper with chainable methods
79#[derive(Debug)]
80pub struct HttpResponse {
81    inner: Response,
82}
83
84impl HttpResponse {
85    fn new(response: Response) -> Self {
86        Self { inner: response }
87    }
88
89    /// Get the HTTP status code
90    pub fn status(&self) -> u16 {
91        self.inner.status_code()
92    }
93
94    /// Get response headers as JSON value (for backward compatibility)
95    pub fn headers(&self) -> Value {
96        let headers_map: std::collections::HashMap<String, String> = self
97            .inner
98            .headers()
99            .iter()
100            .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
101            .collect();
102        serde_json::to_value(headers_map).unwrap_or(Value::Null)
103    }
104
105    /// Get response body as string
106    pub fn body(&self) -> Result<String> {
107        self.inner.text()
108    }
109
110    /// Check if response indicates success (2xx status)
111    pub fn is_success(&self) -> bool {
112        self.inner.is_success()
113    }
114
115    /// Check if response indicates client error (4xx status)
116    pub fn is_client_error(&self) -> bool {
117        self.inner.is_client_error()
118    }
119
120    /// Check if response indicates server error (5xx status)
121    pub fn is_server_error(&self) -> bool {
122        self.inner.is_server_error()
123    }
124
125    /// Get content length from headers
126    pub fn content_length(&self) -> u64 {
127        self.inner.content_length().unwrap_or(0)
128    }
129
130    /// Parse response body as JSON
131    pub fn json<T>(&self) -> Result<T>
132    where
133        T: DeserializeOwned,
134    {
135        self.inner.json()
136    }
137
138    /// Parse response body as generic JSON value
139    pub fn json_value(&self) -> Result<Value> {
140        self.inner.json_value()
141    }
142
143    /// Get the underlying modern Response
144    pub fn into_inner(self) -> Response {
145        self.inner
146    }
147
148    /// Convert to legacy Response format for backward compatibility
149    pub fn to_legacy(&self) -> crate::response::LegacyResponse {
150        self.inner.to_legacy()
151    }
152}
153
154impl IpcHttpClient {
155    /// Create a new IPC HTTP client with default configuration
156    pub fn new<P>(path: P) -> Result<Self>
157    where
158        P: AsRef<Path>,
159    {
160        Self::with_config(path, ClientConfig::default())
161    }
162
163    /// Create a new IPC HTTP client with custom configuration
164    pub fn with_config<P>(path: P, config: ClientConfig) -> Result<Self>
165    where
166        P: AsRef<Path>,
167    {
168        let name = path
169            .as_ref()
170            .to_fs_name::<GenericFilePath>()
171            .map_err(|e| KodeBridgeError::configuration(format!("Invalid path: {}", e)))?
172            .into_owned();
173
174        let pool = if config.enable_pooling {
175            Some(ConnectionPool::new(
176                name.clone(),
177                config.pool_config.clone(),
178            ))
179        } else {
180            None
181        };
182
183        // Create retry executor with optimized configuration for different request types
184        let retry_config = RetryConfig::for_network_operations()
185            .max_attempts(config.max_retries.min(3)) // 限制最大重试次数以提高响应速度
186            .base_delay(Duration::from_millis(
187                config.pool_config.retry_delay_ms.min(25),
188            )); // 更快的重试
189
190        let retry_executor = RetryExecutor::new(retry_config);
191
192        // 创建专门用于PUT请求的快速重试执行器
193        let put_retry_config = RetryConfig::for_put_requests();
194        let put_retry_executor = RetryExecutor::new(put_retry_config);
195
196        Ok(Self {
197            name,
198            config,
199            pool,
200            retry_executor,
201            put_retry_executor,
202        })
203    }
204
205    /// Create a direct connection (bypassing pool)
206    async fn create_direct_connection(&self) -> Result<LocalSocketStream> {
207        let mut last_error = None;
208
209        for attempt in 0..self.config.max_retries {
210            if attempt > 0 {
211                tokio::time::sleep(self.config.retry_delay).await;
212            }
213
214            match LocalSocketStream::connect(self.name.clone()).await {
215                Ok(stream) => {
216                    debug!("Created direct connection on attempt {}", attempt + 1);
217                    return Ok(stream);
218                }
219                Err(e) => {
220                    trace!("Connection attempt {} failed: {}", attempt + 1, e);
221                    last_error = Some(e);
222                }
223            }
224        }
225
226        Err(KodeBridgeError::connection(format!(
227            "Failed to create connection after {} attempts: {}",
228            self.config.max_retries,
229            last_error
230                .map(|e| e.to_string())
231                .unwrap_or_else(|| "Unknown error".to_string())
232        )))
233    }
234
235    /// Get a connection (from pool or create new)
236    async fn get_connection(&self) -> Result<Either<PooledConnection, LocalSocketStream>> {
237        let metrics = global_metrics();
238
239        if let Some(ref pool) = self.pool {
240            match pool.get_connection().await {
241                Ok(conn) => {
242                    metrics.connection_created(true); // From pool
243                    Ok(Either::Pool(conn))
244                }
245                Err(e) => {
246                    metrics.connection_failed();
247                    Err(e)
248                }
249            }
250        } else {
251            match self.create_direct_connection().await {
252                Ok(stream) => {
253                    metrics.connection_created(false); // Direct connection
254                    Ok(Either::Direct(stream))
255                }
256                Err(e) => {
257                    metrics.connection_failed();
258                    Err(e)
259                }
260            }
261        }
262    }
263
264    /// Legacy request method for backward compatibility
265    pub async fn request(
266        &self,
267        method: &str,
268        path: &str,
269        body: Option<&serde_json::Value>,
270    ) -> crate::errors::AnyResult<crate::response::LegacyResponse> {
271        let response = self
272            .send_request_internal(method, path, body, self.config.default_timeout)
273            .await?;
274        Ok(response.to_legacy())
275    }
276
277    /// Internal method to send requests with enhanced retry logic
278    async fn send_request_internal(
279        &self,
280        method: &str,
281        path: &str,
282        body: Option<&Value>,
283        timeout: Duration,
284    ) -> Result<Response> {
285        let method = Method::from_str(method)
286            .map_err(|e| KodeBridgeError::invalid_request(format!("Invalid method: {}", e)))?;
287
288        let mut builder = RequestBuilder::new(method.clone(), path.to_string());
289
290        // Note: This method doesn't support custom headers for backward compatibility
291        // Use the fluent API (get(), post(), etc.) for custom headers
292
293        if let Some(json_body) = body {
294            builder = builder.json(json_body)?;
295        }
296
297        let request = builder.build()?;
298
299        // Use smart retry mechanism
300        self.retry_executor
301            .execute_with_context(&format!("{} {}", method.as_str(), path), || async {
302                // Execute with timeout
303                let result = tokio::time::timeout(timeout, async {
304                    let mut connection = self.get_connection().await?;
305
306                    match &mut connection {
307                        Either::Pool(conn) => {
308                            if let Some(stream) = conn.stream() {
309                                send_request(stream, request.clone()).await
310                            } else {
311                                Err(KodeBridgeError::connection("Pooled connection is invalid"))
312                            }
313                        }
314                        Either::Direct(stream) => send_request(stream, request.clone()).await,
315                    }
316                })
317                .await;
318
319                match result {
320                    Ok(response) => response,
321                    Err(_) => Err(KodeBridgeError::timeout(timeout.as_millis() as u64)),
322                }
323            })
324            .await
325    }
326
327    /// Enhanced request sending with PUT optimization support
328    async fn send_request_with_optimization(
329        &self,
330        method: &str,
331        path: &str,
332        body: Option<&Value>,
333        headers: &[(String, String)],
334        timeout: Duration,
335        is_put_optimized: bool,
336        expected_size: Option<usize>,
337    ) -> Result<Response> {
338        let method_enum = Method::from_str(method)
339            .map_err(|e| KodeBridgeError::invalid_request(format!("Invalid method: {}", e)))?;
340
341        let mut builder = RequestBuilder::new(method_enum.clone(), path.to_string());
342
343        // Add custom headers
344        for (key, value) in headers {
345            builder = builder.header(key.as_str(), value.as_str());
346        }
347
348        if let Some(json_body) = body {
349            builder = builder.json(json_body)?;
350        }
351
352        let request = builder.build()?;
353
354        // PUT请求使用专门的重试策略
355        let retry_context = if is_put_optimized {
356            format!("PUT_OPTIMIZED {}", path)
357        } else {
358            format!("{} {}", method, path)
359        };
360
361        // Use smart retry mechanism with PUT optimization
362        let retry_executor = if is_put_optimized {
363            &self.put_retry_executor // 使用PUT专用的快速重试器
364        } else {
365            &self.retry_executor // 使用通用重试器
366        };
367
368        retry_executor
369            .execute_with_context(&retry_context, || async {
370                // Execute with timeout
371                let result = tokio::time::timeout(timeout, async {
372                    let mut connection = if is_put_optimized && expected_size.unwrap_or(0) > 10240 {
373                        // 对于大的PUT请求,优先获取新连接
374                        self.get_fresh_connection().await?
375                    } else {
376                        self.get_connection().await?
377                    };
378
379                    match &mut connection {
380                        Either::Pool(conn) => {
381                            if let Some(stream) = conn.stream() {
382                                send_request(stream, request.clone()).await
383                            } else {
384                                Err(KodeBridgeError::connection("Pooled connection is invalid"))
385                            }
386                        }
387                        Either::Direct(stream) => send_request(stream, request.clone()).await,
388                    }
389                })
390                .await;
391
392                match result {
393                    Ok(response) => response,
394                    Err(_) => Err(KodeBridgeError::timeout(timeout.as_millis() as u64)),
395                }
396            })
397            .await
398    }
399
400    /// Get a fresh connection optimized for PUT requests
401    async fn get_fresh_connection(&self) -> Result<Either<PooledConnection, LocalSocketStream>> {
402        use interprocess::local_socket::tokio::prelude::LocalSocketStream;
403
404        // 首先尝试从连接池获取新连接
405        if let Some(ref pool) = self.pool {
406            match tokio::time::timeout(Duration::from_millis(20), pool.get_fresh_connection()).await
407            {
408                Ok(Ok(conn)) => return Ok(Either::Pool(conn)),
409                Ok(Err(_)) | Err(_) => {
410                    // 池化新连接失败,继续尝试直接连接
411                }
412            }
413        }
414
415        // 直接创建连接,使用更快的超时设置
416        match tokio::time::timeout(
417            Duration::from_millis(100),
418            LocalSocketStream::connect(self.name.clone()),
419        )
420        .await
421        {
422            Ok(Ok(stream)) => Ok(Either::Direct(stream)),
423            Ok(Err(_)) | Err(_) => {
424                // 如果直接连接失败,回退到普通池化连接
425                if let Some(ref pool) = self.pool {
426                    let conn = pool.get_connection().await?;
427                    Ok(Either::Pool(conn))
428                } else {
429                    Err(KodeBridgeError::connection(
430                        "Failed to get fresh connection",
431                    ))
432                }
433            }
434        }
435    }
436
437    /// GET request
438    pub fn get(&self, path: &str) -> HttpRequestBuilder<'_> {
439        HttpRequestBuilder::new(self, Method::GET, path)
440    }
441
442    /// POST request
443    pub fn post(&self, path: &str) -> HttpRequestBuilder<'_> {
444        HttpRequestBuilder::new(self, Method::POST, path)
445    }
446
447    /// PUT request with optimization enabled by default
448    pub fn put(&self, path: &str) -> HttpRequestBuilder<'_> {
449        HttpRequestBuilder::new(self, Method::PUT, path)
450    }
451
452    /// Optimized batch PUT operations
453    pub async fn put_batch(&self, requests: Vec<(String, Value)>) -> Result<Vec<HttpResponse>> {
454        let batch_size = requests.len();
455        if batch_size == 0 {
456            return Ok(Vec::new());
457        }
458
459        // 限制并发数以避免过载
460        let concurrent_limit = std::cmp::min(self.config.max_concurrent_requests, batch_size);
461        let mut responses = Vec::with_capacity(batch_size);
462
463        // 分批处理以控制内存使用和网络负载
464        for chunk in requests.chunks(concurrent_limit) {
465            let mut futures = Vec::new();
466
467            for (path, body) in chunk {
468                let path = path.clone();
469                let body = body.clone();
470
471                let future = self.put(&path).json_body(&body).optimize_for_put().send();
472
473                futures.push(future);
474            }
475
476            // 并发等待当前批次完成
477            let chunk_results = futures::future::join_all(futures).await;
478
479            for result in chunk_results {
480                match result {
481                    Ok(response) => responses.push(response),
482                    Err(e) => return Err(e),
483                }
484            }
485        }
486
487        Ok(responses)
488    }
489
490    /// DELETE request
491    pub fn delete(&self, path: &str) -> HttpRequestBuilder<'_> {
492        HttpRequestBuilder::new(self, Method::DELETE, path)
493    }
494
495    /// PATCH request
496    pub fn patch(&self, path: &str) -> HttpRequestBuilder<'_> {
497        HttpRequestBuilder::new(self, Method::PATCH, path)
498    }
499
500    /// HEAD request
501    pub fn head(&self, path: &str) -> HttpRequestBuilder<'_> {
502        HttpRequestBuilder::new(self, Method::HEAD, path)
503    }
504
505    /// OPTIONS request
506    pub fn options(&self, path: &str) -> HttpRequestBuilder<'_> {
507        HttpRequestBuilder::new(self, Method::OPTIONS, path)
508    }
509
510    /// Get pool statistics (if pooling is enabled)
511    pub fn pool_stats(&self) -> Option<crate::pool::PoolStats> {
512        self.pool.as_ref().map(|p| p.stats())
513    }
514
515    /// Close the client and clean up resources
516    pub fn close(&self) {
517        if let Some(ref pool) = self.pool {
518            pool.close();
519        }
520    }
521
522    /// Preheat connections for better PUT performance
523    pub async fn preheat_for_puts(&self, count: usize) {
524        if let Some(ref pool) = self.pool {
525            pool.preheat_for_puts(count).await;
526        }
527    }
528
529    /// Smart timeout calculation based on request characteristics
530    fn calculate_smart_timeout(&self, method: &str, body_size: Option<usize>) -> Duration {
531        match method {
532            "PUT" | "POST" => {
533                match body_size {
534                    Some(size) if size > 5 * 1024 * 1024 => Duration::from_secs(30), // >5MB: 30s
535                    Some(size) if size > 1024 * 1024 => Duration::from_secs(15),     // >1MB: 15s
536                    Some(size) if size > 100 * 1024 => Duration::from_secs(8),       // >100KB: 8s
537                    Some(size) if size > 10 * 1024 => Duration::from_secs(4),        // >10KB: 4s
538                    _ => Duration::from_secs(2),                                     // 小请求: 2s
539                }
540            }
541            _ => self.config.default_timeout, // 其他方法使用默认超时
542        }
543    }
544}
545
546impl<'a> HttpRequestBuilder<'a> {
547    fn new(client: &'a IpcHttpClient, method: Method, path: &str) -> Self {
548        let is_put = method == Method::PUT;
549        Self {
550            client,
551            method,
552            path: path.to_string(),
553            body: None,
554            timeout: None,
555            headers: Vec::new(),
556            put_optimized: is_put, // 自动为PUT请求启用优化
557            expected_size: None,
558        }
559    }
560
561    /// Set JSON body
562    pub fn json_body(mut self, body: &Value) -> Self {
563        self.body = Some(body.clone());
564
565        // 为PUT请求估算数据大小以优化处理
566        if self.method == Method::PUT {
567            if let Ok(json_bytes) = serde_json::to_vec(body) {
568                self.expected_size = Some(json_bytes.len());
569            }
570        }
571
572        self
573    }
574
575    /// Set custom timeout
576    pub fn timeout(mut self, timeout: Duration) -> Self {
577        self.timeout = Some(timeout);
578        self
579    }
580
581    /// 设置预期数据大小(用于PUT请求优化)
582    pub fn expected_size(mut self, size: usize) -> Self {
583        self.expected_size = Some(size);
584        self
585    }
586
587    /// 启用PUT请求专门优化
588    pub fn optimize_for_put(mut self) -> Self {
589        self.put_optimized = true;
590        self
591    }
592
593    /// Add custom header
594    pub fn header<K, V>(mut self, key: K, value: V) -> Self
595    where
596        K: Into<String>,
597        V: Into<String>,
598    {
599        self.headers.push((key.into(), value.into()));
600        self
601    }
602
603    /// Send the request
604    pub async fn send(self) -> Result<HttpResponse> {
605        let metrics = global_metrics();
606        let tracker = metrics.request_start(self.method.as_str());
607
608        // 为PUT请求优化超时设置,使用智能超时计算
609        let timeout = if self.put_optimized {
610            self.timeout.unwrap_or_else(|| {
611                self.client
612                    .calculate_smart_timeout(self.method.as_str(), self.expected_size)
613            })
614        } else {
615            self.timeout.unwrap_or(self.client.config.default_timeout)
616        };
617
618        match self
619            .client
620            .send_request_with_optimization(
621                self.method.as_str(),
622                &self.path,
623                self.body.as_ref(),
624                &self.headers,
625                timeout,
626                self.put_optimized,
627                self.expected_size,
628            )
629            .await
630        {
631            Ok(response) => {
632                tracker.success(response.status_code());
633                Ok(HttpResponse::new(response))
634            }
635            Err(e) => {
636                tracker.failure(&format!("{:?}", e));
637                Err(e)
638            }
639        }
640    }
641}
642
643/// Helper enum for connection types
644enum Either<A, B> {
645    Pool(A),
646    Direct(B),
647}
648
649impl Drop for IpcHttpClient {
650    fn drop(&mut self) {
651        self.close();
652    }
653}