Skip to main content

fastcgi_client/
response.rs

1// Copyright 2022 jmjoy
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! FastCGI response types and streaming support.
16//!
17//! This module provides structures for handling FastCGI responses,
18//! including both complete responses and streaming responses.
19
20use std::{
21    fmt::{self, Debug},
22    pin::Pin,
23    str,
24    task::{Context, Poll},
25};
26
27#[cfg(feature = "http")]
28use std::str::FromStr;
29
30use bytes::{Bytes, BytesMut};
31use futures_core::stream::Stream;
32use tracing::debug;
33
34use crate::{
35    ClientError, ClientResult,
36    io::AsyncRead,
37    meta::{EndRequestRec, HEADER_LEN, Header, RequestType},
38};
39
40#[cfg(feature = "http")]
41use crate::{HttpConversionError, HttpConversionResult};
42
43/// Output of FastCGI request, contains STDOUT and STDERR data.
44///
45/// This structure represents a complete FastCGI response with
46/// both stdout and stderr output from the FastCGI server.
47#[derive(Default, Clone)]
48#[non_exhaustive]
49pub struct Response {
50    /// The stdout output from the FastCGI server
51    pub stdout: Option<Vec<u8>>,
52    /// The stderr output from the FastCGI server
53    pub stderr: Option<Vec<u8>>,
54}
55
56impl Debug for Response {
57    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
58        f.debug_struct("Response")
59            .field("stdout", &self.stdout.as_deref().map(str::from_utf8))
60            .field("stderr", &self.stderr.as_deref().map(str::from_utf8))
61            .finish()
62    }
63}
64
65#[cfg(feature = "http")]
66impl<B> TryFrom<::http::Response<B>> for Response
67where
68    B: AsRef<[u8]>,
69{
70    type Error = HttpConversionError;
71
72    fn try_from(response: ::http::Response<B>) -> Result<Self, Self::Error> {
73        let (parts, body) = response.into_parts();
74        let mut stdout = Vec::new();
75
76        if parts.status != ::http::StatusCode::OK {
77            stdout.extend_from_slice(format!("Status: {}\r\n", parts.status).as_bytes());
78        }
79
80        for (name, value) in &parts.headers {
81            stdout.extend_from_slice(name.as_str().as_bytes());
82            stdout.extend_from_slice(b": ");
83            stdout.extend_from_slice(value.as_bytes());
84            stdout.extend_from_slice(b"\r\n");
85        }
86
87        stdout.extend_from_slice(b"\r\n");
88        stdout.extend_from_slice(body.as_ref());
89
90        Ok(Response {
91            stdout: Some(stdout),
92            stderr: None,
93        })
94    }
95}
96
97#[cfg(feature = "http")]
98impl TryFrom<Response> for ::http::Response<Vec<u8>> {
99    type Error = HttpConversionError;
100
101    fn try_from(response: Response) -> Result<Self, Self::Error> {
102        let stdout = response.stdout.unwrap_or_default();
103        let (header_bytes, body_bytes) = split_header_body(&stdout)?;
104        let mut status = ::http::StatusCode::OK;
105        let mut builder = ::http::Response::builder();
106
107        {
108            let headers = builder
109                .headers_mut()
110                .expect("response builder should provide headers");
111            for line in header_bytes.split(|byte| *byte == b'\n') {
112                let line = trim_cr(line);
113                if line.is_empty() {
114                    continue;
115                }
116                let Some((name, value)) = line.split_first_colon() else {
117                    return Err(HttpConversionError::MalformedHttpResponse {
118                        message: "response header line is missing ':'",
119                    });
120                };
121
122                if name.eq_ignore_ascii_case(b"Status") {
123                    status = parse_status_header(value)?;
124                    continue;
125                }
126
127                headers.append(
128                    ::http::header::HeaderName::from_bytes(name)?,
129                    ::http::header::HeaderValue::from_bytes(trim_start_ascii(value))?,
130                );
131            }
132        }
133
134        builder = builder.status(status);
135        Ok(builder.body(body_bytes.to_vec())?)
136    }
137}
138
139/// Content type from a FastCGI response stream.
140///
141/// This enum represents the different types of content that can be
142/// received from a FastCGI server during streaming.
143pub enum Content {
144    /// Standard output content from the FastCGI server
145    Stdout(Bytes),
146    /// Standard error content from the FastCGI server
147    Stderr(Bytes),
148}
149
150/// A streaming response from a FastCGI server.
151///
152/// Generated by
153/// [Client::execute_once_stream](crate::client::Client::execute_once_stream) or
154/// [Client::execute_stream](crate::client::Client::execute_stream).
155///
156/// This stream yields `Content` items as they are received from the server.
157pub struct ResponseStream<S: AsyncRead + Unpin> {
158    stream: S,
159    id: u16,
160    eof: bool,
161    header: Option<Header>,
162    buf: BytesMut,
163}
164
165impl<S: AsyncRead + Unpin> ResponseStream<S> {
166    /// Creates a new response stream.
167    ///
168    /// # Arguments
169    ///
170    /// * `stream` - The underlying stream to read from
171    /// * `id` - The request ID for this response
172    #[inline]
173    pub(crate) fn new(stream: S, id: u16) -> Self {
174        Self {
175            stream,
176            id,
177            eof: false,
178            header: None,
179            buf: BytesMut::new(),
180        }
181    }
182
183    /// Reads a FastCGI header from the buffer.
184    ///
185    /// Returns `None` if there isn't enough data in the buffer.
186    #[inline]
187    fn read_header(&mut self) -> Option<Header> {
188        if self.buf.len() < HEADER_LEN {
189            return None;
190        }
191        let buf = self.buf.split_to(HEADER_LEN);
192        let header = (&buf as &[u8]).try_into().expect("failed to read header");
193        Some(Header::new_from_buf(header))
194    }
195
196    /// Reads content from the buffer based on the current header.
197    ///
198    /// Returns `None` if there isn't enough data in the buffer.
199    #[inline]
200    fn read_content(&mut self) -> Option<Bytes> {
201        let header = self.header.as_ref().unwrap();
202        let block_length = header.content_length as usize + header.padding_length as usize;
203        if self.buf.len() < block_length {
204            return None;
205        }
206        let content = self.buf.split_to(header.content_length as usize);
207        let _ = self.buf.split_to(header.padding_length as usize);
208        self.header = None;
209        Some(content.freeze())
210    }
211
212    /// Processes a complete FastCGI message from the buffer.
213    ///
214    /// Returns `Ok(Some(Content))` if a complete message was processed,
215    /// `Ok(None)` if more data is needed, or an error if processing failed.
216    fn process_message(&mut self) -> Result<Option<Content>, ClientError> {
217        if self.buf.is_empty() {
218            return Ok(None);
219        }
220        if self.header.is_none() {
221            match self.read_header() {
222                Some(header) => self.header = Some(header),
223                None => return Ok(None),
224            }
225        }
226        let header = self.header.as_ref().unwrap();
227        match header.r#type.clone() {
228            RequestType::Stdout => {
229                if let Some(data) = self.read_content() {
230                    return Ok(Some(Content::Stdout(data)));
231                }
232            }
233            RequestType::Stderr => {
234                if let Some(data) = self.read_content() {
235                    return Ok(Some(Content::Stderr(data)));
236                }
237            }
238            RequestType::EndRequest => {
239                let header = header.clone();
240                let Some(data) = self.read_content() else {
241                    return Ok(None);
242                };
243
244                let end = EndRequestRec::new_from_buf(header, &data);
245                debug!(id = self.id, ?end, "Receive from stream.");
246
247                self.eof = true;
248                end.end_request
249                    .protocol_status
250                    .convert_to_client_result(end.end_request.app_status)?;
251                return Ok(None);
252            }
253            r#type => {
254                self.eof = true;
255                return Err(ClientError::UnknownRequestType {
256                    request_type: r#type,
257                });
258            }
259        }
260        Ok(None)
261    }
262
263    fn poll_fill_buf(&mut self, cx: &mut Context<'_>) -> Poll<ClientResult<Option<()>>> {
264        let mut chunk = [0; 8192];
265        match Pin::new(&mut self.stream).poll_read(cx, &mut chunk) {
266            Poll::Ready(Ok(0)) => Poll::Ready(Ok(None)),
267            Poll::Ready(Ok(read)) => {
268                self.buf.extend_from_slice(&chunk[..read]);
269                Poll::Ready(Ok(Some(())))
270            }
271            Poll::Ready(Err(err)) => Poll::Ready(Err(err.into())),
272            Poll::Pending => Poll::Pending,
273        }
274    }
275}
276
277impl<S> Stream for ResponseStream<S>
278where
279    S: AsyncRead + Unpin,
280{
281    type Item = ClientResult<Content>;
282
283    fn poll_next(
284        mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>,
285    ) -> Poll<Option<Self::Item>> {
286        let mut pending = false;
287        loop {
288            match self.poll_fill_buf(cx) {
289                Poll::Ready(Ok(Some(()))) => match self.process_message() {
290                    Ok(Some(data)) => return Poll::Ready(Some(Ok(data))),
291                    Ok(None) if self.eof => return Poll::Ready(None),
292                    Ok(None) => continue,
293                    Err(err) => return Poll::Ready(Some(Err(err))),
294                },
295                Poll::Ready(Ok(None)) => break,
296                Poll::Ready(Err(err)) => return Poll::Ready(Some(Err(err))),
297                Poll::Pending => {
298                    pending = true;
299                    break;
300                }
301            }
302        }
303        match self.process_message() {
304            Ok(Some(data)) => Poll::Ready(Some(Ok(data))),
305            Ok(None) if !self.eof && pending => Poll::Pending,
306            Ok(None) => Poll::Ready(None),
307            Err(err) => Poll::Ready(Some(Err(err))),
308        }
309    }
310}
311
312#[cfg(feature = "http")]
313fn split_header_body(stdout: &[u8]) -> HttpConversionResult<(&[u8], &[u8])> {
314    if let Some(offset) = stdout.windows(4).position(|window| window == b"\r\n\r\n") {
315        return Ok((&stdout[..offset], &stdout[offset + 4..]));
316    }
317    if let Some(offset) = stdout.windows(2).position(|window| window == b"\n\n") {
318        return Ok((&stdout[..offset], &stdout[offset + 2..]));
319    }
320    Err(HttpConversionError::MalformedHttpResponse {
321        message: "response does not contain a header/body separator",
322    })
323}
324
325#[cfg(feature = "http")]
326fn parse_status_header(value: &[u8]) -> HttpConversionResult<::http::StatusCode> {
327    let value = str::from_utf8(trim_start_ascii(value)).map_err(|_| {
328        HttpConversionError::MalformedHttpResponse {
329            message: "status header is not valid UTF-8",
330        }
331    })?;
332    let Some(code) = value.split_whitespace().next() else {
333        return Err(HttpConversionError::MalformedHttpResponse {
334            message: "status header is empty",
335        });
336    };
337    Ok(::http::StatusCode::from_str(code)?)
338}
339
340#[cfg(feature = "http")]
341fn trim_cr(line: &[u8]) -> &[u8] {
342    line.strip_suffix(b"\r").unwrap_or(line)
343}
344
345#[cfg(feature = "http")]
346fn trim_start_ascii(bytes: &[u8]) -> &[u8] {
347    let index = bytes
348        .iter()
349        .position(|byte| !byte.is_ascii_whitespace())
350        .unwrap_or(bytes.len());
351    &bytes[index..]
352}
353
354#[cfg(feature = "http")]
355trait SplitFirstColon {
356    fn split_first_colon(&self) -> Option<(&[u8], &[u8])>;
357}
358
359#[cfg(feature = "http")]
360impl SplitFirstColon for [u8] {
361    fn split_first_colon(&self) -> Option<(&[u8], &[u8])> {
362        let offset = self.iter().position(|byte| *byte == b':')?;
363        Some((&self[..offset], &self[offset + 1..]))
364    }
365}
366
367#[cfg(all(test, feature = "http"))]
368mod http_tests {
369    use crate::Response;
370
371    #[test]
372    fn response_into_http_defaults_status_to_ok() {
373        let response = Response {
374            stdout: Some(b"Content-type: text/plain\r\nX-Test: 1\r\n\r\nhello".to_vec()),
375            stderr: Some(b"notice".to_vec()),
376        };
377
378        let response: ::http::Response<Vec<u8>> = response.try_into().unwrap();
379
380        assert_eq!(response.status(), ::http::StatusCode::OK);
381        assert_eq!(response.headers()["content-type"], "text/plain");
382        assert_eq!(response.body(), b"hello");
383    }
384
385    #[test]
386    fn response_from_http_serializes_status_and_headers() {
387        let response = ::http::Response::builder()
388            .status(::http::StatusCode::CREATED)
389            .header(::http::header::CONTENT_TYPE, "text/plain")
390            .body(b"hello".to_vec())
391            .unwrap();
392
393        let response = Response::try_from(response).unwrap();
394        let stdout = String::from_utf8(response.stdout.unwrap()).unwrap();
395
396        assert!(stdout.starts_with("Status: 201 Created\r\n"));
397        assert!(stdout.contains("content-type: text/plain\r\n\r\nhello"));
398    }
399}