kode_bridge/
stream_client.rs1use crate::errors::{KodeBridgeError, Result};
2use bytes::Bytes;
3use futures::stream::StreamExt;
4use http::{header, HeaderMap, StatusCode};
5use pin_project_lite::pin_project;
6use serde::de::DeserializeOwned;
7use serde_json::Value;
8use std::collections::HashMap;
9use std::pin::Pin;
10use std::str::FromStr;
11use std::task::{Context, Poll};
12use std::time::Duration;
13use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader};
14use tokio_stream::Stream;
15use tokio_util::codec::{FramedRead, LinesCodec};
16use tracing::{debug, trace, warn};
17
18pin_project! {
19 pub struct StreamingResponse {
21 pub status: StatusCode,
22 pub headers: HeaderMap,
23 #[pin]
24 pub stream: Pin<Box<dyn Stream<Item = std::result::Result<String, std::io::Error>> + Send>>,
25 }
26}
27
28impl StreamingResponse {
29 pub fn new(
30 status: StatusCode,
31 headers: HeaderMap,
32 stream: Pin<Box<dyn Stream<Item = std::result::Result<String, std::io::Error>> + Send>>,
33 ) -> Self {
34 Self {
35 status,
36 headers,
37 stream,
38 }
39 }
40
41 pub fn status(&self) -> StatusCode {
43 self.status
44 }
45
46 pub fn status_code(&self) -> u16 {
48 self.status.as_u16()
49 }
50
51 pub fn headers(&self) -> &HeaderMap {
53 &self.headers
54 }
55
56 pub fn is_success(&self) -> bool {
58 self.status.is_success()
59 }
60
61 pub fn is_client_error(&self) -> bool {
63 self.status.is_client_error()
64 }
65
66 pub fn is_server_error(&self) -> bool {
68 self.status.is_server_error()
69 }
70
71 pub fn content_length(&self) -> Option<u64> {
73 self.headers
74 .get(header::CONTENT_LENGTH)?
75 .to_str()
76 .ok()?
77 .parse()
78 .ok()
79 }
80
81 pub fn content_type(&self) -> Option<&str> {
83 self.headers.get(header::CONTENT_TYPE)?.to_str().ok()
84 }
85
86 pub async fn json<T>(mut self, timeout: Duration) -> Result<Vec<T>>
88 where
89 T: DeserializeOwned + Send,
90 {
91 let mut results = Vec::new();
92 let timeout_future = tokio::time::sleep(timeout);
93 tokio::pin!(timeout_future);
94
95 loop {
96 tokio::select! {
97 line_result = self.stream.next() => {
98 match line_result {
99 Some(Ok(line)) => {
100 if line.trim().is_empty() {
101 continue;
102 }
103 if let Ok(parsed) = serde_json::from_str::<T>(&line) {
105 results.push(parsed);
106 } else {
107 trace!("Failed to parse JSON line: {}", line);
108 }
109 }
110 Some(Err(e)) => {
111 warn!("Stream error: {}", e);
112 break;
113 }
114 None => break,
115 }
116 }
117 _ = &mut timeout_future => {
118 debug!("Stream timeout reached after {}ms", timeout.as_millis());
119 break;
120 }
121 }
122 }
123
124 Ok(results)
125 }
126
127 pub async fn process_json<F, T>(mut self, timeout: Duration, mut handler: F) -> Result<Vec<T>>
129 where
130 F: FnMut(&str) -> Option<T>,
131 T: Send + 'static,
132 {
133 let mut results = Vec::new();
134 let timeout_future = tokio::time::sleep(timeout);
135 tokio::pin!(timeout_future);
136
137 loop {
138 tokio::select! {
139 line_result = self.stream.next() => {
140 match line_result {
141 Some(Ok(line)) => {
142 if let Some(parsed) = handler(&line) {
143 results.push(parsed);
144 }
145 }
146 Some(Err(e)) => {
147 warn!("Stream error: {}", e);
148 break;
149 }
150 None => break,
151 }
152 }
153 _ = &mut timeout_future => break,
154 }
155 }
156
157 Ok(results)
158 }
159
160 pub async fn process_lines<F>(mut self, mut handler: F) -> Result<()>
162 where
163 F: FnMut(&str) -> std::result::Result<(), Box<dyn std::error::Error + Send + Sync>>,
164 {
165 while let Some(line_result) = self.stream.next().await {
166 match line_result {
167 Ok(line) => {
168 if let Err(e) = handler(&line) {
169 warn!("Handler error: {}", e);
170 return Err(KodeBridgeError::custom(format!("Handler error: {}", e)));
171 }
172 }
173 Err(e) => {
174 warn!("Stream error: {}", e);
175 return Err(KodeBridgeError::from(e));
176 }
177 }
178 }
179 Ok(())
180 }
181
182 pub async fn process_lines_with_timeout<F>(
184 mut self,
185 timeout: Duration,
186 mut handler: F,
187 ) -> Result<()>
188 where
189 F: FnMut(&str) -> std::result::Result<bool, Box<dyn std::error::Error + Send + Sync>>, {
191 let timeout_future = tokio::time::sleep(timeout);
192 tokio::pin!(timeout_future);
193
194 loop {
195 tokio::select! {
196 line_result = self.stream.next() => {
197 match line_result {
198 Some(Ok(line)) => {
199 match handler(&line) {
200 Ok(continue_processing) => {
201 if !continue_processing {
202 break;
203 }
204 }
205 Err(e) => {
206 warn!("Handler error: {}", e);
207 return Err(KodeBridgeError::custom(format!("Handler error: {}", e)));
208 }
209 }
210 }
211 Some(Err(e)) => {
212 warn!("Stream error: {}", e);
213 return Err(KodeBridgeError::from(e));
214 }
215 None => break,
216 }
217 }
218 _ = &mut timeout_future => {
219 debug!("Processing timeout reached");
220 break;
221 }
222 }
223 }
224
225 Ok(())
226 }
227
228 pub async fn collect_text(mut self) -> Result<String> {
230 let mut body_lines = Vec::new();
231
232 while let Some(line_result) = self.stream.next().await {
233 match line_result {
234 Ok(line) => body_lines.push(line),
235 Err(e) => return Err(KodeBridgeError::from(e)),
236 }
237 }
238
239 Ok(body_lines.join("\n"))
240 }
241
242 pub async fn collect_text_with_timeout(mut self, timeout: Duration) -> Result<String> {
244 let mut body_lines = Vec::new();
245 let timeout_future = tokio::time::sleep(timeout);
246 tokio::pin!(timeout_future);
247
248 loop {
249 tokio::select! {
250 line_result = self.stream.next() => {
251 match line_result {
252 Some(Ok(line)) => body_lines.push(line),
253 Some(Err(e)) => return Err(KodeBridgeError::from(e)),
254 None => break, }
256 }
257 _ = &mut timeout_future => {
258 debug!("Collection timeout reached");
259 break; }
261 }
262 }
263
264 Ok(body_lines.join("\n"))
265 }
266
267 pub fn status_u16(&self) -> u16 {
269 self.status.as_u16()
270 }
271
272 pub fn headers_json(&self) -> Value {
274 let headers_map: HashMap<String, String> = self
275 .headers
276 .iter()
277 .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
278 .collect();
279 serde_json::to_value(headers_map).unwrap_or(Value::Null)
280 }
281}
282
283impl Stream for StreamingResponse {
284 type Item = std::result::Result<String, std::io::Error>;
285
286 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
287 let this = self.project();
288 this.stream.poll_next(cx)
289 }
290}
291
292pub async fn parse_streaming_response<S>(stream: S) -> Result<StreamingResponse>
294where
295 S: AsyncRead + Unpin + Send + 'static,
296{
297 let mut reader = BufReader::new(stream);
298 let mut buffer = Vec::new();
299
300 let mut headers_end = None;
302 loop {
303 let mut line = Vec::new();
304 let n = reader.read_until(b'\n', &mut line).await?;
305 if n == 0 {
306 return Err(KodeBridgeError::protocol("Unexpected end of stream"));
307 }
308
309 buffer.extend_from_slice(&line);
310
311 if buffer.len() >= 4 {
313 for i in 0..buffer.len() - 3 {
314 if &buffer[i..i + 4] == b"\r\n\r\n" {
315 headers_end = Some(i + 4);
316 break;
317 }
318 }
319 }
320
321 if headers_end.is_some() {
322 break;
323 }
324 }
325
326 let headers_end = headers_end
327 .ok_or_else(|| KodeBridgeError::protocol("Could not find end of HTTP headers"))?;
328
329 let mut headers = [httparse::EMPTY_HEADER; 64];
331 let mut response = httparse::Response::new(&mut headers);
332
333 let status = match response.parse(&buffer[..headers_end])? {
334 httparse::Status::Complete(_) => response
335 .code
336 .ok_or_else(|| KodeBridgeError::protocol("HTTP response missing status code"))?,
337 httparse::Status::Partial => {
338 return Err(KodeBridgeError::protocol("Incomplete HTTP response"));
339 }
340 };
341
342 let mut header_map = HeaderMap::new();
344 for header in response.headers {
345 let name =
346 http::HeaderName::from_str(header.name).map_err(|e| KodeBridgeError::Http(e.into()))?;
347 let value = http::HeaderValue::from_bytes(header.value)
348 .map_err(|e| KodeBridgeError::Http(e.into()))?;
349 header_map.insert(name, value);
350 }
351
352 let framed = FramedRead::new(reader, LinesCodec::new());
354 let line_stream = framed.map(|result| result.map_err(std::io::Error::other));
355
356 Ok(StreamingResponse::new(
357 StatusCode::from_u16(status)?,
358 header_map,
359 Box::pin(line_stream),
360 ))
361}
362
363pub async fn send_streaming_request<S>(mut stream: S, request: Bytes) -> Result<StreamingResponse>
365where
366 S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
367{
368 stream.write_all(&request).await?;
370 stream.flush().await?;
371
372 trace!("Sent HTTP streaming request ({} bytes)", request.len());
373
374 let response = parse_streaming_response(stream).await?;
376
377 debug!(
378 "Received HTTP streaming response: {} {}",
379 response.status(),
380 response.content_length().unwrap_or(0)
381 );
382
383 Ok(response)
384}