kode_bridge/
ipc_http_client.rs

1use std::path::Path;
2use std::time::Duration;
3
4use interprocess::local_socket::tokio::prelude::LocalSocketStream;
5use interprocess::local_socket::traits::tokio::Stream;
6use interprocess::local_socket::{GenericFilePath, Name, ToFsName};
7use serde::de::DeserializeOwned;
8use serde_json::Value;
9
10use crate::errors::{KodeBridgeError, Result};
11use crate::http_client::{RequestBuilder, Response, send_request};
12use crate::pool::{ConnectionPool, PoolConfig, PooledConnection};
13use http::Method;
14use std::str::FromStr;
15use tracing::{debug, trace};
16
17/// Configuration for IPC HTTP client
18#[derive(Debug, Clone)]
19pub struct ClientConfig {
20    /// Default timeout for requests
21    pub default_timeout: Duration,
22    /// Connection pool configuration
23    pub pool_config: PoolConfig,
24    /// Enable connection pooling
25    pub enable_pooling: bool,
26    /// Retry configuration
27    pub max_retries: usize,
28    pub retry_delay: Duration,
29}
30
31impl Default for ClientConfig {
32    fn default() -> Self {
33        Self {
34            default_timeout: Duration::from_secs(30),
35            pool_config: PoolConfig::default(),
36            enable_pooling: true,
37            max_retries: 3,
38            retry_delay: Duration::from_millis(100),
39        }
40    }
41}
42
43/// Generic IPC HTTP client that works on both Unix and Windows platforms
44///
45/// This client is optimized for request-response patterns with connection pooling support.
46/// For streaming functionality, use `IpcStreamClient` instead.
47pub struct IpcHttpClient {
48    name: Name<'static>,
49    config: ClientConfig,
50    pool: Option<ConnectionPool>,
51}
52
53/// HTTP request builder for fluent API
54pub struct HttpRequestBuilder<'a> {
55    client: &'a IpcHttpClient,
56    method: Method,
57    path: String,
58    body: Option<Value>,
59    timeout: Option<Duration>,
60    headers: Vec<(String, String)>,
61}
62
63/// Enhanced HTTP response wrapper with chainable methods
64#[derive(Debug)]
65pub struct HttpResponse {
66    inner: Response,
67}
68
69impl HttpResponse {
70    fn new(response: Response) -> Self {
71        Self { inner: response }
72    }
73
74    /// Get the HTTP status code
75    pub fn status(&self) -> u16 {
76        self.inner.status_code()
77    }
78
79    /// Get response headers as JSON value (for backward compatibility)
80    pub fn headers(&self) -> Value {
81        let headers_map: std::collections::HashMap<String, String> = self
82            .inner
83            .headers()
84            .iter()
85            .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
86            .collect();
87        serde_json::to_value(headers_map).unwrap_or(Value::Null)
88    }
89
90    /// Get response body as string
91    pub fn body(&self) -> Result<String> {
92        self.inner.text()
93    }
94
95    /// Check if response indicates success (2xx status)
96    pub fn is_success(&self) -> bool {
97        self.inner.is_success()
98    }
99
100    /// Check if response indicates client error (4xx status)
101    pub fn is_client_error(&self) -> bool {
102        self.inner.is_client_error()
103    }
104
105    /// Check if response indicates server error (5xx status)
106    pub fn is_server_error(&self) -> bool {
107        self.inner.is_server_error()
108    }
109
110    /// Get content length from headers
111    pub fn content_length(&self) -> u64 {
112        self.inner.content_length().unwrap_or(0)
113    }
114
115    /// Parse response body as JSON
116    pub fn json<T>(&self) -> Result<T>
117    where
118        T: DeserializeOwned,
119    {
120        self.inner.json()
121    }
122
123    /// Parse response body as generic JSON value
124    pub fn json_value(&self) -> Result<Value> {
125        self.inner.json_value()
126    }
127
128    /// Get the underlying modern Response
129    pub fn into_inner(self) -> Response {
130        self.inner
131    }
132
133    /// Convert to legacy Response format for backward compatibility
134    pub fn to_legacy(&self) -> crate::response::LegacyResponse {
135        self.inner.to_legacy()
136    }
137}
138
139impl IpcHttpClient {
140    /// Create a new IPC HTTP client with default configuration
141    pub fn new<P>(path: P) -> Result<Self>
142    where
143        P: AsRef<Path>,
144    {
145        Self::with_config(path, ClientConfig::default())
146    }
147
148    /// Create a new IPC HTTP client with custom configuration
149    pub fn with_config<P>(path: P, config: ClientConfig) -> Result<Self>
150    where
151        P: AsRef<Path>,
152    {
153        let name = path
154            .as_ref()
155            .to_fs_name::<GenericFilePath>()
156            .map_err(|e| KodeBridgeError::configuration(format!("Invalid path: {}", e)))?
157            .into_owned();
158
159        let pool = if config.enable_pooling {
160            Some(ConnectionPool::new(
161                name.clone(),
162                config.pool_config.clone(),
163            ))
164        } else {
165            None
166        };
167
168        Ok(Self { name, config, pool })
169    }
170
171    /// Create a direct connection (bypassing pool)
172    async fn create_direct_connection(&self) -> Result<LocalSocketStream> {
173        let mut last_error = None;
174
175        for attempt in 0..self.config.max_retries {
176            if attempt > 0 {
177                tokio::time::sleep(self.config.retry_delay).await;
178            }
179
180            match LocalSocketStream::connect(self.name.clone()).await {
181                Ok(stream) => {
182                    debug!("Created direct connection on attempt {}", attempt + 1);
183                    return Ok(stream);
184                }
185                Err(e) => {
186                    trace!("Connection attempt {} failed: {}", attempt + 1, e);
187                    last_error = Some(e);
188                }
189            }
190        }
191
192        Err(KodeBridgeError::connection(format!(
193            "Failed to create connection after {} attempts: {}",
194            self.config.max_retries,
195            last_error.unwrap()
196        )))
197    }
198
199    /// Get a connection (from pool or create new)
200    async fn get_connection(&self) -> Result<Either<PooledConnection, LocalSocketStream>> {
201        if let Some(ref pool) = self.pool {
202            pool.get_connection().await.map(Either::Pool)
203        } else {
204            self.create_direct_connection().await.map(Either::Direct)
205        }
206    }
207
208    /// Legacy request method for backward compatibility
209    pub async fn request(
210        &self,
211        method: &str,
212        path: &str,
213        body: Option<&serde_json::Value>,
214    ) -> crate::errors::AnyResult<crate::response::LegacyResponse> {
215        let response = self
216            .send_request_internal(method, path, body, self.config.default_timeout)
217            .await?;
218        Ok(response.to_legacy())
219    }
220
221    /// Internal method to send requests
222    async fn send_request_internal(
223        &self,
224        method: &str,
225        path: &str,
226        body: Option<&Value>,
227        timeout: Duration,
228    ) -> Result<Response> {
229        let method = Method::from_str(method)
230            .map_err(|e| KodeBridgeError::invalid_request(format!("Invalid method: {}", e)))?;
231
232        let mut builder = RequestBuilder::new(method, path.to_string());
233
234        if let Some(json_body) = body {
235            builder = builder.json(json_body)?;
236        }
237
238        let request = builder.build()?;
239
240        // Execute with timeout
241        let result = tokio::time::timeout(timeout, async {
242            let mut connection = self.get_connection().await?;
243
244            match &mut connection {
245                Either::Pool(conn) => {
246                    if let Some(stream) = conn.stream() {
247                        send_request(stream, request).await
248                    } else {
249                        Err(KodeBridgeError::connection("Pooled connection is invalid"))
250                    }
251                }
252                Either::Direct(stream) => send_request(stream, request).await,
253            }
254        })
255        .await;
256
257        match result {
258            Ok(response) => response,
259            Err(_) => Err(KodeBridgeError::timeout(timeout.as_millis() as u64)),
260        }
261    }
262
263    /// GET request
264    pub fn get(&self, path: &str) -> HttpRequestBuilder<'_> {
265        HttpRequestBuilder::new(self, Method::GET, path)
266    }
267
268    /// POST request
269    pub fn post(&self, path: &str) -> HttpRequestBuilder<'_> {
270        HttpRequestBuilder::new(self, Method::POST, path)
271    }
272
273    /// PUT request
274    pub fn put(&self, path: &str) -> HttpRequestBuilder<'_> {
275        HttpRequestBuilder::new(self, Method::PUT, path)
276    }
277
278    /// DELETE request
279    pub fn delete(&self, path: &str) -> HttpRequestBuilder<'_> {
280        HttpRequestBuilder::new(self, Method::DELETE, path)
281    }
282
283    /// PATCH request
284    pub fn patch(&self, path: &str) -> HttpRequestBuilder<'_> {
285        HttpRequestBuilder::new(self, Method::PATCH, path)
286    }
287
288    /// HEAD request
289    pub fn head(&self, path: &str) -> HttpRequestBuilder<'_> {
290        HttpRequestBuilder::new(self, Method::HEAD, path)
291    }
292
293    /// OPTIONS request
294    pub fn options(&self, path: &str) -> HttpRequestBuilder<'_> {
295        HttpRequestBuilder::new(self, Method::OPTIONS, path)
296    }
297
298    /// Get pool statistics (if pooling is enabled)
299    pub fn pool_stats(&self) -> Option<crate::pool::PoolStats> {
300        self.pool.as_ref().map(|p| p.stats())
301    }
302
303    /// Close the client and clean up resources
304    pub fn close(&self) {
305        if let Some(ref pool) = self.pool {
306            pool.close();
307        }
308    }
309}
310
311impl<'a> HttpRequestBuilder<'a> {
312    fn new(client: &'a IpcHttpClient, method: Method, path: &str) -> Self {
313        Self {
314            client,
315            method,
316            path: path.to_string(),
317            body: None,
318            timeout: None,
319            headers: Vec::new(),
320        }
321    }
322
323    /// Set JSON body
324    pub fn json_body(mut self, body: &Value) -> Self {
325        self.body = Some(body.clone());
326        self
327    }
328
329    /// Set custom timeout
330    pub fn timeout(mut self, timeout: Duration) -> Self {
331        self.timeout = Some(timeout);
332        self
333    }
334
335    /// Add custom header
336    pub fn header<K, V>(mut self, key: K, value: V) -> Self
337    where
338        K: Into<String>,
339        V: Into<String>,
340    {
341        self.headers.push((key.into(), value.into()));
342        self
343    }
344
345    /// Send the request
346    pub async fn send(self) -> Result<HttpResponse> {
347        let timeout = self.timeout.unwrap_or(self.client.config.default_timeout);
348        let response = self
349            .client
350            .send_request_internal(
351                self.method.as_str(),
352                &self.path,
353                self.body.as_ref(),
354                timeout,
355            )
356            .await?;
357
358        Ok(HttpResponse::new(response))
359    }
360}
361
362/// Helper enum for connection types
363enum Either<A, B> {
364    Pool(A),
365    Direct(B),
366}
367
368impl Drop for IpcHttpClient {
369    fn drop(&mut self) {
370        self.close();
371    }
372}