kode_bridge/
stream_client.rs1use crate::errors::{KodeBridgeError, Result};
2use crate::http_client::RequestBuilder;
3use bytes::Bytes;
4use futures::stream::StreamExt;
5use http::{HeaderMap, Method, StatusCode, header};
6use pin_project_lite::pin_project;
7use serde::de::DeserializeOwned;
8use serde_json::Value;
9use std::collections::HashMap;
10use std::pin::Pin;
11use std::str::FromStr;
12use std::task::{Context, Poll};
13use std::time::Duration;
14use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader};
15use tokio_stream::Stream;
16use tokio_util::codec::{FramedRead, LinesCodec};
17use tracing::{debug, trace, warn};
18
19pin_project! {
20 pub struct StreamingResponse {
22 pub status: StatusCode,
23 pub headers: HeaderMap,
24 #[pin]
25 pub stream: Pin<Box<dyn Stream<Item = std::result::Result<String, std::io::Error>> + Send>>,
26 }
27}
28
29impl StreamingResponse {
30 pub fn new(
31 status: StatusCode,
32 headers: HeaderMap,
33 stream: Pin<Box<dyn Stream<Item = std::result::Result<String, std::io::Error>> + Send>>,
34 ) -> Self {
35 Self {
36 status,
37 headers,
38 stream,
39 }
40 }
41
42 pub fn status(&self) -> StatusCode {
44 self.status
45 }
46
47 pub fn status_code(&self) -> u16 {
49 self.status.as_u16()
50 }
51
52 pub fn headers(&self) -> &HeaderMap {
54 &self.headers
55 }
56
57 pub fn is_success(&self) -> bool {
59 self.status.is_success()
60 }
61
62 pub fn is_client_error(&self) -> bool {
64 self.status.is_client_error()
65 }
66
67 pub fn is_server_error(&self) -> bool {
69 self.status.is_server_error()
70 }
71
72 pub fn content_length(&self) -> Option<u64> {
74 self.headers
75 .get(header::CONTENT_LENGTH)?
76 .to_str()
77 .ok()?
78 .parse()
79 .ok()
80 }
81
82 pub fn content_type(&self) -> Option<&str> {
84 self.headers.get(header::CONTENT_TYPE)?.to_str().ok()
85 }
86
87 pub async fn json<T>(mut self, timeout: Duration) -> Result<Vec<T>>
89 where
90 T: DeserializeOwned + Send,
91 {
92 let mut results = Vec::new();
93 let timeout_future = tokio::time::sleep(timeout);
94 tokio::pin!(timeout_future);
95
96 loop {
97 tokio::select! {
98 line_result = self.stream.next() => {
99 match line_result {
100 Some(Ok(line)) => {
101 if line.trim().is_empty() {
102 continue;
103 }
104 if let Ok(parsed) = serde_json::from_str::<T>(&line) {
106 results.push(parsed);
107 } else {
108 trace!("Failed to parse JSON line: {}", line);
109 }
110 }
111 Some(Err(e)) => {
112 warn!("Stream error: {}", e);
113 break;
114 }
115 None => break,
116 }
117 }
118 _ = &mut timeout_future => {
119 debug!("Stream timeout reached after {}ms", timeout.as_millis());
120 break;
121 }
122 }
123 }
124
125 Ok(results)
126 }
127
128 pub async fn process_json<F, T>(mut self, timeout: Duration, mut handler: F) -> Result<Vec<T>>
130 where
131 F: FnMut(&str) -> Option<T>,
132 T: Send + 'static,
133 {
134 let mut results = Vec::new();
135 let timeout_future = tokio::time::sleep(timeout);
136 tokio::pin!(timeout_future);
137
138 loop {
139 tokio::select! {
140 line_result = self.stream.next() => {
141 match line_result {
142 Some(Ok(line)) => {
143 if let Some(parsed) = handler(&line) {
144 results.push(parsed);
145 }
146 }
147 Some(Err(e)) => {
148 warn!("Stream error: {}", e);
149 break;
150 }
151 None => break,
152 }
153 }
154 _ = &mut timeout_future => break,
155 }
156 }
157
158 Ok(results)
159 }
160
161 pub async fn process_lines<F>(mut self, mut handler: F) -> Result<()>
163 where
164 F: FnMut(&str) -> std::result::Result<(), Box<dyn std::error::Error + Send + Sync>>,
165 {
166 while let Some(line_result) = self.stream.next().await {
167 match line_result {
168 Ok(line) => {
169 if let Err(e) = handler(&line) {
170 warn!("Handler error: {}", e);
171 return Err(KodeBridgeError::custom(format!("Handler error: {}", e)));
172 }
173 }
174 Err(e) => {
175 warn!("Stream error: {}", e);
176 return Err(KodeBridgeError::from(e));
177 }
178 }
179 }
180 Ok(())
181 }
182
183 pub async fn process_lines_with_timeout<F>(
185 mut self,
186 timeout: Duration,
187 mut handler: F,
188 ) -> Result<()>
189 where
190 F: FnMut(&str) -> std::result::Result<bool, Box<dyn std::error::Error + Send + Sync>>, {
192 let timeout_future = tokio::time::sleep(timeout);
193 tokio::pin!(timeout_future);
194
195 loop {
196 tokio::select! {
197 line_result = self.stream.next() => {
198 match line_result {
199 Some(Ok(line)) => {
200 match handler(&line) {
201 Ok(continue_processing) => {
202 if !continue_processing {
203 break;
204 }
205 }
206 Err(e) => {
207 warn!("Handler error: {}", e);
208 return Err(KodeBridgeError::custom(format!("Handler error: {}", e)));
209 }
210 }
211 }
212 Some(Err(e)) => {
213 warn!("Stream error: {}", e);
214 return Err(KodeBridgeError::from(e));
215 }
216 None => break,
217 }
218 }
219 _ = &mut timeout_future => {
220 debug!("Processing timeout reached");
221 break;
222 }
223 }
224 }
225
226 Ok(())
227 }
228
229 pub async fn collect_text(mut self) -> Result<String> {
231 let mut body_lines = Vec::new();
232
233 while let Some(line_result) = self.stream.next().await {
234 match line_result {
235 Ok(line) => body_lines.push(line),
236 Err(e) => return Err(KodeBridgeError::from(e)),
237 }
238 }
239
240 Ok(body_lines.join("\n"))
241 }
242
243 pub async fn collect_text_with_timeout(mut self, timeout: Duration) -> Result<String> {
245 let mut body_lines = Vec::new();
246 let timeout_future = tokio::time::sleep(timeout);
247 tokio::pin!(timeout_future);
248
249 loop {
250 tokio::select! {
251 line_result = self.stream.next() => {
252 match line_result {
253 Some(Ok(line)) => body_lines.push(line),
254 Some(Err(e)) => return Err(KodeBridgeError::from(e)),
255 None => break, }
257 }
258 _ = &mut timeout_future => {
259 debug!("Collection timeout reached");
260 break; }
262 }
263 }
264
265 Ok(body_lines.join("\n"))
266 }
267
268 pub fn status_u16(&self) -> u16 {
270 self.status.as_u16()
271 }
272
273 pub fn headers_json(&self) -> Value {
275 let headers_map: HashMap<String, String> = self
276 .headers
277 .iter()
278 .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
279 .collect();
280 serde_json::to_value(headers_map).unwrap_or(Value::Null)
281 }
282}
283
284impl Stream for StreamingResponse {
285 type Item = std::result::Result<String, std::io::Error>;
286
287 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
288 let this = self.project();
289 this.stream.poll_next(cx)
290 }
291}
292
293pub async fn parse_streaming_response<S>(stream: S) -> Result<StreamingResponse>
295where
296 S: AsyncRead + Unpin + Send + 'static,
297{
298 let mut reader = BufReader::new(stream);
299 let mut buffer = Vec::new();
300
301 let mut headers_end = None;
303 loop {
304 let mut line = Vec::new();
305 let n = reader.read_until(b'\n', &mut line).await?;
306 if n == 0 {
307 return Err(KodeBridgeError::protocol("Unexpected end of stream"));
308 }
309
310 buffer.extend_from_slice(&line);
311
312 if buffer.len() >= 4 {
314 for i in 0..buffer.len() - 3 {
315 if &buffer[i..i + 4] == b"\r\n\r\n" {
316 headers_end = Some(i + 4);
317 break;
318 }
319 }
320 }
321
322 if headers_end.is_some() {
323 break;
324 }
325 }
326
327 let headers_end = headers_end
328 .ok_or_else(|| KodeBridgeError::protocol("Could not find end of HTTP headers"))?;
329
330 let mut headers = [httparse::EMPTY_HEADER; 64];
332 let mut response = httparse::Response::new(&mut headers);
333
334 let status = match response.parse(&buffer[..headers_end])? {
335 httparse::Status::Complete(_) => response.code.unwrap(),
336 httparse::Status::Partial => {
337 return Err(KodeBridgeError::protocol("Incomplete HTTP response"));
338 }
339 };
340
341 let mut header_map = HeaderMap::new();
343 for header in response.headers {
344 let name =
345 http::HeaderName::from_str(header.name).map_err(|e| KodeBridgeError::Http(e.into()))?;
346 let value = http::HeaderValue::from_bytes(header.value)
347 .map_err(|e| KodeBridgeError::Http(e.into()))?;
348 header_map.insert(name, value);
349 }
350
351 let framed = FramedRead::new(reader, LinesCodec::new());
353 let line_stream = framed.map(|result| result.map_err(std::io::Error::other));
354
355 Ok(StreamingResponse::new(
356 StatusCode::from_u16(status)?,
357 header_map,
358 Box::pin(line_stream),
359 ))
360}
361
362pub async fn send_streaming_request<S>(mut stream: S, request: Bytes) -> Result<StreamingResponse>
364where
365 S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
366{
367 stream.write_all(&request).await?;
369 stream.flush().await?;
370
371 trace!("Sent HTTP streaming request ({} bytes)", request.len());
372
373 let response = parse_streaming_response(stream).await?;
375
376 debug!(
377 "Received HTTP streaming response: {} {}",
378 response.status(),
379 response.content_length().unwrap_or(0)
380 );
381
382 Ok(response)
383}
384
385#[allow(dead_code)]
387pub async fn http_request_stream<S>(
388 stream: S,
389 method: &str,
390 path: &str,
391 body: Option<&Value>,
392) -> Result<StreamingResponse>
393where
394 S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
395{
396 let method = Method::from_str(method).map_err(|e| KodeBridgeError::Http(e.into()))?;
397
398 let mut builder = RequestBuilder::new(method, path.to_string());
399
400 if let Some(json_body) = body {
401 builder = builder.json(json_body)?;
402 }
403
404 let request = builder.build()?;
405 send_streaming_request(stream, request).await
406}