Skip to main content

ndjson_rpc/
http_client.rs

1//! HTTP-over-TCP management client. Mirrors [`crate::UnixMgmtClient`]
2//! but talks to [`crate::http_server`] over `hyper::client::conn::http1`.
3//!
4//! One TCP connection per call: the management API is verb-at-a-time
5//! and not chatty enough to amortize a connection pool. Each call opens
6//! a fresh TCP stream, runs the H1 handshake, sends a single POST,
7//! consumes either a one-shot JSON body or a chunked NDJSON stream,
8//! then drops the connection.
9
10use std::net::SocketAddr;
11use std::sync::Arc;
12
13use bytes::Bytes;
14use http_body_util::{BodyExt, Full};
15use hyper::body::Incoming;
16use hyper::header::{AUTHORIZATION, CONTENT_TYPE, HOST};
17use hyper::{Method, StatusCode};
18use hyper_util::rt::TokioIo;
19use tokio::net::TcpStream;
20
21use crate::client::MgmtClientError;
22use crate::protocol::{Request, Response, ResponseOutcome, WireError, WireErrorKind};
23
24/// Plaintext HTTP/1.1 mgmt client. Cheap to clone — `addr` is `Copy`,
25/// `token` is reference-counted.
26#[derive(Clone, Debug)]
27pub struct HttpMgmtClient {
28	addr: SocketAddr,
29	token: Option<Arc<str>>,
30}
31
32impl HttpMgmtClient {
33	/// Build a client targeting the given HTTP endpoint. The `token`
34	/// argument matches the server's `bearer_token` setting: pass
35	/// `None` only when the server is configured for anonymous access.
36	#[must_use]
37	pub fn new(addr: SocketAddr, token: Option<Arc<str>>) -> Self {
38		Self { addr, token }
39	}
40
41	/// One-shot verb call. Mirrors [`crate::UnixMgmtClient::call`].
42	///
43	/// # Errors
44	/// I/O failure ([`MgmtClientError::Io`]); a non-200 HTTP response
45	/// ([`MgmtClientError::Http`] — `401` for missing / wrong token,
46	/// `400` / `404` / `405` / `413` for malformed requests); a
47	/// structured server-side error frame ([`MgmtClientError::Server`]);
48	/// or a JSON shape mismatch ([`MgmtClientError::Decode`]).
49	pub async fn call<A, R>(&self, verb: &str, args: &A) -> Result<R, MgmtClientError>
50	where
51		A: serde::Serialize,
52		R: for<'de> serde::Deserialize<'de>,
53	{
54		let req = Request {
55			id: 1,
56			verb: verb.to_string(),
57			args: serde_json::to_value(args).map_err(MgmtClientError::Encode)?,
58		};
59		let body_bytes = Bytes::from(serde_json::to_vec(&req).map_err(MgmtClientError::Encode)?);
60		let resp = self.send(body_bytes).await?;
61		let status = resp.status();
62		let resp_body = resp
63			.into_body()
64			.collect()
65			.await
66			.map_err(|e| MgmtClientError::Io(std::io::Error::other(e.to_string())))?
67			.to_bytes();
68		if status != StatusCode::OK {
69			let body = String::from_utf8_lossy(&resp_body).into_owned();
70			return Err(MgmtClientError::Http { status: status.as_u16(), body });
71		}
72		let response: Response = serde_json::from_slice(&resp_body).map_err(MgmtClientError::Decode)?;
73		match response.outcome {
74			ResponseOutcome::Result { result } => {
75				serde_json::from_value(result).map_err(MgmtClientError::Decode)
76			}
77			ResponseOutcome::Error { error } => Err(MgmtClientError::Server(error)),
78			ResponseOutcome::Event { .. } | ResponseOutcome::End { .. } => {
79				Err(MgmtClientError::Server(WireError {
80					kind: WireErrorKind::Internal,
81					message: "received streaming frame on one-shot call".to_string(),
82				}))
83			}
84		}
85	}
86
87	/// Streaming verb call. Invokes `on_event` for each `Event` frame,
88	/// returns when the server emits `End`, the connection drops, or
89	/// the server emits `Error`.
90	///
91	/// # Errors
92	/// Same shape as [`Self::call`], plus the streaming-specific case
93	/// where the server emits a `Result` frame on what should be a
94	/// streaming verb.
95	pub async fn stream<A, F>(
96		&self,
97		verb: &str,
98		args: &A,
99		mut on_event: F,
100	) -> Result<(), MgmtClientError>
101	where
102		A: serde::Serialize,
103		F: FnMut(serde_json::Value),
104	{
105		let req = Request {
106			id: 1,
107			verb: verb.to_string(),
108			args: serde_json::to_value(args).map_err(MgmtClientError::Encode)?,
109		};
110		let body_bytes = Bytes::from(serde_json::to_vec(&req).map_err(MgmtClientError::Encode)?);
111		let resp = self.send(body_bytes).await?;
112		let status = resp.status();
113		if status != StatusCode::OK {
114			let body = resp
115				.into_body()
116				.collect()
117				.await
118				.map_err(|e| MgmtClientError::Io(std::io::Error::other(e.to_string())))?
119				.to_bytes();
120			let body = String::from_utf8_lossy(&body).into_owned();
121			return Err(MgmtClientError::Http { status: status.as_u16(), body });
122		}
123		// Drain the chunked body into a line accumulator and dispatch
124		// each complete `\n`-delimited Response frame as it lands.
125		let mut body = resp.into_body();
126		let mut buf: Vec<u8> = Vec::with_capacity(4096);
127		loop {
128			let frame = match body.frame().await {
129				Some(Ok(f)) => f,
130				Some(Err(e)) => {
131					return Err(MgmtClientError::Io(std::io::Error::other(e.to_string())));
132				}
133				None => break,
134			};
135			let Ok(data) = frame.into_data() else {
136				// Trailers / non-data frame — ignore for the NDJSON contract.
137				continue;
138			};
139			buf.extend_from_slice(&data);
140			while let Some(idx) = buf.iter().position(|b| *b == b'\n') {
141				let line: Vec<u8> = buf.drain(..=idx).collect();
142				let line = &line[..line.len() - 1]; // strip trailing '\n'
143				if line.is_empty() {
144					continue;
145				}
146				let response: Response = serde_json::from_slice(line).map_err(MgmtClientError::Decode)?;
147				match response.outcome {
148					ResponseOutcome::Event { event } => on_event(event),
149					ResponseOutcome::End { .. } => return Ok(()),
150					ResponseOutcome::Error { error } => return Err(MgmtClientError::Server(error)),
151					ResponseOutcome::Result { .. } => {
152						return Err(MgmtClientError::Server(WireError {
153							kind: WireErrorKind::Internal,
154							message: "received one-shot Result on streaming call".to_string(),
155						}));
156					}
157				}
158			}
159		}
160		// Server closed mid-stream without an End frame — treat as
161		// graceful EOF, mirroring `UnixMgmtClient::call_stream`.
162		Ok(())
163	}
164
165	/// Open a fresh TCP connection, run the H1 handshake, send the
166	/// POST, and return the response head + an in-flight body. The
167	/// caller drains the body before going out of scope; the spawned
168	/// driver task ends when the connection closes.
169	async fn send(&self, body: Bytes) -> Result<hyper::Response<Incoming>, MgmtClientError> {
170		let stream = TcpStream::connect(self.addr).await?;
171		let io = TokioIo::new(stream);
172		let (mut sender, conn) = hyper::client::conn::http1::handshake(io)
173			.await
174			.map_err(|e| MgmtClientError::Io(std::io::Error::other(e.to_string())))?;
175		// Drive the connection to completion in the background. The
176		// task ends when `sender` is dropped (which happens after this
177		// function returns and the caller drops the response body),
178		// or when the server closes the connection.
179		tokio::spawn(async move {
180			if let Err(e) = conn.await {
181				tracing::debug!(?e, "mgmt http client connection ended");
182			}
183		});
184
185		let mut builder = http::Request::builder()
186			.method(Method::POST)
187			.uri("/")
188			.header(HOST, self.addr.to_string())
189			.header(CONTENT_TYPE, "application/json");
190		if let Some(token) = &self.token {
191			builder = builder.header(AUTHORIZATION, format!("Bearer {token}"));
192		}
193		let http_req = builder
194			.body(Full::new(body))
195			.map_err(|e| MgmtClientError::Io(std::io::Error::other(e.to_string())))?;
196		sender
197			.send_request(http_req)
198			.await
199			.map_err(|e| MgmtClientError::Io(std::io::Error::other(e.to_string())))
200	}
201}
202
203#[cfg(test)]
204mod tests {
205	use super::*;
206	use crate::protocol::{EndMarker, ResponseOutcome};
207
208	/// Verify the line-accumulator splits a single byte buffer of
209	/// concatenated NDJSON frames into individual `Response` decodes.
210	/// This covers the streaming hot path without needing a real server.
211	#[test]
212	fn ndjson_line_accumulator_splits_frames() {
213		let frames = vec![
214			Response { id: 1, outcome: ResponseOutcome::Event { event: serde_json::json!({"i": 1}) } },
215			Response { id: 1, outcome: ResponseOutcome::Event { event: serde_json::json!({"i": 2}) } },
216			Response { id: 1, outcome: ResponseOutcome::End { end: EndMarker::default() } },
217		];
218		let mut wire: Vec<u8> = Vec::new();
219		for f in &frames {
220			wire.extend(serde_json::to_vec(f).unwrap());
221			wire.push(b'\n');
222		}
223		// Simulate the drain loop from `stream` against the wire bytes.
224		let mut buf = wire;
225		let mut decoded: Vec<Response> = Vec::new();
226		while let Some(idx) = buf.iter().position(|b| *b == b'\n') {
227			let line: Vec<u8> = buf.drain(..=idx).collect();
228			let body = &line[..line.len() - 1];
229			let r: Response = serde_json::from_slice(body).unwrap();
230			decoded.push(r);
231		}
232		assert_eq!(decoded.len(), 3);
233		assert!(matches!(decoded[0].outcome, ResponseOutcome::Event { .. }));
234		assert!(matches!(decoded[2].outcome, ResponseOutcome::End { .. }));
235	}
236
237	#[test]
238	fn ndjson_line_accumulator_buffers_partial_frame_until_newline() {
239		// Split one Response across two byte chunks at an arbitrary
240		// internal offset to confirm the accumulator stitches them.
241		let frame =
242			Response { id: 7, outcome: ResponseOutcome::Result { result: serde_json::json!(42) } };
243		let mut wire = serde_json::to_vec(&frame).unwrap();
244		wire.push(b'\n');
245		let (a, b) = wire.split_at(5);
246		let mut buf: Vec<u8> = Vec::new();
247		buf.extend_from_slice(a);
248		assert!(!buf.contains(&b'\n'), "no complete frame yet");
249		buf.extend_from_slice(b);
250		let idx = buf.iter().position(|x| *x == b'\n').unwrap();
251		let line: Vec<u8> = buf.drain(..=idx).collect();
252		let body = &line[..line.len() - 1];
253		let r: Response = serde_json::from_slice(body).unwrap();
254		assert_eq!(r.id, 7);
255	}
256}