kode_bridge/
ipc_stream_client.rs

1use std::path::Path;
2use std::time::Duration;
3
4use interprocess::local_socket::tokio::prelude::LocalSocketStream;
5use interprocess::local_socket::traits::tokio::Stream;
6use interprocess::local_socket::{GenericFilePath, Name, ToFsName};
7use serde::de::DeserializeOwned;
8use serde_json::Value;
9
10use crate::errors::{KodeBridgeError, Result};
11use crate::http_client::RequestBuilder;
12use crate::stream_client::{StreamingResponse, send_streaming_request};
13use http::Method;
14use std::str::FromStr;
15use tracing::{debug, trace};
16
17/// Configuration for IPC streaming client
18#[derive(Debug, Clone)]
19pub struct StreamClientConfig {
20    /// Default timeout for connections
21    pub default_timeout: Duration,
22    /// Maximum number of retry attempts
23    pub max_retries: usize,
24    /// Delay between retry attempts
25    pub retry_delay: Duration,
26    /// Buffer size for streaming
27    pub buffer_size: usize,
28}
29
30impl Default for StreamClientConfig {
31    fn default() -> Self {
32        Self {
33            default_timeout: Duration::from_secs(60),
34            max_retries: 3,
35            retry_delay: Duration::from_millis(100),
36            buffer_size: 8192,
37        }
38    }
39}
40
41/// Specialized IPC streaming client for handling real-time data streams
42pub struct IpcStreamClient {
43    name: Name<'static>,
44    config: StreamClientConfig,
45}
46
47/// Stream request builder for fluent API
48pub struct StreamRequestBuilder<'a> {
49    client: &'a IpcStreamClient,
50    method: Method,
51    path: String,
52    body: Option<Value>,
53    timeout: Duration,
54    headers: Vec<(String, String)>,
55}
56
57/// Stream response wrapper with chainable methods
58pub struct StreamResponse {
59    inner: StreamingResponse,
60}
61
62impl StreamResponse {
63    fn new(response: StreamingResponse) -> Self {
64        Self { inner: response }
65    }
66
67    /// Get the HTTP status code
68    pub fn status(&self) -> u16 {
69        self.inner.status_code()
70    }
71
72    /// Check if response indicates success (2xx status)
73    pub fn is_success(&self) -> bool {
74        self.inner.is_success()
75    }
76
77    /// Check if response indicates client error (4xx status)
78    pub fn is_client_error(&self) -> bool {
79        self.inner.is_client_error()
80    }
81
82    /// Check if response indicates server error (5xx status)
83    pub fn is_server_error(&self) -> bool {
84        self.inner.is_server_error()
85    }
86
87    /// Convert stream to JSON objects with automatic parsing and timeout
88    pub async fn json_results<T>(self) -> Result<Vec<T>>
89    where
90        T: DeserializeOwned + Send,
91    {
92        self.inner.json(Duration::from_secs(30)).await
93    }
94
95    /// Convert stream to JSON objects with custom timeout
96    pub async fn json<T>(self, timeout: Duration) -> Result<Vec<T>>
97    where
98        T: DeserializeOwned + Send,
99    {
100        self.inner.json(timeout).await
101    }
102
103    /// Process stream in real-time with a handler
104    pub async fn process_lines<F>(self, timeout: Duration, mut handler: F) -> Result<()>
105    where
106        F: FnMut(&str) -> std::result::Result<(), Box<dyn std::error::Error + Send + Sync>>,
107    {
108        self.inner
109            .process_lines_with_timeout(timeout, |line| {
110                handler(line).map(|_| true) // Convert to continue flag
111            })
112            .await
113    }
114
115    /// Process stream with custom JSON processing
116    pub async fn process_json<F, T>(self, timeout: Duration, handler: F) -> Result<Vec<T>>
117    where
118        F: FnMut(&str) -> Option<T>,
119        T: Send + 'static,
120    {
121        self.inner.process_json(timeout, handler).await
122    }
123
124    /// Collect all stream content as text
125    pub async fn collect_text(self) -> Result<String> {
126        self.inner.collect_text().await
127    }
128
129    /// Collect stream content with timeout
130    pub async fn collect_text_with_timeout(self, timeout: Duration) -> Result<String> {
131        self.inner.collect_text_with_timeout(timeout).await
132    }
133
134    /// Get the underlying modern StreamingResponse
135    pub fn into_inner(self) -> StreamingResponse {
136        self.inner
137    }
138
139    /// Get headers as JSON for backward compatibility
140    pub fn headers(&self) -> Value {
141        self.inner.headers_json()
142    }
143}
144
145impl IpcStreamClient {
146    /// Create a new IPC streaming client with default configuration
147    pub fn new<P>(path: P) -> Result<Self>
148    where
149        P: AsRef<Path>,
150    {
151        Self::with_config(path, StreamClientConfig::default())
152    }
153
154    /// Create a new IPC streaming client with custom configuration
155    pub fn with_config<P>(path: P, config: StreamClientConfig) -> Result<Self>
156    where
157        P: AsRef<Path>,
158    {
159        let name = path
160            .as_ref()
161            .to_fs_name::<GenericFilePath>()
162            .map_err(|e| KodeBridgeError::configuration(format!("Invalid path: {}", e)))?
163            .into_owned();
164
165        Ok(Self { name, config })
166    }
167
168    /// Create a connection with retry logic
169    async fn create_connection(&self) -> Result<LocalSocketStream> {
170        let mut last_error = None;
171
172        for attempt in 0..self.config.max_retries {
173            if attempt > 0 {
174                tokio::time::sleep(self.config.retry_delay).await;
175            }
176
177            match LocalSocketStream::connect(self.name.clone()).await {
178                Ok(stream) => {
179                    debug!("Created streaming connection on attempt {}", attempt + 1);
180                    return Ok(stream);
181                }
182                Err(e) => {
183                    trace!("Streaming connection attempt {} failed: {}", attempt + 1, e);
184                    last_error = Some(e);
185                }
186            }
187        }
188
189        Err(KodeBridgeError::connection(format!(
190            "Failed to create streaming connection after {} attempts: {}",
191            self.config.max_retries,
192            last_error.unwrap()
193        )))
194    }
195
196    /// Internal method to send streaming requests
197    async fn send_request_internal(
198        &self,
199        method: &str,
200        path: &str,
201        body: Option<&Value>,
202        timeout: Duration,
203    ) -> Result<StreamingResponse> {
204        let method = Method::from_str(method)
205            .map_err(|e| KodeBridgeError::invalid_request(format!("Invalid method: {}", e)))?;
206
207        let mut builder = RequestBuilder::new(method, path.to_string());
208
209        if let Some(json_body) = body {
210            builder = builder.json(json_body)?;
211        }
212
213        let request = builder.build()?;
214
215        // Execute with timeout
216        let result = tokio::time::timeout(timeout, async {
217            let stream = self.create_connection().await?;
218            send_streaming_request(stream, request).await
219        })
220        .await;
221
222        match result {
223            Ok(response) => response,
224            Err(_) => Err(KodeBridgeError::timeout(timeout.as_millis() as u64)),
225        }
226    }
227
228    /// GET streaming request
229    pub fn get(&self, path: &str) -> StreamRequestBuilder<'_> {
230        StreamRequestBuilder::new(self, Method::GET, path)
231    }
232
233    /// POST streaming request
234    pub fn post(&self, path: &str) -> StreamRequestBuilder<'_> {
235        StreamRequestBuilder::new(self, Method::POST, path)
236    }
237
238    /// PUT streaming request
239    pub fn put(&self, path: &str) -> StreamRequestBuilder<'_> {
240        StreamRequestBuilder::new(self, Method::PUT, path)
241    }
242
243    /// DELETE streaming request
244    pub fn delete(&self, path: &str) -> StreamRequestBuilder<'_> {
245        StreamRequestBuilder::new(self, Method::DELETE, path)
246    }
247
248    /// PATCH streaming request
249    pub fn patch(&self, path: &str) -> StreamRequestBuilder<'_> {
250        StreamRequestBuilder::new(self, Method::PATCH, path)
251    }
252
253    /// HEAD streaming request
254    pub fn head(&self, path: &str) -> StreamRequestBuilder<'_> {
255        StreamRequestBuilder::new(self, Method::HEAD, path)
256    }
257
258    /// OPTIONS streaming request
259    pub fn options(&self, path: &str) -> StreamRequestBuilder<'_> {
260        StreamRequestBuilder::new(self, Method::OPTIONS, path)
261    }
262}
263
264impl<'a> StreamRequestBuilder<'a> {
265    fn new(client: &'a IpcStreamClient, method: Method, path: &str) -> Self {
266        Self {
267            client,
268            method,
269            path: path.to_string(),
270            body: None,
271            timeout: client.config.default_timeout,
272            headers: Vec::new(),
273        }
274    }
275
276    /// Set JSON body
277    pub fn json_body(mut self, body: &Value) -> Self {
278        self.body = Some(body.clone());
279        self
280    }
281
282    /// Set custom timeout
283    pub fn timeout(mut self, timeout: Duration) -> Self {
284        self.timeout = timeout;
285        self
286    }
287
288    /// Add custom header
289    pub fn header<K, V>(mut self, key: K, value: V) -> Self
290    where
291        K: Into<String>,
292        V: Into<String>,
293    {
294        self.headers.push((key.into(), value.into()));
295        self
296    }
297
298    /// Send the streaming request
299    pub async fn send(self) -> Result<StreamResponse> {
300        let response = self
301            .client
302            .send_request_internal(
303                self.method.as_str(),
304                &self.path,
305                self.body.as_ref(),
306                self.timeout,
307            )
308            .await?;
309
310        Ok(StreamResponse::new(response))
311    }
312
313    /// Send and get JSON results automatically (convenience method)
314    pub async fn json_results<T>(self) -> Result<Vec<T>>
315    where
316        T: DeserializeOwned + Send,
317    {
318        let response = self.send().await?;
319        response.json_results().await
320    }
321
322    /// Send and process lines with a handler (convenience method)
323    pub async fn process_lines<F>(self, handler: F) -> Result<()>
324    where
325        F: FnMut(&str) -> std::result::Result<(), Box<dyn std::error::Error + Send + Sync>>,
326    {
327        let timeout = self.timeout;
328        let response = self.send().await?;
329        response.process_lines(timeout, handler).await
330    }
331}