kode_bridge/
stream_client.rs

1use crate::errors::{KodeBridgeError, Result};
2use bytes::Bytes;
3use futures::stream::StreamExt as _;
4use http::{header, HeaderMap, StatusCode};
5use pin_project_lite::pin_project;
6use serde::de::DeserializeOwned;
7use serde_json::Value;
8use std::collections::HashMap;
9use std::pin::Pin;
10use std::str::FromStr as _;
11use std::task::{Context, Poll};
12use std::time::Duration;
13use tokio::io::{AsyncBufReadExt as _, AsyncRead, AsyncWrite, AsyncWriteExt as _, BufReader};
14use tokio_stream::Stream;
15use tokio_util::codec::{FramedRead, LinesCodec};
16use tracing::{debug, trace, warn};
17
18pin_project! {
19    /// Streaming HTTP response that yields data as it arrives
20    pub struct StreamingResponse {
21        pub status: StatusCode,
22        pub headers: HeaderMap,
23        #[pin]
24        pub stream: Pin<Box<dyn Stream<Item = std::result::Result<String, std::io::Error>> + Send>>,
25    }
26}
27
28impl StreamingResponse {
29    pub fn new(
30        status: StatusCode,
31        headers: HeaderMap,
32        stream: Pin<Box<dyn Stream<Item = std::result::Result<String, std::io::Error>> + Send>>,
33    ) -> Self {
34        Self {
35            status,
36            headers,
37            stream,
38        }
39    }
40
41    /// Get HTTP status code
42    pub const fn status(&self) -> StatusCode {
43        self.status
44    }
45
46    /// Get status code as u16
47    pub const fn status_code(&self) -> u16 {
48        self.status.as_u16()
49    }
50
51    /// Get response headers
52    pub const fn headers(&self) -> &HeaderMap {
53        &self.headers
54    }
55
56    /// Check if response indicates success (2xx status)
57    pub fn is_success(&self) -> bool {
58        self.status.is_success()
59    }
60
61    /// Check if response indicates client error (4xx status)
62    pub fn is_client_error(&self) -> bool {
63        self.status.is_client_error()
64    }
65
66    /// Check if response indicates server error (5xx status)
67    pub fn is_server_error(&self) -> bool {
68        self.status.is_server_error()
69    }
70
71    /// Get content length from headers
72    pub fn content_length(&self) -> Option<u64> {
73        self.headers
74            .get(header::CONTENT_LENGTH)?
75            .to_str()
76            .ok()?
77            .parse()
78            .ok()
79    }
80
81    /// Get content type from headers
82    pub fn content_type(&self) -> Option<&str> {
83        self.headers.get(header::CONTENT_TYPE)?.to_str().ok()
84    }
85
86    /// Elegant JSON stream processing - automatically parse and filter valid data
87    pub async fn json<T>(mut self, timeout: Duration) -> Result<Vec<T>>
88    where
89        T: DeserializeOwned + Send,
90    {
91        let mut results = Vec::new();
92        let timeout_future = tokio::time::sleep(timeout);
93        tokio::pin!(timeout_future);
94
95        loop {
96            tokio::select! {
97                line_result = self.stream.next() => {
98                    match line_result {
99                        Some(Ok(line)) => {
100                            if line.trim().is_empty() {
101                                continue;
102                            }
103                            // Auto-parse JSON, ignore failures for robustness
104                            if let Ok(parsed) = serde_json::from_str::<T>(&line) {
105                                results.push(parsed);
106                            } else {
107                                trace!("Failed to parse JSON line: {}", line);
108                            }
109                        }
110                        Some(Err(e)) => {
111                            warn!("Stream error: {}", e);
112                            break;
113                        }
114                        None => break,
115                    }
116                }
117                _ = &mut timeout_future => {
118                    debug!("Stream timeout reached after {}ms", timeout.as_millis());
119                    break;
120                }
121            }
122        }
123
124        Ok(results)
125    }
126
127    /// Process stream with custom JSON handler
128    pub async fn process_json<F, T>(mut self, timeout: Duration, mut handler: F) -> Result<Vec<T>>
129    where
130        F: FnMut(&str) -> Option<T> + Send,
131        T: Send + 'static,
132    {
133        let mut results = Vec::new();
134        let timeout_future = tokio::time::sleep(timeout);
135        tokio::pin!(timeout_future);
136
137        loop {
138            tokio::select! {
139                line_result = self.stream.next() => {
140                    match line_result {
141                        Some(Ok(line)) => {
142                            if let Some(parsed) = handler(&line) {
143                                results.push(parsed);
144                            }
145                        }
146                        Some(Err(e)) => {
147                            warn!("Stream error: {}", e);
148                            break;
149                        }
150                        None => break,
151                    }
152                }
153                _ = &mut timeout_future => break,
154            }
155        }
156
157        Ok(results)
158    }
159
160    /// Process stream data in real-time with error handling
161    pub async fn process_lines<F>(mut self, mut handler: F) -> Result<()>
162    where
163        F: FnMut(&str) -> std::result::Result<(), Box<dyn std::error::Error + Send + Sync>> + Send,
164    {
165        while let Some(line_result) = self.stream.next().await {
166            match line_result {
167                Ok(line) => {
168                    if let Err(e) = handler(&line) {
169                        warn!("Handler error: {}", e);
170                        return Err(KodeBridgeError::custom(format!("Handler error: {}", e)));
171                    }
172                }
173                Err(e) => {
174                    warn!("Stream error: {}", e);
175                    return Err(KodeBridgeError::from(e));
176                }
177            }
178        }
179        Ok(())
180    }
181
182    /// Process stream with timeout and error handling - optimized for better performance
183    pub async fn process_lines_with_timeout<F>(mut self, timeout: Duration, mut handler: F) -> Result<()>
184    where
185        F: FnMut(&str) -> std::result::Result<bool, Box<dyn std::error::Error + Send + Sync>> + Send, // Return false to stop
186    {
187        // 使用更短的超时避免长时间的waker等待
188        let optimized_timeout = std::cmp::min(timeout, Duration::from_secs(5));
189        let timeout_future = tokio::time::sleep(optimized_timeout);
190        tokio::pin!(timeout_future);
191
192        loop {
193            tokio::select! {
194                line_result = self.stream.next() => {
195                    match line_result {
196                        Some(Ok(line)) => {
197                            match handler(&line) {
198                                Ok(continue_processing) => {
199                                    if !continue_processing {
200                                        break;
201                                    }
202                                    // 重置超时计时器以避免不必要的超时
203                                    timeout_future.as_mut().reset(tokio::time::Instant::now() + optimized_timeout);
204                                }
205                                Err(e) => {
206                                    warn!("Handler error: {}", e);
207                                    return Err(KodeBridgeError::custom(format!("Handler error: {}", e)));
208                                }
209                            }
210                        }
211                        Some(Err(e)) => {
212                            warn!("Stream error: {}", e);
213                            return Err(KodeBridgeError::from(e));
214                        }
215                        None => break,
216                    }
217                }
218                _ = &mut timeout_future => {
219                    debug!("Processing timeout reached ({:?})", optimized_timeout);
220                    break;
221                }
222            }
223        }
224
225        Ok(())
226    }
227
228    /// Collect all stream data into a string
229    pub async fn collect_text(mut self) -> Result<String> {
230        let mut body_lines = Vec::new();
231
232        while let Some(line_result) = self.stream.next().await {
233            match line_result {
234                Ok(line) => body_lines.push(line),
235                Err(e) => return Err(KodeBridgeError::from(e)),
236            }
237        }
238
239        Ok(body_lines.join("\n"))
240    }
241
242    /// Collect stream data with a timeout - optimized for better performance
243    pub async fn collect_text_with_timeout(mut self, timeout: Duration) -> Result<String> {
244        let mut body_lines = Vec::new();
245
246        // 限制最大超时时间避免长时间waker等待
247        let optimized_timeout = std::cmp::min(timeout, Duration::from_secs(30));
248        let timeout_future = tokio::time::sleep(optimized_timeout);
249        tokio::pin!(timeout_future);
250
251        loop {
252            tokio::select! {
253                line_result = self.stream.next() => {
254                    match line_result {
255                        Some(Ok(line)) => {
256                            body_lines.push(line);
257                            // 收到数据后重置超时,避免不必要的超时
258                            timeout_future.as_mut().reset(tokio::time::Instant::now() + optimized_timeout);
259                        }
260                        Some(Err(e)) => return Err(KodeBridgeError::from(e)),
261                        None => break, // Stream ended
262                    }
263                }
264                _ = &mut timeout_future => {
265                    debug!("Collection timeout reached");
266                    break; // Timeout reached
267                }
268            }
269        }
270
271        Ok(body_lines.join("\n"))
272    }
273
274    /// Convert to legacy format for compatibility
275    pub const fn status_u16(&self) -> u16 {
276        self.status.as_u16()
277    }
278
279    /// Get headers as JSON value for compatibility
280    pub fn headers_json(&self) -> Value {
281        let headers_map: HashMap<String, String> = self
282            .headers
283            .iter()
284            .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
285            .collect();
286        serde_json::to_value(headers_map).unwrap_or(Value::Null)
287    }
288}
289
290impl Stream for StreamingResponse {
291    type Item = std::result::Result<String, std::io::Error>;
292
293    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
294        let this = self.project();
295        this.stream.poll_next(cx)
296    }
297}
298
299/// Parse HTTP response headers and create streaming response
300pub async fn parse_streaming_response<S>(stream: S) -> Result<StreamingResponse>
301where
302    S: AsyncRead + Unpin + Send + 'static,
303{
304    let mut reader = BufReader::new(stream);
305    let mut buffer = Vec::new();
306
307    // Read until we have the complete headers
308    let mut headers_end = None;
309    loop {
310        let mut line = Vec::new();
311        let n = reader.read_until(b'\n', &mut line).await?;
312        if n == 0 {
313            return Err(KodeBridgeError::protocol("Unexpected end of stream"));
314        }
315
316        buffer.extend_from_slice(&line);
317
318        // Check for end of headers (\r\n\r\n)
319        if buffer.len() >= 4 {
320            for i in 0..buffer.len() - 3 {
321                if &buffer[i..i + 4] == b"\r\n\r\n" {
322                    headers_end = Some(i + 4);
323                    break;
324                }
325            }
326        }
327
328        if headers_end.is_some() {
329            break;
330        }
331    }
332
333    let headers_end = headers_end.ok_or_else(|| KodeBridgeError::protocol("Could not find end of HTTP headers"))?;
334
335    // Parse the headers using httparse
336    let mut headers = vec![httparse::EMPTY_HEADER; 64];
337    let mut response = httparse::Response::new(headers.as_mut_slice());
338
339    let status = match response.parse(&buffer[..headers_end])? {
340        httparse::Status::Complete(_) => response
341            .code
342            .ok_or_else(|| KodeBridgeError::protocol("HTTP response missing status code"))?,
343        httparse::Status::Partial => {
344            return Err(KodeBridgeError::protocol("Incomplete HTTP response"));
345        }
346    };
347
348    // Build HeaderMap
349    let mut header_map = HeaderMap::new();
350    for header in response.headers {
351        let name = http::HeaderName::from_str(header.name).map_err(|e| KodeBridgeError::Http(e.into()))?;
352        let value = http::HeaderValue::from_bytes(header.value).map_err(|e| KodeBridgeError::Http(e.into()))?;
353        header_map.insert(name, value);
354    }
355
356    // Create line stream from the remaining reader
357    let framed = FramedRead::new(reader, LinesCodec::new());
358    let line_stream = framed.map(|result| result.map_err(std::io::Error::other));
359
360    Ok(StreamingResponse::new(
361        StatusCode::from_u16(status)?,
362        header_map,
363        Box::pin(line_stream),
364    ))
365}
366
367/// Send HTTP request and get streaming response
368pub async fn send_streaming_request<S>(mut stream: S, request: Bytes) -> Result<StreamingResponse>
369where
370    S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
371{
372    // Send request
373    stream.write_all(&request).await?;
374    stream.flush().await?;
375
376    trace!("Sent HTTP streaming request ({} bytes)", request.len());
377
378    // Parse response
379    let response = parse_streaming_response(stream).await?;
380
381    debug!(
382        "Received HTTP streaming response: {} {}",
383        response.status(),
384        response.content_length().unwrap_or(0)
385    );
386
387    Ok(response)
388}