kode_bridge/
ipc_http_client.rs

1use std::path::Path;
2use std::time::Duration;
3
4use interprocess::local_socket::tokio::prelude::LocalSocketStream;
5use interprocess::local_socket::traits::tokio::Stream;
6use interprocess::local_socket::{GenericFilePath, Name, ToFsName};
7use serde::de::DeserializeOwned;
8use serde_json::Value;
9
10use crate::errors::{KodeBridgeError, Result};
11use crate::http_client::{send_request, RequestBuilder, Response};
12use crate::metrics::global_metrics;
13use crate::pool::{ConnectionPool, PoolConfig, PooledConnection};
14use crate::retry::{RetryConfig, RetryExecutor};
15use http::Method;
16use std::str::FromStr;
17use tracing::{debug, trace};
18
19/// Configuration for IPC HTTP client
20#[derive(Debug, Clone)]
21pub struct ClientConfig {
22    /// Default timeout for requests
23    pub default_timeout: Duration,
24    /// Connection pool configuration
25    pub pool_config: PoolConfig,
26    /// Enable connection pooling
27    pub enable_pooling: bool,
28    /// Retry configuration
29    pub max_retries: usize,
30    pub retry_delay: Duration,
31    /// Concurrent request limit
32    pub max_concurrent_requests: usize,
33    /// Rate limiting: max requests per second
34    pub max_requests_per_second: Option<f64>,
35}
36
37impl Default for ClientConfig {
38    fn default() -> Self {
39        Self {
40            default_timeout: Duration::from_secs(5), // 减少默认超时到5秒
41            pool_config: PoolConfig::default(),
42            enable_pooling: true,
43            max_retries: 3,                         // 减少重试次数
44            retry_delay: Duration::from_millis(25), // 减少重试延迟
45            max_concurrent_requests: 16,            // 增加并发请求数
46            max_requests_per_second: Some(50.0),    // 增加请求速率限制
47        }
48    }
49}
50
51/// Generic IPC HTTP client that works on both Unix and Windows platforms
52///
53/// This client is optimized for request-response patterns with connection pooling support.
54/// For streaming functionality, use `IpcStreamClient` instead.
55pub struct IpcHttpClient {
56    name: Name<'static>,
57    config: ClientConfig,
58    pool: Option<ConnectionPool>,
59    retry_executor: RetryExecutor,
60    /// 专门用于PUT请求的重试执行器
61    put_retry_executor: RetryExecutor,
62}
63
64/// HTTP request builder for fluent API
65pub struct HttpRequestBuilder<'a> {
66    client: &'a IpcHttpClient,
67    method: Method,
68    path: String,
69    body: Option<Value>,
70    timeout: Option<Duration>,
71    headers: Vec<(String, String)>,
72    /// PUT专用优化标志
73    put_optimized: bool,
74    /// 预期数据大小,用于选择合适的缓冲区和超时
75    expected_size: Option<usize>,
76}
77
78/// Enhanced HTTP response wrapper with chainable methods
79#[derive(Debug)]
80pub struct HttpResponse {
81    inner: Response,
82}
83
84impl HttpResponse {
85    fn new(response: Response) -> Self {
86        Self { inner: response }
87    }
88
89    /// Get the HTTP status code
90    pub fn status(&self) -> u16 {
91        self.inner.status_code()
92    }
93
94    /// Get response headers as JSON value (for backward compatibility)
95    pub fn headers(&self) -> Value {
96        let headers_map: std::collections::HashMap<String, String> = self
97            .inner
98            .headers()
99            .iter()
100            .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
101            .collect();
102        serde_json::to_value(headers_map).unwrap_or(Value::Null)
103    }
104
105    /// Get response body as string
106    pub fn body(&self) -> Result<String> {
107        self.inner.text()
108    }
109
110    /// Check if response indicates success (2xx status)
111    pub fn is_success(&self) -> bool {
112        self.inner.is_success()
113    }
114
115    /// Check if response indicates client error (4xx status)
116    pub fn is_client_error(&self) -> bool {
117        self.inner.is_client_error()
118    }
119
120    /// Check if response indicates server error (5xx status)
121    pub fn is_server_error(&self) -> bool {
122        self.inner.is_server_error()
123    }
124
125    /// Get content length from headers
126    pub fn content_length(&self) -> u64 {
127        self.inner.content_length().unwrap_or(0)
128    }
129
130    /// Parse response body as JSON
131    pub fn json<T>(&self) -> Result<T>
132    where
133        T: DeserializeOwned,
134    {
135        self.inner.json()
136    }
137
138    /// Parse response body as generic JSON value
139    pub fn json_value(&self) -> Result<Value> {
140        self.inner.json_value()
141    }
142
143    /// Get the underlying modern Response
144    pub fn into_inner(self) -> Response {
145        self.inner
146    }
147
148    /// Convert to legacy Response format for backward compatibility
149    pub fn to_legacy(&self) -> crate::response::LegacyResponse {
150        self.inner.to_legacy()
151    }
152}
153
154impl IpcHttpClient {
155    /// Create a new IPC HTTP client with default configuration
156    pub fn new<P>(path: P) -> Result<Self>
157    where
158        P: AsRef<Path>,
159    {
160        Self::with_config(path, ClientConfig::default())
161    }
162
163    /// Create a new IPC HTTP client with custom configuration
164    pub fn with_config<P>(path: P, config: ClientConfig) -> Result<Self>
165    where
166        P: AsRef<Path>,
167    {
168        let name = path
169            .as_ref()
170            .to_fs_name::<GenericFilePath>()
171            .map_err(|e| KodeBridgeError::configuration(format!("Invalid path: {}", e)))?
172            .into_owned();
173
174        let pool = if config.enable_pooling {
175            Some(ConnectionPool::new(
176                name.clone(),
177                config.pool_config.clone(),
178            ))
179        } else {
180            None
181        };
182
183        // Create retry executor with optimized configuration for different request types
184        let retry_config = RetryConfig::for_network_operations()
185            .max_attempts(config.max_retries.min(3)) // 限制最大重试次数以提高响应速度
186            .base_delay(Duration::from_millis(
187                config.pool_config.retry_delay_ms.min(25),
188            )); // 更快的重试
189
190        let retry_executor = RetryExecutor::new(retry_config);
191
192        // 创建专门用于PUT请求的快速重试执行器
193        let put_retry_config = RetryConfig::for_put_requests();
194        let put_retry_executor = RetryExecutor::new(put_retry_config);
195
196        Ok(Self {
197            name,
198            config,
199            pool,
200            retry_executor,
201            put_retry_executor,
202        })
203    }
204
205    /// Create a direct connection (bypassing pool)
206    async fn create_direct_connection(&self) -> Result<LocalSocketStream> {
207        let mut last_error = None;
208
209        for attempt in 0..self.config.max_retries {
210            if attempt > 0 {
211                tokio::time::sleep(self.config.retry_delay).await;
212            }
213
214            match LocalSocketStream::connect(self.name.clone()).await {
215                Ok(stream) => {
216                    debug!("Created direct connection on attempt {}", attempt + 1);
217                    return Ok(stream);
218                }
219                Err(e) => {
220                    trace!("Connection attempt {} failed: {}", attempt + 1, e);
221                    last_error = Some(e);
222                }
223            }
224        }
225
226        Err(KodeBridgeError::connection(format!(
227            "Failed to create connection after {} attempts: {}",
228            self.config.max_retries,
229            last_error
230                .map(|e| e.to_string())
231                .unwrap_or_else(|| "Unknown error".to_string())
232        )))
233    }
234
235    /// Get a connection (from pool or create new)
236    async fn get_connection(&self) -> Result<Either<PooledConnection, LocalSocketStream>> {
237        let metrics = global_metrics();
238
239        if let Some(ref pool) = self.pool {
240            match pool.get_connection().await {
241                Ok(conn) => {
242                    metrics.connection_created(true); // From pool
243                    Ok(Either::Pool(conn))
244                }
245                Err(e) => {
246                    metrics.connection_failed();
247                    Err(e)
248                }
249            }
250        } else {
251            match self.create_direct_connection().await {
252                Ok(stream) => {
253                    metrics.connection_created(false); // Direct connection
254                    Ok(Either::Direct(stream))
255                }
256                Err(e) => {
257                    metrics.connection_failed();
258                    Err(e)
259                }
260            }
261        }
262    }
263
264    /// Legacy request method for backward compatibility
265    pub async fn request(
266        &self,
267        method: &str,
268        path: &str,
269        body: Option<&serde_json::Value>,
270    ) -> crate::errors::AnyResult<crate::response::LegacyResponse> {
271        let response = self
272            .send_request_internal(method, path, body, self.config.default_timeout)
273            .await?;
274        Ok(response.to_legacy())
275    }
276
277    /// Internal method to send requests with enhanced retry logic
278    async fn send_request_internal(
279        &self,
280        method: &str,
281        path: &str,
282        body: Option<&Value>,
283        timeout: Duration,
284    ) -> Result<Response> {
285        let method = Method::from_str(method)
286            .map_err(|e| KodeBridgeError::invalid_request(format!("Invalid method: {}", e)))?;
287
288        let mut builder = RequestBuilder::new(method.clone(), path.to_string());
289
290        if let Some(json_body) = body {
291            builder = builder.json(json_body)?;
292        }
293
294        let request = builder.build()?;
295
296        // Use smart retry mechanism
297        self.retry_executor
298            .execute_with_context(&format!("{} {}", method.as_str(), path), || async {
299                // Execute with timeout
300                let result = tokio::time::timeout(timeout, async {
301                    let mut connection = self.get_connection().await?;
302
303                    match &mut connection {
304                        Either::Pool(conn) => {
305                            if let Some(stream) = conn.stream() {
306                                send_request(stream, request.clone()).await
307                            } else {
308                                Err(KodeBridgeError::connection("Pooled connection is invalid"))
309                            }
310                        }
311                        Either::Direct(stream) => send_request(stream, request.clone()).await,
312                    }
313                })
314                .await;
315
316                match result {
317                    Ok(response) => response,
318                    Err(_) => Err(KodeBridgeError::timeout(timeout.as_millis() as u64)),
319                }
320            })
321            .await
322    }
323
324    /// Enhanced request sending with PUT optimization support
325    async fn send_request_with_optimization(
326        &self,
327        method: &str,
328        path: &str,
329        body: Option<&Value>,
330        timeout: Duration,
331        is_put_optimized: bool,
332        expected_size: Option<usize>,
333    ) -> Result<Response> {
334        let method_enum = Method::from_str(method)
335            .map_err(|e| KodeBridgeError::invalid_request(format!("Invalid method: {}", e)))?;
336
337        let mut builder = RequestBuilder::new(method_enum.clone(), path.to_string());
338
339        if let Some(json_body) = body {
340            builder = builder.json(json_body)?;
341        }
342
343        let request = builder.build()?;
344
345        // PUT请求使用专门的重试策略
346        let retry_context = if is_put_optimized {
347            format!("PUT_OPTIMIZED {}", path)
348        } else {
349            format!("{} {}", method, path)
350        };
351
352        // Use smart retry mechanism with PUT optimization
353        let retry_executor = if is_put_optimized {
354            &self.put_retry_executor // 使用PUT专用的快速重试器
355        } else {
356            &self.retry_executor // 使用通用重试器
357        };
358
359        retry_executor
360            .execute_with_context(&retry_context, || async {
361                // Execute with timeout
362                let result = tokio::time::timeout(timeout, async {
363                    let mut connection = if is_put_optimized && expected_size.unwrap_or(0) > 10240 {
364                        // 对于大的PUT请求,优先获取新连接
365                        self.get_fresh_connection().await?
366                    } else {
367                        self.get_connection().await?
368                    };
369
370                    match &mut connection {
371                        Either::Pool(conn) => {
372                            if let Some(stream) = conn.stream() {
373                                send_request(stream, request.clone()).await
374                            } else {
375                                Err(KodeBridgeError::connection("Pooled connection is invalid"))
376                            }
377                        }
378                        Either::Direct(stream) => send_request(stream, request.clone()).await,
379                    }
380                })
381                .await;
382
383                match result {
384                    Ok(response) => response,
385                    Err(_) => Err(KodeBridgeError::timeout(timeout.as_millis() as u64)),
386                }
387            })
388            .await
389    }
390
391    /// Get a fresh connection optimized for PUT requests
392    async fn get_fresh_connection(&self) -> Result<Either<PooledConnection, LocalSocketStream>> {
393        use interprocess::local_socket::tokio::prelude::LocalSocketStream;
394
395        // 首先尝试从连接池获取新连接
396        if let Some(ref pool) = self.pool {
397            match tokio::time::timeout(Duration::from_millis(20), pool.get_fresh_connection()).await
398            {
399                Ok(Ok(conn)) => return Ok(Either::Pool(conn)),
400                Ok(Err(_)) | Err(_) => {
401                    // 池化新连接失败,继续尝试直接连接
402                }
403            }
404        }
405
406        // 直接创建连接,使用更快的超时设置
407        match tokio::time::timeout(
408            Duration::from_millis(100),
409            LocalSocketStream::connect(self.name.clone()),
410        )
411        .await
412        {
413            Ok(Ok(stream)) => Ok(Either::Direct(stream)),
414            Ok(Err(_)) | Err(_) => {
415                // 如果直接连接失败,回退到普通池化连接
416                if let Some(ref pool) = self.pool {
417                    let conn = pool.get_connection().await?;
418                    Ok(Either::Pool(conn))
419                } else {
420                    Err(KodeBridgeError::connection(
421                        "Failed to get fresh connection",
422                    ))
423                }
424            }
425        }
426    }
427
428    /// GET request
429    pub fn get(&self, path: &str) -> HttpRequestBuilder<'_> {
430        HttpRequestBuilder::new(self, Method::GET, path)
431    }
432
433    /// POST request
434    pub fn post(&self, path: &str) -> HttpRequestBuilder<'_> {
435        HttpRequestBuilder::new(self, Method::POST, path)
436    }
437
438    /// PUT request with optimization enabled by default
439    pub fn put(&self, path: &str) -> HttpRequestBuilder<'_> {
440        HttpRequestBuilder::new(self, Method::PUT, path)
441    }
442
443    /// Optimized batch PUT operations
444    pub async fn put_batch(&self, requests: Vec<(String, Value)>) -> Result<Vec<HttpResponse>> {
445        let batch_size = requests.len();
446        if batch_size == 0 {
447            return Ok(Vec::new());
448        }
449
450        // 限制并发数以避免过载
451        let concurrent_limit = std::cmp::min(self.config.max_concurrent_requests, batch_size);
452        let mut responses = Vec::with_capacity(batch_size);
453
454        // 分批处理以控制内存使用和网络负载
455        for chunk in requests.chunks(concurrent_limit) {
456            let mut futures = Vec::new();
457
458            for (path, body) in chunk {
459                let path = path.clone();
460                let body = body.clone();
461
462                let future = self.put(&path).json_body(&body).optimize_for_put().send();
463
464                futures.push(future);
465            }
466
467            // 并发等待当前批次完成
468            let chunk_results = futures::future::join_all(futures).await;
469
470            for result in chunk_results {
471                match result {
472                    Ok(response) => responses.push(response),
473                    Err(e) => return Err(e),
474                }
475            }
476        }
477
478        Ok(responses)
479    }
480
481    /// DELETE request
482    pub fn delete(&self, path: &str) -> HttpRequestBuilder<'_> {
483        HttpRequestBuilder::new(self, Method::DELETE, path)
484    }
485
486    /// PATCH request
487    pub fn patch(&self, path: &str) -> HttpRequestBuilder<'_> {
488        HttpRequestBuilder::new(self, Method::PATCH, path)
489    }
490
491    /// HEAD request
492    pub fn head(&self, path: &str) -> HttpRequestBuilder<'_> {
493        HttpRequestBuilder::new(self, Method::HEAD, path)
494    }
495
496    /// OPTIONS request
497    pub fn options(&self, path: &str) -> HttpRequestBuilder<'_> {
498        HttpRequestBuilder::new(self, Method::OPTIONS, path)
499    }
500
501    /// Get pool statistics (if pooling is enabled)
502    pub fn pool_stats(&self) -> Option<crate::pool::PoolStats> {
503        self.pool.as_ref().map(|p| p.stats())
504    }
505
506    /// Close the client and clean up resources
507    pub fn close(&self) {
508        if let Some(ref pool) = self.pool {
509            pool.close();
510        }
511    }
512
513    /// Preheat connections for better PUT performance
514    pub async fn preheat_for_puts(&self, count: usize) {
515        if let Some(ref pool) = self.pool {
516            pool.preheat_for_puts(count).await;
517        }
518    }
519
520    /// Smart timeout calculation based on request characteristics
521    fn calculate_smart_timeout(&self, method: &str, body_size: Option<usize>) -> Duration {
522        match method {
523            "PUT" | "POST" => {
524                match body_size {
525                    Some(size) if size > 5 * 1024 * 1024 => Duration::from_secs(30), // >5MB: 30s
526                    Some(size) if size > 1024 * 1024 => Duration::from_secs(15),     // >1MB: 15s
527                    Some(size) if size > 100 * 1024 => Duration::from_secs(8),       // >100KB: 8s
528                    Some(size) if size > 10 * 1024 => Duration::from_secs(4),        // >10KB: 4s
529                    _ => Duration::from_secs(2),                                     // 小请求: 2s
530                }
531            }
532            _ => self.config.default_timeout, // 其他方法使用默认超时
533        }
534    }
535}
536
537impl<'a> HttpRequestBuilder<'a> {
538    fn new(client: &'a IpcHttpClient, method: Method, path: &str) -> Self {
539        let is_put = method == Method::PUT;
540        Self {
541            client,
542            method,
543            path: path.to_string(),
544            body: None,
545            timeout: None,
546            headers: Vec::new(),
547            put_optimized: is_put, // 自动为PUT请求启用优化
548            expected_size: None,
549        }
550    }
551
552    /// Set JSON body
553    pub fn json_body(mut self, body: &Value) -> Self {
554        self.body = Some(body.clone());
555
556        // 为PUT请求估算数据大小以优化处理
557        if self.method == Method::PUT {
558            if let Ok(json_bytes) = serde_json::to_vec(body) {
559                self.expected_size = Some(json_bytes.len());
560            }
561        }
562
563        self
564    }
565
566    /// Set custom timeout
567    pub fn timeout(mut self, timeout: Duration) -> Self {
568        self.timeout = Some(timeout);
569        self
570    }
571
572    /// 设置预期数据大小(用于PUT请求优化)
573    pub fn expected_size(mut self, size: usize) -> Self {
574        self.expected_size = Some(size);
575        self
576    }
577
578    /// 启用PUT请求专门优化
579    pub fn optimize_for_put(mut self) -> Self {
580        self.put_optimized = true;
581        self
582    }
583
584    /// Add custom header
585    pub fn header<K, V>(mut self, key: K, value: V) -> Self
586    where
587        K: Into<String>,
588        V: Into<String>,
589    {
590        self.headers.push((key.into(), value.into()));
591        self
592    }
593
594    /// Send the request
595    pub async fn send(self) -> Result<HttpResponse> {
596        let metrics = global_metrics();
597        let tracker = metrics.request_start(self.method.as_str());
598
599        // 为PUT请求优化超时设置,使用智能超时计算
600        let timeout = if self.put_optimized {
601            self.timeout.unwrap_or_else(|| {
602                self.client
603                    .calculate_smart_timeout(self.method.as_str(), self.expected_size)
604            })
605        } else {
606            self.timeout.unwrap_or(self.client.config.default_timeout)
607        };
608
609        match self
610            .client
611            .send_request_with_optimization(
612                self.method.as_str(),
613                &self.path,
614                self.body.as_ref(),
615                timeout,
616                self.put_optimized,
617                self.expected_size,
618            )
619            .await
620        {
621            Ok(response) => {
622                tracker.success(response.status_code());
623                Ok(HttpResponse::new(response))
624            }
625            Err(e) => {
626                tracker.failure(&format!("{:?}", e));
627                Err(e)
628            }
629        }
630    }
631}
632
633/// Helper enum for connection types
634enum Either<A, B> {
635    Pool(A),
636    Direct(B),
637}
638
639impl Drop for IpcHttpClient {
640    fn drop(&mut self) {
641        self.close();
642    }
643}