kode_bridge/
stream_client.rs1use crate::errors::{KodeBridgeError, Result};
2use bytes::Bytes;
3use futures::stream::StreamExt as _;
4use http::{header, HeaderMap, StatusCode};
5use pin_project_lite::pin_project;
6use serde::de::DeserializeOwned;
7use serde_json::Value;
8use std::collections::HashMap;
9use std::pin::Pin;
10use std::str::FromStr as _;
11use std::task::{Context, Poll};
12use std::time::Duration;
13use tokio::io::{AsyncBufReadExt as _, AsyncRead, AsyncWrite, AsyncWriteExt as _, BufReader};
14use tokio_stream::Stream;
15use tokio_util::codec::{FramedRead, LinesCodec};
16use tracing::{debug, trace, warn};
17
18pin_project! {
19 pub struct StreamingResponse {
21 pub status: StatusCode,
22 pub headers: HeaderMap,
23 #[pin]
24 pub stream: Pin<Box<dyn Stream<Item = std::result::Result<String, std::io::Error>> + Send>>,
25 }
26}
27
28impl StreamingResponse {
29 pub fn new(
30 status: StatusCode,
31 headers: HeaderMap,
32 stream: Pin<Box<dyn Stream<Item = std::result::Result<String, std::io::Error>> + Send>>,
33 ) -> Self {
34 Self {
35 status,
36 headers,
37 stream,
38 }
39 }
40
41 pub const fn status(&self) -> StatusCode {
43 self.status
44 }
45
46 pub const fn status_code(&self) -> u16 {
48 self.status.as_u16()
49 }
50
51 pub const fn headers(&self) -> &HeaderMap {
53 &self.headers
54 }
55
56 pub fn is_success(&self) -> bool {
58 self.status.is_success()
59 }
60
61 pub fn is_client_error(&self) -> bool {
63 self.status.is_client_error()
64 }
65
66 pub fn is_server_error(&self) -> bool {
68 self.status.is_server_error()
69 }
70
71 pub fn content_length(&self) -> Option<u64> {
73 self.headers
74 .get(header::CONTENT_LENGTH)?
75 .to_str()
76 .ok()?
77 .parse()
78 .ok()
79 }
80
81 pub fn content_type(&self) -> Option<&str> {
83 self.headers.get(header::CONTENT_TYPE)?.to_str().ok()
84 }
85
86 pub async fn json<T>(mut self, timeout: Duration) -> Result<Vec<T>>
88 where
89 T: DeserializeOwned + Send,
90 {
91 let mut results = Vec::new();
92 let timeout_future = tokio::time::sleep(timeout);
93 tokio::pin!(timeout_future);
94
95 loop {
96 tokio::select! {
97 line_result = self.stream.next() => {
98 match line_result {
99 Some(Ok(line)) => {
100 if line.trim().is_empty() {
101 continue;
102 }
103 if let Ok(parsed) = serde_json::from_str::<T>(&line) {
105 results.push(parsed);
106 } else {
107 trace!("Failed to parse JSON line: {}", line);
108 }
109 }
110 Some(Err(e)) => {
111 warn!("Stream error: {}", e);
112 break;
113 }
114 None => break,
115 }
116 }
117 _ = &mut timeout_future => {
118 debug!("Stream timeout reached after {}ms", timeout.as_millis());
119 break;
120 }
121 }
122 }
123
124 Ok(results)
125 }
126
127 pub async fn process_json<F, T>(mut self, timeout: Duration, mut handler: F) -> Result<Vec<T>>
129 where
130 F: FnMut(&str) -> Option<T> + Send,
131 T: Send + 'static,
132 {
133 let mut results = Vec::new();
134 let timeout_future = tokio::time::sleep(timeout);
135 tokio::pin!(timeout_future);
136
137 loop {
138 tokio::select! {
139 line_result = self.stream.next() => {
140 match line_result {
141 Some(Ok(line)) => {
142 if let Some(parsed) = handler(&line) {
143 results.push(parsed);
144 }
145 }
146 Some(Err(e)) => {
147 warn!("Stream error: {}", e);
148 break;
149 }
150 None => break,
151 }
152 }
153 _ = &mut timeout_future => break,
154 }
155 }
156
157 Ok(results)
158 }
159
160 pub async fn process_lines<F>(mut self, mut handler: F) -> Result<()>
162 where
163 F: FnMut(&str) -> std::result::Result<(), Box<dyn std::error::Error + Send + Sync>> + Send,
164 {
165 while let Some(line_result) = self.stream.next().await {
166 match line_result {
167 Ok(line) => {
168 if let Err(e) = handler(&line) {
169 warn!("Handler error: {}", e);
170 return Err(KodeBridgeError::custom(format!("Handler error: {}", e)));
171 }
172 }
173 Err(e) => {
174 warn!("Stream error: {}", e);
175 return Err(KodeBridgeError::from(e));
176 }
177 }
178 }
179 Ok(())
180 }
181
182 pub async fn process_lines_with_timeout<F>(mut self, timeout: Duration, mut handler: F) -> Result<()>
184 where
185 F: FnMut(&str) -> std::result::Result<bool, Box<dyn std::error::Error + Send + Sync>> + Send, {
187 let optimized_timeout = std::cmp::min(timeout, Duration::from_secs(5));
189 let timeout_future = tokio::time::sleep(optimized_timeout);
190 tokio::pin!(timeout_future);
191
192 loop {
193 tokio::select! {
194 line_result = self.stream.next() => {
195 match line_result {
196 Some(Ok(line)) => {
197 match handler(&line) {
198 Ok(continue_processing) => {
199 if !continue_processing {
200 break;
201 }
202 timeout_future.as_mut().reset(tokio::time::Instant::now() + optimized_timeout);
204 }
205 Err(e) => {
206 warn!("Handler error: {}", e);
207 return Err(KodeBridgeError::custom(format!("Handler error: {}", e)));
208 }
209 }
210 }
211 Some(Err(e)) => {
212 warn!("Stream error: {}", e);
213 return Err(KodeBridgeError::from(e));
214 }
215 None => break,
216 }
217 }
218 _ = &mut timeout_future => {
219 debug!("Processing timeout reached ({:?})", optimized_timeout);
220 break;
221 }
222 }
223 }
224
225 Ok(())
226 }
227
228 pub async fn collect_text(mut self) -> Result<String> {
230 let mut body_lines = Vec::new();
231
232 while let Some(line_result) = self.stream.next().await {
233 match line_result {
234 Ok(line) => body_lines.push(line),
235 Err(e) => return Err(KodeBridgeError::from(e)),
236 }
237 }
238
239 Ok(body_lines.join("\n"))
240 }
241
242 pub async fn collect_text_with_timeout(mut self, timeout: Duration) -> Result<String> {
244 let mut body_lines = Vec::new();
245
246 let optimized_timeout = std::cmp::min(timeout, Duration::from_secs(30));
248 let timeout_future = tokio::time::sleep(optimized_timeout);
249 tokio::pin!(timeout_future);
250
251 loop {
252 tokio::select! {
253 line_result = self.stream.next() => {
254 match line_result {
255 Some(Ok(line)) => {
256 body_lines.push(line);
257 timeout_future.as_mut().reset(tokio::time::Instant::now() + optimized_timeout);
259 }
260 Some(Err(e)) => return Err(KodeBridgeError::from(e)),
261 None => break, }
263 }
264 _ = &mut timeout_future => {
265 debug!("Collection timeout reached");
266 break; }
268 }
269 }
270
271 Ok(body_lines.join("\n"))
272 }
273
274 pub const fn status_u16(&self) -> u16 {
276 self.status.as_u16()
277 }
278
279 pub fn headers_json(&self) -> Value {
281 let headers_map: HashMap<String, String> = self
282 .headers
283 .iter()
284 .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
285 .collect();
286 serde_json::to_value(headers_map).unwrap_or(Value::Null)
287 }
288}
289
290impl Stream for StreamingResponse {
291 type Item = std::result::Result<String, std::io::Error>;
292
293 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
294 let this = self.project();
295 this.stream.poll_next(cx)
296 }
297}
298
299pub async fn parse_streaming_response<S>(stream: S) -> Result<StreamingResponse>
301where
302 S: AsyncRead + Unpin + Send + 'static,
303{
304 let mut reader = BufReader::new(stream);
305 let mut buffer = Vec::new();
306
307 let mut headers_end = None;
309 loop {
310 let mut line = Vec::new();
311 let n = reader.read_until(b'\n', &mut line).await?;
312 if n == 0 {
313 return Err(KodeBridgeError::protocol("Unexpected end of stream"));
314 }
315
316 buffer.extend_from_slice(&line);
317
318 if buffer.len() >= 4 {
320 for i in 0..buffer.len() - 3 {
321 if &buffer[i..i + 4] == b"\r\n\r\n" {
322 headers_end = Some(i + 4);
323 break;
324 }
325 }
326 }
327
328 if headers_end.is_some() {
329 break;
330 }
331 }
332
333 let headers_end = headers_end.ok_or_else(|| KodeBridgeError::protocol("Could not find end of HTTP headers"))?;
334
335 let mut headers = vec![httparse::EMPTY_HEADER; 64];
337 let mut response = httparse::Response::new(headers.as_mut_slice());
338
339 let status = match response.parse(&buffer[..headers_end])? {
340 httparse::Status::Complete(_) => response
341 .code
342 .ok_or_else(|| KodeBridgeError::protocol("HTTP response missing status code"))?,
343 httparse::Status::Partial => {
344 return Err(KodeBridgeError::protocol("Incomplete HTTP response"));
345 }
346 };
347
348 let mut header_map = HeaderMap::new();
350 for header in response.headers {
351 let name = http::HeaderName::from_str(header.name).map_err(|e| KodeBridgeError::Http(e.into()))?;
352 let value = http::HeaderValue::from_bytes(header.value).map_err(|e| KodeBridgeError::Http(e.into()))?;
353 header_map.insert(name, value);
354 }
355
356 let framed = FramedRead::new(reader, LinesCodec::new());
358 let line_stream = framed.map(|result| result.map_err(std::io::Error::other));
359
360 Ok(StreamingResponse::new(
361 StatusCode::from_u16(status)?,
362 header_map,
363 Box::pin(line_stream),
364 ))
365}
366
367pub async fn send_streaming_request<S>(mut stream: S, request: Bytes) -> Result<StreamingResponse>
369where
370 S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
371{
372 stream.write_all(&request).await?;
374 stream.flush().await?;
375
376 trace!("Sent HTTP streaming request ({} bytes)", request.len());
377
378 let response = parse_streaming_response(stream).await?;
380
381 debug!(
382 "Received HTTP streaming response: {} {}",
383 response.status(),
384 response.content_length().unwrap_or(0)
385 );
386
387 Ok(response)
388}