kode_bridge/
http_client.rs

1use crate::buffer_pool::global_pools;
2use crate::errors::{KodeBridgeError, Result};
3use crate::parser_cache::global_parser_cache;
4use bytes::Bytes;
5use http::{header, HeaderMap, HeaderName, HeaderValue, Method, StatusCode, Version};
6use serde::{de::DeserializeOwned, Serialize};
7use serde_json::Value;
8use std::collections::HashMap;
9use std::str::FromStr;
10use std::time::Duration;
11use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
12use tokio::io::{AsyncRead, AsyncWrite};
13use tracing::{debug, trace};
14
15/// Enhanced HTTP response with rich functionality
16#[derive(Debug, Clone)]
17pub struct Response {
18    status: StatusCode,
19    version: Version,
20    headers: HeaderMap,
21    body: Bytes,
22}
23
24impl Response {
25    pub fn new(status: StatusCode, version: Version, headers: HeaderMap, body: Bytes) -> Self {
26        Self {
27            status,
28            version,
29            headers,
30            body,
31        }
32    }
33
34    /// Get HTTP status code
35    pub fn status(&self) -> StatusCode {
36        self.status
37    }
38
39    /// Get status code as u16
40    pub fn status_code(&self) -> u16 {
41        self.status.as_u16()
42    }
43
44    /// Get HTTP version
45    pub fn version(&self) -> Version {
46        self.version
47    }
48
49    /// Get response headers
50    pub fn headers(&self) -> &HeaderMap {
51        &self.headers
52    }
53
54    /// Get response body as bytes
55    pub fn body(&self) -> &Bytes {
56        &self.body
57    }
58
59    /// Get response body as string
60    pub fn text(&self) -> Result<String> {
61        String::from_utf8(self.body.to_vec()).map_err(KodeBridgeError::from)
62    }
63
64    /// Parse response body as JSON
65    pub fn json<T>(&self) -> Result<T>
66    where
67        T: DeserializeOwned,
68    {
69        serde_json::from_slice(&self.body).map_err(KodeBridgeError::from)
70    }
71
72    /// Parse response body as generic JSON value
73    pub fn json_value(&self) -> Result<Value> {
74        serde_json::from_slice(&self.body).map_err(KodeBridgeError::from)
75    }
76
77    /// Check if response indicates success (2xx status)
78    pub fn is_success(&self) -> bool {
79        self.status.is_success()
80    }
81
82    /// Check if response indicates client error (4xx status)
83    pub fn is_client_error(&self) -> bool {
84        self.status.is_client_error()
85    }
86
87    /// Check if response indicates server error (5xx status)
88    pub fn is_server_error(&self) -> bool {
89        self.status.is_server_error()
90    }
91
92    /// Check if response indicates redirection (3xx status)
93    pub fn is_redirection(&self) -> bool {
94        self.status.is_redirection()
95    }
96
97    /// Get content length from headers
98    pub fn content_length(&self) -> Option<u64> {
99        self.headers
100            .get(header::CONTENT_LENGTH)?
101            .to_str()
102            .ok()?
103            .parse()
104            .ok()
105    }
106
107    /// Get content type from headers
108    pub fn content_type(&self) -> Option<&str> {
109        self.headers.get(header::CONTENT_TYPE)?.to_str().ok()
110    }
111
112    /// Convert to legacy Response format for backward compatibility
113    pub fn to_legacy(&self) -> crate::response::LegacyResponse {
114        let headers_map: HashMap<String, String> = self
115            .headers
116            .iter()
117            .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
118            .collect();
119
120        crate::response::LegacyResponse {
121            status: self.status.as_u16(),
122            headers: serde_json::to_value(headers_map).unwrap_or(Value::Null),
123            body: String::from_utf8_lossy(&self.body).to_string(),
124        }
125    }
126}
127
128/// HTTP request builder with fluent interface
129#[derive(Debug)]
130pub struct RequestBuilder {
131    method: Method,
132    uri: String,
133    headers: HeaderMap,
134    body: Option<Bytes>,
135}
136
137impl RequestBuilder {
138    pub fn new(method: Method, uri: String) -> Self {
139        let mut headers = HeaderMap::new();
140        headers.insert(header::HOST, HeaderValue::from_static("localhost"));
141        headers.insert(
142            header::USER_AGENT,
143            HeaderValue::from_static("kode-bridge/0.1"),
144        );
145
146        Self {
147            method,
148            uri,
149            headers,
150            body: None,
151        }
152    }
153
154    /// Add a custom header
155    pub fn header(mut self, key: &str, value: &str) -> Self {
156        if let (Ok(name), Ok(val)) = (
157            HeaderName::from_bytes(key.as_bytes()),
158            HeaderValue::from_str(value),
159        ) {
160            self.headers.insert(name, val);
161        }
162        self
163    }
164
165    /// Set JSON body with optimized serialization
166    pub fn json<T>(mut self, body: &T) -> Result<Self>
167    where
168        T: Serialize,
169    {
170        // 使用全局缓冲池来减少内存分配
171        let mut buffer = global_pools().get_medium();
172
173        // 直接序列化到缓冲区,避免中间Vec分配
174        serde_json::to_writer(buffer.as_mut_vec(), body)?;
175
176        self.headers.insert(
177            header::CONTENT_TYPE,
178            HeaderValue::from_static("application/json"),
179        );
180
181        // 避免字符串分配,直接使用itoa
182        let content_length = buffer.len().to_string();
183        self.headers.insert(
184            header::CONTENT_LENGTH,
185            HeaderValue::from_str(&content_length).map_err(|e| KodeBridgeError::Http(e.into()))?,
186        );
187
188        // 零拷贝转换到Bytes
189        self.body = Some(Bytes::copy_from_slice(&buffer));
190        Ok(self)
191    }
192
193    /// Build the HTTP request as bytes with optimized allocation
194    pub fn build(self) -> Result<Bytes> {
195        // 预估请求大小以选择合适的缓冲区
196        let estimated_size = 200 + // 基础头部大小
197            self.body.as_ref().map(|b| b.len()).unwrap_or(0);
198
199        let mut request = if estimated_size > 8192 {
200            global_pools().get_large()
201        } else {
202            global_pools().get_medium()
203        };
204
205        // Request line - 使用更高效的写入方式
206        request.extend_from_slice(self.method.as_str().as_bytes());
207        request.extend_from_slice(b" ");
208        request.extend_from_slice(self.uri.as_bytes());
209        request.extend_from_slice(b" HTTP/1.1\r\n");
210
211        // Headers - 批量写入减少系统调用
212        let mut headers_buffer = Vec::with_capacity(512);
213        for (key, value) in &self.headers {
214            headers_buffer.extend_from_slice(key.as_str().as_bytes());
215            headers_buffer.extend_from_slice(b": ");
216            headers_buffer.extend_from_slice(value.as_bytes());
217            headers_buffer.extend_from_slice(b"\r\n");
218        }
219        request.extend_from_slice(&headers_buffer);
220
221        // End of headers
222        request.extend_from_slice(b"\r\n");
223
224        // Body
225        if let Some(body) = self.body {
226            request.extend_from_slice(&body);
227        }
228
229        Ok(Bytes::copy_from_slice(&request))
230    }
231}
232
233/// Parse HTTP response from a stream with optimized parsing
234pub async fn parse_response<S>(stream: S) -> Result<Response>
235where
236    S: AsyncRead + Unpin,
237{
238    let mut reader = BufReader::new(stream);
239
240    // 使用预分配的缓冲区避免多次分配
241    let mut headers_buffer = global_pools().get_medium();
242
243    // 先读取状态行
244    let mut status_line = String::new();
245    reader.read_line(&mut status_line).await?;
246    headers_buffer.extend_from_slice(status_line.as_bytes());
247
248    // 读取HTTP头部直到遇到空行
249    let mut line = String::new();
250    loop {
251        line.clear();
252        let bytes_read = reader.read_line(&mut line).await?;
253        if bytes_read == 0 {
254            break;
255        }
256
257        // 检查是否是头部结束(空行)
258        if line == "\r\n" || line == "\n" {
259            break;
260        }
261
262        // 将头部行添加到缓冲区
263        headers_buffer.extend_from_slice(line.as_bytes());
264    }
265
266    // 添加最后的 \r\n 来标记头部结束
267    headers_buffer.extend_from_slice(b"\r\n");
268
269    // Use cached parser for better performance
270    let mut parser = global_parser_cache().get();
271    let (status, parsed_headers) = parser
272        .parse_response(&headers_buffer)
273        .map_err(|e| match e {
274            httparse::Error::TooManyHeaders => {
275                KodeBridgeError::protocol("Too many HTTP headers in response (limit: 64)")
276            }
277            _ => KodeBridgeError::protocol(format!("Failed to parse HTTP response: {:?}", e)),
278        })?;
279
280    // Build HeaderMap
281    let mut header_map = HeaderMap::new();
282    for (name, value) in parsed_headers {
283        let header_name =
284            HeaderName::from_str(&name).map_err(|e| KodeBridgeError::Http(e.into()))?;
285        let header_value =
286            HeaderValue::from_str(&value).map_err(|e| KodeBridgeError::Http(e.into()))?;
287        header_map.insert(header_name, header_value);
288    }
289
290    // Determine body length
291    let content_length = header_map
292        .get(header::CONTENT_LENGTH)
293        .and_then(|v| v.to_str().ok())
294        .and_then(|s| s.parse::<usize>().ok());
295
296    let is_chunked = header_map
297        .get(header::TRANSFER_ENCODING)
298        .and_then(|v| v.to_str().ok())
299        .map(|s| s.eq_ignore_ascii_case("chunked"))
300        .unwrap_or(false);
301
302    // Read body with optimized handling for empty responses (common in PUT/POST)
303    let body = if is_chunked {
304        read_chunked_body(&mut reader).await?
305    } else if let Some(len) = content_length {
306        if len == 0 {
307            // Empty response, return directly - optimize for PUT responses
308            Bytes::new()
309        } else if len > 10 * 1024 * 1024 {
310            // Use streaming for very large responses (>10MB)
311            return Err(KodeBridgeError::protocol(
312                "Response body too large for memory",
313            ));
314        } else {
315            read_fixed_body(&mut reader, len).await?
316        }
317    } else {
318        // For responses without Content-Length, use adaptive reading with shorter timeouts for PUT
319        read_until_end_adaptive(&mut reader).await?
320    };
321
322    Ok(Response::new(
323        StatusCode::from_u16(status)?,
324        Version::HTTP_11,
325        header_map,
326        body,
327    ))
328}
329
330async fn read_chunked_body<R>(reader: &mut BufReader<R>) -> Result<Bytes>
331where
332    R: AsyncRead + Unpin,
333{
334    let mut body_buffer = global_pools().get_large(); // 开始就使用大缓冲区
335
336    loop {
337        // Read chunk size line
338        let mut size_line = String::new();
339        reader.read_line(&mut size_line).await?;
340
341        let size_line = size_line.trim();
342        if size_line.is_empty() {
343            continue;
344        }
345
346        // Parse chunk size (hex)
347        let chunk_size = usize::from_str_radix(size_line, 16)
348            .map_err(|_| KodeBridgeError::protocol("Invalid chunk size"))?;
349
350        if chunk_size == 0 {
351            // Last chunk, read final CRLF
352            let mut final_line = String::new();
353            reader.read_line(&mut final_line).await?;
354            break;
355        }
356
357        // 对大块数据使用更大的缓冲区
358        if chunk_size > 65536 && body_buffer.capacity() < 131072 {
359            // 如果遇到大块,升级到更大的缓冲区
360            let mut larger_buffer = global_pools().get_extra_large();
361            larger_buffer.extend_from_slice(&body_buffer);
362            body_buffer = larger_buffer;
363        }
364
365        // Read chunk data
366        let mut chunk = vec![0u8; chunk_size];
367        reader.read_exact(&mut chunk).await?;
368        body_buffer.extend_from_slice(&chunk);
369
370        // Read trailing CRLF
371        let mut crlf = [0u8; 2];
372        reader.read_exact(&mut crlf).await?;
373
374        // 安全限制 - 针对PUT请求增加到20MB
375        if body_buffer.len() > 20 * 1024 * 1024 {
376            return Err(KodeBridgeError::protocol("Chunked response body too large"));
377        }
378    }
379
380    Ok(Bytes::copy_from_slice(body_buffer.as_slice()))
381}
382
383async fn read_fixed_body<R>(reader: &mut BufReader<R>, len: usize) -> Result<Bytes>
384where
385    R: AsyncRead + Unpin,
386{
387    let mut body = vec![0u8; len];
388    reader.read_exact(&mut body).await?;
389    Ok(Bytes::from(body))
390}
391
392async fn read_until_end_adaptive<R>(reader: &mut BufReader<R>) -> Result<Bytes>
393where
394    R: AsyncRead + Unpin,
395{
396    let mut body_buffer = global_pools().get_medium(); // 从中等缓冲区开始
397    let mut read_buffer = [0u8; 8192]; // 增加读缓冲区大小
398    let mut consecutive_empty_reads = 0;
399
400    loop {
401        // 使用更短的超时时间,更快地检测结束
402        let timeout_duration = Duration::from_millis(50 + (consecutive_empty_reads * 25));
403
404        match tokio::time::timeout(timeout_duration, reader.read(&mut read_buffer)).await {
405            Ok(Ok(0)) => {
406                // EOF reached
407                break;
408            }
409            Ok(Ok(n)) => {
410                // 如果数据量大,自动升级缓冲区
411                if body_buffer.len() + n > body_buffer.capacity() * 3 / 4
412                    && body_buffer.capacity() < 131072
413                {
414                    let mut larger_buffer = if body_buffer.capacity() < 16384 {
415                        global_pools().get_large()
416                    } else {
417                        global_pools().get_extra_large()
418                    };
419                    larger_buffer.extend_from_slice(&body_buffer);
420                    body_buffer = larger_buffer;
421                }
422
423                body_buffer.extend_from_slice(&read_buffer[..n]);
424                consecutive_empty_reads = 0;
425            }
426            Ok(Err(e)) => {
427                return Err(KodeBridgeError::from(e));
428            }
429            Err(_) => {
430                // Timeout occurred
431                consecutive_empty_reads += 1;
432                if consecutive_empty_reads >= 2 {
433                    // 减少到2次连续超时就结束,提高响应速度
434                    break;
435                }
436                continue;
437            }
438        }
439
440        // Safety limit to prevent unbounded memory usage - 增加到20MB
441        if body_buffer.len() > 20 * 1024 * 1024 {
442            return Err(KodeBridgeError::protocol("Response body too large"));
443        }
444    }
445
446    // Convert pooled buffer to Bytes for return
447    Ok(Bytes::copy_from_slice(body_buffer.as_slice()))
448}
449
450/// Send HTTP request and parse response
451pub async fn send_request<S>(mut stream: S, request: Bytes) -> Result<Response>
452where
453    S: AsyncRead + AsyncWrite + Unpin,
454{
455    // Send request
456    stream.write_all(&request).await?;
457    stream.flush().await?;
458
459    trace!("Sent HTTP request ({} bytes)", request.len());
460
461    // Parse response
462    let response = parse_response(stream).await?;
463
464    debug!(
465        "Received HTTP response: {} {}",
466        response.status(),
467        response.content_length().unwrap_or(0)
468    );
469
470    Ok(response)
471}