fcgi_client/
response.rs

1// Copyright 2022 jmjoy
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! FastCGI response types and streaming support.
16//!
17//! This module provides structures for handling FastCGI responses,
18//! including both complete responses and streaming responses.
19
20use std::{
21    fmt::{self, Debug},
22    pin::Pin,
23    str,
24    task::Poll,
25};
26
27use bytes::{Bytes, BytesMut};
28use futures_util::stream::Stream;
29use tokio::io::AsyncRead;
30use tokio_util::io::ReaderStream;
31use tracing::debug;
32
33use crate::{
34    meta::{EndRequestRec, Header, RequestType, HEADER_LEN},
35    ClientError, ClientResult,
36};
37
38/// Output of FastCGI request, contains STDOUT and STDERR data.
39///
40/// This structure represents a complete FastCGI response with
41/// both stdout and stderr output from the FastCGI server.
42#[derive(Default, Clone)]
43#[non_exhaustive]
44pub struct Response {
45    /// The stdout output from the FastCGI server
46    pub stdout: Option<Bytes>,
47    /// The stderr output from the FastCGI server
48    pub stderr: Option<Bytes>,
49}
50
51impl Debug for Response {
52    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
53        f.debug_struct("Response")
54            .field("stdout", &self.stdout.as_deref().map(str::from_utf8))
55            .field("stderr", &self.stderr.as_deref().map(str::from_utf8))
56            .finish()
57    }
58}
59
60/// Content type from a FastCGI response stream.
61///
62/// This enum represents the different types of content that can be
63/// received from a FastCGI server during streaming.
64pub enum Content {
65    /// Standard output content from the FastCGI server
66    Stdout(Bytes),
67    /// Standard error content from the FastCGI server
68    Stderr(Bytes),
69}
70
71/// A streaming response from a FastCGI server.
72///
73/// Generated by
74/// [Client::execute_once_stream](crate::client::Client::execute_once_stream) or
75/// [Client::execute_stream](crate::client::Client::execute_stream).
76///
77/// This stream yields `Content` items as they are received from the server.
78pub struct ResponseStream<S: AsyncRead + Unpin> {
79    stream: ReaderStream<S>,
80    id: u16,
81    eof: bool,
82    header: Option<Header>,
83    buf: BytesMut,
84}
85
86impl<S: AsyncRead + Unpin> ResponseStream<S> {
87    /// Creates a new response stream.
88    ///
89    /// # Arguments
90    ///
91    /// * `stream` - The underlying stream to read from
92    /// * `id` - The request ID for this response
93    #[inline]
94    pub(crate) fn new(stream: S, id: u16) -> Self {
95        Self {
96            stream: ReaderStream::new(stream),
97            id,
98            eof: false,
99            header: None,
100            buf: BytesMut::new(),
101        }
102    }
103
104    /// Reads a FastCGI header from the buffer.
105    ///
106    /// Returns `None` if there isn't enough data in the buffer.
107    #[inline]
108    fn read_header(&mut self) -> Option<Header> {
109        if self.buf.len() < HEADER_LEN {
110            return None;
111        }
112        let buf = self.buf.split_to(HEADER_LEN);
113        Some(Header::from(buf))
114    }
115
116    /// Reads content from the buffer based on the current header.
117    ///
118    /// Returns `None` if there isn't enough data in the buffer.
119    #[inline]
120    fn read_content(&mut self) -> Option<BytesMut> {
121        let header = self.header.as_ref().unwrap();
122        let block_length = header.content_length as usize + header.padding_length as usize;
123        if self.buf.len() < block_length {
124            return None;
125        }
126        let content = self.buf.split_to(header.content_length as usize);
127        let _ = self.buf.split_to(header.padding_length as usize);
128        self.header = None;
129        Some(content)
130    }
131
132    /// Processes a complete FastCGI message from the buffer.
133    ///
134    /// Returns `Ok(Some(Content))` if a complete message was processed,
135    /// `Ok(None)` if more data is needed, or an error if processing failed.
136    fn process_message(&mut self) -> Result<Option<Content>, ClientError> {
137        if self.buf.is_empty() {
138            return Ok(None);
139        }
140        if self.header.is_none() {
141            match self.read_header() {
142                Some(header) => self.header = Some(header),
143                None => return Ok(None),
144            }
145        }
146        let header = self.header.as_ref().unwrap();
147        match header.r#type.clone() {
148            RequestType::Stdout => {
149                if let Some(data) = self.read_content() {
150                    return Ok(Some(Content::Stdout(data.freeze())));
151                }
152            }
153            RequestType::Stderr => {
154                if let Some(data) = self.read_content() {
155                    return Ok(Some(Content::Stderr(data.freeze())));
156                }
157            }
158            RequestType::EndRequest => {
159                let header = header.clone();
160                let Some(data) = self.read_content() else {
161                    return Ok(None);
162                };
163
164                let end = EndRequestRec::new_from_buf(header, data);
165                debug!(id = self.id, ?end, "Receive from stream.");
166
167                self.eof = true;
168                end.end_request
169                    .protocol_status
170                    .convert_to_client_result(end.end_request.app_status)?;
171                return Ok(None);
172            }
173            r#type => {
174                self.eof = true;
175                return Err(ClientError::UnknownRequestType {
176                    request_type: r#type,
177                });
178            }
179        }
180        Ok(None)
181    }
182}
183
184impl<S> Stream for ResponseStream<S>
185where
186    S: AsyncRead + Unpin,
187{
188    type Item = ClientResult<Content>;
189
190    fn poll_next(
191        mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>,
192    ) -> std::task::Poll<Option<Self::Item>> {
193        let mut pending = false;
194        loop {
195            match Pin::new(&mut self.stream).poll_next(cx) {
196                Poll::Ready(Some(Ok(data))) => {
197                    self.buf.extend_from_slice(&data);
198
199                    match self.process_message() {
200                        Ok(Some(data)) => return Poll::Ready(Some(Ok(data))),
201                        Ok(None) if self.eof => return Poll::Ready(None),
202                        Ok(None) => continue,
203                        Err(err) => return Poll::Ready(Some(Err(err))),
204                    }
205                }
206                Poll::Ready(Some(Err(err))) => return Poll::Ready(Some(Err(err.into()))),
207                Poll::Ready(None) => break,
208                Poll::Pending => {
209                    pending = true;
210                    break;
211                }
212            }
213        }
214        match self.process_message() {
215            Ok(Some(data)) => Poll::Ready(Some(Ok(data))),
216            Ok(None) if !self.eof && pending => Poll::Pending,
217            Ok(None) => Poll::Ready(None),
218            Err(err) => Poll::Ready(Some(Err(err))),
219        }
220    }
221}