kode_bridge/
stream_client.rs

1use crate::errors::{KodeBridgeError, Result};
2use bytes::Bytes;
3use futures::stream::StreamExt;
4use http::{header, HeaderMap, StatusCode};
5use pin_project_lite::pin_project;
6use serde::de::DeserializeOwned;
7use serde_json::Value;
8use std::collections::HashMap;
9use std::pin::Pin;
10use std::str::FromStr;
11use std::task::{Context, Poll};
12use std::time::Duration;
13use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader};
14use tokio_stream::Stream;
15use tokio_util::codec::{FramedRead, LinesCodec};
16use tracing::{debug, trace, warn};
17
18pin_project! {
19    /// Streaming HTTP response that yields data as it arrives
20    pub struct StreamingResponse {
21        pub status: StatusCode,
22        pub headers: HeaderMap,
23        #[pin]
24        pub stream: Pin<Box<dyn Stream<Item = std::result::Result<String, std::io::Error>> + Send>>,
25    }
26}
27
28impl StreamingResponse {
29    pub fn new(
30        status: StatusCode,
31        headers: HeaderMap,
32        stream: Pin<Box<dyn Stream<Item = std::result::Result<String, std::io::Error>> + Send>>,
33    ) -> Self {
34        Self {
35            status,
36            headers,
37            stream,
38        }
39    }
40
41    /// Get HTTP status code
42    pub fn status(&self) -> StatusCode {
43        self.status
44    }
45
46    /// Get status code as u16
47    pub fn status_code(&self) -> u16 {
48        self.status.as_u16()
49    }
50
51    /// Get response headers
52    pub fn headers(&self) -> &HeaderMap {
53        &self.headers
54    }
55
56    /// Check if response indicates success (2xx status)
57    pub fn is_success(&self) -> bool {
58        self.status.is_success()
59    }
60
61    /// Check if response indicates client error (4xx status)
62    pub fn is_client_error(&self) -> bool {
63        self.status.is_client_error()
64    }
65
66    /// Check if response indicates server error (5xx status)
67    pub fn is_server_error(&self) -> bool {
68        self.status.is_server_error()
69    }
70
71    /// Get content length from headers
72    pub fn content_length(&self) -> Option<u64> {
73        self.headers
74            .get(header::CONTENT_LENGTH)?
75            .to_str()
76            .ok()?
77            .parse()
78            .ok()
79    }
80
81    /// Get content type from headers
82    pub fn content_type(&self) -> Option<&str> {
83        self.headers.get(header::CONTENT_TYPE)?.to_str().ok()
84    }
85
86    /// Elegant JSON stream processing - automatically parse and filter valid data
87    pub async fn json<T>(mut self, timeout: Duration) -> Result<Vec<T>>
88    where
89        T: DeserializeOwned + Send,
90    {
91        let mut results = Vec::new();
92        let timeout_future = tokio::time::sleep(timeout);
93        tokio::pin!(timeout_future);
94
95        loop {
96            tokio::select! {
97                line_result = self.stream.next() => {
98                    match line_result {
99                        Some(Ok(line)) => {
100                            if line.trim().is_empty() {
101                                continue;
102                            }
103                            // Auto-parse JSON, ignore failures for robustness
104                            if let Ok(parsed) = serde_json::from_str::<T>(&line) {
105                                results.push(parsed);
106                            } else {
107                                trace!("Failed to parse JSON line: {}", line);
108                            }
109                        }
110                        Some(Err(e)) => {
111                            warn!("Stream error: {}", e);
112                            break;
113                        }
114                        None => break,
115                    }
116                }
117                _ = &mut timeout_future => {
118                    debug!("Stream timeout reached after {}ms", timeout.as_millis());
119                    break;
120                }
121            }
122        }
123
124        Ok(results)
125    }
126
127    /// Process stream with custom JSON handler
128    pub async fn process_json<F, T>(mut self, timeout: Duration, mut handler: F) -> Result<Vec<T>>
129    where
130        F: FnMut(&str) -> Option<T>,
131        T: Send + 'static,
132    {
133        let mut results = Vec::new();
134        let timeout_future = tokio::time::sleep(timeout);
135        tokio::pin!(timeout_future);
136
137        loop {
138            tokio::select! {
139                line_result = self.stream.next() => {
140                    match line_result {
141                        Some(Ok(line)) => {
142                            if let Some(parsed) = handler(&line) {
143                                results.push(parsed);
144                            }
145                        }
146                        Some(Err(e)) => {
147                            warn!("Stream error: {}", e);
148                            break;
149                        }
150                        None => break,
151                    }
152                }
153                _ = &mut timeout_future => break,
154            }
155        }
156
157        Ok(results)
158    }
159
160    /// Process stream data in real-time with error handling
161    pub async fn process_lines<F>(mut self, mut handler: F) -> Result<()>
162    where
163        F: FnMut(&str) -> std::result::Result<(), Box<dyn std::error::Error + Send + Sync>>,
164    {
165        while let Some(line_result) = self.stream.next().await {
166            match line_result {
167                Ok(line) => {
168                    if let Err(e) = handler(&line) {
169                        warn!("Handler error: {}", e);
170                        return Err(KodeBridgeError::custom(format!("Handler error: {}", e)));
171                    }
172                }
173                Err(e) => {
174                    warn!("Stream error: {}", e);
175                    return Err(KodeBridgeError::from(e));
176                }
177            }
178        }
179        Ok(())
180    }
181
182    /// Process stream with timeout and error handling
183    pub async fn process_lines_with_timeout<F>(
184        mut self,
185        timeout: Duration,
186        mut handler: F,
187    ) -> Result<()>
188    where
189        F: FnMut(&str) -> std::result::Result<bool, Box<dyn std::error::Error + Send + Sync>>, // Return false to stop
190    {
191        let timeout_future = tokio::time::sleep(timeout);
192        tokio::pin!(timeout_future);
193
194        loop {
195            tokio::select! {
196                line_result = self.stream.next() => {
197                    match line_result {
198                        Some(Ok(line)) => {
199                            match handler(&line) {
200                                Ok(continue_processing) => {
201                                    if !continue_processing {
202                                        break;
203                                    }
204                                }
205                                Err(e) => {
206                                    warn!("Handler error: {}", e);
207                                    return Err(KodeBridgeError::custom(format!("Handler error: {}", e)));
208                                }
209                            }
210                        }
211                        Some(Err(e)) => {
212                            warn!("Stream error: {}", e);
213                            return Err(KodeBridgeError::from(e));
214                        }
215                        None => break,
216                    }
217                }
218                _ = &mut timeout_future => {
219                    debug!("Processing timeout reached");
220                    break;
221                }
222            }
223        }
224
225        Ok(())
226    }
227
228    /// Collect all stream data into a string
229    pub async fn collect_text(mut self) -> Result<String> {
230        let mut body_lines = Vec::new();
231
232        while let Some(line_result) = self.stream.next().await {
233            match line_result {
234                Ok(line) => body_lines.push(line),
235                Err(e) => return Err(KodeBridgeError::from(e)),
236            }
237        }
238
239        Ok(body_lines.join("\n"))
240    }
241
242    /// Collect stream data with a timeout
243    pub async fn collect_text_with_timeout(mut self, timeout: Duration) -> Result<String> {
244        let mut body_lines = Vec::new();
245        let timeout_future = tokio::time::sleep(timeout);
246        tokio::pin!(timeout_future);
247
248        loop {
249            tokio::select! {
250                line_result = self.stream.next() => {
251                    match line_result {
252                        Some(Ok(line)) => body_lines.push(line),
253                        Some(Err(e)) => return Err(KodeBridgeError::from(e)),
254                        None => break, // Stream ended
255                    }
256                }
257                _ = &mut timeout_future => {
258                    debug!("Collection timeout reached");
259                    break; // Timeout reached
260                }
261            }
262        }
263
264        Ok(body_lines.join("\n"))
265    }
266
267    /// Convert to legacy format for compatibility
268    pub fn status_u16(&self) -> u16 {
269        self.status.as_u16()
270    }
271
272    /// Get headers as JSON value for compatibility
273    pub fn headers_json(&self) -> Value {
274        let headers_map: HashMap<String, String> = self
275            .headers
276            .iter()
277            .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
278            .collect();
279        serde_json::to_value(headers_map).unwrap_or(Value::Null)
280    }
281}
282
283impl Stream for StreamingResponse {
284    type Item = std::result::Result<String, std::io::Error>;
285
286    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
287        let this = self.project();
288        this.stream.poll_next(cx)
289    }
290}
291
292/// Parse HTTP response headers and create streaming response
293pub async fn parse_streaming_response<S>(stream: S) -> Result<StreamingResponse>
294where
295    S: AsyncRead + Unpin + Send + 'static,
296{
297    let mut reader = BufReader::new(stream);
298    let mut buffer = Vec::new();
299
300    // Read until we have the complete headers
301    let mut headers_end = None;
302    loop {
303        let mut line = Vec::new();
304        let n = reader.read_until(b'\n', &mut line).await?;
305        if n == 0 {
306            return Err(KodeBridgeError::protocol("Unexpected end of stream"));
307        }
308
309        buffer.extend_from_slice(&line);
310
311        // Check for end of headers (\r\n\r\n)
312        if buffer.len() >= 4 {
313            for i in 0..buffer.len() - 3 {
314                if &buffer[i..i + 4] == b"\r\n\r\n" {
315                    headers_end = Some(i + 4);
316                    break;
317                }
318            }
319        }
320
321        if headers_end.is_some() {
322            break;
323        }
324    }
325
326    let headers_end = headers_end
327        .ok_or_else(|| KodeBridgeError::protocol("Could not find end of HTTP headers"))?;
328
329    // Parse the headers using httparse
330    let mut headers = [httparse::EMPTY_HEADER; 64];
331    let mut response = httparse::Response::new(&mut headers);
332
333    let status = match response.parse(&buffer[..headers_end])? {
334        httparse::Status::Complete(_) => response
335            .code
336            .ok_or_else(|| KodeBridgeError::protocol("HTTP response missing status code"))?,
337        httparse::Status::Partial => {
338            return Err(KodeBridgeError::protocol("Incomplete HTTP response"));
339        }
340    };
341
342    // Build HeaderMap
343    let mut header_map = HeaderMap::new();
344    for header in response.headers {
345        let name =
346            http::HeaderName::from_str(header.name).map_err(|e| KodeBridgeError::Http(e.into()))?;
347        let value = http::HeaderValue::from_bytes(header.value)
348            .map_err(|e| KodeBridgeError::Http(e.into()))?;
349        header_map.insert(name, value);
350    }
351
352    // Create line stream from the remaining reader
353    let framed = FramedRead::new(reader, LinesCodec::new());
354    let line_stream = framed.map(|result| result.map_err(std::io::Error::other));
355
356    Ok(StreamingResponse::new(
357        StatusCode::from_u16(status)?,
358        header_map,
359        Box::pin(line_stream),
360    ))
361}
362
363/// Send HTTP request and get streaming response
364pub async fn send_streaming_request<S>(mut stream: S, request: Bytes) -> Result<StreamingResponse>
365where
366    S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
367{
368    // Send request
369    stream.write_all(&request).await?;
370    stream.flush().await?;
371
372    trace!("Sent HTTP streaming request ({} bytes)", request.len());
373
374    // Parse response
375    let response = parse_streaming_response(stream).await?;
376
377    debug!(
378        "Received HTTP streaming response: {} {}",
379        response.status(),
380        response.content_length().unwrap_or(0)
381    );
382
383    Ok(response)
384}