kode_bridge/
http_client.rs1use crate::buffer_pool::global_pools;
2use crate::errors::{KodeBridgeError, Result};
3use crate::parser_cache::global_parser_cache;
4use bytes::{Bytes, BytesMut};
5use http::{header, HeaderMap, HeaderName, HeaderValue, Method, StatusCode, Version};
6use serde::{de::DeserializeOwned, Serialize};
7use serde_json::Value;
8use std::collections::HashMap;
9use std::str::FromStr;
10use std::time::Duration;
11use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
12use tokio::io::{AsyncRead, AsyncWrite};
13use tracing::{debug, trace};
14
15#[derive(Debug, Clone)]
17pub struct Response {
18 status: StatusCode,
19 version: Version,
20 headers: HeaderMap,
21 body: Bytes,
22}
23
24impl Response {
25 pub fn new(status: StatusCode, version: Version, headers: HeaderMap, body: Bytes) -> Self {
26 Self {
27 status,
28 version,
29 headers,
30 body,
31 }
32 }
33
34 pub fn status(&self) -> StatusCode {
36 self.status
37 }
38
39 pub fn status_code(&self) -> u16 {
41 self.status.as_u16()
42 }
43
44 pub fn version(&self) -> Version {
46 self.version
47 }
48
49 pub fn headers(&self) -> &HeaderMap {
51 &self.headers
52 }
53
54 pub fn body(&self) -> &Bytes {
56 &self.body
57 }
58
59 pub fn text(&self) -> Result<String> {
61 String::from_utf8(self.body.to_vec()).map_err(KodeBridgeError::from)
62 }
63
64 pub fn json<T>(&self) -> Result<T>
66 where
67 T: DeserializeOwned,
68 {
69 serde_json::from_slice(&self.body).map_err(KodeBridgeError::from)
70 }
71
72 pub fn json_value(&self) -> Result<Value> {
74 serde_json::from_slice(&self.body).map_err(KodeBridgeError::from)
75 }
76
77 pub fn is_success(&self) -> bool {
79 self.status.is_success()
80 }
81
82 pub fn is_client_error(&self) -> bool {
84 self.status.is_client_error()
85 }
86
87 pub fn is_server_error(&self) -> bool {
89 self.status.is_server_error()
90 }
91
92 pub fn is_redirection(&self) -> bool {
94 self.status.is_redirection()
95 }
96
97 pub fn content_length(&self) -> Option<u64> {
99 self.headers
100 .get(header::CONTENT_LENGTH)?
101 .to_str()
102 .ok()?
103 .parse()
104 .ok()
105 }
106
107 pub fn content_type(&self) -> Option<&str> {
109 self.headers.get(header::CONTENT_TYPE)?.to_str().ok()
110 }
111
112 pub fn to_legacy(&self) -> crate::response::LegacyResponse {
114 let headers_map: HashMap<String, String> = self
115 .headers
116 .iter()
117 .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
118 .collect();
119
120 crate::response::LegacyResponse {
121 status: self.status.as_u16(),
122 headers: serde_json::to_value(headers_map).unwrap_or(Value::Null),
123 body: String::from_utf8_lossy(&self.body).to_string(),
124 }
125 }
126}
127
128#[derive(Debug)]
130pub struct RequestBuilder {
131 method: Method,
132 uri: String,
133 headers: HeaderMap,
134 body: Option<Bytes>,
135}
136
137impl RequestBuilder {
138 pub fn new(method: Method, uri: String) -> Self {
139 let mut headers = HeaderMap::new();
140 headers.insert(header::HOST, HeaderValue::from_static("localhost"));
141 headers.insert(
142 header::USER_AGENT,
143 HeaderValue::from_static("kode-bridge/0.1"),
144 );
145
146 Self {
147 method,
148 uri,
149 headers,
150 body: None,
151 }
152 }
153
154 pub fn json<T>(mut self, body: &T) -> Result<Self>
156 where
157 T: Serialize,
158 {
159 let json_bytes = serde_json::to_vec(body)?;
160 self.headers.insert(
161 header::CONTENT_TYPE,
162 HeaderValue::from_static("application/json"),
163 );
164 self.headers.insert(
165 header::CONTENT_LENGTH,
166 HeaderValue::from_str(&json_bytes.len().to_string())
167 .map_err(|e| KodeBridgeError::Http(e.into()))?,
168 );
169 self.body = Some(Bytes::from(json_bytes));
170 Ok(self)
171 }
172
173 pub fn build(self) -> Result<Bytes> {
175 let mut request = BytesMut::new();
176
177 let request_line = format!("{} {} HTTP/1.1\r\n", self.method, self.uri);
179 request.extend_from_slice(request_line.as_bytes());
180
181 for (key, value) in &self.headers {
183 let header_line = format!("{}: {}\r\n", key, value.to_str().unwrap_or(""));
184 request.extend_from_slice(header_line.as_bytes());
185 }
186
187 request.extend_from_slice(b"\r\n");
189
190 if let Some(body) = self.body {
192 request.extend_from_slice(&body);
193 }
194
195 Ok(request.freeze())
196 }
197}
198
199pub async fn parse_response<S>(stream: S) -> Result<Response>
201where
202 S: AsyncRead + Unpin,
203{
204 let mut reader = BufReader::new(stream);
205 let mut buffer = String::new();
206
207 reader.read_line(&mut buffer).await?;
209
210 let mut headers_buffer = Vec::new();
212 headers_buffer.extend_from_slice(buffer.as_bytes());
213
214 let mut line_buffer = String::new();
216 loop {
217 line_buffer.clear();
218 let n = reader.read_line(&mut line_buffer).await?;
219 if n == 0 {
220 return Err(KodeBridgeError::protocol("Unexpected end of stream"));
221 }
222
223 headers_buffer.extend_from_slice(line_buffer.as_bytes());
224
225 if line_buffer == "\r\n" {
227 break;
228 }
229
230 if headers_buffer.len() > 16384 {
232 return Err(KodeBridgeError::protocol("HTTP headers too large"));
233 }
234 }
235
236 let mut parser = global_parser_cache().get();
238 let (status, parsed_headers) = parser.parse_response(&headers_buffer).map_err(|e| {
239 KodeBridgeError::protocol(format!("Failed to parse HTTP response: {:?}", e))
240 })?;
241
242 let mut header_map = HeaderMap::new();
244 for (name, value) in parsed_headers {
245 let header_name =
246 HeaderName::from_str(&name).map_err(|e| KodeBridgeError::Http(e.into()))?;
247 let header_value =
248 HeaderValue::from_str(&value).map_err(|e| KodeBridgeError::Http(e.into()))?;
249 header_map.insert(header_name, header_value);
250 }
251
252 let content_length = header_map
254 .get(header::CONTENT_LENGTH)
255 .and_then(|v| v.to_str().ok())
256 .and_then(|s| s.parse::<usize>().ok());
257
258 let is_chunked = header_map
259 .get(header::TRANSFER_ENCODING)
260 .and_then(|v| v.to_str().ok())
261 .map(|s| s.eq_ignore_ascii_case("chunked"))
262 .unwrap_or(false);
263
264 let body = if is_chunked {
266 read_chunked_body(&mut reader).await?
267 } else if let Some(len) = content_length {
268 if len == 0 {
269 Bytes::new()
271 } else {
272 read_fixed_body(&mut reader, len).await?
273 }
274 } else {
275 read_until_end_adaptive(&mut reader).await?
277 };
278
279 Ok(Response::new(
280 StatusCode::from_u16(status)?,
281 Version::HTTP_11,
282 header_map,
283 body,
284 ))
285}
286
287async fn read_chunked_body<R>(reader: &mut BufReader<R>) -> Result<Bytes>
288where
289 R: AsyncRead + Unpin,
290{
291 let mut body_buffer = global_pools().get_large();
292
293 loop {
294 let mut size_line = String::new();
296 reader.read_line(&mut size_line).await?;
297
298 let size_line = size_line.trim();
299 if size_line.is_empty() {
300 continue;
301 }
302
303 let chunk_size = usize::from_str_radix(size_line, 16)
305 .map_err(|_| KodeBridgeError::protocol("Invalid chunk size"))?;
306
307 if chunk_size == 0 {
308 let mut final_line = String::new();
310 reader.read_line(&mut final_line).await?;
311 break;
312 }
313
314 let mut chunk = vec![0u8; chunk_size];
316 reader.read_exact(&mut chunk).await?;
317 body_buffer.extend_from_slice(&chunk);
318
319 let mut crlf = [0u8; 2];
321 reader.read_exact(&mut crlf).await?;
322 }
323
324 Ok(Bytes::copy_from_slice(body_buffer.as_slice()))
325}
326
327async fn read_fixed_body<R>(reader: &mut BufReader<R>, len: usize) -> Result<Bytes>
328where
329 R: AsyncRead + Unpin,
330{
331 let mut body = vec![0u8; len];
332 reader.read_exact(&mut body).await?;
333 Ok(Bytes::from(body))
334}
335
336async fn read_until_end_adaptive<R>(reader: &mut BufReader<R>) -> Result<Bytes>
337where
338 R: AsyncRead + Unpin,
339{
340 let mut body_buffer = global_pools().get_medium();
341 let mut read_buffer = [0u8; 4096];
342 let mut consecutive_empty_reads = 0;
343
344 loop {
345 let timeout_duration =
347 Duration::from_millis(100 + (consecutive_empty_reads * 50).min(1000));
348
349 match tokio::time::timeout(timeout_duration, reader.read(&mut read_buffer)).await {
350 Ok(Ok(0)) => {
351 break;
353 }
354 Ok(Ok(n)) => {
355 body_buffer.extend_from_slice(&read_buffer[..n]);
356 consecutive_empty_reads = 0;
357 }
358 Ok(Err(e)) => {
359 return Err(KodeBridgeError::from(e));
360 }
361 Err(_) => {
362 consecutive_empty_reads += 1;
364 if consecutive_empty_reads >= 3 {
365 break;
367 }
368 continue;
369 }
370 }
371
372 if body_buffer.len() > 64 * 1024 * 1024 {
374 return Err(KodeBridgeError::protocol("Response body too large"));
376 }
377 }
378
379 Ok(Bytes::copy_from_slice(body_buffer.as_slice()))
381}
382
383pub async fn send_request<S>(mut stream: S, request: Bytes) -> Result<Response>
385where
386 S: AsyncRead + AsyncWrite + Unpin,
387{
388 stream.write_all(&request).await?;
390 stream.flush().await?;
391
392 trace!("Sent HTTP request ({} bytes)", request.len());
393
394 let response = parse_response(stream).await?;
396
397 debug!(
398 "Received HTTP response: {} {}",
399 response.status(),
400 response.content_length().unwrap_or(0)
401 );
402
403 Ok(response)
404}