kode_bridge/
ipc_http_client.rs

1use std::path::Path;
2use std::time::Duration;
3
4use interprocess::local_socket::tokio::prelude::LocalSocketStream;
5use interprocess::local_socket::traits::tokio::Stream as _;
6use interprocess::local_socket::{GenericFilePath, Name, ToFsName as _};
7use serde::de::DeserializeOwned;
8use serde_json::Value;
9
10use crate::errors::{KodeBridgeError, Result};
11use crate::http_client::{send_request, RequestBuilder, Response};
12use crate::metrics::global_metrics;
13use crate::pool::{ConnectionPool, PoolConfig, PooledConnection};
14use crate::retry::{RetryConfig, RetryExecutor};
15use http::Method;
16use std::str::FromStr as _;
17use tracing::{debug, trace};
18
19/// Configuration for IPC HTTP client
20#[derive(Debug, Clone)]
21pub struct ClientConfig {
22    /// Default timeout for requests
23    pub default_timeout: Duration,
24    /// Connection pool configuration
25    pub pool_config: PoolConfig,
26    /// Enable connection pooling
27    pub enable_pooling: bool,
28    /// Retry configuration
29    pub max_retries: usize,
30    pub retry_delay: Duration,
31    /// Concurrent request limit
32    pub max_concurrent_requests: usize,
33    /// Rate limiting: max requests per second
34    pub max_requests_per_second: Option<f64>,
35}
36
37impl Default for ClientConfig {
38    fn default() -> Self {
39        Self {
40            default_timeout: Duration::from_secs(5), // 减少默认超时到5秒
41            pool_config: PoolConfig::default(),
42            enable_pooling: true,
43            max_retries: 3,                         // 减少重试次数
44            retry_delay: Duration::from_millis(25), // 减少重试延迟
45            max_concurrent_requests: 16,            // 增加并发请求数
46            max_requests_per_second: Some(50.0),    // 增加请求速率限制
47        }
48    }
49}
50
51/// Generic IPC HTTP client that works on both Unix and Windows platforms
52///
53/// This client is optimized for request-response patterns with connection pooling support.
54/// For streaming functionality, use `IpcStreamClient` instead.
55pub struct IpcHttpClient {
56    name: Name<'static>,
57    config: ClientConfig,
58    pool: Option<ConnectionPool>,
59    retry_executor: RetryExecutor,
60    /// 专门用于PUT请求的重试执行器
61    put_retry_executor: RetryExecutor,
62}
63
64/// HTTP request builder for fluent API
65pub struct HttpRequestBuilder<'a> {
66    client: &'a IpcHttpClient,
67    method: Method,
68    path: String,
69    body: Option<Value>,
70    timeout: Option<Duration>,
71    headers: Vec<(String, String)>,
72    /// PUT专用优化标志
73    put_optimized: bool,
74    /// 预期数据大小,用于选择合适的缓冲区和超时
75    expected_size: Option<usize>,
76}
77
78/// Enhanced HTTP response wrapper with chainable methods
79#[derive(Debug)]
80pub struct HttpResponse {
81    inner: Response,
82}
83
84impl HttpResponse {
85    const fn new(response: Response) -> Self {
86        Self { inner: response }
87    }
88
89    /// Get the HTTP status code
90    pub const fn status(&self) -> u16 {
91        self.inner.status_code()
92    }
93
94    /// Get response headers as JSON value (for backward compatibility)
95    pub fn headers(&self) -> Value {
96        let headers_map: std::collections::HashMap<String, String> = self
97            .inner
98            .headers()
99            .iter()
100            .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
101            .collect();
102        serde_json::to_value(headers_map).unwrap_or(Value::Null)
103    }
104
105    /// Get response body as string
106    pub fn body(&self) -> Result<String> {
107        self.inner.text()
108    }
109
110    /// Check if response indicates success (2xx status)
111    pub fn is_success(&self) -> bool {
112        self.inner.is_success()
113    }
114
115    /// Check if response indicates client error (4xx status)
116    pub fn is_client_error(&self) -> bool {
117        self.inner.is_client_error()
118    }
119
120    /// Check if response indicates server error (5xx status)
121    pub fn is_server_error(&self) -> bool {
122        self.inner.is_server_error()
123    }
124
125    /// Get content length from headers
126    pub fn content_length(&self) -> u64 {
127        self.inner.content_length().unwrap_or(0)
128    }
129
130    /// Parse response body as JSON
131    pub fn json<T>(&self) -> Result<T>
132    where
133        T: DeserializeOwned,
134    {
135        self.inner.json()
136    }
137
138    /// Parse response body as generic JSON value
139    pub fn json_value(&self) -> Result<Value> {
140        self.inner.json_value()
141    }
142
143    /// Get the underlying modern Response
144    pub fn into_inner(self) -> Response {
145        self.inner
146    }
147
148    /// Convert to legacy Response format for backward compatibility
149    pub fn to_legacy(&self) -> crate::response::LegacyResponse {
150        self.inner.to_legacy()
151    }
152}
153
154impl IpcHttpClient {
155    /// Create a new IPC HTTP client with default configuration
156    pub fn new<P>(path: P) -> Result<Self>
157    where
158        P: AsRef<Path>,
159    {
160        Self::with_config(path, ClientConfig::default())
161    }
162
163    /// Create a new IPC HTTP client with custom configuration
164    pub fn with_config<P>(path: P, config: ClientConfig) -> Result<Self>
165    where
166        P: AsRef<Path>,
167    {
168        let name = path
169            .as_ref()
170            .to_fs_name::<GenericFilePath>()
171            .map_err(|e| KodeBridgeError::configuration(format!("Invalid path: {}", e)))?
172            .into_owned();
173
174        let pool = if config.enable_pooling {
175            Some(ConnectionPool::new(name.clone(), config.pool_config.clone()))
176        } else {
177            None
178        };
179
180        // Create retry executor with optimized configuration for different request types
181        let retry_config = RetryConfig::for_network_operations()
182            .max_attempts(config.max_retries)
183            .base_delay(Duration::from_millis(config.pool_config.retry_delay_ms));
184
185        let retry_executor = RetryExecutor::new(retry_config);
186
187        // 创建专门用于PUT请求的快速重试执行器
188        let put_retry_config = RetryConfig::for_put_requests();
189        let put_retry_executor = RetryExecutor::new(put_retry_config);
190
191        Ok(Self {
192            name,
193            config,
194            pool,
195            retry_executor,
196            put_retry_executor,
197        })
198    }
199
200    /// Create a direct connection (bypassing pool)
201    async fn create_direct_connection(&self) -> Result<LocalSocketStream> {
202        let mut last_error = None;
203
204        for attempt in 0..self.config.max_retries {
205            if attempt > 0 {
206                tokio::time::sleep(self.config.retry_delay).await;
207            }
208
209            match LocalSocketStream::connect(self.name.clone()).await {
210                Ok(stream) => {
211                    debug!("Created direct connection on attempt {}", attempt + 1);
212                    return Ok(stream);
213                }
214                Err(e) => {
215                    trace!("Connection attempt {} failed: {}", attempt + 1, e);
216                    last_error = Some(e);
217                }
218            }
219        }
220
221        Err(KodeBridgeError::connection(format!(
222            "Failed to create connection after {} attempts: {}",
223            self.config.max_retries,
224            last_error
225                .map(|e| e.to_string())
226                .unwrap_or_else(|| "Unknown error".to_string())
227        )))
228    }
229
230    /// Get a connection (from pool or create new)
231    async fn get_connection(&self) -> Result<Either<PooledConnection, LocalSocketStream>> {
232        let metrics = global_metrics();
233
234        if let Some(ref pool) = self.pool {
235            match pool.get_connection().await {
236                Ok(conn) => {
237                    metrics.connection_created(true); // From pool
238                    Ok(Either::Pool(conn))
239                }
240                Err(e) => {
241                    metrics.connection_failed();
242                    Err(e)
243                }
244            }
245        } else {
246            match self.create_direct_connection().await {
247                Ok(stream) => {
248                    metrics.connection_created(false); // Direct connection
249                    Ok(Either::Direct(stream))
250                }
251                Err(e) => {
252                    metrics.connection_failed();
253                    Err(e)
254                }
255            }
256        }
257    }
258
259    /// Legacy request method for backward compatibility
260    pub async fn request(
261        &self,
262        method: &str,
263        path: &str,
264        body: Option<&serde_json::Value>,
265    ) -> crate::errors::AnyResult<crate::response::LegacyResponse> {
266        let response = self
267            .send_request_internal(method, path, body, self.config.default_timeout)
268            .await?;
269        Ok(response.to_legacy())
270    }
271
272    /// Internal method to send requests with enhanced retry logic
273    async fn send_request_internal(
274        &self,
275        method: &str,
276        path: &str,
277        body: Option<&Value>,
278        timeout: Duration,
279    ) -> Result<Response> {
280        let method =
281            Method::from_str(method).map_err(|e| KodeBridgeError::invalid_request(format!("Invalid method: {}", e)))?;
282
283        let mut builder = RequestBuilder::new(method.clone(), path.to_string());
284
285        // Note: This method doesn't support custom headers for backward compatibility
286        // Use the fluent API (get(), post(), etc.) for custom headers
287
288        if let Some(json_body) = body {
289            builder = builder.json(json_body)?;
290        }
291
292        let request = builder.build()?;
293
294        // Use smart retry mechanism
295        self.retry_executor
296            .execute_with_context(&format!("{} {}", method.as_str(), path), || async {
297                // Execute with timeout
298                let result = tokio::time::timeout(timeout, async {
299                    let mut connection = self.get_connection().await?;
300
301                    match &mut connection {
302                        Either::Pool(conn) => {
303                            if let Some(stream) = conn.stream() {
304                                send_request(stream, request.clone()).await
305                            } else {
306                                Err(KodeBridgeError::connection("Pooled connection is invalid"))
307                            }
308                        }
309                        Either::Direct(stream) => send_request(stream, request.clone()).await,
310                    }
311                })
312                .await;
313
314                match result {
315                    Ok(response) => response,
316                    Err(_) => Err(KodeBridgeError::timeout(timeout.as_millis() as u64)),
317                }
318            })
319            .await
320    }
321
322    /// Enhanced request sending with PUT optimization support
323    async fn send_request_with_optimization(
324        &self,
325        method: &str,
326        path: &str,
327        body: Option<&Value>,
328        headers: &[(String, String)],
329        timeout: Duration,
330        is_put_optimized: bool,
331        expected_size: Option<usize>,
332    ) -> Result<Response> {
333        let method_enum =
334            Method::from_str(method).map_err(|e| KodeBridgeError::invalid_request(format!("Invalid method: {}", e)))?;
335
336        let mut builder = RequestBuilder::new(method_enum.clone(), path.to_string());
337
338        // Add custom headers
339        for (key, value) in headers {
340            builder = builder.header(key.as_str(), value.as_str());
341        }
342
343        if let Some(json_body) = body {
344            builder = builder.json(json_body)?;
345        }
346
347        let request = builder.build()?;
348
349        // PUT请求使用专门的重试策略
350        let retry_context = if is_put_optimized {
351            format!("PUT_OPTIMIZED {}", path)
352        } else {
353            format!("{} {}", method, path)
354        };
355
356        // Use smart retry mechanism with PUT optimization
357        let retry_executor = if is_put_optimized {
358            &self.put_retry_executor // 使用PUT专用的快速重试器
359        } else {
360            &self.retry_executor // 使用通用重试器
361        };
362
363        retry_executor
364            .execute_with_context(&retry_context, || async {
365                // Execute with timeout
366                let result = tokio::time::timeout(timeout, async {
367                    let mut connection = if is_put_optimized && expected_size.unwrap_or(0) > 10240 {
368                        // 对于大的PUT请求,优先获取新连接
369                        self.get_fresh_connection().await?
370                    } else {
371                        self.get_connection().await?
372                    };
373
374                    match &mut connection {
375                        Either::Pool(conn) => {
376                            if let Some(stream) = conn.stream() {
377                                send_request(stream, request.clone()).await
378                            } else {
379                                Err(KodeBridgeError::connection("Pooled connection is invalid"))
380                            }
381                        }
382                        Either::Direct(stream) => send_request(stream, request.clone()).await,
383                    }
384                })
385                .await;
386
387                match result {
388                    Ok(response) => response,
389                    Err(_) => Err(KodeBridgeError::timeout(timeout.as_millis() as u64)),
390                }
391            })
392            .await
393    }
394
395    /// Get a fresh connection optimized for PUT requests
396    async fn get_fresh_connection(&self) -> Result<Either<PooledConnection, LocalSocketStream>> {
397        use interprocess::local_socket::tokio::prelude::LocalSocketStream;
398
399        // 首先尝试从连接池获取新连接
400        if let Some(ref pool) = self.pool {
401            match tokio::time::timeout(Duration::from_millis(20), pool.get_fresh_connection()).await {
402                Ok(Ok(conn)) => return Ok(Either::Pool(conn)),
403                Ok(Err(_)) | Err(_) => {
404                    // 池化新连接失败,继续尝试直接连接
405                }
406            }
407        }
408
409        // 直接创建连接,使用更快的超时设置
410        match tokio::time::timeout(
411            Duration::from_millis(100),
412            LocalSocketStream::connect(self.name.clone()),
413        )
414        .await
415        {
416            Ok(Ok(stream)) => Ok(Either::Direct(stream)),
417            Ok(Err(_)) | Err(_) => {
418                // 如果直接连接失败,回退到普通池化连接
419                if let Some(ref pool) = self.pool {
420                    let conn = pool.get_connection().await?;
421                    Ok(Either::Pool(conn))
422                } else {
423                    Err(KodeBridgeError::connection("Failed to get fresh connection"))
424                }
425            }
426        }
427    }
428
429    /// GET request
430    pub fn get(&self, path: &str) -> HttpRequestBuilder<'_> {
431        HttpRequestBuilder::new(self, Method::GET, path)
432    }
433
434    /// POST request
435    pub fn post(&self, path: &str) -> HttpRequestBuilder<'_> {
436        HttpRequestBuilder::new(self, Method::POST, path)
437    }
438
439    /// PUT request with optimization enabled by default
440    pub fn put(&self, path: &str) -> HttpRequestBuilder<'_> {
441        HttpRequestBuilder::new(self, Method::PUT, path)
442    }
443
444    /// Optimized batch PUT operations
445    pub async fn put_batch(&self, requests: Vec<(String, Value)>) -> Result<Vec<HttpResponse>> {
446        let batch_size = requests.len();
447        if batch_size == 0 {
448            return Ok(Vec::new());
449        }
450
451        // 限制并发数以避免过载
452        let concurrent_limit = std::cmp::min(self.config.max_concurrent_requests, batch_size);
453        let mut responses = Vec::with_capacity(batch_size);
454
455        // 分批处理以控制内存使用和网络负载
456        for chunk in requests.chunks(concurrent_limit) {
457            let mut futures = Vec::new();
458
459            for (path, body) in chunk {
460                let path = path.clone();
461                let body = body.clone();
462
463                let future = self.put(&path).json_body(&body).optimize_for_put().send();
464
465                futures.push(future);
466            }
467
468            // 并发等待当前批次完成
469            let chunk_results = futures::future::join_all(futures).await;
470
471            for result in chunk_results {
472                match result {
473                    Ok(response) => responses.push(response),
474                    Err(e) => return Err(e),
475                }
476            }
477        }
478
479        Ok(responses)
480    }
481
482    /// DELETE request
483    pub fn delete(&self, path: &str) -> HttpRequestBuilder<'_> {
484        HttpRequestBuilder::new(self, Method::DELETE, path)
485    }
486
487    /// PATCH request
488    pub fn patch(&self, path: &str) -> HttpRequestBuilder<'_> {
489        HttpRequestBuilder::new(self, Method::PATCH, path)
490    }
491
492    /// HEAD request
493    pub fn head(&self, path: &str) -> HttpRequestBuilder<'_> {
494        HttpRequestBuilder::new(self, Method::HEAD, path)
495    }
496
497    /// OPTIONS request
498    pub fn options(&self, path: &str) -> HttpRequestBuilder<'_> {
499        HttpRequestBuilder::new(self, Method::OPTIONS, path)
500    }
501
502    /// Get pool statistics (if pooling is enabled)
503    pub fn pool_stats(&self) -> Option<crate::pool::PoolStats> {
504        self.pool.as_ref().map(|p| p.stats())
505    }
506
507    /// Close the client and clean up resources
508    pub fn close(&self) {
509        if let Some(ref pool) = self.pool {
510            pool.close();
511        }
512    }
513
514    /// Preheat connections for better PUT performance
515    pub async fn preheat_for_puts(&self, count: usize) {
516        if let Some(ref pool) = self.pool {
517            pool.preheat_for_puts(count).await;
518        }
519    }
520
521    /// Smart timeout calculation based on request characteristics
522    fn calculate_smart_timeout(&self, method: &str, body_size: Option<usize>) -> Duration {
523        match method {
524            "PUT" | "POST" => {
525                match body_size {
526                    Some(size) if size > 5 * 1024 * 1024 => Duration::from_secs(30), // >5MB: 30s
527                    Some(size) if size > 1024 * 1024 => Duration::from_secs(15),     // >1MB: 15s
528                    Some(size) if size > 100 * 1024 => Duration::from_secs(8),       // >100KB: 8s
529                    Some(size) if size > 10 * 1024 => Duration::from_secs(4),        // >10KB: 4s
530                    _ => Duration::from_secs(2),                                     // 小请求: 2s
531                }
532            }
533            _ => self.config.default_timeout, // 其他方法使用默认超时
534        }
535    }
536}
537
538impl<'a> HttpRequestBuilder<'a> {
539    fn new(client: &'a IpcHttpClient, method: Method, path: &str) -> Self {
540        let is_put = method == Method::PUT;
541        Self {
542            client,
543            method,
544            path: path.to_string(),
545            body: None,
546            timeout: None,
547            headers: Vec::new(),
548            put_optimized: is_put, // 自动为PUT请求启用优化
549            expected_size: None,
550        }
551    }
552
553    /// Set JSON body
554    pub fn json_body(mut self, body: &Value) -> Self {
555        self.body = Some(body.clone());
556
557        // 为PUT请求估算数据大小以优化处理
558        if self.method == Method::PUT {
559            if let Ok(json_bytes) = serde_json::to_vec(body) {
560                self.expected_size = Some(json_bytes.len());
561            }
562        }
563
564        self
565    }
566
567    /// Set custom timeout
568    pub const fn timeout(mut self, timeout: Duration) -> Self {
569        self.timeout = Some(timeout);
570        self
571    }
572
573    /// 设置预期数据大小(用于PUT请求优化)
574    pub const fn expected_size(mut self, size: usize) -> Self {
575        self.expected_size = Some(size);
576        self
577    }
578
579    /// 启用PUT请求专门优化
580    pub const fn optimize_for_put(mut self) -> Self {
581        self.put_optimized = true;
582        self
583    }
584
585    /// Add custom header
586    pub fn header<K, V>(mut self, key: K, value: V) -> Self
587    where
588        K: Into<String>,
589        V: Into<String>,
590    {
591        self.headers.push((key.into(), value.into()));
592        self
593    }
594
595    /// Send the request
596    pub async fn send(self) -> Result<HttpResponse> {
597        let metrics = global_metrics();
598        let tracker = metrics.request_start(self.method.as_str());
599
600        // 为PUT请求优化超时设置,使用智能超时计算
601        let timeout = if self.put_optimized {
602            self.timeout.unwrap_or_else(|| {
603                self.client
604                    .calculate_smart_timeout(self.method.as_str(), self.expected_size)
605            })
606        } else {
607            self.timeout.unwrap_or(self.client.config.default_timeout)
608        };
609
610        match self
611            .client
612            .send_request_with_optimization(
613                self.method.as_str(),
614                &self.path,
615                self.body.as_ref(),
616                &self.headers,
617                timeout,
618                self.put_optimized,
619                self.expected_size,
620            )
621            .await
622        {
623            Ok(response) => {
624                tracker.success(response.status_code());
625                Ok(HttpResponse::new(response))
626            }
627            Err(e) => {
628                tracker.failure(&format!("{:?}", e));
629                Err(e)
630            }
631        }
632    }
633}
634
635/// Helper enum for connection types
636enum Either<A, B> {
637    Pool(A),
638    Direct(B),
639}
640
641impl Drop for IpcHttpClient {
642    fn drop(&mut self) {
643        self.close();
644    }
645}