1use crate::errors::{KodeBridgeError, Result};
2use bytes::{Bytes, BytesMut};
3use http::{HeaderMap, HeaderName, HeaderValue, Method, StatusCode, Version, header};
4use serde::{Serialize, de::DeserializeOwned};
5use serde_json::Value;
6use std::collections::HashMap;
7use std::str::FromStr;
8use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
9use tokio::io::{AsyncRead, AsyncWrite};
10use tracing::{debug, trace};
11
12#[derive(Debug, Clone)]
14pub struct Response {
15 status: StatusCode,
16 version: Version,
17 headers: HeaderMap,
18 body: Bytes,
19}
20
21impl Response {
22 pub fn new(status: StatusCode, version: Version, headers: HeaderMap, body: Bytes) -> Self {
23 Self {
24 status,
25 version,
26 headers,
27 body,
28 }
29 }
30
31 pub fn status(&self) -> StatusCode {
33 self.status
34 }
35
36 pub fn status_code(&self) -> u16 {
38 self.status.as_u16()
39 }
40
41 pub fn version(&self) -> Version {
43 self.version
44 }
45
46 pub fn headers(&self) -> &HeaderMap {
48 &self.headers
49 }
50
51 pub fn body(&self) -> &Bytes {
53 &self.body
54 }
55
56 pub fn text(&self) -> Result<String> {
58 String::from_utf8(self.body.to_vec()).map_err(KodeBridgeError::from)
59 }
60
61 pub fn json<T>(&self) -> Result<T>
63 where
64 T: DeserializeOwned,
65 {
66 serde_json::from_slice(&self.body).map_err(KodeBridgeError::from)
67 }
68
69 pub fn json_value(&self) -> Result<Value> {
71 serde_json::from_slice(&self.body).map_err(KodeBridgeError::from)
72 }
73
74 pub fn is_success(&self) -> bool {
76 self.status.is_success()
77 }
78
79 pub fn is_client_error(&self) -> bool {
81 self.status.is_client_error()
82 }
83
84 pub fn is_server_error(&self) -> bool {
86 self.status.is_server_error()
87 }
88
89 pub fn is_redirection(&self) -> bool {
91 self.status.is_redirection()
92 }
93
94 pub fn content_length(&self) -> Option<u64> {
96 self.headers
97 .get(header::CONTENT_LENGTH)?
98 .to_str()
99 .ok()?
100 .parse()
101 .ok()
102 }
103
104 pub fn content_type(&self) -> Option<&str> {
106 self.headers.get(header::CONTENT_TYPE)?.to_str().ok()
107 }
108
109 pub fn to_legacy(&self) -> crate::response::LegacyResponse {
111 let headers_map: HashMap<String, String> = self
112 .headers
113 .iter()
114 .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
115 .collect();
116
117 crate::response::LegacyResponse {
118 status: self.status.as_u16(),
119 headers: serde_json::to_value(headers_map).unwrap_or(Value::Null),
120 body: String::from_utf8_lossy(&self.body).to_string(),
121 }
122 }
123}
124
125#[derive(Debug)]
127pub struct RequestBuilder {
128 method: Method,
129 uri: String,
130 headers: HeaderMap,
131 body: Option<Bytes>,
132}
133
134impl RequestBuilder {
135 pub fn new(method: Method, uri: String) -> Self {
136 let mut headers = HeaderMap::new();
137 headers.insert(header::HOST, HeaderValue::from_static("localhost"));
138 headers.insert(
139 header::USER_AGENT,
140 HeaderValue::from_static("kode-bridge/0.1"),
141 );
142
143 Self {
144 method,
145 uri,
146 headers,
147 body: None,
148 }
149 }
150
151 #[allow(dead_code)]
153 pub fn header<K, V>(mut self, key: K, value: V) -> Result<Self>
154 where
155 HeaderName: TryFrom<K>,
156 <HeaderName as TryFrom<K>>::Error: Into<http::Error>,
157 HeaderValue: TryFrom<V>,
158 <HeaderValue as TryFrom<V>>::Error: Into<http::Error>,
159 {
160 let key = HeaderName::try_from(key).map_err(|e| KodeBridgeError::Http(e.into()))?;
161 let value = HeaderValue::try_from(value).map_err(|e| KodeBridgeError::Http(e.into()))?;
162 self.headers.insert(key, value);
163 Ok(self)
164 }
165
166 pub fn json<T>(mut self, body: &T) -> Result<Self>
168 where
169 T: Serialize,
170 {
171 let json_bytes = serde_json::to_vec(body)?;
172 self.headers.insert(
173 header::CONTENT_TYPE,
174 HeaderValue::from_static("application/json"),
175 );
176 self.headers.insert(
177 header::CONTENT_LENGTH,
178 HeaderValue::from_str(&json_bytes.len().to_string())
179 .map_err(|e| KodeBridgeError::Http(e.into()))?,
180 );
181 self.body = Some(Bytes::from(json_bytes));
182 Ok(self)
183 }
184
185 #[allow(dead_code)]
187 pub fn text<T: AsRef<str>>(mut self, body: T) -> Result<Self> {
188 let text_bytes = body.as_ref().as_bytes().to_vec();
189 self.headers.insert(
190 header::CONTENT_TYPE,
191 HeaderValue::from_static("text/plain; charset=utf-8"),
192 );
193 self.headers.insert(
194 header::CONTENT_LENGTH,
195 HeaderValue::from_str(&text_bytes.len().to_string())
196 .map_err(|e| KodeBridgeError::Http(e.into()))?,
197 );
198 self.body = Some(Bytes::from(text_bytes));
199 Ok(self)
200 }
201
202 #[allow(dead_code)]
204 pub fn bytes(mut self, body: Bytes) -> Result<Self> {
205 self.headers.insert(
206 header::CONTENT_TYPE,
207 HeaderValue::from_static("application/octet-stream"),
208 );
209 self.headers.insert(
210 header::CONTENT_LENGTH,
211 HeaderValue::from_str(&body.len().to_string())
212 .map_err(|e| KodeBridgeError::Http(e.into()))?,
213 );
214 self.body = Some(body);
215 Ok(self)
216 }
217
218 pub fn build(self) -> Result<Bytes> {
220 let mut request = BytesMut::new();
221
222 let request_line = format!("{} {} HTTP/1.1\r\n", self.method, self.uri);
224 request.extend_from_slice(request_line.as_bytes());
225
226 for (key, value) in &self.headers {
228 let header_line = format!("{}: {}\r\n", key, value.to_str().unwrap_or(""));
229 request.extend_from_slice(header_line.as_bytes());
230 }
231
232 request.extend_from_slice(b"\r\n");
234
235 if let Some(body) = self.body {
237 request.extend_from_slice(&body);
238 }
239
240 Ok(request.freeze())
241 }
242}
243
244pub async fn parse_response<S>(stream: S) -> Result<Response>
246where
247 S: AsyncRead + Unpin,
248{
249 let mut reader = BufReader::new(stream);
250 let mut buffer = Vec::new();
251
252 let mut headers_end = None;
254 loop {
255 let mut line = Vec::new();
256 let n = reader.read_until(b'\n', &mut line).await?;
257 if n == 0 {
258 return Err(KodeBridgeError::protocol("Unexpected end of stream"));
259 }
260
261 buffer.extend_from_slice(&line);
262
263 if buffer.len() >= 4 {
265 for i in 0..buffer.len() - 3 {
266 if &buffer[i..i + 4] == b"\r\n\r\n" {
267 headers_end = Some(i + 4);
268 break;
269 }
270 }
271 }
272
273 if headers_end.is_some() {
274 break;
275 }
276 }
277
278 let headers_end = headers_end
279 .ok_or_else(|| KodeBridgeError::protocol("Could not find end of HTTP headers"))?;
280
281 let mut headers = [httparse::EMPTY_HEADER; 64];
283 let mut response = httparse::Response::new(&mut headers);
284
285 let status = match response.parse(&buffer[..headers_end])? {
286 httparse::Status::Complete(_) => response.code.unwrap(),
287 httparse::Status::Partial => {
288 return Err(KodeBridgeError::protocol("Incomplete HTTP response"));
289 }
290 };
291
292 let mut header_map = HeaderMap::new();
294 for header in response.headers {
295 let name =
296 HeaderName::from_str(header.name).map_err(|e| KodeBridgeError::Http(e.into()))?;
297 let value =
298 HeaderValue::from_bytes(header.value).map_err(|e| KodeBridgeError::Http(e.into()))?;
299 header_map.insert(name, value);
300 }
301
302 let content_length = header_map
304 .get(header::CONTENT_LENGTH)
305 .and_then(|v| v.to_str().ok())
306 .and_then(|s| s.parse::<usize>().ok());
307
308 let is_chunked = header_map
309 .get(header::TRANSFER_ENCODING)
310 .and_then(|v| v.to_str().ok())
311 .map(|s| s.eq_ignore_ascii_case("chunked"))
312 .unwrap_or(false);
313
314 let body = if is_chunked {
316 read_chunked_body(&mut reader).await?
317 } else if let Some(len) = content_length {
318 read_fixed_body(&mut reader, len).await?
319 } else {
320 let mut body = Vec::new();
322 reader.read_to_end(&mut body).await?;
323 Bytes::from(body)
324 };
325
326 Ok(Response::new(
327 StatusCode::from_u16(status)?,
328 Version::HTTP_11,
329 header_map,
330 body,
331 ))
332}
333
334async fn read_chunked_body<R>(reader: &mut BufReader<R>) -> Result<Bytes>
335where
336 R: AsyncRead + Unpin,
337{
338 let mut body = BytesMut::new();
339
340 loop {
341 let mut size_line = String::new();
343 reader.read_line(&mut size_line).await?;
344
345 let size_line = size_line.trim();
346 if size_line.is_empty() {
347 continue;
348 }
349
350 let chunk_size = usize::from_str_radix(size_line, 16)
352 .map_err(|_| KodeBridgeError::protocol("Invalid chunk size"))?;
353
354 if chunk_size == 0 {
355 let mut final_line = String::new();
357 reader.read_line(&mut final_line).await?;
358 break;
359 }
360
361 let mut chunk = vec![0u8; chunk_size];
363 reader.read_exact(&mut chunk).await?;
364 body.extend_from_slice(&chunk);
365
366 let mut crlf = [0u8; 2];
368 reader.read_exact(&mut crlf).await?;
369 }
370
371 Ok(body.freeze())
372}
373
374async fn read_fixed_body<R>(reader: &mut BufReader<R>, len: usize) -> Result<Bytes>
375where
376 R: AsyncRead + Unpin,
377{
378 let mut body = vec![0u8; len];
379 reader.read_exact(&mut body).await?;
380 Ok(Bytes::from(body))
381}
382
383pub async fn send_request<S>(mut stream: S, request: Bytes) -> Result<Response>
385where
386 S: AsyncRead + AsyncWrite + Unpin,
387{
388 stream.write_all(&request).await?;
390 stream.flush().await?;
391
392 trace!("Sent HTTP request ({} bytes)", request.len());
393
394 let response = parse_response(stream).await?;
396
397 debug!(
398 "Received HTTP response: {} {}",
399 response.status(),
400 response.content_length().unwrap_or(0)
401 );
402
403 Ok(response)
404}
405
406#[allow(dead_code)]
408pub async fn http_request<S>(
409 stream: S,
410 method: &str,
411 path: &str,
412 body: Option<&Value>,
413) -> Result<Response>
414where
415 S: AsyncRead + AsyncWrite + Unpin,
416{
417 let method = Method::from_str(method).map_err(|e| KodeBridgeError::Http(e.into()))?;
418
419 let mut builder = RequestBuilder::new(method, path.to_string());
420
421 if let Some(json_body) = body {
422 builder = builder.json(json_body)?;
423 }
424
425 let request = builder.build()?;
426 send_request(stream, request).await
427}