kode_bridge/
ipc_http_client.rs

1use std::path::Path;
2use std::time::Duration;
3
4use interprocess::local_socket::tokio::prelude::LocalSocketStream;
5use interprocess::local_socket::traits::tokio::Stream;
6use interprocess::local_socket::{GenericFilePath, Name, ToFsName};
7use serde::de::DeserializeOwned;
8use serde_json::Value;
9
10use crate::errors::{KodeBridgeError, Result};
11use crate::http_client::{send_request, RequestBuilder, Response};
12use crate::metrics::global_metrics;
13use crate::pool::{ConnectionPool, PoolConfig, PooledConnection};
14use crate::retry::{RetryConfig, RetryExecutor};
15use http::Method;
16use std::str::FromStr;
17use tracing::{debug, trace};
18
19/// Configuration for IPC HTTP client
20#[derive(Debug, Clone)]
21pub struct ClientConfig {
22    /// Default timeout for requests
23    pub default_timeout: Duration,
24    /// Connection pool configuration
25    pub pool_config: PoolConfig,
26    /// Enable connection pooling
27    pub enable_pooling: bool,
28    /// Retry configuration
29    pub max_retries: usize,
30    pub retry_delay: Duration,
31    /// Concurrent request limit
32    pub max_concurrent_requests: usize,
33    /// Rate limiting: max requests per second
34    pub max_requests_per_second: Option<f64>,
35}
36
37impl Default for ClientConfig {
38    fn default() -> Self {
39        Self {
40            default_timeout: Duration::from_secs(10), // Reduce default timeout
41            pool_config: PoolConfig::default(),
42            enable_pooling: true,
43            max_retries: 5,                         // Increase retry attempts
44            retry_delay: Duration::from_millis(50), // Reduce retry delay
45            max_concurrent_requests: 8,             // Limit concurrent requests
46            max_requests_per_second: Some(10.0),    // Limit requests per second
47        }
48    }
49}
50
51/// Generic IPC HTTP client that works on both Unix and Windows platforms
52///
53/// This client is optimized for request-response patterns with connection pooling support.
54/// For streaming functionality, use `IpcStreamClient` instead.
55pub struct IpcHttpClient {
56    name: Name<'static>,
57    config: ClientConfig,
58    pool: Option<ConnectionPool>,
59    retry_executor: RetryExecutor,
60}
61
62/// HTTP request builder for fluent API
63pub struct HttpRequestBuilder<'a> {
64    client: &'a IpcHttpClient,
65    method: Method,
66    path: String,
67    body: Option<Value>,
68    timeout: Option<Duration>,
69    headers: Vec<(String, String)>,
70}
71
72/// Enhanced HTTP response wrapper with chainable methods
73#[derive(Debug)]
74pub struct HttpResponse {
75    inner: Response,
76}
77
78impl HttpResponse {
79    fn new(response: Response) -> Self {
80        Self { inner: response }
81    }
82
83    /// Get the HTTP status code
84    pub fn status(&self) -> u16 {
85        self.inner.status_code()
86    }
87
88    /// Get response headers as JSON value (for backward compatibility)
89    pub fn headers(&self) -> Value {
90        let headers_map: std::collections::HashMap<String, String> = self
91            .inner
92            .headers()
93            .iter()
94            .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
95            .collect();
96        serde_json::to_value(headers_map).unwrap_or(Value::Null)
97    }
98
99    /// Get response body as string
100    pub fn body(&self) -> Result<String> {
101        self.inner.text()
102    }
103
104    /// Check if response indicates success (2xx status)
105    pub fn is_success(&self) -> bool {
106        self.inner.is_success()
107    }
108
109    /// Check if response indicates client error (4xx status)
110    pub fn is_client_error(&self) -> bool {
111        self.inner.is_client_error()
112    }
113
114    /// Check if response indicates server error (5xx status)
115    pub fn is_server_error(&self) -> bool {
116        self.inner.is_server_error()
117    }
118
119    /// Get content length from headers
120    pub fn content_length(&self) -> u64 {
121        self.inner.content_length().unwrap_or(0)
122    }
123
124    /// Parse response body as JSON
125    pub fn json<T>(&self) -> Result<T>
126    where
127        T: DeserializeOwned,
128    {
129        self.inner.json()
130    }
131
132    /// Parse response body as generic JSON value
133    pub fn json_value(&self) -> Result<Value> {
134        self.inner.json_value()
135    }
136
137    /// Get the underlying modern Response
138    pub fn into_inner(self) -> Response {
139        self.inner
140    }
141
142    /// Convert to legacy Response format for backward compatibility
143    pub fn to_legacy(&self) -> crate::response::LegacyResponse {
144        self.inner.to_legacy()
145    }
146}
147
148impl IpcHttpClient {
149    /// Create a new IPC HTTP client with default configuration
150    pub fn new<P>(path: P) -> Result<Self>
151    where
152        P: AsRef<Path>,
153    {
154        Self::with_config(path, ClientConfig::default())
155    }
156
157    /// Create a new IPC HTTP client with custom configuration
158    pub fn with_config<P>(path: P, config: ClientConfig) -> Result<Self>
159    where
160        P: AsRef<Path>,
161    {
162        let name = path
163            .as_ref()
164            .to_fs_name::<GenericFilePath>()
165            .map_err(|e| KodeBridgeError::configuration(format!("Invalid path: {}", e)))?
166            .into_owned();
167
168        let pool = if config.enable_pooling {
169            Some(ConnectionPool::new(
170                name.clone(),
171                config.pool_config.clone(),
172            ))
173        } else {
174            None
175        };
176
177        // Create retry executor with appropriate configuration
178        let retry_config = RetryConfig::for_network_operations()
179            .max_attempts(config.max_retries)
180            .base_delay(Duration::from_millis(config.pool_config.retry_delay_ms));
181
182        let retry_executor = RetryExecutor::new(retry_config);
183
184        Ok(Self {
185            name,
186            config,
187            pool,
188            retry_executor,
189        })
190    }
191
192    /// Create a direct connection (bypassing pool)
193    async fn create_direct_connection(&self) -> Result<LocalSocketStream> {
194        let mut last_error = None;
195
196        for attempt in 0..self.config.max_retries {
197            if attempt > 0 {
198                tokio::time::sleep(self.config.retry_delay).await;
199            }
200
201            match LocalSocketStream::connect(self.name.clone()).await {
202                Ok(stream) => {
203                    debug!("Created direct connection on attempt {}", attempt + 1);
204                    return Ok(stream);
205                }
206                Err(e) => {
207                    trace!("Connection attempt {} failed: {}", attempt + 1, e);
208                    last_error = Some(e);
209                }
210            }
211        }
212
213        Err(KodeBridgeError::connection(format!(
214            "Failed to create connection after {} attempts: {}",
215            self.config.max_retries,
216            last_error
217                .map(|e| e.to_string())
218                .unwrap_or_else(|| "Unknown error".to_string())
219        )))
220    }
221
222    /// Get a connection (from pool or create new)
223    async fn get_connection(&self) -> Result<Either<PooledConnection, LocalSocketStream>> {
224        let metrics = global_metrics();
225
226        if let Some(ref pool) = self.pool {
227            match pool.get_connection().await {
228                Ok(conn) => {
229                    metrics.connection_created(true); // From pool
230                    Ok(Either::Pool(conn))
231                }
232                Err(e) => {
233                    metrics.connection_failed();
234                    Err(e)
235                }
236            }
237        } else {
238            match self.create_direct_connection().await {
239                Ok(stream) => {
240                    metrics.connection_created(false); // Direct connection
241                    Ok(Either::Direct(stream))
242                }
243                Err(e) => {
244                    metrics.connection_failed();
245                    Err(e)
246                }
247            }
248        }
249    }
250
251    /// Legacy request method for backward compatibility
252    pub async fn request(
253        &self,
254        method: &str,
255        path: &str,
256        body: Option<&serde_json::Value>,
257    ) -> crate::errors::AnyResult<crate::response::LegacyResponse> {
258        let response = self
259            .send_request_internal(method, path, body, self.config.default_timeout)
260            .await?;
261        Ok(response.to_legacy())
262    }
263
264    /// Internal method to send requests with enhanced retry logic
265    async fn send_request_internal(
266        &self,
267        method: &str,
268        path: &str,
269        body: Option<&Value>,
270        timeout: Duration,
271    ) -> Result<Response> {
272        let method = Method::from_str(method)
273            .map_err(|e| KodeBridgeError::invalid_request(format!("Invalid method: {}", e)))?;
274
275        let mut builder = RequestBuilder::new(method.clone(), path.to_string());
276
277        if let Some(json_body) = body {
278            builder = builder.json(json_body)?;
279        }
280
281        let request = builder.build()?;
282
283        // Use smart retry mechanism
284        self.retry_executor
285            .execute_with_context(&format!("{} {}", method.as_str(), path), || async {
286                // Execute with timeout
287                let result = tokio::time::timeout(timeout, async {
288                    let mut connection = self.get_connection().await?;
289
290                    match &mut connection {
291                        Either::Pool(conn) => {
292                            if let Some(stream) = conn.stream() {
293                                send_request(stream, request.clone()).await
294                            } else {
295                                Err(KodeBridgeError::connection("Pooled connection is invalid"))
296                            }
297                        }
298                        Either::Direct(stream) => send_request(stream, request.clone()).await,
299                    }
300                })
301                .await;
302
303                match result {
304                    Ok(response) => response,
305                    Err(_) => Err(KodeBridgeError::timeout(timeout.as_millis() as u64)),
306                }
307            })
308            .await
309    }
310
311    /// GET request
312    pub fn get(&self, path: &str) -> HttpRequestBuilder<'_> {
313        HttpRequestBuilder::new(self, Method::GET, path)
314    }
315
316    /// POST request
317    pub fn post(&self, path: &str) -> HttpRequestBuilder<'_> {
318        HttpRequestBuilder::new(self, Method::POST, path)
319    }
320
321    /// PUT request
322    pub fn put(&self, path: &str) -> HttpRequestBuilder<'_> {
323        HttpRequestBuilder::new(self, Method::PUT, path)
324    }
325
326    /// DELETE request
327    pub fn delete(&self, path: &str) -> HttpRequestBuilder<'_> {
328        HttpRequestBuilder::new(self, Method::DELETE, path)
329    }
330
331    /// PATCH request
332    pub fn patch(&self, path: &str) -> HttpRequestBuilder<'_> {
333        HttpRequestBuilder::new(self, Method::PATCH, path)
334    }
335
336    /// HEAD request
337    pub fn head(&self, path: &str) -> HttpRequestBuilder<'_> {
338        HttpRequestBuilder::new(self, Method::HEAD, path)
339    }
340
341    /// OPTIONS request
342    pub fn options(&self, path: &str) -> HttpRequestBuilder<'_> {
343        HttpRequestBuilder::new(self, Method::OPTIONS, path)
344    }
345
346    /// Get pool statistics (if pooling is enabled)
347    pub fn pool_stats(&self) -> Option<crate::pool::PoolStats> {
348        self.pool.as_ref().map(|p| p.stats())
349    }
350
351    /// Close the client and clean up resources
352    pub fn close(&self) {
353        if let Some(ref pool) = self.pool {
354            pool.close();
355        }
356    }
357}
358
359impl<'a> HttpRequestBuilder<'a> {
360    fn new(client: &'a IpcHttpClient, method: Method, path: &str) -> Self {
361        Self {
362            client,
363            method,
364            path: path.to_string(),
365            body: None,
366            timeout: None,
367            headers: Vec::new(),
368        }
369    }
370
371    /// Set JSON body
372    pub fn json_body(mut self, body: &Value) -> Self {
373        self.body = Some(body.clone());
374        self
375    }
376
377    /// Set custom timeout
378    pub fn timeout(mut self, timeout: Duration) -> Self {
379        self.timeout = Some(timeout);
380        self
381    }
382
383    /// Add custom header
384    pub fn header<K, V>(mut self, key: K, value: V) -> Self
385    where
386        K: Into<String>,
387        V: Into<String>,
388    {
389        self.headers.push((key.into(), value.into()));
390        self
391    }
392
393    /// Send the request
394    pub async fn send(self) -> Result<HttpResponse> {
395        let metrics = global_metrics();
396        let tracker = metrics.request_start(self.method.as_str());
397
398        let timeout = self.timeout.unwrap_or(self.client.config.default_timeout);
399
400        match self
401            .client
402            .send_request_internal(
403                self.method.as_str(),
404                &self.path,
405                self.body.as_ref(),
406                timeout,
407            )
408            .await
409        {
410            Ok(response) => {
411                tracker.success(response.status_code());
412                Ok(HttpResponse::new(response))
413            }
414            Err(e) => {
415                tracker.failure(&format!("{:?}", e));
416                Err(e)
417            }
418        }
419    }
420}
421
422/// Helper enum for connection types
423enum Either<A, B> {
424    Pool(A),
425    Direct(B),
426}
427
428impl Drop for IpcHttpClient {
429    fn drop(&mut self) {
430        self.close();
431    }
432}