ndjson_rpc/
http_client.rs1use std::net::SocketAddr;
11use std::sync::Arc;
12
13use bytes::Bytes;
14use http_body_util::{BodyExt, Full};
15use hyper::body::Incoming;
16use hyper::header::{AUTHORIZATION, CONTENT_TYPE, HOST};
17use hyper::{Method, StatusCode};
18use hyper_util::rt::TokioIo;
19use tokio::net::TcpStream;
20
21use crate::client::{CONNECT_TIMEOUT, MgmtClientError, ONESHOT_TIMEOUT};
22use crate::protocol::{Request, Response, ResponseOutcome, WireError, WireErrorKind};
23
24#[derive(Clone, Debug)]
27pub struct HttpMgmtClient {
28 addr: SocketAddr,
29 token: Option<Arc<str>>,
30}
31
32impl HttpMgmtClient {
33 #[must_use]
37 pub fn new(addr: SocketAddr, token: Option<Arc<str>>) -> Self {
38 Self { addr, token }
39 }
40
41 pub async fn call<A, R>(&self, verb: &str, args: &A) -> Result<R, MgmtClientError>
50 where
51 A: serde::Serialize,
52 R: for<'de> serde::Deserialize<'de>,
53 {
54 let req = Request {
55 id: 1,
56 verb: verb.to_string(),
57 args: serde_json::to_value(args).map_err(MgmtClientError::Encode)?,
58 };
59 let body_bytes = Bytes::from(serde_json::to_vec(&req).map_err(MgmtClientError::Encode)?);
60 let resp = self.send(body_bytes).await?;
61 let status = resp.status();
62 let resp_body = tokio::time::timeout(ONESHOT_TIMEOUT, resp.into_body().collect())
63 .await
64 .map_err(|_| MgmtClientError::Timeout("read"))?
65 .map_err(|e| MgmtClientError::Io(std::io::Error::other(e.to_string())))?
66 .to_bytes();
67 if status != StatusCode::OK {
68 let body = String::from_utf8_lossy(&resp_body).into_owned();
69 return Err(MgmtClientError::Http { status: status.as_u16(), body });
70 }
71 let response: Response = serde_json::from_slice(&resp_body).map_err(MgmtClientError::Decode)?;
72 match response.outcome {
73 ResponseOutcome::Result { result } => {
74 serde_json::from_value(result).map_err(MgmtClientError::Decode)
75 }
76 ResponseOutcome::Error { error } => Err(MgmtClientError::Server(error)),
77 ResponseOutcome::Event { .. } | ResponseOutcome::End { .. } => Err(MgmtClientError::Server(
78 WireError::new(WireErrorKind::Internal, "received streaming frame on one-shot call"),
79 )),
80 }
81 }
82
83 pub async fn stream<A, F>(
92 &self,
93 verb: &str,
94 args: &A,
95 mut on_event: F,
96 ) -> Result<(), MgmtClientError>
97 where
98 A: serde::Serialize,
99 F: FnMut(serde_json::Value),
100 {
101 let req = Request {
102 id: 1,
103 verb: verb.to_string(),
104 args: serde_json::to_value(args).map_err(MgmtClientError::Encode)?,
105 };
106 let body_bytes = Bytes::from(serde_json::to_vec(&req).map_err(MgmtClientError::Encode)?);
107 let resp = self.send(body_bytes).await?;
108 let status = resp.status();
109 if status != StatusCode::OK {
110 let body = resp
111 .into_body()
112 .collect()
113 .await
114 .map_err(|e| MgmtClientError::Io(std::io::Error::other(e.to_string())))?
115 .to_bytes();
116 let body = String::from_utf8_lossy(&body).into_owned();
117 return Err(MgmtClientError::Http { status: status.as_u16(), body });
118 }
119 let mut body = resp.into_body();
122 let mut buf: Vec<u8> = Vec::with_capacity(4096);
123 loop {
124 let frame = match body.frame().await {
125 Some(Ok(f)) => f,
126 Some(Err(e)) => {
127 return Err(MgmtClientError::Io(std::io::Error::other(e.to_string())));
128 }
129 None => break,
130 };
131 let Ok(data) = frame.into_data() else {
132 continue;
134 };
135 buf.extend_from_slice(&data);
136 while let Some(idx) = buf.iter().position(|b| *b == b'\n') {
137 let line: Vec<u8> = buf.drain(..=idx).collect();
138 let line = &line[..line.len() - 1]; if line.is_empty() {
140 continue;
141 }
142 let response: Response = serde_json::from_slice(line).map_err(MgmtClientError::Decode)?;
143 match response.outcome {
144 ResponseOutcome::Event { event } => on_event(event),
145 ResponseOutcome::End { .. } => return Ok(()),
146 ResponseOutcome::Error { error } => return Err(MgmtClientError::Server(error)),
147 ResponseOutcome::Result { .. } => {
148 return Err(MgmtClientError::Server(WireError::new(
149 WireErrorKind::Internal,
150 "received one-shot Result on streaming call",
151 )));
152 }
153 }
154 }
155 }
156 Ok(())
159 }
160
161 async fn send(&self, body: Bytes) -> Result<hyper::Response<Incoming>, MgmtClientError> {
166 let stream = tokio::time::timeout(CONNECT_TIMEOUT, TcpStream::connect(self.addr))
167 .await
168 .map_err(|_| MgmtClientError::Timeout("connect"))??;
169 let io = TokioIo::new(stream);
170 let (mut sender, conn) = hyper::client::conn::http1::handshake(io)
171 .await
172 .map_err(|e| MgmtClientError::Io(std::io::Error::other(e.to_string())))?;
173 tokio::spawn(async move {
178 if let Err(e) = conn.await {
179 tracing::debug!(?e, "mgmt http client connection ended");
180 }
181 });
182
183 let mut builder = http::Request::builder()
184 .method(Method::POST)
185 .uri("/")
186 .header(HOST, self.addr.to_string())
187 .header(CONTENT_TYPE, "application/json");
188 if let Some(token) = &self.token {
189 builder = builder.header(AUTHORIZATION, format!("Bearer {token}"));
190 }
191 let http_req = builder
192 .body(Full::new(body))
193 .map_err(|e| MgmtClientError::Io(std::io::Error::other(e.to_string())))?;
194 sender
195 .send_request(http_req)
196 .await
197 .map_err(|e| MgmtClientError::Io(std::io::Error::other(e.to_string())))
198 }
199}
200
201#[cfg(test)]
202mod tests {
203 use super::*;
204 use crate::protocol::{EndMarker, ResponseOutcome};
205
206 #[test]
210 fn ndjson_line_accumulator_splits_frames() {
211 let frames = vec![
212 Response { id: 1, outcome: ResponseOutcome::Event { event: serde_json::json!({"i": 1}) } },
213 Response { id: 1, outcome: ResponseOutcome::Event { event: serde_json::json!({"i": 2}) } },
214 Response { id: 1, outcome: ResponseOutcome::End { end: EndMarker::default() } },
215 ];
216 let mut wire: Vec<u8> = Vec::new();
217 for f in &frames {
218 wire.extend(serde_json::to_vec(f).unwrap());
219 wire.push(b'\n');
220 }
221 let mut buf = wire;
223 let mut decoded: Vec<Response> = Vec::new();
224 while let Some(idx) = buf.iter().position(|b| *b == b'\n') {
225 let line: Vec<u8> = buf.drain(..=idx).collect();
226 let body = &line[..line.len() - 1];
227 let r: Response = serde_json::from_slice(body).unwrap();
228 decoded.push(r);
229 }
230 assert_eq!(decoded.len(), 3);
231 assert!(matches!(decoded[0].outcome, ResponseOutcome::Event { .. }));
232 assert!(matches!(decoded[2].outcome, ResponseOutcome::End { .. }));
233 }
234
235 #[test]
236 fn ndjson_line_accumulator_buffers_partial_frame_until_newline() {
237 let frame =
240 Response { id: 7, outcome: ResponseOutcome::Result { result: serde_json::json!(42) } };
241 let mut wire = serde_json::to_vec(&frame).unwrap();
242 wire.push(b'\n');
243 let (a, b) = wire.split_at(5);
244 let mut buf: Vec<u8> = Vec::new();
245 buf.extend_from_slice(a);
246 assert!(!buf.contains(&b'\n'), "no complete frame yet");
247 buf.extend_from_slice(b);
248 let idx = buf.iter().position(|x| *x == b'\n').unwrap();
249 let line: Vec<u8> = buf.drain(..=idx).collect();
250 let body = &line[..line.len() - 1];
251 let r: Response = serde_json::from_slice(body).unwrap();
252 assert_eq!(r.id, 7);
253 }
254}