kode_bridge/
http_client.rs1use crate::errors::{KodeBridgeError, Result};
2use crate::parser_cache::global_parser_cache;
3use bytes::{BufMut as _, Bytes, BytesMut};
4use http::{header, HeaderMap, HeaderName, HeaderValue, Method, StatusCode, Version};
5use serde::{de::DeserializeOwned, Serialize};
6use serde_json::Value;
7use std::collections::HashMap;
8use std::str::FromStr as _;
9use std::time::Duration;
10use tokio::io::{AsyncBufReadExt as _, AsyncReadExt as _, AsyncWriteExt as _, BufReader};
11use tokio::io::{AsyncRead, AsyncWrite};
12use tracing::{debug, trace};
13
14#[derive(Debug, Clone)]
16pub struct Response {
17 status: StatusCode,
18 version: Version,
19 headers: HeaderMap,
20 body: Bytes,
21}
22
23impl Response {
24 pub const fn new(status: StatusCode, version: Version, headers: HeaderMap, body: Bytes) -> Self {
25 Self {
26 status,
27 version,
28 headers,
29 body,
30 }
31 }
32
33 pub const fn status(&self) -> StatusCode {
35 self.status
36 }
37
38 pub const fn status_code(&self) -> u16 {
40 self.status.as_u16()
41 }
42
43 pub const fn version(&self) -> Version {
45 self.version
46 }
47
48 pub const fn headers(&self) -> &HeaderMap {
50 &self.headers
51 }
52
53 pub const fn body(&self) -> &Bytes {
55 &self.body
56 }
57
58 pub fn text(&self) -> Result<String> {
60 String::from_utf8(self.body.to_vec()).map_err(KodeBridgeError::from)
61 }
62
63 pub fn json<T>(&self) -> Result<T>
65 where
66 T: DeserializeOwned,
67 {
68 serde_json::from_slice(&self.body).map_err(KodeBridgeError::from)
69 }
70
71 pub fn json_value(&self) -> Result<Value> {
73 serde_json::from_slice(&self.body).map_err(KodeBridgeError::from)
74 }
75
76 pub fn is_success(&self) -> bool {
78 self.status.is_success()
79 }
80
81 pub fn is_client_error(&self) -> bool {
83 self.status.is_client_error()
84 }
85
86 pub fn is_server_error(&self) -> bool {
88 self.status.is_server_error()
89 }
90
91 pub fn is_redirection(&self) -> bool {
93 self.status.is_redirection()
94 }
95
96 pub fn content_length(&self) -> Option<u64> {
98 self.headers
99 .get(header::CONTENT_LENGTH)?
100 .to_str()
101 .ok()?
102 .parse()
103 .ok()
104 }
105
106 pub fn content_type(&self) -> Option<&str> {
108 self.headers.get(header::CONTENT_TYPE)?.to_str().ok()
109 }
110
111 pub fn to_legacy(&self) -> crate::response::LegacyResponse {
113 let headers_map: HashMap<String, String> = self
114 .headers
115 .iter()
116 .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
117 .collect();
118
119 crate::response::LegacyResponse {
120 status: self.status.as_u16(),
121 headers: serde_json::to_value(headers_map).unwrap_or(Value::Null),
122 body: String::from_utf8_lossy(&self.body).to_string(),
123 }
124 }
125}
126
127#[derive(Debug)]
129pub struct RequestBuilder {
130 method: Method,
131 uri: String,
132 headers: HeaderMap,
133 body: Option<Bytes>,
134}
135
136impl RequestBuilder {
137 pub fn new(method: Method, uri: String) -> Self {
138 let mut headers = HeaderMap::new();
139 headers.insert(header::HOST, HeaderValue::from_static("localhost"));
140 headers.insert(header::USER_AGENT, HeaderValue::from_static("kode-bridge/0.1"));
141
142 Self {
143 method,
144 uri,
145 headers,
146 body: None,
147 }
148 }
149
150 pub fn header(mut self, key: &str, value: &str) -> Self {
152 if let (Ok(name), Ok(val)) = (HeaderName::from_bytes(key.as_bytes()), HeaderValue::from_str(value)) {
153 self.headers.insert(name, val);
154 }
155 self
156 }
157
158 pub fn body_bytes(mut self, body: Bytes, content_type: &'static str) -> Result<Self> {
160 self.headers
161 .insert(header::CONTENT_TYPE, HeaderValue::from_static(content_type));
162
163 let content_length = body.len().to_string();
164 self.headers.insert(
165 header::CONTENT_LENGTH,
166 HeaderValue::from_str(&content_length).map_err(|e| KodeBridgeError::Http(e.into()))?,
167 );
168
169 self.body = Some(body);
170 Ok(self)
171 }
172
173 pub fn json<T>(self, body: &T) -> Result<Self>
175 where
176 T: Serialize,
177 {
178 let mut buffer = BytesMut::with_capacity(1024);
179 {
180 let writer = (&mut buffer).writer();
181 serde_json::to_writer(writer, body).map_err(KodeBridgeError::from)?;
182 }
183
184 self.body_bytes(buffer.freeze(), "application/json")
185 }
186
187 pub fn build(self) -> Result<Bytes> {
189 let mut request = BytesMut::with_capacity(1024 + self.body.as_ref().map(|b| b.len()).unwrap_or(0));
190
191 request.extend_from_slice(self.method.as_str().as_bytes());
193 request.extend_from_slice(b" ");
194 request.extend_from_slice(self.uri.as_bytes());
195 request.extend_from_slice(b" HTTP/1.1\r\n");
196
197 let mut headers_buffer = BytesMut::with_capacity(512);
199 for (key, value) in &self.headers {
200 headers_buffer.extend_from_slice(key.as_str().as_bytes());
201 headers_buffer.extend_from_slice(b": ");
202 headers_buffer.extend_from_slice(value.as_bytes());
203 headers_buffer.extend_from_slice(b"\r\n");
204 }
205 request.extend_from_slice(&headers_buffer);
206
207 request.extend_from_slice(b"\r\n");
209
210 if let Some(body) = self.body {
212 request.extend_from_slice(&body);
213 }
214
215 Ok(request.freeze())
216 }
217}
218
219pub async fn parse_response<S>(stream: S) -> Result<Response>
221where
222 S: AsyncRead + Unpin + Send,
223{
224 let mut reader = BufReader::new(stream);
225
226 let mut headers_buffer = BytesMut::with_capacity(1024);
228
229 let mut status_line = String::new();
231 reader.read_line(&mut status_line).await?;
232 headers_buffer.extend_from_slice(status_line.as_bytes());
233
234 let mut line = String::new();
236 loop {
237 line.clear();
238 let bytes_read = reader.read_line(&mut line).await?;
239 if bytes_read == 0 {
240 break;
241 }
242
243 if line == "\r\n" || line == "\n" {
245 break;
246 }
247
248 headers_buffer.extend_from_slice(line.as_bytes());
250 }
251
252 headers_buffer.extend_from_slice(b"\r\n");
254
255 let mut parser = global_parser_cache().get();
257 let (status, parsed_headers) = parser
258 .parse_response(&headers_buffer)
259 .map_err(|e| match e {
260 httparse::Error::TooManyHeaders => {
261 KodeBridgeError::protocol("Too many HTTP headers in response (limit: 64)")
262 }
263 _ => KodeBridgeError::protocol(format!("Failed to parse HTTP response: {:?}", e)),
264 })?;
265
266 let mut header_map = HeaderMap::new();
268 for (name, value) in parsed_headers {
269 let header_name = HeaderName::from_str(&name).map_err(|e| KodeBridgeError::Http(e.into()))?;
270 let header_value = HeaderValue::from_str(&value).map_err(|e| KodeBridgeError::Http(e.into()))?;
271 header_map.insert(header_name, header_value);
272 }
273
274 let content_length = header_map
276 .get(header::CONTENT_LENGTH)
277 .and_then(|v| v.to_str().ok())
278 .and_then(|s| s.parse::<usize>().ok());
279
280 let is_chunked = header_map
281 .get(header::TRANSFER_ENCODING)
282 .and_then(|v| v.to_str().ok())
283 .map(|s| s.eq_ignore_ascii_case("chunked"))
284 .unwrap_or(false);
285
286 let body = if is_chunked {
288 read_chunked_body(&mut reader).await?
289 } else if let Some(len) = content_length {
290 if len == 0 {
291 Bytes::new()
293 } else if len > 10 * 1024 * 1024 {
294 return Err(KodeBridgeError::protocol("Response body too large for memory"));
296 } else {
297 read_fixed_body(&mut reader, len).await?
298 }
299 } else {
300 read_until_end_adaptive(&mut reader).await?
302 };
303
304 Ok(Response::new(
305 StatusCode::from_u16(status)?,
306 Version::HTTP_11,
307 header_map,
308 body,
309 ))
310}
311
312async fn read_chunked_body<R>(reader: &mut BufReader<R>) -> Result<Bytes>
313where
314 R: AsyncRead + Unpin + Send,
315{
316 let mut body_buffer = BytesMut::with_capacity(8192);
317
318 loop {
319 let mut size_line = String::new();
321 reader.read_line(&mut size_line).await?;
322
323 let size_line = size_line.trim();
324 if size_line.is_empty() {
325 continue;
326 }
327
328 let chunk_size =
330 usize::from_str_radix(size_line, 16).map_err(|_| KodeBridgeError::protocol("Invalid chunk size"))?;
331
332 if chunk_size == 0 {
333 let mut final_line = String::new();
335 reader.read_line(&mut final_line).await?;
336 break;
337 }
338
339 let mut chunk = vec![0u8; chunk_size];
341 reader.read_exact(&mut chunk).await?;
342 body_buffer.extend_from_slice(&chunk);
343
344 let mut crlf = [0u8; 2];
346 reader.read_exact(&mut crlf).await?;
347 }
348
349 Ok(body_buffer.freeze())
350}
351
352async fn read_fixed_body<R>(reader: &mut BufReader<R>, len: usize) -> Result<Bytes>
353where
354 R: AsyncRead + Unpin + Send,
355{
356 let mut body = vec![0u8; len];
357 reader.read_exact(&mut body).await?;
358 Ok(Bytes::from(body))
359}
360
361async fn read_until_end_adaptive<R>(reader: &mut BufReader<R>) -> Result<Bytes>
362where
363 R: AsyncRead + Unpin + Send,
364{
365 let mut body_buffer = BytesMut::with_capacity(8192);
366 let mut read_buffer = [0u8; 512];
367 let mut consecutive_empty_reads = 0;
368
369 loop {
370 let timeout_duration = Duration::from_millis(25 + (consecutive_empty_reads * 25));
371
372 match tokio::time::timeout(timeout_duration, reader.read(&mut read_buffer)).await {
373 Ok(Ok(0)) => {
374 break;
376 }
377 Ok(Ok(n)) => {
378 body_buffer.extend_from_slice(&read_buffer[..n]);
379 consecutive_empty_reads = 0;
380 }
381 Ok(Err(e)) => {
382 return Err(KodeBridgeError::from(e));
383 }
384 Err(_) => {
385 consecutive_empty_reads += 1;
387 if consecutive_empty_reads >= 2 {
388 break;
390 }
391 continue;
392 }
393 }
394
395 if body_buffer.len() > 20 * 1024 * 1024 {
397 return Err(KodeBridgeError::protocol("Response body too large"));
398 }
399 }
400
401 Ok(body_buffer.freeze())
403}
404
405pub async fn send_request<S>(mut stream: S, request: Bytes) -> Result<Response>
407where
408 S: AsyncRead + AsyncWrite + Unpin + Send,
409{
410 stream.write_all(&request).await?;
412 stream.flush().await?;
413
414 trace!("Sent HTTP request ({} bytes)", request.len());
415
416 let response = parse_response(stream).await?;
418
419 debug!(
420 "Received HTTP response: {} {}",
421 response.status(),
422 response.content_length().unwrap_or(0)
423 );
424
425 Ok(response)
426}