fastcgi_client/
response.rs1use std::{
21 fmt::{self, Debug},
22 pin::Pin,
23 str,
24 task::Poll,
25};
26
27use bytes::{Bytes, BytesMut};
28use futures_core::stream::Stream;
29use tokio::io::AsyncRead;
30use tokio_util::io::ReaderStream;
31use tracing::debug;
32
33use crate::{
34 meta::{EndRequestRec, Header, RequestType, HEADER_LEN},
35 ClientError, ClientResult,
36};
37
38#[derive(Default, Clone)]
43#[non_exhaustive]
44pub struct Response {
45 pub stdout: Option<Vec<u8>>,
47 pub stderr: Option<Vec<u8>>,
49}
50
51impl Debug for Response {
52 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
53 f.debug_struct("Response")
54 .field("stdout", &self.stdout.as_deref().map(str::from_utf8))
55 .field("stderr", &self.stderr.as_deref().map(str::from_utf8))
56 .finish()
57 }
58}
59
60pub enum Content {
65 Stdout(Bytes),
67 Stderr(Bytes),
69}
70
71pub struct ResponseStream<S: AsyncRead + Unpin> {
79 stream: ReaderStream<S>,
80 id: u16,
81 eof: bool,
82 header: Option<Header>,
83 buf: BytesMut,
84}
85
86impl<S: AsyncRead + Unpin> ResponseStream<S> {
87 #[inline]
94 pub(crate) fn new(stream: S, id: u16) -> Self {
95 Self {
96 stream: ReaderStream::new(stream),
97 id,
98 eof: false,
99 header: None,
100 buf: BytesMut::new(),
101 }
102 }
103
104 #[inline]
108 fn read_header(&mut self) -> Option<Header> {
109 if self.buf.len() < HEADER_LEN {
110 return None;
111 }
112 let buf = self.buf.split_to(HEADER_LEN);
113 let header = (&buf as &[u8]).try_into().expect("failed to read header");
114 Some(Header::new_from_buf(header))
115 }
116
117 #[inline]
121 fn read_content(&mut self) -> Option<Bytes> {
122 let header = self.header.as_ref().unwrap();
123 let block_length = header.content_length as usize + header.padding_length as usize;
124 if self.buf.len() < block_length {
125 return None;
126 }
127 let content = self.buf.split_to(header.content_length as usize);
128 let _ = self.buf.split_to(header.padding_length as usize);
129 self.header = None;
130 Some(content.freeze())
131 }
132
133 fn process_message(&mut self) -> Result<Option<Content>, ClientError> {
138 if self.buf.is_empty() {
139 return Ok(None);
140 }
141 if self.header.is_none() {
142 match self.read_header() {
143 Some(header) => self.header = Some(header),
144 None => return Ok(None),
145 }
146 }
147 let header = self.header.as_ref().unwrap();
148 match header.r#type.clone() {
149 RequestType::Stdout => {
150 if let Some(data) = self.read_content() {
151 return Ok(Some(Content::Stdout(data)));
152 }
153 }
154 RequestType::Stderr => {
155 if let Some(data) = self.read_content() {
156 return Ok(Some(Content::Stderr(data)));
157 }
158 }
159 RequestType::EndRequest => {
160 let header = header.clone();
161 let Some(data) = self.read_content() else {
162 return Ok(None);
163 };
164
165 let end = EndRequestRec::new_from_buf(header, &data);
166 debug!(id = self.id, ?end, "Receive from stream.");
167
168 self.eof = true;
169 end.end_request
170 .protocol_status
171 .convert_to_client_result(end.end_request.app_status)?;
172 return Ok(None);
173 }
174 r#type => {
175 self.eof = true;
176 return Err(ClientError::UnknownRequestType {
177 request_type: r#type,
178 });
179 }
180 }
181 Ok(None)
182 }
183}
184
185impl<S> Stream for ResponseStream<S>
186where
187 S: AsyncRead + Unpin,
188{
189 type Item = ClientResult<Content>;
190
191 fn poll_next(
192 mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>,
193 ) -> std::task::Poll<Option<Self::Item>> {
194 let mut pending = false;
195 loop {
196 match Pin::new(&mut self.stream).poll_next(cx) {
197 Poll::Ready(Some(Ok(data))) => {
198 self.buf.extend_from_slice(&data);
199
200 match self.process_message() {
201 Ok(Some(data)) => return Poll::Ready(Some(Ok(data))),
202 Ok(None) if self.eof => return Poll::Ready(None),
203 Ok(None) => continue,
204 Err(err) => return Poll::Ready(Some(Err(err))),
205 }
206 }
207 Poll::Ready(Some(Err(err))) => return Poll::Ready(Some(Err(err.into()))),
208 Poll::Ready(None) => break,
209 Poll::Pending => {
210 pending = true;
211 break;
212 }
213 }
214 }
215 match self.process_message() {
216 Ok(Some(data)) => Poll::Ready(Some(Ok(data))),
217 Ok(None) if !self.eof && pending => Poll::Pending,
218 Ok(None) => Poll::Ready(None),
219 Err(err) => Poll::Ready(Some(Err(err))),
220 }
221 }
222}