Skip to main content

kode_bridge/
http_client.rs

1use crate::errors::{KodeBridgeError, Result};
2use crate::parser_cache::global_parser_cache;
3use bytes::{BufMut as _, Bytes, BytesMut};
4use http::{header, HeaderMap, HeaderName, HeaderValue, Method, StatusCode, Version};
5use serde::{de::DeserializeOwned, Serialize};
6use serde_json::Value;
7use std::collections::HashMap;
8use std::str::FromStr as _;
9use std::time::Duration;
10use tokio::io::{AsyncBufReadExt as _, AsyncReadExt as _, AsyncWriteExt as _, BufReader};
11use tokio::io::{AsyncRead, AsyncWrite};
12use tracing::{debug, trace};
13
14/// Enhanced HTTP response with rich functionality
15#[derive(Debug, Clone)]
16pub struct Response {
17    status: StatusCode,
18    version: Version,
19    headers: HeaderMap,
20    body: Bytes,
21}
22
23impl Response {
24    pub const fn new(status: StatusCode, version: Version, headers: HeaderMap, body: Bytes) -> Self {
25        Self {
26            status,
27            version,
28            headers,
29            body,
30        }
31    }
32
33    /// Get HTTP status code
34    pub const fn status(&self) -> StatusCode {
35        self.status
36    }
37
38    /// Get status code as u16
39    pub const fn status_code(&self) -> u16 {
40        self.status.as_u16()
41    }
42
43    /// Get HTTP version
44    pub const fn version(&self) -> Version {
45        self.version
46    }
47
48    /// Get response headers
49    pub const fn headers(&self) -> &HeaderMap {
50        &self.headers
51    }
52
53    /// Get response body as bytes
54    pub const fn body(&self) -> &Bytes {
55        &self.body
56    }
57
58    /// Get response body as string
59    pub fn text(&self) -> Result<String> {
60        String::from_utf8(self.body.to_vec()).map_err(KodeBridgeError::from)
61    }
62
63    /// Parse response body as JSON
64    pub fn json<T>(&self) -> Result<T>
65    where
66        T: DeserializeOwned,
67    {
68        serde_json::from_slice(&self.body).map_err(KodeBridgeError::from)
69    }
70
71    /// Parse response body as generic JSON value
72    pub fn json_value(&self) -> Result<Value> {
73        serde_json::from_slice(&self.body).map_err(KodeBridgeError::from)
74    }
75
76    /// Check if response indicates success (2xx status)
77    pub fn is_success(&self) -> bool {
78        self.status.is_success()
79    }
80
81    /// Check if response indicates client error (4xx status)
82    pub fn is_client_error(&self) -> bool {
83        self.status.is_client_error()
84    }
85
86    /// Check if response indicates server error (5xx status)
87    pub fn is_server_error(&self) -> bool {
88        self.status.is_server_error()
89    }
90
91    /// Check if response indicates redirection (3xx status)
92    pub fn is_redirection(&self) -> bool {
93        self.status.is_redirection()
94    }
95
96    /// Get content length from headers
97    pub fn content_length(&self) -> Option<u64> {
98        self.headers
99            .get(header::CONTENT_LENGTH)?
100            .to_str()
101            .ok()?
102            .parse()
103            .ok()
104    }
105
106    /// Get content type from headers
107    pub fn content_type(&self) -> Option<&str> {
108        self.headers.get(header::CONTENT_TYPE)?.to_str().ok()
109    }
110
111    /// Convert to legacy Response format for backward compatibility
112    pub fn to_legacy(&self) -> crate::response::LegacyResponse {
113        let headers_map: HashMap<String, String> = self
114            .headers
115            .iter()
116            .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
117            .collect();
118
119        crate::response::LegacyResponse {
120            status: self.status.as_u16(),
121            headers: serde_json::to_value(headers_map).unwrap_or(Value::Null),
122            body: String::from_utf8_lossy(&self.body).to_string(),
123        }
124    }
125}
126
127/// HTTP request builder with fluent interface
128#[derive(Debug)]
129pub struct RequestBuilder {
130    method: Method,
131    uri: String,
132    headers: HeaderMap,
133    body: Option<Bytes>,
134}
135
136impl RequestBuilder {
137    pub fn new(method: Method, uri: String) -> Self {
138        let mut headers = HeaderMap::new();
139        headers.insert(header::HOST, HeaderValue::from_static("localhost"));
140        headers.insert(header::USER_AGENT, HeaderValue::from_static("kode-bridge/0.1"));
141
142        Self {
143            method,
144            uri,
145            headers,
146            body: None,
147        }
148    }
149
150    /// Add a custom header
151    pub fn header(mut self, key: &str, value: &str) -> Self {
152        if let (Ok(name), Ok(val)) = (HeaderName::from_bytes(key.as_bytes()), HeaderValue::from_str(value)) {
153            self.headers.insert(name, val);
154        }
155        self
156    }
157
158    /// Set a pre-serialized body and content type.
159    pub fn body_bytes(mut self, body: Bytes, content_type: &'static str) -> Result<Self> {
160        self.headers
161            .insert(header::CONTENT_TYPE, HeaderValue::from_static(content_type));
162
163        let content_length = body.len().to_string();
164        self.headers.insert(
165            header::CONTENT_LENGTH,
166            HeaderValue::from_str(&content_length).map_err(|e| KodeBridgeError::Http(e.into()))?,
167        );
168
169        self.body = Some(body);
170        Ok(self)
171    }
172
173    /// Set JSON body with optimized serialization
174    pub fn json<T>(self, body: &T) -> Result<Self>
175    where
176        T: Serialize,
177    {
178        let mut buffer = BytesMut::with_capacity(1024);
179        {
180            let writer = (&mut buffer).writer();
181            serde_json::to_writer(writer, body).map_err(KodeBridgeError::from)?;
182        }
183
184        self.body_bytes(buffer.freeze(), "application/json")
185    }
186
187    /// Build the HTTP request as bytes with optimized allocation
188    pub fn build(self) -> Result<Bytes> {
189        let mut request = BytesMut::with_capacity(1024 + self.body.as_ref().map(|b| b.len()).unwrap_or(0));
190
191        // Request line - 使用更高效的写入方式
192        request.extend_from_slice(self.method.as_str().as_bytes());
193        request.extend_from_slice(b" ");
194        request.extend_from_slice(self.uri.as_bytes());
195        request.extend_from_slice(b" HTTP/1.1\r\n");
196
197        // Headers - 批量写入减少系统调用
198        let mut headers_buffer = BytesMut::with_capacity(512);
199        for (key, value) in &self.headers {
200            headers_buffer.extend_from_slice(key.as_str().as_bytes());
201            headers_buffer.extend_from_slice(b": ");
202            headers_buffer.extend_from_slice(value.as_bytes());
203            headers_buffer.extend_from_slice(b"\r\n");
204        }
205        request.extend_from_slice(&headers_buffer);
206
207        // End of headers
208        request.extend_from_slice(b"\r\n");
209
210        // Body
211        if let Some(body) = self.body {
212            request.extend_from_slice(&body);
213        }
214
215        Ok(request.freeze())
216    }
217}
218
219/// Parse HTTP response from a stream with optimized parsing
220pub async fn parse_response<S>(stream: S) -> Result<Response>
221where
222    S: AsyncRead + Unpin + Send,
223{
224    let mut reader = BufReader::new(stream);
225
226    // 使用预分配的缓冲区避免多次分配
227    let mut headers_buffer = BytesMut::with_capacity(1024);
228
229    // 先读取状态行
230    let mut status_line = String::new();
231    reader.read_line(&mut status_line).await?;
232    headers_buffer.extend_from_slice(status_line.as_bytes());
233
234    // 读取HTTP头部直到遇到空行
235    let mut line = String::new();
236    loop {
237        line.clear();
238        let bytes_read = reader.read_line(&mut line).await?;
239        if bytes_read == 0 {
240            break;
241        }
242
243        // 检查是否是头部结束(空行)
244        if line == "\r\n" || line == "\n" {
245            break;
246        }
247
248        // 将头部行添加到缓冲区
249        headers_buffer.extend_from_slice(line.as_bytes());
250    }
251
252    // 添加最后的 \r\n 来标记头部结束
253    headers_buffer.extend_from_slice(b"\r\n");
254
255    // Use cached parser for better performance
256    let mut parser = global_parser_cache().get();
257    let (status, parsed_headers) = parser
258        .parse_response(&headers_buffer)
259        .map_err(|e| match e {
260            httparse::Error::TooManyHeaders => {
261                KodeBridgeError::protocol("Too many HTTP headers in response (limit: 64)")
262            }
263            _ => KodeBridgeError::protocol(format!("Failed to parse HTTP response: {:?}", e)),
264        })?;
265
266    // Build HeaderMap
267    let mut header_map = HeaderMap::new();
268    for (name, value) in parsed_headers {
269        let header_name = HeaderName::from_str(&name).map_err(|e| KodeBridgeError::Http(e.into()))?;
270        let header_value = HeaderValue::from_str(&value).map_err(|e| KodeBridgeError::Http(e.into()))?;
271        header_map.insert(header_name, header_value);
272    }
273
274    // Determine body length
275    let content_length = header_map
276        .get(header::CONTENT_LENGTH)
277        .and_then(|v| v.to_str().ok())
278        .and_then(|s| s.parse::<usize>().ok());
279
280    let is_chunked = header_map
281        .get(header::TRANSFER_ENCODING)
282        .and_then(|v| v.to_str().ok())
283        .map(|s| s.eq_ignore_ascii_case("chunked"))
284        .unwrap_or(false);
285
286    // Read body with optimized handling for empty responses (common in PUT/POST)
287    let body = if is_chunked {
288        read_chunked_body(&mut reader).await?
289    } else if let Some(len) = content_length {
290        if len == 0 {
291            // Empty response, return directly - optimize for PUT responses
292            Bytes::new()
293        } else if len > 10 * 1024 * 1024 {
294            // Use streaming for very large responses (>10MB)
295            return Err(KodeBridgeError::protocol("Response body too large for memory"));
296        } else {
297            read_fixed_body(&mut reader, len).await?
298        }
299    } else {
300        // For responses without Content-Length, use adaptive reading with shorter timeouts for PUT
301        read_until_end_adaptive(&mut reader).await?
302    };
303
304    Ok(Response::new(
305        StatusCode::from_u16(status)?,
306        Version::HTTP_11,
307        header_map,
308        body,
309    ))
310}
311
312async fn read_chunked_body<R>(reader: &mut BufReader<R>) -> Result<Bytes>
313where
314    R: AsyncRead + Unpin + Send,
315{
316    let mut body_buffer = BytesMut::with_capacity(8192);
317
318    loop {
319        // Read chunk size line
320        let mut size_line = String::new();
321        reader.read_line(&mut size_line).await?;
322
323        let size_line = size_line.trim();
324        if size_line.is_empty() {
325            continue;
326        }
327
328        // Parse chunk size (hex)
329        let chunk_size =
330            usize::from_str_radix(size_line, 16).map_err(|_| KodeBridgeError::protocol("Invalid chunk size"))?;
331
332        if chunk_size == 0 {
333            // Last chunk, read final CRLF
334            let mut final_line = String::new();
335            reader.read_line(&mut final_line).await?;
336            break;
337        }
338
339        // Read chunk data
340        let mut chunk = vec![0u8; chunk_size];
341        reader.read_exact(&mut chunk).await?;
342        body_buffer.extend_from_slice(&chunk);
343
344        // Read trailing CRLF
345        let mut crlf = [0u8; 2];
346        reader.read_exact(&mut crlf).await?;
347    }
348
349    Ok(body_buffer.freeze())
350}
351
352async fn read_fixed_body<R>(reader: &mut BufReader<R>, len: usize) -> Result<Bytes>
353where
354    R: AsyncRead + Unpin + Send,
355{
356    let mut body = vec![0u8; len];
357    reader.read_exact(&mut body).await?;
358    Ok(Bytes::from(body))
359}
360
361async fn read_until_end_adaptive<R>(reader: &mut BufReader<R>) -> Result<Bytes>
362where
363    R: AsyncRead + Unpin + Send,
364{
365    let mut body_buffer = BytesMut::with_capacity(8192);
366    let mut read_buffer = [0u8; 512];
367    let mut consecutive_empty_reads = 0;
368
369    loop {
370        let timeout_duration = Duration::from_millis(25 + (consecutive_empty_reads * 25));
371
372        match tokio::time::timeout(timeout_duration, reader.read(&mut read_buffer)).await {
373            Ok(Ok(0)) => {
374                // EOF reached
375                break;
376            }
377            Ok(Ok(n)) => {
378                body_buffer.extend_from_slice(&read_buffer[..n]);
379                consecutive_empty_reads = 0;
380            }
381            Ok(Err(e)) => {
382                return Err(KodeBridgeError::from(e));
383            }
384            Err(_) => {
385                // Timeout occurred
386                consecutive_empty_reads += 1;
387                if consecutive_empty_reads >= 2 {
388                    // 减少到2次连续超时就结束,提高响应速度
389                    break;
390                }
391                continue;
392            }
393        }
394
395        // Safety limit to prevent unbounded memory usage - 增加到20MB
396        if body_buffer.len() > 20 * 1024 * 1024 {
397            return Err(KodeBridgeError::protocol("Response body too large"));
398        }
399    }
400
401    // Convert pooled buffer to Bytes for return
402    Ok(body_buffer.freeze())
403}
404
405/// Send HTTP request and parse response
406pub async fn send_request<S>(mut stream: S, request: Bytes) -> Result<Response>
407where
408    S: AsyncRead + AsyncWrite + Unpin + Send,
409{
410    // Send request
411    stream.write_all(&request).await?;
412    stream.flush().await?;
413
414    trace!("Sent HTTP request ({} bytes)", request.len());
415
416    // Parse response
417    let response = parse_response(stream).await?;
418
419    debug!(
420        "Received HTTP response: {} {}",
421        response.status(),
422        response.content_length().unwrap_or(0)
423    );
424
425    Ok(response)
426}