Skip to main content

kode_bridge/
ipc_http_client.rs

1use std::path::Path;
2use std::time::Duration;
3
4use interprocess::local_socket::tokio::prelude::LocalSocketStream;
5use interprocess::local_socket::traits::tokio::Stream as _;
6use interprocess::local_socket::{GenericFilePath, Name, ToFsName as _};
7use serde::de::DeserializeOwned;
8use serde_json::Value;
9
10use crate::errors::{KodeBridgeError, Result};
11use crate::http_client::{send_request, RequestBuilder, Response};
12use crate::metrics::global_metrics;
13use crate::pool::{ConnectionPool, PoolConfig, PooledConnection};
14use crate::retry::{RetryConfig, RetryExecutor};
15use bytes::Bytes;
16use http::Method;
17use std::str::FromStr as _;
18use tracing::{debug, trace};
19
20/// Configuration for IPC HTTP client
21#[derive(Debug, Clone)]
22pub struct ClientConfig {
23    /// Default timeout for requests
24    pub default_timeout: Duration,
25    /// Connection pool configuration
26    pub pool_config: PoolConfig,
27    /// Enable connection pooling
28    pub enable_pooling: bool,
29    /// Retry configuration
30    pub max_retries: usize,
31    pub retry_delay: Duration,
32    /// Concurrent request limit
33    pub max_concurrent_requests: usize,
34    /// Rate limiting: max requests per second
35    pub max_requests_per_second: Option<f64>,
36}
37
38impl Default for ClientConfig {
39    fn default() -> Self {
40        Self {
41            default_timeout: Duration::from_secs(5), // 减少默认超时到5秒
42            pool_config: PoolConfig::default(),
43            enable_pooling: true,
44            max_retries: 3,                         // 减少重试次数
45            retry_delay: Duration::from_millis(25), // 减少重试延迟
46            max_concurrent_requests: 16,            // 增加并发请求数
47            max_requests_per_second: Some(50.0),    // 增加请求速率限制
48        }
49    }
50}
51
52/// Generic IPC HTTP client that works on both Unix and Windows platforms
53///
54/// This client is optimized for request-response patterns with connection pooling support.
55/// For streaming functionality, use `IpcStreamClient` instead.
56pub struct IpcHttpClient {
57    name: Name<'static>,
58    config: ClientConfig,
59    pool: Option<ConnectionPool>,
60    retry_executor: RetryExecutor,
61    /// 专门用于PUT请求的重试执行器
62    put_retry_executor: RetryExecutor,
63}
64
65/// HTTP request builder for fluent API
66pub struct HttpRequestBuilder<'a> {
67    client: &'a IpcHttpClient,
68    method: Method,
69    path: String,
70    body: Option<RequestBody>,
71    timeout: Option<Duration>,
72    headers: Vec<(String, String)>,
73    /// PUT专用优化标志
74    put_optimized: bool,
75    /// 预期数据大小,用于选择合适的缓冲区和超时
76    expected_size: Option<usize>,
77}
78
79enum RequestBody {
80    Json(Value),
81    JsonBytes(Bytes),
82}
83
84/// Enhanced HTTP response wrapper with chainable methods
85#[derive(Debug)]
86pub struct HttpResponse {
87    inner: Response,
88}
89
90impl HttpResponse {
91    const fn new(response: Response) -> Self {
92        Self { inner: response }
93    }
94
95    /// Get the HTTP status code
96    pub const fn status(&self) -> u16 {
97        self.inner.status_code()
98    }
99
100    /// Get response headers as JSON value (for backward compatibility)
101    pub fn headers(&self) -> Value {
102        let headers_map: std::collections::HashMap<String, String> = self
103            .inner
104            .headers()
105            .iter()
106            .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
107            .collect();
108        serde_json::to_value(headers_map).unwrap_or(Value::Null)
109    }
110
111    /// Get response body as string
112    pub fn body(&self) -> Result<String> {
113        self.inner.text()
114    }
115
116    /// Check if response indicates success (2xx status)
117    pub fn is_success(&self) -> bool {
118        self.inner.is_success()
119    }
120
121    /// Check if response indicates client error (4xx status)
122    pub fn is_client_error(&self) -> bool {
123        self.inner.is_client_error()
124    }
125
126    /// Check if response indicates server error (5xx status)
127    pub fn is_server_error(&self) -> bool {
128        self.inner.is_server_error()
129    }
130
131    /// Get content length from headers
132    pub fn content_length(&self) -> u64 {
133        self.inner.content_length().unwrap_or(0)
134    }
135
136    /// Parse response body as JSON
137    pub fn json<T>(&self) -> Result<T>
138    where
139        T: DeserializeOwned,
140    {
141        self.inner.json()
142    }
143
144    /// Parse response body as generic JSON value
145    pub fn json_value(&self) -> Result<Value> {
146        self.inner.json_value()
147    }
148
149    /// Get the underlying modern Response
150    pub fn into_inner(self) -> Response {
151        self.inner
152    }
153
154    /// Convert to legacy Response format for backward compatibility
155    pub fn to_legacy(&self) -> crate::response::LegacyResponse {
156        self.inner.to_legacy()
157    }
158}
159
160impl IpcHttpClient {
161    /// Create a new IPC HTTP client with default configuration
162    pub fn new<P>(path: P) -> Result<Self>
163    where
164        P: AsRef<Path>,
165    {
166        Self::with_config(path, ClientConfig::default())
167    }
168
169    /// Create a new IPC HTTP client with custom configuration
170    pub fn with_config<P>(path: P, config: ClientConfig) -> Result<Self>
171    where
172        P: AsRef<Path>,
173    {
174        let name = path
175            .as_ref()
176            .to_fs_name::<GenericFilePath>()
177            .map_err(|e| KodeBridgeError::configuration(format!("Invalid path: {}", e)))?
178            .into_owned();
179
180        let pool = if config.enable_pooling {
181            Some(ConnectionPool::new(name.clone(), config.pool_config.clone()))
182        } else {
183            None
184        };
185
186        // Create retry executor with optimized configuration for different request types
187        let retry_config = RetryConfig::for_network_operations()
188            .max_attempts(config.max_retries)
189            .base_delay(Duration::from_millis(config.pool_config.retry_delay_ms));
190
191        let retry_executor = RetryExecutor::new(retry_config);
192
193        // 创建专门用于PUT请求的快速重试执行器
194        let put_retry_config = RetryConfig::for_put_requests();
195        let put_retry_executor = RetryExecutor::new(put_retry_config);
196
197        Ok(Self {
198            name,
199            config,
200            pool,
201            retry_executor,
202            put_retry_executor,
203        })
204    }
205
206    /// Create a direct connection (bypassing pool)
207    async fn create_direct_connection(&self) -> Result<LocalSocketStream> {
208        let mut last_error = None;
209
210        for attempt in 0..self.config.max_retries {
211            if attempt > 0 {
212                tokio::time::sleep(self.config.retry_delay).await;
213            }
214
215            match LocalSocketStream::connect(self.name.clone()).await {
216                Ok(stream) => {
217                    debug!("Created direct connection on attempt {}", attempt + 1);
218                    return Ok(stream);
219                }
220                Err(e) => {
221                    trace!("Connection attempt {} failed: {}", attempt + 1, e);
222                    last_error = Some(e);
223                }
224            }
225        }
226
227        Err(KodeBridgeError::connection(format!(
228            "Failed to create connection after {} attempts: {}",
229            self.config.max_retries,
230            last_error
231                .map(|e| e.to_string())
232                .unwrap_or_else(|| "Unknown error".to_string())
233        )))
234    }
235
236    /// Get a connection (from pool or create new)
237    async fn get_connection(&self) -> Result<Either<PooledConnection, LocalSocketStream>> {
238        let metrics = global_metrics();
239
240        if let Some(ref pool) = self.pool {
241            match pool.get_connection().await {
242                Ok(conn) => {
243                    metrics.connection_created(true); // From pool
244                    Ok(Either::Pool(conn))
245                }
246                Err(e) => {
247                    metrics.connection_failed();
248                    Err(e)
249                }
250            }
251        } else {
252            match self.create_direct_connection().await {
253                Ok(stream) => {
254                    metrics.connection_created(false); // Direct connection
255                    Ok(Either::Direct(stream))
256                }
257                Err(e) => {
258                    metrics.connection_failed();
259                    Err(e)
260                }
261            }
262        }
263    }
264
265    /// Legacy request method for backward compatibility
266    pub async fn request(
267        &self,
268        method: &str,
269        path: &str,
270        body: Option<&serde_json::Value>,
271    ) -> crate::errors::AnyResult<crate::response::LegacyResponse> {
272        let response = self
273            .send_request_internal(method, path, body, self.config.default_timeout)
274            .await?;
275        Ok(response.to_legacy())
276    }
277
278    /// Internal method to send requests with enhanced retry logic
279    async fn send_request_internal(
280        &self,
281        method: &str,
282        path: &str,
283        body: Option<&Value>,
284        timeout: Duration,
285    ) -> Result<Response> {
286        let method =
287            Method::from_str(method).map_err(|e| KodeBridgeError::invalid_request(format!("Invalid method: {}", e)))?;
288
289        let mut builder = RequestBuilder::new(method.clone(), path.to_string());
290
291        // Note: This method doesn't support custom headers for backward compatibility
292        // Use the fluent API (get(), post(), etc.) for custom headers
293
294        if let Some(json_body) = body {
295            builder = builder.json(json_body)?;
296        }
297
298        let request = builder.build()?;
299
300        // Use smart retry mechanism
301        self.retry_executor
302            .execute_with_context(&format!("{} {}", method.as_str(), path), || async {
303                // Execute with timeout
304                let result = tokio::time::timeout(timeout, async {
305                    let mut connection = self.get_connection().await?;
306
307                    match &mut connection {
308                        Either::Pool(conn) => {
309                            if let Some(stream) = conn.stream() {
310                                let result = send_request(stream, request.clone()).await;
311                                if result.is_err() {
312                                    conn.invalidate();
313                                }
314                                result
315                            } else {
316                                conn.invalidate();
317                                Err(KodeBridgeError::connection("Pooled connection is invalid"))
318                            }
319                        }
320                        Either::Direct(stream) => send_request(stream, request.clone()).await,
321                    }
322                })
323                .await;
324
325                match result {
326                    Ok(response) => response,
327                    Err(_) => Err(KodeBridgeError::timeout(timeout.as_millis() as u64)),
328                }
329            })
330            .await
331    }
332
333    /// Enhanced request sending with PUT optimization support
334    async fn send_request_with_optimization(
335        &self,
336        method: &str,
337        path: &str,
338        body: Option<&RequestBody>,
339        headers: &[(String, String)],
340        timeout: Duration,
341        is_put_optimized: bool,
342        expected_size: Option<usize>,
343    ) -> Result<Response> {
344        let method_enum =
345            Method::from_str(method).map_err(|e| KodeBridgeError::invalid_request(format!("Invalid method: {}", e)))?;
346
347        let mut builder = RequestBuilder::new(method_enum.clone(), path.to_string());
348
349        // Add custom headers
350        for (key, value) in headers {
351            builder = builder.header(key.as_str(), value.as_str());
352        }
353
354        if let Some(body) = body {
355            builder = match body {
356                RequestBody::Json(value) => builder.json(value)?,
357                RequestBody::JsonBytes(bytes) => builder.body_bytes(bytes.clone(), "application/json")?,
358            };
359        }
360
361        let request = builder.build()?;
362
363        // PUT请求使用专门的重试策略
364        let retry_context = if is_put_optimized {
365            format!("PUT_OPTIMIZED {}", path)
366        } else {
367            format!("{} {}", method, path)
368        };
369
370        // Use smart retry mechanism with PUT optimization
371        let retry_executor = if is_put_optimized {
372            &self.put_retry_executor // 使用PUT专用的快速重试器
373        } else {
374            &self.retry_executor // 使用通用重试器
375        };
376
377        retry_executor
378            .execute_with_context(&retry_context, || async {
379                // Execute with timeout
380                let result = tokio::time::timeout(timeout, async {
381                    let mut connection = if is_put_optimized && expected_size.unwrap_or(0) > 10240 {
382                        // 对于大的PUT请求,优先获取新连接
383                        self.get_fresh_connection().await?
384                    } else {
385                        self.get_connection().await?
386                    };
387
388                    match &mut connection {
389                        Either::Pool(conn) => {
390                            if let Some(stream) = conn.stream() {
391                                let result = send_request(stream, request.clone()).await;
392                                if result.is_err() {
393                                    conn.invalidate();
394                                }
395                                result
396                            } else {
397                                conn.invalidate();
398                                Err(KodeBridgeError::connection("Pooled connection is invalid"))
399                            }
400                        }
401                        Either::Direct(stream) => send_request(stream, request.clone()).await,
402                    }
403                })
404                .await;
405
406                match result {
407                    Ok(response) => response,
408                    Err(_) => Err(KodeBridgeError::timeout(timeout.as_millis() as u64)),
409                }
410            })
411            .await
412    }
413
414    /// Get a fresh connection optimized for PUT requests
415    async fn get_fresh_connection(&self) -> Result<Either<PooledConnection, LocalSocketStream>> {
416        use interprocess::local_socket::tokio::prelude::LocalSocketStream;
417
418        // 首先尝试从连接池获取新连接
419        if let Some(ref pool) = self.pool {
420            match tokio::time::timeout(Duration::from_millis(20), pool.get_fresh_connection()).await {
421                Ok(Ok(conn)) => return Ok(Either::Pool(conn)),
422                Ok(Err(_)) | Err(_) => {
423                    // 池化新连接失败,继续尝试直接连接
424                }
425            }
426        }
427
428        // 直接创建连接,使用更快的超时设置
429        match tokio::time::timeout(
430            Duration::from_millis(100),
431            LocalSocketStream::connect(self.name.clone()),
432        )
433        .await
434        {
435            Ok(Ok(stream)) => Ok(Either::Direct(stream)),
436            Ok(Err(_)) | Err(_) => {
437                // 如果直接连接失败,回退到普通池化连接
438                if let Some(ref pool) = self.pool {
439                    let conn = pool.get_connection().await?;
440                    Ok(Either::Pool(conn))
441                } else {
442                    Err(KodeBridgeError::connection("Failed to get fresh connection"))
443                }
444            }
445        }
446    }
447
448    /// GET request
449    pub fn get(&self, path: &str) -> HttpRequestBuilder<'_> {
450        HttpRequestBuilder::new(self, Method::GET, path)
451    }
452
453    /// POST request
454    pub fn post(&self, path: &str) -> HttpRequestBuilder<'_> {
455        HttpRequestBuilder::new(self, Method::POST, path)
456    }
457
458    /// PUT request with optimization enabled by default
459    pub fn put(&self, path: &str) -> HttpRequestBuilder<'_> {
460        HttpRequestBuilder::new(self, Method::PUT, path)
461    }
462
463    /// Optimized batch PUT operations
464    pub async fn put_batch(&self, requests: Vec<(String, Value)>) -> Result<Vec<HttpResponse>> {
465        let batch_size = requests.len();
466        if batch_size == 0 {
467            return Ok(Vec::new());
468        }
469
470        let concurrent_limit = std::cmp::min(self.config.max_concurrent_requests, batch_size);
471        let mut responses = Vec::with_capacity(batch_size);
472        let mut pending = Vec::with_capacity(concurrent_limit);
473
474        for (path, body) in requests {
475            let body = Bytes::from(body.to_string());
476            pending.push(self.put(&path).json_bytes(body).optimize_for_put().send());
477
478            if pending.len() == concurrent_limit {
479                let chunk_results = futures::future::join_all(std::mem::take(&mut pending)).await;
480                for result in chunk_results {
481                    match result {
482                        Ok(response) => responses.push(response),
483                        Err(e) => return Err(e),
484                    }
485                }
486            }
487        }
488
489        if !pending.is_empty() {
490            let chunk_results = futures::future::join_all(pending).await;
491            for result in chunk_results {
492                match result {
493                    Ok(response) => responses.push(response),
494                    Err(e) => return Err(e),
495                }
496            }
497        }
498
499        Ok(responses)
500    }
501
502    /// DELETE request
503    pub fn delete(&self, path: &str) -> HttpRequestBuilder<'_> {
504        HttpRequestBuilder::new(self, Method::DELETE, path)
505    }
506
507    /// PATCH request
508    pub fn patch(&self, path: &str) -> HttpRequestBuilder<'_> {
509        HttpRequestBuilder::new(self, Method::PATCH, path)
510    }
511
512    /// HEAD request
513    pub fn head(&self, path: &str) -> HttpRequestBuilder<'_> {
514        HttpRequestBuilder::new(self, Method::HEAD, path)
515    }
516
517    /// OPTIONS request
518    pub fn options(&self, path: &str) -> HttpRequestBuilder<'_> {
519        HttpRequestBuilder::new(self, Method::OPTIONS, path)
520    }
521
522    /// Get pool statistics (if pooling is enabled)
523    pub fn pool_stats(&self) -> Option<crate::pool::PoolStats> {
524        self.pool.as_ref().map(|p| p.stats())
525    }
526
527    /// Close the client and clean up resources
528    pub fn close(&self) {
529        if let Some(ref pool) = self.pool {
530            pool.close();
531        }
532    }
533
534    /// Preheat connections for better PUT performance
535    pub async fn preheat_for_puts(&self, count: usize) {
536        if let Some(ref pool) = self.pool {
537            pool.preheat_for_puts(count).await;
538        }
539    }
540
541    /// Smart timeout calculation based on request characteristics
542    fn calculate_smart_timeout(&self, method: &str, body_size: Option<usize>) -> Duration {
543        match method {
544            "PUT" | "POST" => {
545                match body_size {
546                    Some(size) if size > 5 * 1024 * 1024 => Duration::from_secs(30), // >5MB: 30s
547                    Some(size) if size > 1024 * 1024 => Duration::from_secs(15),     // >1MB: 15s
548                    Some(size) if size > 100 * 1024 => Duration::from_secs(8),       // >100KB: 8s
549                    Some(size) if size > 10 * 1024 => Duration::from_secs(4),        // >10KB: 4s
550                    _ => Duration::from_secs(2),                                     // 小请求: 2s
551                }
552            }
553            _ => self.config.default_timeout, // 其他方法使用默认超时
554        }
555    }
556}
557
558impl<'a> HttpRequestBuilder<'a> {
559    fn new(client: &'a IpcHttpClient, method: Method, path: &str) -> Self {
560        let is_put = method == Method::PUT;
561        Self {
562            client,
563            method,
564            path: path.to_string(),
565            body: None,
566            timeout: None,
567            headers: Vec::new(),
568            put_optimized: is_put, // 自动为PUT请求启用优化
569            expected_size: None,
570        }
571    }
572
573    /// Set JSON body
574    pub fn json_body(mut self, body: &Value) -> Self {
575        self.body = Some(RequestBody::Json(body.clone()));
576        self
577    }
578
579    /// Set a pre-serialized JSON body.
580    pub fn json_bytes(mut self, body: Bytes) -> Self {
581        self.expected_size = Some(body.len());
582        self.body = Some(RequestBody::JsonBytes(body));
583        self
584    }
585
586    /// Set custom timeout
587    pub const fn timeout(mut self, timeout: Duration) -> Self {
588        self.timeout = Some(timeout);
589        self
590    }
591
592    /// 设置预期数据大小(用于PUT请求优化)
593    pub const fn expected_size(mut self, size: usize) -> Self {
594        self.expected_size = Some(size);
595        self
596    }
597
598    /// 启用PUT请求专门优化
599    pub const fn optimize_for_put(mut self) -> Self {
600        self.put_optimized = true;
601        self
602    }
603
604    /// Add custom header
605    pub fn header<K, V>(mut self, key: K, value: V) -> Self
606    where
607        K: Into<String>,
608        V: Into<String>,
609    {
610        self.headers.push((key.into(), value.into()));
611        self
612    }
613
614    /// Send the request
615    pub async fn send(self) -> Result<HttpResponse> {
616        let metrics = global_metrics();
617        let tracker = metrics.request_start(self.method.as_str());
618
619        // 为PUT请求优化超时设置,使用智能超时计算
620        let timeout = if self.put_optimized {
621            self.timeout.unwrap_or_else(|| {
622                self.client
623                    .calculate_smart_timeout(self.method.as_str(), self.expected_size)
624            })
625        } else {
626            self.timeout.unwrap_or(self.client.config.default_timeout)
627        };
628
629        match self
630            .client
631            .send_request_with_optimization(
632                self.method.as_str(),
633                &self.path,
634                self.body.as_ref(),
635                &self.headers,
636                timeout,
637                self.put_optimized,
638                self.expected_size,
639            )
640            .await
641        {
642            Ok(response) => {
643                tracker.success(response.status_code());
644                Ok(HttpResponse::new(response))
645            }
646            Err(e) => {
647                tracker.failure(&format!("{:?}", e));
648                Err(e)
649            }
650        }
651    }
652}
653
654/// Helper enum for connection types
655enum Either<A, B> {
656    Pool(A),
657    Direct(B),
658}
659
660impl Drop for IpcHttpClient {
661    fn drop(&mut self) {
662        self.close();
663    }
664}