1use std::path::Path;
2use std::time::Duration;
3
4use interprocess::local_socket::tokio::prelude::LocalSocketStream;
5use interprocess::local_socket::traits::tokio::Stream as _;
6use interprocess::local_socket::{GenericFilePath, Name, ToFsName as _};
7use serde::de::DeserializeOwned;
8use serde_json::Value;
9
10use crate::errors::{KodeBridgeError, Result};
11use crate::http_client::RequestBuilder;
12use crate::stream_client::{send_streaming_request, StreamingResponse};
13use http::Method;
14use std::str::FromStr as _;
15use tracing::{debug, trace};
16
17#[derive(Debug, Clone)]
19pub struct StreamClientConfig {
20 pub default_timeout: Duration,
22 pub max_retries: usize,
24 pub retry_delay: Duration,
26 pub buffer_size: usize,
28}
29
30impl Default for StreamClientConfig {
31 fn default() -> Self {
32 Self {
33 default_timeout: Duration::from_secs(60),
34 max_retries: 3,
35 retry_delay: Duration::from_millis(100),
36 buffer_size: 8192,
37 }
38 }
39}
40
41pub struct IpcStreamClient {
43 name: Name<'static>,
44 config: StreamClientConfig,
45}
46
47pub struct StreamRequestBuilder<'a> {
49 client: &'a IpcStreamClient,
50 method: Method,
51 path: String,
52 body: Option<Value>,
53 timeout: Duration,
54 headers: Vec<(String, String)>,
55}
56
57pub struct StreamResponse {
59 inner: StreamingResponse,
60}
61
62impl StreamResponse {
63 const fn new(response: StreamingResponse) -> Self {
64 Self { inner: response }
65 }
66
67 pub const fn status(&self) -> u16 {
69 self.inner.status_code()
70 }
71
72 pub fn is_success(&self) -> bool {
74 self.inner.is_success()
75 }
76
77 pub fn is_client_error(&self) -> bool {
79 self.inner.is_client_error()
80 }
81
82 pub fn is_server_error(&self) -> bool {
84 self.inner.is_server_error()
85 }
86
87 pub async fn json_results<T>(self) -> Result<Vec<T>>
89 where
90 T: DeserializeOwned + Send,
91 {
92 self.inner.json(Duration::from_secs(30)).await
93 }
94
95 pub async fn json<T>(self, timeout: Duration) -> Result<Vec<T>>
97 where
98 T: DeserializeOwned + Send,
99 {
100 self.inner.json(timeout).await
101 }
102
103 pub async fn process_lines<F>(self, timeout: Duration, mut handler: F) -> Result<()>
105 where
106 F: FnMut(&str) -> std::result::Result<(), Box<dyn std::error::Error + Send + Sync>> + Send,
107 {
108 self.inner
109 .process_lines_with_timeout(timeout, |line| {
110 handler(line).map(|_| true) })
112 .await
113 }
114
115 pub async fn process_json<F, T>(self, timeout: Duration, handler: F) -> Result<Vec<T>>
117 where
118 F: FnMut(&str) -> Option<T> + Send,
119 T: Send + 'static,
120 {
121 self.inner.process_json(timeout, handler).await
122 }
123
124 pub async fn collect_text(self) -> Result<String> {
126 self.inner.collect_text().await
127 }
128
129 pub async fn collect_text_with_timeout(self, timeout: Duration) -> Result<String> {
131 self.inner.collect_text_with_timeout(timeout).await
132 }
133
134 pub fn into_inner(self) -> StreamingResponse {
136 self.inner
137 }
138
139 pub fn headers(&self) -> Value {
141 self.inner.headers_json()
142 }
143}
144
145impl IpcStreamClient {
146 pub fn new<P>(path: P) -> Result<Self>
148 where
149 P: AsRef<Path>,
150 {
151 Self::with_config(path, StreamClientConfig::default())
152 }
153
154 pub fn with_config<P>(path: P, config: StreamClientConfig) -> Result<Self>
156 where
157 P: AsRef<Path>,
158 {
159 let name = path
160 .as_ref()
161 .to_fs_name::<GenericFilePath>()
162 .map_err(|e| KodeBridgeError::configuration(format!("Invalid path: {}", e)))?
163 .into_owned();
164
165 Ok(Self { name, config })
166 }
167
168 async fn create_connection(&self) -> Result<LocalSocketStream> {
170 let mut last_error = None;
171
172 for attempt in 0..self.config.max_retries {
173 if attempt > 0 {
174 tokio::time::sleep(self.config.retry_delay).await;
175 }
176
177 match LocalSocketStream::connect(self.name.clone()).await {
178 Ok(stream) => {
179 debug!("Created streaming connection on attempt {}", attempt + 1);
180 return Ok(stream);
181 }
182 Err(e) => {
183 trace!("Streaming connection attempt {} failed: {}", attempt + 1, e);
184 last_error = Some(e);
185 }
186 }
187 }
188
189 Err(KodeBridgeError::connection(format!(
190 "Failed to create streaming connection after {} attempts: {}",
191 self.config.max_retries,
192 last_error
193 .map(|e| e.to_string())
194 .unwrap_or_else(|| "Unknown error".to_string())
195 )))
196 }
197
198 async fn send_request_internal(
200 &self,
201 method: &str,
202 path: &str,
203 body: Option<&Value>,
204 timeout: Duration,
205 ) -> Result<StreamingResponse> {
206 let method =
207 Method::from_str(method).map_err(|e| KodeBridgeError::invalid_request(format!("Invalid method: {}", e)))?;
208
209 let mut builder = RequestBuilder::new(method, path.to_string());
210
211 if let Some(json_body) = body {
212 builder = builder.json(json_body)?;
213 }
214
215 let request = builder.build()?;
216
217 let result = tokio::time::timeout(timeout, async {
219 let stream = self.create_connection().await?;
220 send_streaming_request(stream, request).await
221 })
222 .await;
223
224 match result {
225 Ok(response) => response,
226 Err(_) => Err(KodeBridgeError::timeout(timeout.as_millis() as u64)),
227 }
228 }
229
230 pub fn get(&self, path: &str) -> StreamRequestBuilder<'_> {
232 StreamRequestBuilder::new(self, Method::GET, path)
233 }
234
235 pub fn post(&self, path: &str) -> StreamRequestBuilder<'_> {
237 StreamRequestBuilder::new(self, Method::POST, path)
238 }
239
240 pub fn put(&self, path: &str) -> StreamRequestBuilder<'_> {
242 StreamRequestBuilder::new(self, Method::PUT, path)
243 }
244
245 pub fn delete(&self, path: &str) -> StreamRequestBuilder<'_> {
247 StreamRequestBuilder::new(self, Method::DELETE, path)
248 }
249
250 pub fn patch(&self, path: &str) -> StreamRequestBuilder<'_> {
252 StreamRequestBuilder::new(self, Method::PATCH, path)
253 }
254
255 pub fn head(&self, path: &str) -> StreamRequestBuilder<'_> {
257 StreamRequestBuilder::new(self, Method::HEAD, path)
258 }
259
260 pub fn options(&self, path: &str) -> StreamRequestBuilder<'_> {
262 StreamRequestBuilder::new(self, Method::OPTIONS, path)
263 }
264}
265
266impl<'a> StreamRequestBuilder<'a> {
267 fn new(client: &'a IpcStreamClient, method: Method, path: &str) -> Self {
268 Self {
269 client,
270 method,
271 path: path.to_string(),
272 body: None,
273 timeout: client.config.default_timeout,
274 headers: Vec::new(),
275 }
276 }
277
278 pub fn json_body(mut self, body: &Value) -> Self {
280 self.body = Some(body.clone());
281 self
282 }
283
284 pub const fn timeout(mut self, timeout: Duration) -> Self {
286 self.timeout = timeout;
287 self
288 }
289
290 pub fn header<K, V>(mut self, key: K, value: V) -> Self
292 where
293 K: Into<String>,
294 V: Into<String>,
295 {
296 self.headers.push((key.into(), value.into()));
297 self
298 }
299
300 pub async fn send(self) -> Result<StreamResponse> {
302 let response = self
303 .client
304 .send_request_internal(self.method.as_str(), &self.path, self.body.as_ref(), self.timeout)
305 .await?;
306
307 Ok(StreamResponse::new(response))
308 }
309
310 pub async fn json_results<T>(self) -> Result<Vec<T>>
312 where
313 T: DeserializeOwned + Send,
314 {
315 let response = self.send().await?;
316 response.json_results().await
317 }
318
319 pub async fn process_lines<F>(self, handler: F) -> Result<()>
321 where
322 F: FnMut(&str) -> std::result::Result<(), Box<dyn std::error::Error + Send + Sync>> + Send,
323 {
324 let timeout = self.timeout;
325 let response = self.send().await?;
326 response.process_lines(timeout, handler).await
327 }
328}