fastcgi_client/
response.rs

1// Copyright 2022 jmjoy
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! FastCGI response types and streaming support.
16//!
17//! This module provides structures for handling FastCGI responses,
18//! including both complete responses and streaming responses.
19
20use std::{
21    fmt::{self, Debug},
22    pin::Pin,
23    str,
24    task::Poll,
25};
26
27use bytes::{Bytes, BytesMut};
28use futures_core::stream::Stream;
29use tokio::io::AsyncRead;
30use tokio_util::io::ReaderStream;
31use tracing::debug;
32
33use crate::{
34    meta::{EndRequestRec, Header, RequestType, HEADER_LEN},
35    ClientError, ClientResult,
36};
37
38/// Output of FastCGI request, contains STDOUT and STDERR data.
39///
40/// This structure represents a complete FastCGI response with
41/// both stdout and stderr output from the FastCGI server.
42#[derive(Default, Clone)]
43#[non_exhaustive]
44pub struct Response {
45    /// The stdout output from the FastCGI server
46    pub stdout: Option<Vec<u8>>,
47    /// The stderr output from the FastCGI server
48    pub stderr: Option<Vec<u8>>,
49}
50
51impl Debug for Response {
52    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
53        f.debug_struct("Response")
54            .field("stdout", &self.stdout.as_deref().map(str::from_utf8))
55            .field("stderr", &self.stderr.as_deref().map(str::from_utf8))
56            .finish()
57    }
58}
59
60/// Content type from a FastCGI response stream.
61///
62/// This enum represents the different types of content that can be
63/// received from a FastCGI server during streaming.
64pub enum Content {
65    /// Standard output content from the FastCGI server
66    Stdout(Bytes),
67    /// Standard error content from the FastCGI server
68    Stderr(Bytes),
69}
70
71/// A streaming response from a FastCGI server.
72///
73/// Generated by
74/// [Client::execute_once_stream](crate::client::Client::execute_once_stream) or
75/// [Client::execute_stream](crate::client::Client::execute_stream).
76///
77/// This stream yields `Content` items as they are received from the server.
78pub struct ResponseStream<S: AsyncRead + Unpin> {
79    stream: ReaderStream<S>,
80    id: u16,
81    eof: bool,
82    header: Option<Header>,
83    buf: BytesMut,
84}
85
86impl<S: AsyncRead + Unpin> ResponseStream<S> {
87    /// Creates a new response stream.
88    ///
89    /// # Arguments
90    ///
91    /// * `stream` - The underlying stream to read from
92    /// * `id` - The request ID for this response
93    #[inline]
94    pub(crate) fn new(stream: S, id: u16) -> Self {
95        Self {
96            stream: ReaderStream::new(stream),
97            id,
98            eof: false,
99            header: None,
100            buf: BytesMut::new(),
101        }
102    }
103
104    /// Reads a FastCGI header from the buffer.
105    ///
106    /// Returns `None` if there isn't enough data in the buffer.
107    #[inline]
108    fn read_header(&mut self) -> Option<Header> {
109        if self.buf.len() < HEADER_LEN {
110            return None;
111        }
112        let buf = self.buf.split_to(HEADER_LEN);
113        let header = (&buf as &[u8]).try_into().expect("failed to read header");
114        Some(Header::new_from_buf(header))
115    }
116
117    /// Reads content from the buffer based on the current header.
118    ///
119    /// Returns `None` if there isn't enough data in the buffer.
120    #[inline]
121    fn read_content(&mut self) -> Option<Bytes> {
122        let header = self.header.as_ref().unwrap();
123        let block_length = header.content_length as usize + header.padding_length as usize;
124        if self.buf.len() < block_length {
125            return None;
126        }
127        let content = self.buf.split_to(header.content_length as usize);
128        let _ = self.buf.split_to(header.padding_length as usize);
129        self.header = None;
130        Some(content.freeze())
131    }
132
133    /// Processes a complete FastCGI message from the buffer.
134    ///
135    /// Returns `Ok(Some(Content))` if a complete message was processed,
136    /// `Ok(None)` if more data is needed, or an error if processing failed.
137    fn process_message(&mut self) -> Result<Option<Content>, ClientError> {
138        if self.buf.is_empty() {
139            return Ok(None);
140        }
141        if self.header.is_none() {
142            match self.read_header() {
143                Some(header) => self.header = Some(header),
144                None => return Ok(None),
145            }
146        }
147        let header = self.header.as_ref().unwrap();
148        match header.r#type.clone() {
149            RequestType::Stdout => {
150                if let Some(data) = self.read_content() {
151                    return Ok(Some(Content::Stdout(data)));
152                }
153            }
154            RequestType::Stderr => {
155                if let Some(data) = self.read_content() {
156                    return Ok(Some(Content::Stderr(data)));
157                }
158            }
159            RequestType::EndRequest => {
160                let header = header.clone();
161                let Some(data) = self.read_content() else {
162                    return Ok(None);
163                };
164
165                let end = EndRequestRec::new_from_buf(header, &data);
166                debug!(id = self.id, ?end, "Receive from stream.");
167
168                self.eof = true;
169                end.end_request
170                    .protocol_status
171                    .convert_to_client_result(end.end_request.app_status)?;
172                return Ok(None);
173            }
174            r#type => {
175                self.eof = true;
176                return Err(ClientError::UnknownRequestType {
177                    request_type: r#type,
178                });
179            }
180        }
181        Ok(None)
182    }
183}
184
185impl<S> Stream for ResponseStream<S>
186where
187    S: AsyncRead + Unpin,
188{
189    type Item = ClientResult<Content>;
190
191    fn poll_next(
192        mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>,
193    ) -> std::task::Poll<Option<Self::Item>> {
194        let mut pending = false;
195        loop {
196            match Pin::new(&mut self.stream).poll_next(cx) {
197                Poll::Ready(Some(Ok(data))) => {
198                    self.buf.extend_from_slice(&data);
199
200                    match self.process_message() {
201                        Ok(Some(data)) => return Poll::Ready(Some(Ok(data))),
202                        Ok(None) if self.eof => return Poll::Ready(None),
203                        Ok(None) => continue,
204                        Err(err) => return Poll::Ready(Some(Err(err))),
205                    }
206                }
207                Poll::Ready(Some(Err(err))) => return Poll::Ready(Some(Err(err.into()))),
208                Poll::Ready(None) => break,
209                Poll::Pending => {
210                    pending = true;
211                    break;
212                }
213            }
214        }
215        match self.process_message() {
216            Ok(Some(data)) => Poll::Ready(Some(Ok(data))),
217            Ok(None) if !self.eof && pending => Poll::Pending,
218            Ok(None) => Poll::Ready(None),
219            Err(err) => Poll::Ready(Some(Err(err))),
220        }
221    }
222}