kode_bridge/
ipc_stream_client.rs

1use std::path::Path;
2use std::time::Duration;
3
4use interprocess::local_socket::tokio::prelude::LocalSocketStream;
5use interprocess::local_socket::traits::tokio::Stream as _;
6use interprocess::local_socket::{GenericFilePath, Name, ToFsName as _};
7use serde::de::DeserializeOwned;
8use serde_json::Value;
9
10use crate::errors::{KodeBridgeError, Result};
11use crate::http_client::RequestBuilder;
12use crate::stream_client::{send_streaming_request, StreamingResponse};
13use http::Method;
14use std::str::FromStr as _;
15use tracing::{debug, trace};
16
17/// Configuration for IPC streaming client
18#[derive(Debug, Clone)]
19pub struct StreamClientConfig {
20    /// Default timeout for connections
21    pub default_timeout: Duration,
22    /// Maximum number of retry attempts
23    pub max_retries: usize,
24    /// Delay between retry attempts
25    pub retry_delay: Duration,
26    /// Buffer size for streaming
27    pub buffer_size: usize,
28}
29
30impl Default for StreamClientConfig {
31    fn default() -> Self {
32        Self {
33            default_timeout: Duration::from_secs(60),
34            max_retries: 3,
35            retry_delay: Duration::from_millis(100),
36            buffer_size: 8192,
37        }
38    }
39}
40
41/// Specialized IPC streaming client for handling real-time data streams
42pub struct IpcStreamClient {
43    name: Name<'static>,
44    config: StreamClientConfig,
45}
46
47/// Stream request builder for fluent API
48pub struct StreamRequestBuilder<'a> {
49    client: &'a IpcStreamClient,
50    method: Method,
51    path: String,
52    body: Option<Value>,
53    timeout: Duration,
54    headers: Vec<(String, String)>,
55}
56
57/// Stream response wrapper with chainable methods
58pub struct StreamResponse {
59    inner: StreamingResponse,
60}
61
62impl StreamResponse {
63    const fn new(response: StreamingResponse) -> Self {
64        Self { inner: response }
65    }
66
67    /// Get the HTTP status code
68    pub const fn status(&self) -> u16 {
69        self.inner.status_code()
70    }
71
72    /// Check if response indicates success (2xx status)
73    pub fn is_success(&self) -> bool {
74        self.inner.is_success()
75    }
76
77    /// Check if response indicates client error (4xx status)
78    pub fn is_client_error(&self) -> bool {
79        self.inner.is_client_error()
80    }
81
82    /// Check if response indicates server error (5xx status)
83    pub fn is_server_error(&self) -> bool {
84        self.inner.is_server_error()
85    }
86
87    /// Convert stream to JSON objects with automatic parsing and timeout
88    pub async fn json_results<T>(self) -> Result<Vec<T>>
89    where
90        T: DeserializeOwned + Send,
91    {
92        self.inner.json(Duration::from_secs(30)).await
93    }
94
95    /// Convert stream to JSON objects with custom timeout
96    pub async fn json<T>(self, timeout: Duration) -> Result<Vec<T>>
97    where
98        T: DeserializeOwned + Send,
99    {
100        self.inner.json(timeout).await
101    }
102
103    /// Process stream in real-time with a handler
104    pub async fn process_lines<F>(self, timeout: Duration, mut handler: F) -> Result<()>
105    where
106        F: FnMut(&str) -> std::result::Result<(), Box<dyn std::error::Error + Send + Sync>> + Send,
107    {
108        self.inner
109            .process_lines_with_timeout(timeout, |line| {
110                handler(line).map(|_| true) // Convert to continue flag
111            })
112            .await
113    }
114
115    /// Process stream with custom JSON processing
116    pub async fn process_json<F, T>(self, timeout: Duration, handler: F) -> Result<Vec<T>>
117    where
118        F: FnMut(&str) -> Option<T> + Send,
119        T: Send + 'static,
120    {
121        self.inner.process_json(timeout, handler).await
122    }
123
124    /// Collect all stream content as text
125    pub async fn collect_text(self) -> Result<String> {
126        self.inner.collect_text().await
127    }
128
129    /// Collect stream content with timeout
130    pub async fn collect_text_with_timeout(self, timeout: Duration) -> Result<String> {
131        self.inner.collect_text_with_timeout(timeout).await
132    }
133
134    /// Get the underlying modern StreamingResponse
135    pub fn into_inner(self) -> StreamingResponse {
136        self.inner
137    }
138
139    /// Get headers as JSON for backward compatibility
140    pub fn headers(&self) -> Value {
141        self.inner.headers_json()
142    }
143}
144
145impl IpcStreamClient {
146    /// Create a new IPC streaming client with default configuration
147    pub fn new<P>(path: P) -> Result<Self>
148    where
149        P: AsRef<Path>,
150    {
151        Self::with_config(path, StreamClientConfig::default())
152    }
153
154    /// Create a new IPC streaming client with custom configuration
155    pub fn with_config<P>(path: P, config: StreamClientConfig) -> Result<Self>
156    where
157        P: AsRef<Path>,
158    {
159        let name = path
160            .as_ref()
161            .to_fs_name::<GenericFilePath>()
162            .map_err(|e| KodeBridgeError::configuration(format!("Invalid path: {}", e)))?
163            .into_owned();
164
165        Ok(Self { name, config })
166    }
167
168    /// Create a connection with retry logic
169    async fn create_connection(&self) -> Result<LocalSocketStream> {
170        let mut last_error = None;
171
172        for attempt in 0..self.config.max_retries {
173            if attempt > 0 {
174                tokio::time::sleep(self.config.retry_delay).await;
175            }
176
177            match LocalSocketStream::connect(self.name.clone()).await {
178                Ok(stream) => {
179                    debug!("Created streaming connection on attempt {}", attempt + 1);
180                    return Ok(stream);
181                }
182                Err(e) => {
183                    trace!("Streaming connection attempt {} failed: {}", attempt + 1, e);
184                    last_error = Some(e);
185                }
186            }
187        }
188
189        Err(KodeBridgeError::connection(format!(
190            "Failed to create streaming connection after {} attempts: {}",
191            self.config.max_retries,
192            last_error
193                .map(|e| e.to_string())
194                .unwrap_or_else(|| "Unknown error".to_string())
195        )))
196    }
197
198    /// Internal method to send streaming requests
199    async fn send_request_internal(
200        &self,
201        method: &str,
202        path: &str,
203        body: Option<&Value>,
204        timeout: Duration,
205    ) -> Result<StreamingResponse> {
206        let method =
207            Method::from_str(method).map_err(|e| KodeBridgeError::invalid_request(format!("Invalid method: {}", e)))?;
208
209        let mut builder = RequestBuilder::new(method, path.to_string());
210
211        if let Some(json_body) = body {
212            builder = builder.json(json_body)?;
213        }
214
215        let request = builder.build()?;
216
217        // Execute with timeout
218        let result = tokio::time::timeout(timeout, async {
219            let stream = self.create_connection().await?;
220            send_streaming_request(stream, request).await
221        })
222        .await;
223
224        match result {
225            Ok(response) => response,
226            Err(_) => Err(KodeBridgeError::timeout(timeout.as_millis() as u64)),
227        }
228    }
229
230    /// GET streaming request
231    pub fn get(&self, path: &str) -> StreamRequestBuilder<'_> {
232        StreamRequestBuilder::new(self, Method::GET, path)
233    }
234
235    /// POST streaming request
236    pub fn post(&self, path: &str) -> StreamRequestBuilder<'_> {
237        StreamRequestBuilder::new(self, Method::POST, path)
238    }
239
240    /// PUT streaming request
241    pub fn put(&self, path: &str) -> StreamRequestBuilder<'_> {
242        StreamRequestBuilder::new(self, Method::PUT, path)
243    }
244
245    /// DELETE streaming request
246    pub fn delete(&self, path: &str) -> StreamRequestBuilder<'_> {
247        StreamRequestBuilder::new(self, Method::DELETE, path)
248    }
249
250    /// PATCH streaming request
251    pub fn patch(&self, path: &str) -> StreamRequestBuilder<'_> {
252        StreamRequestBuilder::new(self, Method::PATCH, path)
253    }
254
255    /// HEAD streaming request
256    pub fn head(&self, path: &str) -> StreamRequestBuilder<'_> {
257        StreamRequestBuilder::new(self, Method::HEAD, path)
258    }
259
260    /// OPTIONS streaming request
261    pub fn options(&self, path: &str) -> StreamRequestBuilder<'_> {
262        StreamRequestBuilder::new(self, Method::OPTIONS, path)
263    }
264}
265
266impl<'a> StreamRequestBuilder<'a> {
267    fn new(client: &'a IpcStreamClient, method: Method, path: &str) -> Self {
268        Self {
269            client,
270            method,
271            path: path.to_string(),
272            body: None,
273            timeout: client.config.default_timeout,
274            headers: Vec::new(),
275        }
276    }
277
278    /// Set JSON body
279    pub fn json_body(mut self, body: &Value) -> Self {
280        self.body = Some(body.clone());
281        self
282    }
283
284    /// Set custom timeout
285    pub const fn timeout(mut self, timeout: Duration) -> Self {
286        self.timeout = timeout;
287        self
288    }
289
290    /// Add custom header
291    pub fn header<K, V>(mut self, key: K, value: V) -> Self
292    where
293        K: Into<String>,
294        V: Into<String>,
295    {
296        self.headers.push((key.into(), value.into()));
297        self
298    }
299
300    /// Send the streaming request
301    pub async fn send(self) -> Result<StreamResponse> {
302        let response = self
303            .client
304            .send_request_internal(self.method.as_str(), &self.path, self.body.as_ref(), self.timeout)
305            .await?;
306
307        Ok(StreamResponse::new(response))
308    }
309
310    /// Send and get JSON results automatically (convenience method)
311    pub async fn json_results<T>(self) -> Result<Vec<T>>
312    where
313        T: DeserializeOwned + Send,
314    {
315        let response = self.send().await?;
316        response.json_results().await
317    }
318
319    /// Send and process lines with a handler (convenience method)
320    pub async fn process_lines<F>(self, handler: F) -> Result<()>
321    where
322        F: FnMut(&str) -> std::result::Result<(), Box<dyn std::error::Error + Send + Sync>> + Send,
323    {
324        let timeout = self.timeout;
325        let response = self.send().await?;
326        response.process_lines(timeout, handler).await
327    }
328}