kvarn_fastcgi_client/
response.rs

1// Copyright 2022 jmjoy
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::{
16    meta::{EndRequestRec, Header, RequestType},
17    ClientError, ClientResult,
18};
19use std::{cmp::min, fmt, fmt::Debug, str};
20use tokio::io::{AsyncRead, AsyncReadExt};
21use tracing::debug;
22
23/// Output of fastcgi request, contains STDOUT and STDERR.
24#[derive(Default, Clone)]
25#[non_exhaustive]
26pub struct Response {
27    pub stdout: Option<Vec<u8>>,
28    pub stderr: Option<Vec<u8>>,
29}
30
31impl Debug for Response {
32    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
33        f.debug_struct("Response")
34            .field("stdout", &self.stdout.as_deref().map(str::from_utf8))
35            .field("stderr", &self.stderr.as_deref().map(str::from_utf8))
36            .finish()
37    }
38}
39
40pub enum Content<'a> {
41    Stdout(&'a [u8]),
42    Stderr(&'a [u8]),
43}
44
45#[derive(PartialEq)]
46enum ReadStep {
47    Content,
48    Padding,
49}
50
51/// Generated by
52/// [Client::execute_once_stream](crate::client::Client::execute_once_stream) or
53/// [Client::execute_stream](crate::client::Client::execute_stream).
54///
55/// The [ResponseStream] does not implement `futures::Stream`, because
56/// `futures::Stream` does not yet support GAT, so manually provide the
57/// [next](ResponseStream::next) method, which support the `while let` syntax.
58pub struct ResponseStream<S: AsyncRead + Unpin> {
59    stream: S,
60    id: u16,
61
62    ended: bool,
63
64    header: Option<Header>,
65
66    content_buf: Vec<u8>,
67    content_read: usize,
68
69    read_step: ReadStep,
70}
71
72impl<S: AsyncRead + Unpin + Send> ResponseStream<S> {
73    #[inline]
74    pub(crate) fn new(stream: S, id: u16) -> Self {
75        Self {
76            stream,
77            id,
78            ended: false,
79            header: None,
80            content_buf: vec![0; 4096],
81            content_read: 0,
82            read_step: ReadStep::Content,
83        }
84    }
85
86    pub async fn next(&mut self) -> Option<ClientResult<Content<'_>>> {
87        if self.ended {
88            return None;
89        }
90
91        loop {
92            if self.header.is_none() {
93                match Header::new_from_stream(&mut self.stream).await {
94                    Ok(header) => {
95                        self.header = Some(header);
96                    }
97                    Err(err) => {
98                        self.ended = true;
99                        return Some(Err(err.into()));
100                    }
101                };
102            }
103
104            let header = self.header.as_ref().unwrap();
105
106            match header.r#type.clone() {
107                RequestType::Stdout => match self.read_step {
108                    ReadStep::Content => {
109                        return self
110                            .read_to_content(
111                                header.content_length as usize,
112                                Content::Stdout,
113                                Self::prepare_for_read_padding,
114                            )
115                            .await;
116                    }
117                    ReadStep::Padding => {
118                        self.read_to_content(
119                            header.padding_length as usize,
120                            Content::Stdout,
121                            Self::prepare_for_read_header,
122                        )
123                        .await;
124                        continue;
125                    }
126                },
127                RequestType::Stderr => match self.read_step {
128                    ReadStep::Content => {
129                        return self
130                            .read_to_content(
131                                header.content_length as usize,
132                                Content::Stderr,
133                                Self::prepare_for_read_padding,
134                            )
135                            .await;
136                    }
137                    ReadStep::Padding => {
138                        self.read_to_content(
139                            header.padding_length as usize,
140                            Content::Stderr,
141                            Self::prepare_for_read_header,
142                        )
143                        .await;
144                        continue;
145                    }
146                },
147                RequestType::EndRequest => {
148                    let end_request_rec =
149                        match EndRequestRec::from_header(header, &mut self.stream).await {
150                            Ok(rec) => rec,
151                            Err(err) => {
152                                self.ended = true;
153                                return Some(Err(err.into()));
154                            }
155                        };
156                    debug!(id = self.id, ?end_request_rec, "Receive from stream.");
157
158                    self.ended = true;
159
160                    return match end_request_rec
161                        .end_request
162                        .protocol_status
163                        .convert_to_client_result(end_request_rec.end_request.app_status)
164                    {
165                        Ok(_) => None,
166                        Err(err) => Some(Err(err)),
167                    };
168                }
169                r#type => {
170                    self.ended = true;
171                    return Some(Err(ClientError::UnknownRequestType {
172                        request_type: r#type,
173                    }));
174                }
175            }
176        }
177    }
178
179    async fn read_to_content<'a, T: 'a>(
180        &'a mut self, length: usize, content_fn: impl FnOnce(&'a [u8]) -> T,
181        prepare_for_next_fn: impl FnOnce(&mut Self),
182    ) -> Option<ClientResult<T>> {
183        let content_len = self.content_buf.len();
184        let read = match self
185            .stream
186            .read(&mut self.content_buf[..min(content_len, length - self.content_read)])
187            .await
188        {
189            Ok(read) => read,
190            Err(err) => {
191                self.ended = true;
192                return Some(Err(err.into()));
193            }
194        };
195
196        self.content_read += read;
197        if self.content_read >= length {
198            self.content_read = 0;
199            prepare_for_next_fn(self);
200        }
201
202        Some(Ok(content_fn(&self.content_buf[..read])))
203    }
204
205    fn prepare_for_read_padding(&mut self) {
206        self.read_step = ReadStep::Padding;
207    }
208
209    fn prepare_for_read_header(&mut self) {
210        self.header = None;
211        self.read_step = ReadStep::Content;
212    }
213}