Skip to main content

fastcgi_connect/
response.rs

1// Copyright 2022 jmjoy
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! FastCGI response types and streaming support.
16//!
17//! This module provides structures for handling FastCGI responses,
18//! including both complete responses and streaming responses.
19
20use std::{
21    fmt::{self, Debug},
22    pin::Pin,
23    str,
24    task::Poll,
25};
26
27use bytes::{Bytes, BytesMut};
28use futures_core::stream::Stream;
29
30#[cfg(feature = "tokio")]
31use tokio::io::AsyncRead;
32#[cfg(feature = "tokio")]
33use tokio_util::io::ReaderStream;
34
35#[cfg(feature = "smol")]
36use smol::io::{AsyncRead, AsyncReadExt, Bytes as ReaderStream};
37
38use tracing::debug;
39
40use crate::{
41    meta::{EndRequestRec, Header, RequestType, HEADER_LEN},
42    ClientError, ClientResult,
43};
44
45/// Output of FastCGI request, contains STDOUT and STDERR data.
46///
47/// This structure represents a complete FastCGI response with
48/// both stdout and stderr output from the FastCGI server.
49#[derive(Default, Clone)]
50#[non_exhaustive]
51pub struct Response {
52    /// The stdout output from the FastCGI server
53    pub stdout: Option<Vec<u8>>,
54    /// The stderr output from the FastCGI server
55    pub stderr: Option<Vec<u8>>,
56}
57
58impl Debug for Response {
59    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
60        f.debug_struct("Response")
61            .field("stdout", &self.stdout.as_deref().map(str::from_utf8))
62            .field("stderr", &self.stderr.as_deref().map(str::from_utf8))
63            .finish()
64    }
65}
66
67/// Content type from a FastCGI response stream.
68///
69/// This enum represents the different types of content that can be
70/// received from a FastCGI server during streaming.
71pub enum Content {
72    /// Standard output content from the FastCGI server
73    Stdout(Bytes),
74    /// Standard error content from the FastCGI server
75    Stderr(Bytes),
76}
77
78/// A streaming response from a FastCGI server.
79///
80/// Generated by
81/// [Client::execute_once_stream](crate::client::Client::execute_once_stream) or
82/// [Client::execute_stream](crate::client::Client::execute_stream).
83///
84/// This stream yields `Content` items as they are received from the server.
85pub struct ResponseStream<S: AsyncRead + Unpin> {
86    #[cfg(feature = "tokio")]
87    stream: ReaderStream<S>,
88    #[cfg(feature = "smol")]
89    stream: ReaderStream<S>,
90    id: u16,
91    eof: bool,
92    header: Option<Header>,
93    buf: BytesMut,
94}
95
96impl<S: AsyncRead + Unpin> ResponseStream<S> {
97    /// Creates a new response stream.
98    ///
99    /// # Arguments
100    ///
101    /// * `stream` - The underlying stream to read from
102    /// * `id` - The request ID for this response
103    #[inline]
104    pub(crate) fn new(stream: S, id: u16) -> Self {
105        Self {
106            #[cfg(feature = "tokio")]
107            stream: ReaderStream::new(stream),
108            #[cfg(feature = "smol")]
109            stream: stream.bytes(),
110            id,
111            eof: false,
112            header: None,
113            buf: BytesMut::new(),
114        }
115    }
116
117    /// Reads a FastCGI header from the buffer.
118    ///
119    /// Returns `None` if there isn't enough data in the buffer.
120    #[inline]
121    fn read_header(&mut self) -> Option<Header> {
122        if self.buf.len() < HEADER_LEN {
123            return None;
124        }
125        let buf = self.buf.split_to(HEADER_LEN);
126        let header = (&buf as &[u8]).try_into().expect("failed to read header");
127        Some(Header::new_from_buf(header))
128    }
129
130    /// Reads content from the buffer based on the current header.
131    ///
132    /// Returns `None` if there isn't enough data in the buffer.
133    #[inline]
134    fn read_content(&mut self) -> Option<Bytes> {
135        let header = self.header.as_ref().unwrap();
136        let block_length = header.content_length as usize + header.padding_length as usize;
137        if self.buf.len() < block_length {
138            return None;
139        }
140        let content = self.buf.split_to(header.content_length as usize);
141        let _ = self.buf.split_to(header.padding_length as usize);
142        self.header = None;
143        Some(content.freeze())
144    }
145
146    /// Processes a complete FastCGI message from the buffer.
147    ///
148    /// Returns `Ok(Some(Content))` if a complete message was processed,
149    /// `Ok(None)` if more data is needed, or an error if processing failed.
150    fn process_message(&mut self) -> Result<Option<Content>, ClientError> {
151        if self.buf.is_empty() {
152            return Ok(None);
153        }
154        if self.header.is_none() {
155            match self.read_header() {
156                Some(header) => self.header = Some(header),
157                None => return Ok(None),
158            }
159        }
160        let header = self.header.as_ref().unwrap();
161        match header.r#type.clone() {
162            RequestType::Stdout => {
163                if let Some(data) = self.read_content() {
164                    return Ok(Some(Content::Stdout(data)));
165                }
166            }
167            RequestType::Stderr => {
168                if let Some(data) = self.read_content() {
169                    return Ok(Some(Content::Stderr(data)));
170                }
171            }
172            RequestType::EndRequest => {
173                let header = header.clone();
174                let Some(data) = self.read_content() else {
175                    return Ok(None);
176                };
177
178                let end = EndRequestRec::new_from_buf(header, &data);
179                debug!(id = self.id, ?end, "Receive from stream.");
180
181                self.eof = true;
182                end.end_request
183                    .protocol_status
184                    .convert_to_client_result(end.end_request.app_status)?;
185                return Ok(None);
186            }
187            r#type => {
188                self.eof = true;
189                return Err(ClientError::UnknownRequestType {
190                    request_type: r#type,
191                });
192            }
193        }
194        Ok(None)
195    }
196}
197
198impl<S> Stream for ResponseStream<S>
199where
200    S: AsyncRead + Unpin,
201{
202    type Item = ClientResult<Content>;
203
204    fn poll_next(
205        mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>,
206    ) -> std::task::Poll<Option<Self::Item>> {
207        let mut pending = false;
208        loop {
209            match Pin::new(&mut self.stream).poll_next(cx) {
210                Poll::Ready(Some(Ok(data))) => {
211                    #[cfg(feature = "smol")]
212                    self.buf.extend_from_slice(&[data]);
213                    #[cfg(feature = "tokio")]
214                    self.buf.extend_from_slice(&data);
215
216                    match self.process_message() {
217                        Ok(Some(data)) => return Poll::Ready(Some(Ok(data))),
218                        Ok(None) if self.eof => return Poll::Ready(None),
219                        Ok(None) => continue,
220                        Err(err) => return Poll::Ready(Some(Err(err))),
221                    }
222                }
223                Poll::Ready(Some(Err(err))) => return Poll::Ready(Some(Err(err.into()))),
224                Poll::Ready(None) => break,
225                Poll::Pending => {
226                    pending = true;
227                    break;
228                }
229            }
230        }
231        match self.process_message() {
232            Ok(Some(data)) => Poll::Ready(Some(Ok(data))),
233            Ok(None) if !self.eof && pending => Poll::Pending,
234            Ok(None) => Poll::Ready(None),
235            Err(err) => Poll::Ready(Some(Err(err))),
236        }
237    }
238}