kode_bridge/
stream_client.rs

1use crate::errors::{KodeBridgeError, Result};
2use crate::http_client::RequestBuilder;
3use bytes::Bytes;
4use futures::stream::StreamExt;
5use http::{HeaderMap, Method, StatusCode, header};
6use pin_project_lite::pin_project;
7use serde::de::DeserializeOwned;
8use serde_json::Value;
9use std::collections::HashMap;
10use std::pin::Pin;
11use std::str::FromStr;
12use std::task::{Context, Poll};
13use std::time::Duration;
14use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader};
15use tokio_stream::Stream;
16use tokio_util::codec::{FramedRead, LinesCodec};
17use tracing::{debug, trace, warn};
18
19pin_project! {
20    /// Streaming HTTP response that yields data as it arrives
21    pub struct StreamingResponse {
22        pub status: StatusCode,
23        pub headers: HeaderMap,
24        #[pin]
25        pub stream: Pin<Box<dyn Stream<Item = std::result::Result<String, std::io::Error>> + Send>>,
26    }
27}
28
29impl StreamingResponse {
30    pub fn new(
31        status: StatusCode,
32        headers: HeaderMap,
33        stream: Pin<Box<dyn Stream<Item = std::result::Result<String, std::io::Error>> + Send>>,
34    ) -> Self {
35        Self {
36            status,
37            headers,
38            stream,
39        }
40    }
41
42    /// Get HTTP status code
43    pub fn status(&self) -> StatusCode {
44        self.status
45    }
46
47    /// Get status code as u16
48    pub fn status_code(&self) -> u16 {
49        self.status.as_u16()
50    }
51
52    /// Get response headers
53    pub fn headers(&self) -> &HeaderMap {
54        &self.headers
55    }
56
57    /// Check if response indicates success (2xx status)
58    pub fn is_success(&self) -> bool {
59        self.status.is_success()
60    }
61
62    /// Check if response indicates client error (4xx status)
63    pub fn is_client_error(&self) -> bool {
64        self.status.is_client_error()
65    }
66
67    /// Check if response indicates server error (5xx status)
68    pub fn is_server_error(&self) -> bool {
69        self.status.is_server_error()
70    }
71
72    /// Get content length from headers
73    pub fn content_length(&self) -> Option<u64> {
74        self.headers
75            .get(header::CONTENT_LENGTH)?
76            .to_str()
77            .ok()?
78            .parse()
79            .ok()
80    }
81
82    /// Get content type from headers
83    pub fn content_type(&self) -> Option<&str> {
84        self.headers.get(header::CONTENT_TYPE)?.to_str().ok()
85    }
86
87    /// Elegant JSON stream processing - automatically parse and filter valid data
88    pub async fn json<T>(mut self, timeout: Duration) -> Result<Vec<T>>
89    where
90        T: DeserializeOwned + Send,
91    {
92        let mut results = Vec::new();
93        let timeout_future = tokio::time::sleep(timeout);
94        tokio::pin!(timeout_future);
95
96        loop {
97            tokio::select! {
98                line_result = self.stream.next() => {
99                    match line_result {
100                        Some(Ok(line)) => {
101                            if line.trim().is_empty() {
102                                continue;
103                            }
104                            // Auto-parse JSON, ignore failures for robustness
105                            if let Ok(parsed) = serde_json::from_str::<T>(&line) {
106                                results.push(parsed);
107                            } else {
108                                trace!("Failed to parse JSON line: {}", line);
109                            }
110                        }
111                        Some(Err(e)) => {
112                            warn!("Stream error: {}", e);
113                            break;
114                        }
115                        None => break,
116                    }
117                }
118                _ = &mut timeout_future => {
119                    debug!("Stream timeout reached after {}ms", timeout.as_millis());
120                    break;
121                }
122            }
123        }
124
125        Ok(results)
126    }
127
128    /// Process stream with custom JSON handler
129    pub async fn process_json<F, T>(mut self, timeout: Duration, mut handler: F) -> Result<Vec<T>>
130    where
131        F: FnMut(&str) -> Option<T>,
132        T: Send + 'static,
133    {
134        let mut results = Vec::new();
135        let timeout_future = tokio::time::sleep(timeout);
136        tokio::pin!(timeout_future);
137
138        loop {
139            tokio::select! {
140                line_result = self.stream.next() => {
141                    match line_result {
142                        Some(Ok(line)) => {
143                            if let Some(parsed) = handler(&line) {
144                                results.push(parsed);
145                            }
146                        }
147                        Some(Err(e)) => {
148                            warn!("Stream error: {}", e);
149                            break;
150                        }
151                        None => break,
152                    }
153                }
154                _ = &mut timeout_future => break,
155            }
156        }
157
158        Ok(results)
159    }
160
161    /// Process stream data in real-time with error handling
162    pub async fn process_lines<F>(mut self, mut handler: F) -> Result<()>
163    where
164        F: FnMut(&str) -> std::result::Result<(), Box<dyn std::error::Error + Send + Sync>>,
165    {
166        while let Some(line_result) = self.stream.next().await {
167            match line_result {
168                Ok(line) => {
169                    if let Err(e) = handler(&line) {
170                        warn!("Handler error: {}", e);
171                        return Err(KodeBridgeError::custom(format!("Handler error: {}", e)));
172                    }
173                }
174                Err(e) => {
175                    warn!("Stream error: {}", e);
176                    return Err(KodeBridgeError::from(e));
177                }
178            }
179        }
180        Ok(())
181    }
182
183    /// Process stream with timeout and error handling
184    pub async fn process_lines_with_timeout<F>(
185        mut self,
186        timeout: Duration,
187        mut handler: F,
188    ) -> Result<()>
189    where
190        F: FnMut(&str) -> std::result::Result<bool, Box<dyn std::error::Error + Send + Sync>>, // Return false to stop
191    {
192        let timeout_future = tokio::time::sleep(timeout);
193        tokio::pin!(timeout_future);
194
195        loop {
196            tokio::select! {
197                line_result = self.stream.next() => {
198                    match line_result {
199                        Some(Ok(line)) => {
200                            match handler(&line) {
201                                Ok(continue_processing) => {
202                                    if !continue_processing {
203                                        break;
204                                    }
205                                }
206                                Err(e) => {
207                                    warn!("Handler error: {}", e);
208                                    return Err(KodeBridgeError::custom(format!("Handler error: {}", e)));
209                                }
210                            }
211                        }
212                        Some(Err(e)) => {
213                            warn!("Stream error: {}", e);
214                            return Err(KodeBridgeError::from(e));
215                        }
216                        None => break,
217                    }
218                }
219                _ = &mut timeout_future => {
220                    debug!("Processing timeout reached");
221                    break;
222                }
223            }
224        }
225
226        Ok(())
227    }
228
229    /// Collect all stream data into a string
230    pub async fn collect_text(mut self) -> Result<String> {
231        let mut body_lines = Vec::new();
232
233        while let Some(line_result) = self.stream.next().await {
234            match line_result {
235                Ok(line) => body_lines.push(line),
236                Err(e) => return Err(KodeBridgeError::from(e)),
237            }
238        }
239
240        Ok(body_lines.join("\n"))
241    }
242
243    /// Collect stream data with a timeout
244    pub async fn collect_text_with_timeout(mut self, timeout: Duration) -> Result<String> {
245        let mut body_lines = Vec::new();
246        let timeout_future = tokio::time::sleep(timeout);
247        tokio::pin!(timeout_future);
248
249        loop {
250            tokio::select! {
251                line_result = self.stream.next() => {
252                    match line_result {
253                        Some(Ok(line)) => body_lines.push(line),
254                        Some(Err(e)) => return Err(KodeBridgeError::from(e)),
255                        None => break, // Stream ended
256                    }
257                }
258                _ = &mut timeout_future => {
259                    debug!("Collection timeout reached");
260                    break; // Timeout reached
261                }
262            }
263        }
264
265        Ok(body_lines.join("\n"))
266    }
267
268    /// Convert to legacy format for compatibility
269    pub fn status_u16(&self) -> u16 {
270        self.status.as_u16()
271    }
272
273    /// Get headers as JSON value for compatibility
274    pub fn headers_json(&self) -> Value {
275        let headers_map: HashMap<String, String> = self
276            .headers
277            .iter()
278            .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
279            .collect();
280        serde_json::to_value(headers_map).unwrap_or(Value::Null)
281    }
282}
283
284impl Stream for StreamingResponse {
285    type Item = std::result::Result<String, std::io::Error>;
286
287    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
288        let this = self.project();
289        this.stream.poll_next(cx)
290    }
291}
292
293/// Parse HTTP response headers and create streaming response
294pub async fn parse_streaming_response<S>(stream: S) -> Result<StreamingResponse>
295where
296    S: AsyncRead + Unpin + Send + 'static,
297{
298    let mut reader = BufReader::new(stream);
299    let mut buffer = Vec::new();
300
301    // Read until we have the complete headers
302    let mut headers_end = None;
303    loop {
304        let mut line = Vec::new();
305        let n = reader.read_until(b'\n', &mut line).await?;
306        if n == 0 {
307            return Err(KodeBridgeError::protocol("Unexpected end of stream"));
308        }
309
310        buffer.extend_from_slice(&line);
311
312        // Check for end of headers (\r\n\r\n)
313        if buffer.len() >= 4 {
314            for i in 0..buffer.len() - 3 {
315                if &buffer[i..i + 4] == b"\r\n\r\n" {
316                    headers_end = Some(i + 4);
317                    break;
318                }
319            }
320        }
321
322        if headers_end.is_some() {
323            break;
324        }
325    }
326
327    let headers_end = headers_end
328        .ok_or_else(|| KodeBridgeError::protocol("Could not find end of HTTP headers"))?;
329
330    // Parse the headers using httparse
331    let mut headers = [httparse::EMPTY_HEADER; 64];
332    let mut response = httparse::Response::new(&mut headers);
333
334    let status = match response.parse(&buffer[..headers_end])? {
335        httparse::Status::Complete(_) => response.code.unwrap(),
336        httparse::Status::Partial => {
337            return Err(KodeBridgeError::protocol("Incomplete HTTP response"));
338        }
339    };
340
341    // Build HeaderMap
342    let mut header_map = HeaderMap::new();
343    for header in response.headers {
344        let name =
345            http::HeaderName::from_str(header.name).map_err(|e| KodeBridgeError::Http(e.into()))?;
346        let value = http::HeaderValue::from_bytes(header.value)
347            .map_err(|e| KodeBridgeError::Http(e.into()))?;
348        header_map.insert(name, value);
349    }
350
351    // Create line stream from the remaining reader
352    let framed = FramedRead::new(reader, LinesCodec::new());
353    let line_stream = framed.map(|result| result.map_err(std::io::Error::other));
354
355    Ok(StreamingResponse::new(
356        StatusCode::from_u16(status)?,
357        header_map,
358        Box::pin(line_stream),
359    ))
360}
361
362/// Send HTTP request and get streaming response
363pub async fn send_streaming_request<S>(mut stream: S, request: Bytes) -> Result<StreamingResponse>
364where
365    S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
366{
367    // Send request
368    stream.write_all(&request).await?;
369    stream.flush().await?;
370
371    trace!("Sent HTTP streaming request ({} bytes)", request.len());
372
373    // Parse response
374    let response = parse_streaming_response(stream).await?;
375
376    debug!(
377        "Received HTTP streaming response: {} {}",
378        response.status(),
379        response.content_length().unwrap_or(0)
380    );
381
382    Ok(response)
383}
384
385/// Convenience function for making streaming HTTP requests
386#[allow(dead_code)]
387pub async fn http_request_stream<S>(
388    stream: S,
389    method: &str,
390    path: &str,
391    body: Option<&Value>,
392) -> Result<StreamingResponse>
393where
394    S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
395{
396    let method = Method::from_str(method).map_err(|e| KodeBridgeError::Http(e.into()))?;
397
398    let mut builder = RequestBuilder::new(method, path.to_string());
399
400    if let Some(json_body) = body {
401        builder = builder.json(json_body)?;
402    }
403
404    let request = builder.build()?;
405    send_streaming_request(stream, request).await
406}