kode_bridge/
ipc_http_client.rs

1use std::path::Path;
2use std::time::Duration;
3
4use interprocess::local_socket::tokio::prelude::LocalSocketStream;
5use interprocess::local_socket::traits::tokio::Stream;
6use interprocess::local_socket::{GenericFilePath, Name, ToFsName};
7use serde::de::DeserializeOwned;
8use serde_json::Value;
9
10use crate::errors::{KodeBridgeError, Result};
11use crate::http_client::{send_request, RequestBuilder, Response};
12use crate::metrics::global_metrics;
13use crate::pool::{ConnectionPool, PoolConfig, PooledConnection};
14use crate::retry::{RetryConfig, RetryExecutor};
15use http::Method;
16use std::str::FromStr;
17use tracing::{debug, trace};
18
19/// Configuration for IPC HTTP client
20#[derive(Debug, Clone)]
21pub struct ClientConfig {
22    /// Default timeout for requests
23    pub default_timeout: Duration,
24    /// Connection pool configuration
25    pub pool_config: PoolConfig,
26    /// Enable connection pooling
27    pub enable_pooling: bool,
28    /// Retry configuration
29    pub max_retries: usize,
30    pub retry_delay: Duration,
31    /// Concurrent request limit
32    pub max_concurrent_requests: usize,
33    /// Rate limiting: max requests per second
34    pub max_requests_per_second: Option<f64>,
35}
36
37impl Default for ClientConfig {
38    fn default() -> Self {
39        Self {
40            default_timeout: Duration::from_secs(5), // 减少默认超时到5秒
41            pool_config: PoolConfig::default(),
42            enable_pooling: true,
43            max_retries: 3,                         // 减少重试次数
44            retry_delay: Duration::from_millis(25), // 减少重试延迟
45            max_concurrent_requests: 16,            // 增加并发请求数
46            max_requests_per_second: Some(50.0),    // 增加请求速率限制
47        }
48    }
49}
50
51/// Generic IPC HTTP client that works on both Unix and Windows platforms
52///
53/// This client is optimized for request-response patterns with connection pooling support.
54/// For streaming functionality, use `IpcStreamClient` instead.
55pub struct IpcHttpClient {
56    name: Name<'static>,
57    config: ClientConfig,
58    pool: Option<ConnectionPool>,
59    retry_executor: RetryExecutor,
60    /// 专门用于PUT请求的重试执行器
61    put_retry_executor: RetryExecutor,
62}
63
64/// HTTP request builder for fluent API
65pub struct HttpRequestBuilder<'a> {
66    client: &'a IpcHttpClient,
67    method: Method,
68    path: String,
69    body: Option<Value>,
70    timeout: Option<Duration>,
71    headers: Vec<(String, String)>,
72    /// PUT专用优化标志
73    put_optimized: bool,
74    /// 预期数据大小,用于选择合适的缓冲区和超时
75    expected_size: Option<usize>,
76}
77
78/// Enhanced HTTP response wrapper with chainable methods
79#[derive(Debug)]
80pub struct HttpResponse {
81    inner: Response,
82}
83
84impl HttpResponse {
85    fn new(response: Response) -> Self {
86        Self { inner: response }
87    }
88
89    /// Get the HTTP status code
90    pub fn status(&self) -> u16 {
91        self.inner.status_code()
92    }
93
94    /// Get response headers as JSON value (for backward compatibility)
95    pub fn headers(&self) -> Value {
96        let headers_map: std::collections::HashMap<String, String> = self
97            .inner
98            .headers()
99            .iter()
100            .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
101            .collect();
102        serde_json::to_value(headers_map).unwrap_or(Value::Null)
103    }
104
105    /// Get response body as string
106    pub fn body(&self) -> Result<String> {
107        self.inner.text()
108    }
109
110    /// Check if response indicates success (2xx status)
111    pub fn is_success(&self) -> bool {
112        self.inner.is_success()
113    }
114
115    /// Check if response indicates client error (4xx status)
116    pub fn is_client_error(&self) -> bool {
117        self.inner.is_client_error()
118    }
119
120    /// Check if response indicates server error (5xx status)
121    pub fn is_server_error(&self) -> bool {
122        self.inner.is_server_error()
123    }
124
125    /// Get content length from headers
126    pub fn content_length(&self) -> u64 {
127        self.inner.content_length().unwrap_or(0)
128    }
129
130    /// Parse response body as JSON
131    pub fn json<T>(&self) -> Result<T>
132    where
133        T: DeserializeOwned,
134    {
135        self.inner.json()
136    }
137
138    /// Parse response body as generic JSON value
139    pub fn json_value(&self) -> Result<Value> {
140        self.inner.json_value()
141    }
142
143    /// Get the underlying modern Response
144    pub fn into_inner(self) -> Response {
145        self.inner
146    }
147
148    /// Convert to legacy Response format for backward compatibility
149    pub fn to_legacy(&self) -> crate::response::LegacyResponse {
150        self.inner.to_legacy()
151    }
152}
153
154impl IpcHttpClient {
155    /// Create a new IPC HTTP client with default configuration
156    pub fn new<P>(path: P) -> Result<Self>
157    where
158        P: AsRef<Path>,
159    {
160        Self::with_config(path, ClientConfig::default())
161    }
162
163    /// Create a new IPC HTTP client with custom configuration
164    pub fn with_config<P>(path: P, config: ClientConfig) -> Result<Self>
165    where
166        P: AsRef<Path>,
167    {
168        let name = path
169            .as_ref()
170            .to_fs_name::<GenericFilePath>()
171            .map_err(|e| KodeBridgeError::configuration(format!("Invalid path: {}", e)))?
172            .into_owned();
173
174        let pool = if config.enable_pooling {
175            Some(ConnectionPool::new(
176                name.clone(),
177                config.pool_config.clone(),
178            ))
179        } else {
180            None
181        };
182
183        // Create retry executor with optimized configuration for different request types
184        let retry_config = RetryConfig::for_network_operations()
185            .max_attempts(config.max_retries)
186            .base_delay(Duration::from_millis(config.pool_config.retry_delay_ms));
187
188        let retry_executor = RetryExecutor::new(retry_config);
189
190        // 创建专门用于PUT请求的快速重试执行器
191        let put_retry_config = RetryConfig::for_put_requests();
192        let put_retry_executor = RetryExecutor::new(put_retry_config);
193
194        Ok(Self {
195            name,
196            config,
197            pool,
198            retry_executor,
199            put_retry_executor,
200        })
201    }
202
203    /// Create a direct connection (bypassing pool)
204    async fn create_direct_connection(&self) -> Result<LocalSocketStream> {
205        let mut last_error = None;
206
207        for attempt in 0..self.config.max_retries {
208            if attempt > 0 {
209                tokio::time::sleep(self.config.retry_delay).await;
210            }
211
212            match LocalSocketStream::connect(self.name.clone()).await {
213                Ok(stream) => {
214                    debug!("Created direct connection on attempt {}", attempt + 1);
215                    return Ok(stream);
216                }
217                Err(e) => {
218                    trace!("Connection attempt {} failed: {}", attempt + 1, e);
219                    last_error = Some(e);
220                }
221            }
222        }
223
224        Err(KodeBridgeError::connection(format!(
225            "Failed to create connection after {} attempts: {}",
226            self.config.max_retries,
227            last_error
228                .map(|e| e.to_string())
229                .unwrap_or_else(|| "Unknown error".to_string())
230        )))
231    }
232
233    /// Get a connection (from pool or create new)
234    async fn get_connection(&self) -> Result<Either<PooledConnection, LocalSocketStream>> {
235        let metrics = global_metrics();
236
237        if let Some(ref pool) = self.pool {
238            match pool.get_connection().await {
239                Ok(conn) => {
240                    metrics.connection_created(true); // From pool
241                    Ok(Either::Pool(conn))
242                }
243                Err(e) => {
244                    metrics.connection_failed();
245                    Err(e)
246                }
247            }
248        } else {
249            match self.create_direct_connection().await {
250                Ok(stream) => {
251                    metrics.connection_created(false); // Direct connection
252                    Ok(Either::Direct(stream))
253                }
254                Err(e) => {
255                    metrics.connection_failed();
256                    Err(e)
257                }
258            }
259        }
260    }
261
262    /// Legacy request method for backward compatibility
263    pub async fn request(
264        &self,
265        method: &str,
266        path: &str,
267        body: Option<&serde_json::Value>,
268    ) -> crate::errors::AnyResult<crate::response::LegacyResponse> {
269        let response = self
270            .send_request_internal(method, path, body, self.config.default_timeout)
271            .await?;
272        Ok(response.to_legacy())
273    }
274
275    /// Internal method to send requests with enhanced retry logic
276    async fn send_request_internal(
277        &self,
278        method: &str,
279        path: &str,
280        body: Option<&Value>,
281        timeout: Duration,
282    ) -> Result<Response> {
283        let method = Method::from_str(method)
284            .map_err(|e| KodeBridgeError::invalid_request(format!("Invalid method: {}", e)))?;
285
286        let mut builder = RequestBuilder::new(method.clone(), path.to_string());
287
288        // Note: This method doesn't support custom headers for backward compatibility
289        // Use the fluent API (get(), post(), etc.) for custom headers
290
291        if let Some(json_body) = body {
292            builder = builder.json(json_body)?;
293        }
294
295        let request = builder.build()?;
296
297        // Use smart retry mechanism
298        self.retry_executor
299            .execute_with_context(&format!("{} {}", method.as_str(), path), || async {
300                // Execute with timeout
301                let result = tokio::time::timeout(timeout, async {
302                    let mut connection = self.get_connection().await?;
303
304                    match &mut connection {
305                        Either::Pool(conn) => {
306                            if let Some(stream) = conn.stream() {
307                                send_request(stream, request.clone()).await
308                            } else {
309                                Err(KodeBridgeError::connection("Pooled connection is invalid"))
310                            }
311                        }
312                        Either::Direct(stream) => send_request(stream, request.clone()).await,
313                    }
314                })
315                .await;
316
317                match result {
318                    Ok(response) => response,
319                    Err(_) => Err(KodeBridgeError::timeout(timeout.as_millis() as u64)),
320                }
321            })
322            .await
323    }
324
325    /// Enhanced request sending with PUT optimization support
326    async fn send_request_with_optimization(
327        &self,
328        method: &str,
329        path: &str,
330        body: Option<&Value>,
331        headers: &[(String, String)],
332        timeout: Duration,
333        is_put_optimized: bool,
334        expected_size: Option<usize>,
335    ) -> Result<Response> {
336        let method_enum = Method::from_str(method)
337            .map_err(|e| KodeBridgeError::invalid_request(format!("Invalid method: {}", e)))?;
338
339        let mut builder = RequestBuilder::new(method_enum.clone(), path.to_string());
340
341        // Add custom headers
342        for (key, value) in headers {
343            builder = builder.header(key.as_str(), value.as_str());
344        }
345
346        if let Some(json_body) = body {
347            builder = builder.json(json_body)?;
348        }
349
350        let request = builder.build()?;
351
352        // PUT请求使用专门的重试策略
353        let retry_context = if is_put_optimized {
354            format!("PUT_OPTIMIZED {}", path)
355        } else {
356            format!("{} {}", method, path)
357        };
358
359        // Use smart retry mechanism with PUT optimization
360        let retry_executor = if is_put_optimized {
361            &self.put_retry_executor // 使用PUT专用的快速重试器
362        } else {
363            &self.retry_executor // 使用通用重试器
364        };
365
366        retry_executor
367            .execute_with_context(&retry_context, || async {
368                // Execute with timeout
369                let result = tokio::time::timeout(timeout, async {
370                    let mut connection = if is_put_optimized && expected_size.unwrap_or(0) > 10240 {
371                        // 对于大的PUT请求,优先获取新连接
372                        self.get_fresh_connection().await?
373                    } else {
374                        self.get_connection().await?
375                    };
376
377                    match &mut connection {
378                        Either::Pool(conn) => {
379                            if let Some(stream) = conn.stream() {
380                                send_request(stream, request.clone()).await
381                            } else {
382                                Err(KodeBridgeError::connection("Pooled connection is invalid"))
383                            }
384                        }
385                        Either::Direct(stream) => send_request(stream, request.clone()).await,
386                    }
387                })
388                .await;
389
390                match result {
391                    Ok(response) => response,
392                    Err(_) => Err(KodeBridgeError::timeout(timeout.as_millis() as u64)),
393                }
394            })
395            .await
396    }
397
398    /// Get a fresh connection optimized for PUT requests
399    async fn get_fresh_connection(&self) -> Result<Either<PooledConnection, LocalSocketStream>> {
400        use interprocess::local_socket::tokio::prelude::LocalSocketStream;
401
402        // 首先尝试从连接池获取新连接
403        if let Some(ref pool) = self.pool {
404            match tokio::time::timeout(Duration::from_millis(20), pool.get_fresh_connection()).await
405            {
406                Ok(Ok(conn)) => return Ok(Either::Pool(conn)),
407                Ok(Err(_)) | Err(_) => {
408                    // 池化新连接失败,继续尝试直接连接
409                }
410            }
411        }
412
413        // 直接创建连接,使用更快的超时设置
414        match tokio::time::timeout(
415            Duration::from_millis(100),
416            LocalSocketStream::connect(self.name.clone()),
417        )
418        .await
419        {
420            Ok(Ok(stream)) => Ok(Either::Direct(stream)),
421            Ok(Err(_)) | Err(_) => {
422                // 如果直接连接失败,回退到普通池化连接
423                if let Some(ref pool) = self.pool {
424                    let conn = pool.get_connection().await?;
425                    Ok(Either::Pool(conn))
426                } else {
427                    Err(KodeBridgeError::connection(
428                        "Failed to get fresh connection",
429                    ))
430                }
431            }
432        }
433    }
434
435    /// GET request
436    pub fn get(&self, path: &str) -> HttpRequestBuilder<'_> {
437        HttpRequestBuilder::new(self, Method::GET, path)
438    }
439
440    /// POST request
441    pub fn post(&self, path: &str) -> HttpRequestBuilder<'_> {
442        HttpRequestBuilder::new(self, Method::POST, path)
443    }
444
445    /// PUT request with optimization enabled by default
446    pub fn put(&self, path: &str) -> HttpRequestBuilder<'_> {
447        HttpRequestBuilder::new(self, Method::PUT, path)
448    }
449
450    /// Optimized batch PUT operations
451    pub async fn put_batch(&self, requests: Vec<(String, Value)>) -> Result<Vec<HttpResponse>> {
452        let batch_size = requests.len();
453        if batch_size == 0 {
454            return Ok(Vec::new());
455        }
456
457        // 限制并发数以避免过载
458        let concurrent_limit = std::cmp::min(self.config.max_concurrent_requests, batch_size);
459        let mut responses = Vec::with_capacity(batch_size);
460
461        // 分批处理以控制内存使用和网络负载
462        for chunk in requests.chunks(concurrent_limit) {
463            let mut futures = Vec::new();
464
465            for (path, body) in chunk {
466                let path = path.clone();
467                let body = body.clone();
468
469                let future = self.put(&path).json_body(&body).optimize_for_put().send();
470
471                futures.push(future);
472            }
473
474            // 并发等待当前批次完成
475            let chunk_results = futures::future::join_all(futures).await;
476
477            for result in chunk_results {
478                match result {
479                    Ok(response) => responses.push(response),
480                    Err(e) => return Err(e),
481                }
482            }
483        }
484
485        Ok(responses)
486    }
487
488    /// DELETE request
489    pub fn delete(&self, path: &str) -> HttpRequestBuilder<'_> {
490        HttpRequestBuilder::new(self, Method::DELETE, path)
491    }
492
493    /// PATCH request
494    pub fn patch(&self, path: &str) -> HttpRequestBuilder<'_> {
495        HttpRequestBuilder::new(self, Method::PATCH, path)
496    }
497
498    /// HEAD request
499    pub fn head(&self, path: &str) -> HttpRequestBuilder<'_> {
500        HttpRequestBuilder::new(self, Method::HEAD, path)
501    }
502
503    /// OPTIONS request
504    pub fn options(&self, path: &str) -> HttpRequestBuilder<'_> {
505        HttpRequestBuilder::new(self, Method::OPTIONS, path)
506    }
507
508    /// Get pool statistics (if pooling is enabled)
509    pub fn pool_stats(&self) -> Option<crate::pool::PoolStats> {
510        self.pool.as_ref().map(|p| p.stats())
511    }
512
513    /// Close the client and clean up resources
514    pub fn close(&self) {
515        if let Some(ref pool) = self.pool {
516            pool.close();
517        }
518    }
519
520    /// Preheat connections for better PUT performance
521    pub async fn preheat_for_puts(&self, count: usize) {
522        if let Some(ref pool) = self.pool {
523            pool.preheat_for_puts(count).await;
524        }
525    }
526
527    /// Smart timeout calculation based on request characteristics
528    fn calculate_smart_timeout(&self, method: &str, body_size: Option<usize>) -> Duration {
529        match method {
530            "PUT" | "POST" => {
531                match body_size {
532                    Some(size) if size > 5 * 1024 * 1024 => Duration::from_secs(30), // >5MB: 30s
533                    Some(size) if size > 1024 * 1024 => Duration::from_secs(15),     // >1MB: 15s
534                    Some(size) if size > 100 * 1024 => Duration::from_secs(8),       // >100KB: 8s
535                    Some(size) if size > 10 * 1024 => Duration::from_secs(4),        // >10KB: 4s
536                    _ => Duration::from_secs(2),                                     // 小请求: 2s
537                }
538            }
539            _ => self.config.default_timeout, // 其他方法使用默认超时
540        }
541    }
542}
543
544impl<'a> HttpRequestBuilder<'a> {
545    fn new(client: &'a IpcHttpClient, method: Method, path: &str) -> Self {
546        let is_put = method == Method::PUT;
547        Self {
548            client,
549            method,
550            path: path.to_string(),
551            body: None,
552            timeout: None,
553            headers: Vec::new(),
554            put_optimized: is_put, // 自动为PUT请求启用优化
555            expected_size: None,
556        }
557    }
558
559    /// Set JSON body
560    pub fn json_body(mut self, body: &Value) -> Self {
561        self.body = Some(body.clone());
562
563        // 为PUT请求估算数据大小以优化处理
564        if self.method == Method::PUT {
565            if let Ok(json_bytes) = serde_json::to_vec(body) {
566                self.expected_size = Some(json_bytes.len());
567            }
568        }
569
570        self
571    }
572
573    /// Set custom timeout
574    pub fn timeout(mut self, timeout: Duration) -> Self {
575        self.timeout = Some(timeout);
576        self
577    }
578
579    /// 设置预期数据大小(用于PUT请求优化)
580    pub fn expected_size(mut self, size: usize) -> Self {
581        self.expected_size = Some(size);
582        self
583    }
584
585    /// 启用PUT请求专门优化
586    pub fn optimize_for_put(mut self) -> Self {
587        self.put_optimized = true;
588        self
589    }
590
591    /// Add custom header
592    pub fn header<K, V>(mut self, key: K, value: V) -> Self
593    where
594        K: Into<String>,
595        V: Into<String>,
596    {
597        self.headers.push((key.into(), value.into()));
598        self
599    }
600
601    /// Send the request
602    pub async fn send(self) -> Result<HttpResponse> {
603        let metrics = global_metrics();
604        let tracker = metrics.request_start(self.method.as_str());
605
606        // 为PUT请求优化超时设置,使用智能超时计算
607        let timeout = if self.put_optimized {
608            self.timeout.unwrap_or_else(|| {
609                self.client
610                    .calculate_smart_timeout(self.method.as_str(), self.expected_size)
611            })
612        } else {
613            self.timeout.unwrap_or(self.client.config.default_timeout)
614        };
615
616        match self
617            .client
618            .send_request_with_optimization(
619                self.method.as_str(),
620                &self.path,
621                self.body.as_ref(),
622                &self.headers,
623                timeout,
624                self.put_optimized,
625                self.expected_size,
626            )
627            .await
628        {
629            Ok(response) => {
630                tracker.success(response.status_code());
631                Ok(HttpResponse::new(response))
632            }
633            Err(e) => {
634                tracker.failure(&format!("{:?}", e));
635                Err(e)
636            }
637        }
638    }
639}
640
641/// Helper enum for connection types
642enum Either<A, B> {
643    Pool(A),
644    Direct(B),
645}
646
647impl Drop for IpcHttpClient {
648    fn drop(&mut self) {
649        self.close();
650    }
651}