1use std::path::Path;
2use std::time::Duration;
3
4use interprocess::local_socket::tokio::prelude::LocalSocketStream;
5use interprocess::local_socket::traits::tokio::Stream;
6use interprocess::local_socket::{GenericFilePath, Name, ToFsName};
7use serde::de::DeserializeOwned;
8use serde_json::Value;
9
10use crate::errors::{KodeBridgeError, Result};
11use crate::http_client::RequestBuilder;
12use crate::stream_client::{StreamingResponse, send_streaming_request};
13use http::Method;
14use std::str::FromStr;
15use tracing::{debug, trace};
16
17#[derive(Debug, Clone)]
19pub struct StreamClientConfig {
20 pub default_timeout: Duration,
22 pub max_retries: usize,
24 pub retry_delay: Duration,
26 pub buffer_size: usize,
28}
29
30impl Default for StreamClientConfig {
31 fn default() -> Self {
32 Self {
33 default_timeout: Duration::from_secs(60),
34 max_retries: 3,
35 retry_delay: Duration::from_millis(100),
36 buffer_size: 8192,
37 }
38 }
39}
40
41pub struct IpcStreamClient {
43 name: Name<'static>,
44 config: StreamClientConfig,
45}
46
47pub struct StreamRequestBuilder<'a> {
49 client: &'a IpcStreamClient,
50 method: Method,
51 path: String,
52 body: Option<Value>,
53 timeout: Duration,
54 headers: Vec<(String, String)>,
55}
56
57pub struct StreamResponse {
59 inner: StreamingResponse,
60}
61
62impl StreamResponse {
63 fn new(response: StreamingResponse) -> Self {
64 Self { inner: response }
65 }
66
67 pub fn status(&self) -> u16 {
69 self.inner.status_code()
70 }
71
72 pub fn is_success(&self) -> bool {
74 self.inner.is_success()
75 }
76
77 pub fn is_client_error(&self) -> bool {
79 self.inner.is_client_error()
80 }
81
82 pub fn is_server_error(&self) -> bool {
84 self.inner.is_server_error()
85 }
86
87 pub async fn json_results<T>(self) -> Result<Vec<T>>
89 where
90 T: DeserializeOwned + Send,
91 {
92 self.inner.json(Duration::from_secs(30)).await
93 }
94
95 pub async fn json<T>(self, timeout: Duration) -> Result<Vec<T>>
97 where
98 T: DeserializeOwned + Send,
99 {
100 self.inner.json(timeout).await
101 }
102
103 pub async fn process_lines<F>(self, timeout: Duration, mut handler: F) -> Result<()>
105 where
106 F: FnMut(&str) -> std::result::Result<(), Box<dyn std::error::Error + Send + Sync>>,
107 {
108 self.inner
109 .process_lines_with_timeout(timeout, |line| {
110 handler(line).map(|_| true) })
112 .await
113 }
114
115 pub async fn process_json<F, T>(self, timeout: Duration, handler: F) -> Result<Vec<T>>
117 where
118 F: FnMut(&str) -> Option<T>,
119 T: Send + 'static,
120 {
121 self.inner.process_json(timeout, handler).await
122 }
123
124 pub async fn collect_text(self) -> Result<String> {
126 self.inner.collect_text().await
127 }
128
129 pub async fn collect_text_with_timeout(self, timeout: Duration) -> Result<String> {
131 self.inner.collect_text_with_timeout(timeout).await
132 }
133
134 pub fn into_inner(self) -> StreamingResponse {
136 self.inner
137 }
138
139 pub fn headers(&self) -> Value {
141 self.inner.headers_json()
142 }
143}
144
145impl IpcStreamClient {
146 pub fn new<P>(path: P) -> Result<Self>
148 where
149 P: AsRef<Path>,
150 {
151 Self::with_config(path, StreamClientConfig::default())
152 }
153
154 pub fn with_config<P>(path: P, config: StreamClientConfig) -> Result<Self>
156 where
157 P: AsRef<Path>,
158 {
159 let name = path
160 .as_ref()
161 .to_fs_name::<GenericFilePath>()
162 .map_err(|e| KodeBridgeError::configuration(format!("Invalid path: {}", e)))?
163 .into_owned();
164
165 Ok(Self { name, config })
166 }
167
168 async fn create_connection(&self) -> Result<LocalSocketStream> {
170 let mut last_error = None;
171
172 for attempt in 0..self.config.max_retries {
173 if attempt > 0 {
174 tokio::time::sleep(self.config.retry_delay).await;
175 }
176
177 match LocalSocketStream::connect(self.name.clone()).await {
178 Ok(stream) => {
179 debug!("Created streaming connection on attempt {}", attempt + 1);
180 return Ok(stream);
181 }
182 Err(e) => {
183 trace!("Streaming connection attempt {} failed: {}", attempt + 1, e);
184 last_error = Some(e);
185 }
186 }
187 }
188
189 Err(KodeBridgeError::connection(format!(
190 "Failed to create streaming connection after {} attempts: {}",
191 self.config.max_retries,
192 last_error.unwrap()
193 )))
194 }
195
196 async fn send_request_internal(
198 &self,
199 method: &str,
200 path: &str,
201 body: Option<&Value>,
202 timeout: Duration,
203 ) -> Result<StreamingResponse> {
204 let method = Method::from_str(method)
205 .map_err(|e| KodeBridgeError::invalid_request(format!("Invalid method: {}", e)))?;
206
207 let mut builder = RequestBuilder::new(method, path.to_string());
208
209 if let Some(json_body) = body {
210 builder = builder.json(json_body)?;
211 }
212
213 let request = builder.build()?;
214
215 let result = tokio::time::timeout(timeout, async {
217 let stream = self.create_connection().await?;
218 send_streaming_request(stream, request).await
219 })
220 .await;
221
222 match result {
223 Ok(response) => response,
224 Err(_) => Err(KodeBridgeError::timeout(timeout.as_millis() as u64)),
225 }
226 }
227
228 pub fn get(&self, path: &str) -> StreamRequestBuilder<'_> {
230 StreamRequestBuilder::new(self, Method::GET, path)
231 }
232
233 pub fn post(&self, path: &str) -> StreamRequestBuilder<'_> {
235 StreamRequestBuilder::new(self, Method::POST, path)
236 }
237
238 pub fn put(&self, path: &str) -> StreamRequestBuilder<'_> {
240 StreamRequestBuilder::new(self, Method::PUT, path)
241 }
242
243 pub fn delete(&self, path: &str) -> StreamRequestBuilder<'_> {
245 StreamRequestBuilder::new(self, Method::DELETE, path)
246 }
247
248 pub fn patch(&self, path: &str) -> StreamRequestBuilder<'_> {
250 StreamRequestBuilder::new(self, Method::PATCH, path)
251 }
252
253 pub fn head(&self, path: &str) -> StreamRequestBuilder<'_> {
255 StreamRequestBuilder::new(self, Method::HEAD, path)
256 }
257
258 pub fn options(&self, path: &str) -> StreamRequestBuilder<'_> {
260 StreamRequestBuilder::new(self, Method::OPTIONS, path)
261 }
262}
263
264impl<'a> StreamRequestBuilder<'a> {
265 fn new(client: &'a IpcStreamClient, method: Method, path: &str) -> Self {
266 Self {
267 client,
268 method,
269 path: path.to_string(),
270 body: None,
271 timeout: client.config.default_timeout,
272 headers: Vec::new(),
273 }
274 }
275
276 pub fn json_body(mut self, body: &Value) -> Self {
278 self.body = Some(body.clone());
279 self
280 }
281
282 pub fn timeout(mut self, timeout: Duration) -> Self {
284 self.timeout = timeout;
285 self
286 }
287
288 pub fn header<K, V>(mut self, key: K, value: V) -> Self
290 where
291 K: Into<String>,
292 V: Into<String>,
293 {
294 self.headers.push((key.into(), value.into()));
295 self
296 }
297
298 pub async fn send(self) -> Result<StreamResponse> {
300 let response = self
301 .client
302 .send_request_internal(
303 self.method.as_str(),
304 &self.path,
305 self.body.as_ref(),
306 self.timeout,
307 )
308 .await?;
309
310 Ok(StreamResponse::new(response))
311 }
312
313 pub async fn json_results<T>(self) -> Result<Vec<T>>
315 where
316 T: DeserializeOwned + Send,
317 {
318 let response = self.send().await?;
319 response.json_results().await
320 }
321
322 pub async fn process_lines<F>(self, handler: F) -> Result<()>
324 where
325 F: FnMut(&str) -> std::result::Result<(), Box<dyn std::error::Error + Send + Sync>>,
326 {
327 let timeout = self.timeout;
328 let response = self.send().await?;
329 response.process_lines(timeout, handler).await
330 }
331}