kode_bridge/
http_client.rs

1use crate::buffer_pool::global_pools;
2use crate::errors::{KodeBridgeError, Result};
3use crate::parser_cache::global_parser_cache;
4use bytes::{Bytes, BytesMut};
5use http::{header, HeaderMap, HeaderName, HeaderValue, Method, StatusCode, Version};
6use serde::{de::DeserializeOwned, Serialize};
7use serde_json::Value;
8use std::collections::HashMap;
9use std::str::FromStr;
10use std::time::Duration;
11use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
12use tokio::io::{AsyncRead, AsyncWrite};
13use tracing::{debug, trace};
14
15/// Enhanced HTTP response with rich functionality
16#[derive(Debug, Clone)]
17pub struct Response {
18    status: StatusCode,
19    version: Version,
20    headers: HeaderMap,
21    body: Bytes,
22}
23
24impl Response {
25    pub fn new(status: StatusCode, version: Version, headers: HeaderMap, body: Bytes) -> Self {
26        Self {
27            status,
28            version,
29            headers,
30            body,
31        }
32    }
33
34    /// Get HTTP status code
35    pub fn status(&self) -> StatusCode {
36        self.status
37    }
38
39    /// Get status code as u16
40    pub fn status_code(&self) -> u16 {
41        self.status.as_u16()
42    }
43
44    /// Get HTTP version
45    pub fn version(&self) -> Version {
46        self.version
47    }
48
49    /// Get response headers
50    pub fn headers(&self) -> &HeaderMap {
51        &self.headers
52    }
53
54    /// Get response body as bytes
55    pub fn body(&self) -> &Bytes {
56        &self.body
57    }
58
59    /// Get response body as string
60    pub fn text(&self) -> Result<String> {
61        String::from_utf8(self.body.to_vec()).map_err(KodeBridgeError::from)
62    }
63
64    /// Parse response body as JSON
65    pub fn json<T>(&self) -> Result<T>
66    where
67        T: DeserializeOwned,
68    {
69        serde_json::from_slice(&self.body).map_err(KodeBridgeError::from)
70    }
71
72    /// Parse response body as generic JSON value
73    pub fn json_value(&self) -> Result<Value> {
74        serde_json::from_slice(&self.body).map_err(KodeBridgeError::from)
75    }
76
77    /// Check if response indicates success (2xx status)
78    pub fn is_success(&self) -> bool {
79        self.status.is_success()
80    }
81
82    /// Check if response indicates client error (4xx status)
83    pub fn is_client_error(&self) -> bool {
84        self.status.is_client_error()
85    }
86
87    /// Check if response indicates server error (5xx status)
88    pub fn is_server_error(&self) -> bool {
89        self.status.is_server_error()
90    }
91
92    /// Check if response indicates redirection (3xx status)
93    pub fn is_redirection(&self) -> bool {
94        self.status.is_redirection()
95    }
96
97    /// Get content length from headers
98    pub fn content_length(&self) -> Option<u64> {
99        self.headers
100            .get(header::CONTENT_LENGTH)?
101            .to_str()
102            .ok()?
103            .parse()
104            .ok()
105    }
106
107    /// Get content type from headers
108    pub fn content_type(&self) -> Option<&str> {
109        self.headers.get(header::CONTENT_TYPE)?.to_str().ok()
110    }
111
112    /// Convert to legacy Response format for backward compatibility
113    pub fn to_legacy(&self) -> crate::response::LegacyResponse {
114        let headers_map: HashMap<String, String> = self
115            .headers
116            .iter()
117            .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
118            .collect();
119
120        crate::response::LegacyResponse {
121            status: self.status.as_u16(),
122            headers: serde_json::to_value(headers_map).unwrap_or(Value::Null),
123            body: String::from_utf8_lossy(&self.body).to_string(),
124        }
125    }
126}
127
128/// HTTP request builder with fluent interface
129#[derive(Debug)]
130pub struct RequestBuilder {
131    method: Method,
132    uri: String,
133    headers: HeaderMap,
134    body: Option<Bytes>,
135}
136
137impl RequestBuilder {
138    pub fn new(method: Method, uri: String) -> Self {
139        let mut headers = HeaderMap::new();
140        headers.insert(header::HOST, HeaderValue::from_static("localhost"));
141        headers.insert(
142            header::USER_AGENT,
143            HeaderValue::from_static("kode-bridge/0.1"),
144        );
145
146        Self {
147            method,
148            uri,
149            headers,
150            body: None,
151        }
152    }
153
154    /// Set JSON body
155    pub fn json<T>(mut self, body: &T) -> Result<Self>
156    where
157        T: Serialize,
158    {
159        let json_bytes = serde_json::to_vec(body)?;
160        self.headers.insert(
161            header::CONTENT_TYPE,
162            HeaderValue::from_static("application/json"),
163        );
164        self.headers.insert(
165            header::CONTENT_LENGTH,
166            HeaderValue::from_str(&json_bytes.len().to_string())
167                .map_err(|e| KodeBridgeError::Http(e.into()))?,
168        );
169        self.body = Some(Bytes::from(json_bytes));
170        Ok(self)
171    }
172
173    /// Build the HTTP request as bytes
174    pub fn build(self) -> Result<Bytes> {
175        let mut request = BytesMut::new();
176
177        // Request line
178        let request_line = format!("{} {} HTTP/1.1\r\n", self.method, self.uri);
179        request.extend_from_slice(request_line.as_bytes());
180
181        // Headers
182        for (key, value) in &self.headers {
183            let header_line = format!("{}: {}\r\n", key, value.to_str().unwrap_or(""));
184            request.extend_from_slice(header_line.as_bytes());
185        }
186
187        // End of headers
188        request.extend_from_slice(b"\r\n");
189
190        // Body
191        if let Some(body) = self.body {
192            request.extend_from_slice(&body);
193        }
194
195        Ok(request.freeze())
196    }
197}
198
199/// Parse HTTP response from a stream
200pub async fn parse_response<S>(stream: S) -> Result<Response>
201where
202    S: AsyncRead + Unpin,
203{
204    let mut reader = BufReader::new(stream);
205    let mut buffer = String::new();
206
207    // Optimization: read the status line at once
208    reader.read_line(&mut buffer).await?;
209
210    // Continue reading headers
211    let mut headers_buffer = Vec::new();
212    headers_buffer.extend_from_slice(buffer.as_bytes());
213
214    // Read remaining headers until \r\n\r\n is found
215    let mut line_buffer = String::new();
216    loop {
217        line_buffer.clear();
218        let n = reader.read_line(&mut line_buffer).await?;
219        if n == 0 {
220            return Err(KodeBridgeError::protocol("Unexpected end of stream"));
221        }
222
223        headers_buffer.extend_from_slice(line_buffer.as_bytes());
224
225        // Check for empty line (just \r\n)
226        if line_buffer == "\r\n" {
227            break;
228        }
229
230        // Prevent headers from being too large
231        if headers_buffer.len() > 16384 {
232            return Err(KodeBridgeError::protocol("HTTP headers too large"));
233        }
234    }
235
236    // Use cached parser for better performance
237    let mut parser = global_parser_cache().get();
238    let (status, parsed_headers) = parser.parse_response(&headers_buffer).map_err(|e| {
239        KodeBridgeError::protocol(format!("Failed to parse HTTP response: {:?}", e))
240    })?;
241
242    // Build HeaderMap
243    let mut header_map = HeaderMap::new();
244    for (name, value) in parsed_headers {
245        let header_name =
246            HeaderName::from_str(&name).map_err(|e| KodeBridgeError::Http(e.into()))?;
247        let header_value =
248            HeaderValue::from_str(&value).map_err(|e| KodeBridgeError::Http(e.into()))?;
249        header_map.insert(header_name, header_value);
250    }
251
252    // Determine body length
253    let content_length = header_map
254        .get(header::CONTENT_LENGTH)
255        .and_then(|v| v.to_str().ok())
256        .and_then(|s| s.parse::<usize>().ok());
257
258    let is_chunked = header_map
259        .get(header::TRANSFER_ENCODING)
260        .and_then(|v| v.to_str().ok())
261        .map(|s| s.eq_ignore_ascii_case("chunked"))
262        .unwrap_or(false);
263
264    // Read body with optimized handling for empty responses (common in PUT/POST)
265    let body = if is_chunked {
266        read_chunked_body(&mut reader).await?
267    } else if let Some(len) = content_length {
268        if len == 0 {
269            // Empty response, return directly
270            Bytes::new()
271        } else {
272            read_fixed_body(&mut reader, len).await?
273        }
274    } else {
275        // For responses without Content-Length, use adaptive reading
276        read_until_end_adaptive(&mut reader).await?
277    };
278
279    Ok(Response::new(
280        StatusCode::from_u16(status)?,
281        Version::HTTP_11,
282        header_map,
283        body,
284    ))
285}
286
287async fn read_chunked_body<R>(reader: &mut BufReader<R>) -> Result<Bytes>
288where
289    R: AsyncRead + Unpin,
290{
291    let mut body_buffer = global_pools().get_large();
292
293    loop {
294        // Read chunk size line
295        let mut size_line = String::new();
296        reader.read_line(&mut size_line).await?;
297
298        let size_line = size_line.trim();
299        if size_line.is_empty() {
300            continue;
301        }
302
303        // Parse chunk size (hex)
304        let chunk_size = usize::from_str_radix(size_line, 16)
305            .map_err(|_| KodeBridgeError::protocol("Invalid chunk size"))?;
306
307        if chunk_size == 0 {
308            // Last chunk, read final CRLF
309            let mut final_line = String::new();
310            reader.read_line(&mut final_line).await?;
311            break;
312        }
313
314        // Read chunk data
315        let mut chunk = vec![0u8; chunk_size];
316        reader.read_exact(&mut chunk).await?;
317        body_buffer.extend_from_slice(&chunk);
318
319        // Read trailing CRLF
320        let mut crlf = [0u8; 2];
321        reader.read_exact(&mut crlf).await?;
322    }
323
324    Ok(Bytes::copy_from_slice(body_buffer.as_slice()))
325}
326
327async fn read_fixed_body<R>(reader: &mut BufReader<R>, len: usize) -> Result<Bytes>
328where
329    R: AsyncRead + Unpin,
330{
331    let mut body = vec![0u8; len];
332    reader.read_exact(&mut body).await?;
333    Ok(Bytes::from(body))
334}
335
336async fn read_until_end_adaptive<R>(reader: &mut BufReader<R>) -> Result<Bytes>
337where
338    R: AsyncRead + Unpin,
339{
340    let mut body_buffer = global_pools().get_medium();
341    let mut read_buffer = [0u8; 4096];
342    let mut consecutive_empty_reads = 0;
343
344    loop {
345        // Use progressively longer timeouts to avoid premature termination
346        let timeout_duration =
347            Duration::from_millis(100 + (consecutive_empty_reads * 50).min(1000));
348
349        match tokio::time::timeout(timeout_duration, reader.read(&mut read_buffer)).await {
350            Ok(Ok(0)) => {
351                // EOF reached
352                break;
353            }
354            Ok(Ok(n)) => {
355                body_buffer.extend_from_slice(&read_buffer[..n]);
356                consecutive_empty_reads = 0;
357            }
358            Ok(Err(e)) => {
359                return Err(KodeBridgeError::from(e));
360            }
361            Err(_) => {
362                // Timeout occurred
363                consecutive_empty_reads += 1;
364                if consecutive_empty_reads >= 3 {
365                    // After 3 consecutive timeouts, assume no more data
366                    break;
367                }
368                continue;
369            }
370        }
371
372        // Safety limit to prevent unbounded memory usage
373        if body_buffer.len() > 64 * 1024 * 1024 {
374            // 64MB limit
375            return Err(KodeBridgeError::protocol("Response body too large"));
376        }
377    }
378
379    // Convert pooled buffer to Bytes for return
380    Ok(Bytes::copy_from_slice(body_buffer.as_slice()))
381}
382
383/// Send HTTP request and parse response
384pub async fn send_request<S>(mut stream: S, request: Bytes) -> Result<Response>
385where
386    S: AsyncRead + AsyncWrite + Unpin,
387{
388    // Send request
389    stream.write_all(&request).await?;
390    stream.flush().await?;
391
392    trace!("Sent HTTP request ({} bytes)", request.len());
393
394    // Parse response
395    let response = parse_response(stream).await?;
396
397    debug!(
398        "Received HTTP response: {} {}",
399        response.status(),
400        response.content_length().unwrap_or(0)
401    );
402
403    Ok(response)
404}