kode_bridge/
http_client.rs1use crate::buffer_pool::global_pools;
2use crate::errors::{KodeBridgeError, Result};
3use crate::parser_cache::global_parser_cache;
4use bytes::Bytes;
5use http::{header, HeaderMap, HeaderName, HeaderValue, Method, StatusCode, Version};
6use serde::{de::DeserializeOwned, Serialize};
7use serde_json::Value;
8use std::collections::HashMap;
9use std::str::FromStr as _;
10use std::time::Duration;
11use tokio::io::{AsyncBufReadExt as _, AsyncReadExt as _, AsyncWriteExt as _, BufReader};
12use tokio::io::{AsyncRead, AsyncWrite};
13use tracing::{debug, trace};
14
15#[derive(Debug, Clone)]
17pub struct Response {
18 status: StatusCode,
19 version: Version,
20 headers: HeaderMap,
21 body: Bytes,
22}
23
24impl Response {
25 pub const fn new(status: StatusCode, version: Version, headers: HeaderMap, body: Bytes) -> Self {
26 Self {
27 status,
28 version,
29 headers,
30 body,
31 }
32 }
33
34 pub const fn status(&self) -> StatusCode {
36 self.status
37 }
38
39 pub const fn status_code(&self) -> u16 {
41 self.status.as_u16()
42 }
43
44 pub const fn version(&self) -> Version {
46 self.version
47 }
48
49 pub const fn headers(&self) -> &HeaderMap {
51 &self.headers
52 }
53
54 pub const fn body(&self) -> &Bytes {
56 &self.body
57 }
58
59 pub fn text(&self) -> Result<String> {
61 String::from_utf8(self.body.to_vec()).map_err(KodeBridgeError::from)
62 }
63
64 pub fn json<T>(&self) -> Result<T>
66 where
67 T: DeserializeOwned,
68 {
69 serde_json::from_slice(&self.body).map_err(KodeBridgeError::from)
70 }
71
72 pub fn json_value(&self) -> Result<Value> {
74 serde_json::from_slice(&self.body).map_err(KodeBridgeError::from)
75 }
76
77 pub fn is_success(&self) -> bool {
79 self.status.is_success()
80 }
81
82 pub fn is_client_error(&self) -> bool {
84 self.status.is_client_error()
85 }
86
87 pub fn is_server_error(&self) -> bool {
89 self.status.is_server_error()
90 }
91
92 pub fn is_redirection(&self) -> bool {
94 self.status.is_redirection()
95 }
96
97 pub fn content_length(&self) -> Option<u64> {
99 self.headers
100 .get(header::CONTENT_LENGTH)?
101 .to_str()
102 .ok()?
103 .parse()
104 .ok()
105 }
106
107 pub fn content_type(&self) -> Option<&str> {
109 self.headers.get(header::CONTENT_TYPE)?.to_str().ok()
110 }
111
112 pub fn to_legacy(&self) -> crate::response::LegacyResponse {
114 let headers_map: HashMap<String, String> = self
115 .headers
116 .iter()
117 .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
118 .collect();
119
120 crate::response::LegacyResponse {
121 status: self.status.as_u16(),
122 headers: serde_json::to_value(headers_map).unwrap_or(Value::Null),
123 body: String::from_utf8_lossy(&self.body).to_string(),
124 }
125 }
126}
127
128#[derive(Debug)]
130pub struct RequestBuilder {
131 method: Method,
132 uri: String,
133 headers: HeaderMap,
134 body: Option<Bytes>,
135}
136
137impl RequestBuilder {
138 pub fn new(method: Method, uri: String) -> Self {
139 let mut headers = HeaderMap::new();
140 headers.insert(header::HOST, HeaderValue::from_static("localhost"));
141 headers.insert(header::USER_AGENT, HeaderValue::from_static("kode-bridge/0.1"));
142
143 Self {
144 method,
145 uri,
146 headers,
147 body: None,
148 }
149 }
150
151 pub fn header(mut self, key: &str, value: &str) -> Self {
153 if let (Ok(name), Ok(val)) = (HeaderName::from_bytes(key.as_bytes()), HeaderValue::from_str(value)) {
154 self.headers.insert(name, val);
155 }
156 self
157 }
158
159 pub fn json<T>(mut self, body: &T) -> Result<Self>
161 where
162 T: Serialize,
163 {
164 let mut buffer = global_pools().get_medium();
166
167 serde_json::to_writer(buffer.as_mut_vec(), body)?;
169
170 self.headers
171 .insert(header::CONTENT_TYPE, HeaderValue::from_static("application/json"));
172
173 let content_length = buffer.len().to_string();
175 self.headers.insert(
176 header::CONTENT_LENGTH,
177 HeaderValue::from_str(&content_length).map_err(|e| KodeBridgeError::Http(e.into()))?,
178 );
179
180 self.body = Some(Bytes::copy_from_slice(&buffer));
182 Ok(self)
183 }
184
185 pub fn build(self) -> Result<Bytes> {
187 let estimated_size = 200 + self.body.as_ref().map(|b| b.len()).unwrap_or(0);
190
191 let mut request = if estimated_size > 8192 {
192 global_pools().get_large()
193 } else {
194 global_pools().get_medium()
195 };
196
197 request.extend_from_slice(self.method.as_str().as_bytes());
199 request.extend_from_slice(b" ");
200 request.extend_from_slice(self.uri.as_bytes());
201 request.extend_from_slice(b" HTTP/1.1\r\n");
202
203 let mut headers_buffer = Vec::with_capacity(512);
205 for (key, value) in &self.headers {
206 headers_buffer.extend_from_slice(key.as_str().as_bytes());
207 headers_buffer.extend_from_slice(b": ");
208 headers_buffer.extend_from_slice(value.as_bytes());
209 headers_buffer.extend_from_slice(b"\r\n");
210 }
211 request.extend_from_slice(&headers_buffer);
212
213 request.extend_from_slice(b"\r\n");
215
216 if let Some(body) = self.body {
218 request.extend_from_slice(&body);
219 }
220
221 Ok(Bytes::copy_from_slice(&request))
222 }
223}
224
225pub async fn parse_response<S>(stream: S) -> Result<Response>
227where
228 S: AsyncRead + Unpin + Send,
229{
230 let mut reader = BufReader::new(stream);
231
232 let mut headers_buffer = global_pools().get_medium();
234
235 let mut status_line = String::new();
237 reader.read_line(&mut status_line).await?;
238 headers_buffer.extend_from_slice(status_line.as_bytes());
239
240 let mut line = String::new();
242 loop {
243 line.clear();
244 let bytes_read = reader.read_line(&mut line).await?;
245 if bytes_read == 0 {
246 break;
247 }
248
249 if line == "\r\n" || line == "\n" {
251 break;
252 }
253
254 headers_buffer.extend_from_slice(line.as_bytes());
256 }
257
258 headers_buffer.extend_from_slice(b"\r\n");
260
261 let mut parser = global_parser_cache().get();
263 let (status, parsed_headers) = parser
264 .parse_response(&headers_buffer)
265 .map_err(|e| match e {
266 httparse::Error::TooManyHeaders => {
267 KodeBridgeError::protocol("Too many HTTP headers in response (limit: 64)")
268 }
269 _ => KodeBridgeError::protocol(format!("Failed to parse HTTP response: {:?}", e)),
270 })?;
271
272 let mut header_map = HeaderMap::new();
274 for (name, value) in parsed_headers {
275 let header_name = HeaderName::from_str(&name).map_err(|e| KodeBridgeError::Http(e.into()))?;
276 let header_value = HeaderValue::from_str(&value).map_err(|e| KodeBridgeError::Http(e.into()))?;
277 header_map.insert(header_name, header_value);
278 }
279
280 let content_length = header_map
282 .get(header::CONTENT_LENGTH)
283 .and_then(|v| v.to_str().ok())
284 .and_then(|s| s.parse::<usize>().ok());
285
286 let is_chunked = header_map
287 .get(header::TRANSFER_ENCODING)
288 .and_then(|v| v.to_str().ok())
289 .map(|s| s.eq_ignore_ascii_case("chunked"))
290 .unwrap_or(false);
291
292 let body = if is_chunked {
294 read_chunked_body(&mut reader).await?
295 } else if let Some(len) = content_length {
296 if len == 0 {
297 Bytes::new()
299 } else if len > 10 * 1024 * 1024 {
300 return Err(KodeBridgeError::protocol("Response body too large for memory"));
302 } else {
303 read_fixed_body(&mut reader, len).await?
304 }
305 } else {
306 read_until_end_adaptive(&mut reader).await?
308 };
309
310 Ok(Response::new(
311 StatusCode::from_u16(status)?,
312 Version::HTTP_11,
313 header_map,
314 body,
315 ))
316}
317
318async fn read_chunked_body<R>(reader: &mut BufReader<R>) -> Result<Bytes>
319where
320 R: AsyncRead + Unpin + Send,
321{
322 let mut body_buffer = global_pools().get_large(); loop {
325 let mut size_line = String::new();
327 reader.read_line(&mut size_line).await?;
328
329 let size_line = size_line.trim();
330 if size_line.is_empty() {
331 continue;
332 }
333
334 let chunk_size =
336 usize::from_str_radix(size_line, 16).map_err(|_| KodeBridgeError::protocol("Invalid chunk size"))?;
337
338 if chunk_size == 0 {
339 let mut final_line = String::new();
341 reader.read_line(&mut final_line).await?;
342 break;
343 }
344
345 if chunk_size > 65536 && body_buffer.capacity() < 131072 {
347 let mut larger_buffer = global_pools().get_extra_large();
349 larger_buffer.extend_from_slice(&body_buffer);
350 body_buffer = larger_buffer;
351 }
352
353 let mut chunk = vec![0u8; chunk_size];
355 reader.read_exact(&mut chunk).await?;
356 body_buffer.extend_from_slice(&chunk);
357
358 let mut crlf = [0u8; 2];
360 reader.read_exact(&mut crlf).await?;
361
362 if body_buffer.len() > 20 * 1024 * 1024 {
364 return Err(KodeBridgeError::protocol("Chunked response body too large"));
365 }
366 }
367
368 Ok(Bytes::copy_from_slice(body_buffer.as_slice()))
369}
370
371async fn read_fixed_body<R>(reader: &mut BufReader<R>, len: usize) -> Result<Bytes>
372where
373 R: AsyncRead + Unpin + Send,
374{
375 let mut body = vec![0u8; len];
376 reader.read_exact(&mut body).await?;
377 Ok(Bytes::from(body))
378}
379
380async fn read_until_end_adaptive<R>(reader: &mut BufReader<R>) -> Result<Bytes>
381where
382 R: AsyncRead + Unpin + Send,
383{
384 let mut body_buffer = global_pools().get_medium();
385 let mut read_buffer = [0u8; 512];
386 let mut consecutive_empty_reads = 0;
387
388 loop {
389 let timeout_duration = Duration::from_millis(50 + (consecutive_empty_reads * 25));
390
391 match tokio::time::timeout(timeout_duration, reader.read(&mut read_buffer)).await {
392 Ok(Ok(0)) => {
393 break;
395 }
396 Ok(Ok(n)) => {
397 if body_buffer.len() + n > body_buffer.capacity() * 3 / 4 && body_buffer.capacity() < 131072 {
399 let mut larger_buffer = if body_buffer.capacity() < 16384 {
400 global_pools().get_large()
401 } else {
402 global_pools().get_extra_large()
403 };
404 larger_buffer.extend_from_slice(&body_buffer);
405 body_buffer = larger_buffer;
406 }
407
408 body_buffer.extend_from_slice(&read_buffer[..n]);
409 consecutive_empty_reads = 0;
410 }
411 Ok(Err(e)) => {
412 return Err(KodeBridgeError::from(e));
413 }
414 Err(_) => {
415 consecutive_empty_reads += 1;
417 if consecutive_empty_reads >= 2 {
418 break;
420 }
421 continue;
422 }
423 }
424
425 if body_buffer.len() > 20 * 1024 * 1024 {
427 return Err(KodeBridgeError::protocol("Response body too large"));
428 }
429 }
430
431 Ok(Bytes::copy_from_slice(body_buffer.as_slice()))
433}
434
435pub async fn send_request<S>(mut stream: S, request: Bytes) -> Result<Response>
437where
438 S: AsyncRead + AsyncWrite + Unpin + Send,
439{
440 stream.write_all(&request).await?;
442 stream.flush().await?;
443
444 trace!("Sent HTTP request ({} bytes)", request.len());
445
446 let response = parse_response(stream).await?;
448
449 debug!(
450 "Received HTTP response: {} {}",
451 response.status(),
452 response.content_length().unwrap_or(0)
453 );
454
455 Ok(response)
456}