kode_bridge/ipc_http/
http_over_stream.rs

1use crate::errors::AnyResult;
2use crate::types::Response;
3use std::collections::HashMap;
4use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
5use tokio::io::{AsyncRead, AsyncWrite};
6
7pub async fn send_http_over_stream<S>(
8    mut stream: S,
9    method: &str,
10    path: &str,
11    body: Option<&serde_json::Value>,
12) -> AnyResult<Response>
13where
14    S: AsyncRead + AsyncWrite + Unpin,
15{
16    let body_bytes = if let Some(b) = body {
17        Some(serde_json::to_vec(b)?)
18    } else {
19        None
20    };
21
22    let mut request = format!(
23        "{} {} HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n",
24        method, path
25    );
26    if let Some(ref b) = body_bytes {
27        if body.is_some() {
28            request.push_str("Content-Type: application/json\r\n");
29        }
30        request.push_str(&format!("Content-Length: {}\r\n", b.len()));
31    }
32    request.push_str("\r\n");
33
34    if let Some(ref b) = body_bytes {
35        let mut buf = Vec::with_capacity(request.len() + b.len());
36        buf.extend_from_slice(request.as_bytes());
37        buf.extend_from_slice(b);
38        stream.write_all(&buf).await?;
39    } else {
40        stream.write_all(request.as_bytes()).await?;
41    }
42    stream.flush().await?;
43
44    let mut reader = BufReader::new(stream);
45    let mut status_line = String::new();
46    reader.read_line(&mut status_line).await?;
47    let status = status_line
48        .split_whitespace()
49        .nth(1)
50        .and_then(|s| s.parse::<u16>().ok())
51        .unwrap_or(0);
52
53    let mut headers_map = HashMap::new();
54    let mut content_length = None;
55    let mut is_chunked = false;
56    let mut line = String::with_capacity(128);
57    loop {
58        line.clear();
59        let n = reader.read_line(&mut line).await?;
60        if n == 0 || line == "\r\n" {
61            break;
62        }
63        if let Some((k, v)) = line.split_once(":") {
64            let key = k.trim().to_string();
65            let value = v.trim().to_string();
66            if key.eq_ignore_ascii_case("Content-Length") {
67                content_length = value.parse::<usize>().ok();
68            }
69            if key.eq_ignore_ascii_case("Transfer-Encoding") && value.eq_ignore_ascii_case("chunked") {
70                is_chunked = true;
71            }
72            headers_map.insert(key, value);
73        }
74    }
75    let headers = serde_json::to_value(headers_map)?;
76
77    let body_str = if is_chunked {
78        let mut body = Vec::new();
79        loop {
80            let mut size_line = String::new();
81            let n = reader.read_line(&mut size_line).await?;
82            if n == 0 {
83                break;
84            }
85            let size_line = size_line.trim();
86            if size_line.is_empty() {
87                continue;
88            }
89            let chunk_size = usize::from_str_radix(size_line, 16).unwrap_or(0);
90            if chunk_size == 0 {
91                // 读完最后一个 chunk 后还有一个空行
92                let _ = reader.read_line(&mut String::new()).await;
93                break;
94            }
95            let mut chunk = vec![0u8; chunk_size];
96            reader.read_exact(&mut chunk).await?;
97            body.extend_from_slice(&chunk);
98            // 读掉 chunk 结尾的 \r\n
99            let mut crlf = [0u8; 2];
100            reader.read_exact(&mut crlf).await?;
101        }
102        String::from_utf8(body)?
103    } else if let Some(len) = content_length {
104        let mut body = vec![0u8; len];
105        reader.read_exact(&mut body).await?;
106        String::from_utf8(body)?
107    } else {
108        let mut body = Vec::new();
109        reader.read_to_end(&mut body).await?;
110        String::from_utf8(body)?
111    };
112
113    Ok(Response {
114        status,
115        headers: headers,
116        body: body_str,
117    })
118}