1use std::{
21 fmt::{self, Debug},
22 pin::Pin,
23 str,
24 task::{Context, Poll},
25};
26
27#[cfg(feature = "http")]
28use std::str::FromStr;
29
30use bytes::{Bytes, BytesMut};
31use futures_core::stream::Stream;
32use tracing::debug;
33
34use crate::{
35 ClientError, ClientResult,
36 io::AsyncRead,
37 meta::{EndRequestRec, HEADER_LEN, Header, RequestType},
38};
39
40#[cfg(feature = "http")]
41use crate::{HttpConversionError, HttpConversionResult};
42
43#[derive(Default, Clone)]
48#[non_exhaustive]
49pub struct Response {
50 pub stdout: Option<Vec<u8>>,
52 pub stderr: Option<Vec<u8>>,
54}
55
56impl Debug for Response {
57 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
58 f.debug_struct("Response")
59 .field("stdout", &self.stdout.as_deref().map(str::from_utf8))
60 .field("stderr", &self.stderr.as_deref().map(str::from_utf8))
61 .finish()
62 }
63}
64
65#[cfg(feature = "http")]
66impl<B> TryFrom<::http::Response<B>> for Response
67where
68 B: AsRef<[u8]>,
69{
70 type Error = HttpConversionError;
71
72 fn try_from(response: ::http::Response<B>) -> Result<Self, Self::Error> {
73 let (parts, body) = response.into_parts();
74 let mut stdout = Vec::new();
75
76 if parts.status != ::http::StatusCode::OK {
77 stdout.extend_from_slice(format!("Status: {}\r\n", parts.status).as_bytes());
78 }
79
80 for (name, value) in &parts.headers {
81 stdout.extend_from_slice(name.as_str().as_bytes());
82 stdout.extend_from_slice(b": ");
83 stdout.extend_from_slice(value.as_bytes());
84 stdout.extend_from_slice(b"\r\n");
85 }
86
87 stdout.extend_from_slice(b"\r\n");
88 stdout.extend_from_slice(body.as_ref());
89
90 Ok(Response {
91 stdout: Some(stdout),
92 stderr: None,
93 })
94 }
95}
96
97#[cfg(feature = "http")]
98impl TryFrom<Response> for ::http::Response<Vec<u8>> {
99 type Error = HttpConversionError;
100
101 fn try_from(response: Response) -> Result<Self, Self::Error> {
102 let stdout = response.stdout.unwrap_or_default();
103 let (header_bytes, body_bytes) = split_header_body(&stdout)?;
104 let mut status = ::http::StatusCode::OK;
105 let mut builder = ::http::Response::builder();
106
107 {
108 let headers = builder
109 .headers_mut()
110 .expect("response builder should provide headers");
111 for line in header_bytes.split(|byte| *byte == b'\n') {
112 let line = trim_cr(line);
113 if line.is_empty() {
114 continue;
115 }
116 let Some((name, value)) = line.split_first_colon() else {
117 return Err(HttpConversionError::MalformedHttpResponse {
118 message: "response header line is missing ':'",
119 });
120 };
121
122 if name.eq_ignore_ascii_case(b"Status") {
123 status = parse_status_header(value)?;
124 continue;
125 }
126
127 headers.append(
128 ::http::header::HeaderName::from_bytes(name)?,
129 ::http::header::HeaderValue::from_bytes(trim_start_ascii(value))?,
130 );
131 }
132 }
133
134 builder = builder.status(status);
135 Ok(builder.body(body_bytes.to_vec())?)
136 }
137}
138
139pub enum Content {
144 Stdout(Bytes),
146 Stderr(Bytes),
148}
149
150pub struct ResponseStream<S: AsyncRead + Unpin> {
158 stream: S,
159 id: u16,
160 eof: bool,
161 header: Option<Header>,
162 buf: BytesMut,
163}
164
165impl<S: AsyncRead + Unpin> ResponseStream<S> {
166 #[inline]
173 pub(crate) fn new(stream: S, id: u16) -> Self {
174 Self {
175 stream,
176 id,
177 eof: false,
178 header: None,
179 buf: BytesMut::new(),
180 }
181 }
182
183 #[inline]
187 fn read_header(&mut self) -> Option<Header> {
188 if self.buf.len() < HEADER_LEN {
189 return None;
190 }
191 let buf = self.buf.split_to(HEADER_LEN);
192 let header = (&buf as &[u8]).try_into().expect("failed to read header");
193 Some(Header::new_from_buf(header))
194 }
195
196 #[inline]
200 fn read_content(&mut self) -> Option<Bytes> {
201 let header = self.header.as_ref().unwrap();
202 let block_length = header.content_length as usize + header.padding_length as usize;
203 if self.buf.len() < block_length {
204 return None;
205 }
206 let content = self.buf.split_to(header.content_length as usize);
207 let _ = self.buf.split_to(header.padding_length as usize);
208 self.header = None;
209 Some(content.freeze())
210 }
211
212 fn process_message(&mut self) -> Result<Option<Content>, ClientError> {
217 if self.buf.is_empty() {
218 return Ok(None);
219 }
220 if self.header.is_none() {
221 match self.read_header() {
222 Some(header) => self.header = Some(header),
223 None => return Ok(None),
224 }
225 }
226 let header = self.header.as_ref().unwrap();
227 match header.r#type.clone() {
228 RequestType::Stdout => {
229 if let Some(data) = self.read_content() {
230 return Ok(Some(Content::Stdout(data)));
231 }
232 }
233 RequestType::Stderr => {
234 if let Some(data) = self.read_content() {
235 return Ok(Some(Content::Stderr(data)));
236 }
237 }
238 RequestType::EndRequest => {
239 let header = header.clone();
240 let Some(data) = self.read_content() else {
241 return Ok(None);
242 };
243
244 let end = EndRequestRec::new_from_buf(header, &data);
245 debug!(id = self.id, ?end, "Receive from stream.");
246
247 self.eof = true;
248 end.end_request
249 .protocol_status
250 .convert_to_client_result(end.end_request.app_status)?;
251 return Ok(None);
252 }
253 r#type => {
254 self.eof = true;
255 return Err(ClientError::UnknownRequestType {
256 request_type: r#type,
257 });
258 }
259 }
260 Ok(None)
261 }
262
263 fn poll_fill_buf(&mut self, cx: &mut Context<'_>) -> Poll<ClientResult<Option<()>>> {
264 let mut chunk = [0; 8192];
265 match Pin::new(&mut self.stream).poll_read(cx, &mut chunk) {
266 Poll::Ready(Ok(0)) => Poll::Ready(Ok(None)),
267 Poll::Ready(Ok(read)) => {
268 self.buf.extend_from_slice(&chunk[..read]);
269 Poll::Ready(Ok(Some(())))
270 }
271 Poll::Ready(Err(err)) => Poll::Ready(Err(err.into())),
272 Poll::Pending => Poll::Pending,
273 }
274 }
275}
276
277impl<S> Stream for ResponseStream<S>
278where
279 S: AsyncRead + Unpin,
280{
281 type Item = ClientResult<Content>;
282
283 fn poll_next(
284 mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>,
285 ) -> Poll<Option<Self::Item>> {
286 let mut pending = false;
287 loop {
288 match self.poll_fill_buf(cx) {
289 Poll::Ready(Ok(Some(()))) => match self.process_message() {
290 Ok(Some(data)) => return Poll::Ready(Some(Ok(data))),
291 Ok(None) if self.eof => return Poll::Ready(None),
292 Ok(None) => continue,
293 Err(err) => return Poll::Ready(Some(Err(err))),
294 },
295 Poll::Ready(Ok(None)) => break,
296 Poll::Ready(Err(err)) => return Poll::Ready(Some(Err(err))),
297 Poll::Pending => {
298 pending = true;
299 break;
300 }
301 }
302 }
303 match self.process_message() {
304 Ok(Some(data)) => Poll::Ready(Some(Ok(data))),
305 Ok(None) if !self.eof && pending => Poll::Pending,
306 Ok(None) => Poll::Ready(None),
307 Err(err) => Poll::Ready(Some(Err(err))),
308 }
309 }
310}
311
312#[cfg(feature = "http")]
313fn split_header_body(stdout: &[u8]) -> HttpConversionResult<(&[u8], &[u8])> {
314 if let Some(offset) = stdout.windows(4).position(|window| window == b"\r\n\r\n") {
315 return Ok((&stdout[..offset], &stdout[offset + 4..]));
316 }
317 if let Some(offset) = stdout.windows(2).position(|window| window == b"\n\n") {
318 return Ok((&stdout[..offset], &stdout[offset + 2..]));
319 }
320 Err(HttpConversionError::MalformedHttpResponse {
321 message: "response does not contain a header/body separator",
322 })
323}
324
325#[cfg(feature = "http")]
326fn parse_status_header(value: &[u8]) -> HttpConversionResult<::http::StatusCode> {
327 let value = str::from_utf8(trim_start_ascii(value)).map_err(|_| {
328 HttpConversionError::MalformedHttpResponse {
329 message: "status header is not valid UTF-8",
330 }
331 })?;
332 let Some(code) = value.split_whitespace().next() else {
333 return Err(HttpConversionError::MalformedHttpResponse {
334 message: "status header is empty",
335 });
336 };
337 Ok(::http::StatusCode::from_str(code)?)
338}
339
340#[cfg(feature = "http")]
341fn trim_cr(line: &[u8]) -> &[u8] {
342 line.strip_suffix(b"\r").unwrap_or(line)
343}
344
345#[cfg(feature = "http")]
346fn trim_start_ascii(bytes: &[u8]) -> &[u8] {
347 let index = bytes
348 .iter()
349 .position(|byte| !byte.is_ascii_whitespace())
350 .unwrap_or(bytes.len());
351 &bytes[index..]
352}
353
354#[cfg(feature = "http")]
355trait SplitFirstColon {
356 fn split_first_colon(&self) -> Option<(&[u8], &[u8])>;
357}
358
359#[cfg(feature = "http")]
360impl SplitFirstColon for [u8] {
361 fn split_first_colon(&self) -> Option<(&[u8], &[u8])> {
362 let offset = self.iter().position(|byte| *byte == b':')?;
363 Some((&self[..offset], &self[offset + 1..]))
364 }
365}
366
367#[cfg(all(test, feature = "http"))]
368mod http_tests {
369 use crate::Response;
370
371 #[test]
372 fn response_into_http_defaults_status_to_ok() {
373 let response = Response {
374 stdout: Some(b"Content-type: text/plain\r\nX-Test: 1\r\n\r\nhello".to_vec()),
375 stderr: Some(b"notice".to_vec()),
376 };
377
378 let response: ::http::Response<Vec<u8>> = response.try_into().unwrap();
379
380 assert_eq!(response.status(), ::http::StatusCode::OK);
381 assert_eq!(response.headers()["content-type"], "text/plain");
382 assert_eq!(response.body(), b"hello");
383 }
384
385 #[test]
386 fn response_from_http_serializes_status_and_headers() {
387 let response = ::http::Response::builder()
388 .status(::http::StatusCode::CREATED)
389 .header(::http::header::CONTENT_TYPE, "text/plain")
390 .body(b"hello".to_vec())
391 .unwrap();
392
393 let response = Response::try_from(response).unwrap();
394 let stdout = String::from_utf8(response.stdout.unwrap()).unwrap();
395
396 assert!(stdout.starts_with("Status: 201 Created\r\n"));
397 assert!(stdout.contains("content-type: text/plain\r\n\r\nhello"));
398 }
399}