fastcgi_connect/
response.rs1use std::{
21 fmt::{self, Debug},
22 pin::Pin,
23 str,
24 task::Poll,
25};
26
27use bytes::{Bytes, BytesMut};
28use futures_core::stream::Stream;
29
30#[cfg(feature = "tokio")]
31use tokio::io::AsyncRead;
32#[cfg(feature = "tokio")]
33use tokio_util::io::ReaderStream;
34
35#[cfg(feature = "smol")]
36use smol::io::{AsyncRead, AsyncReadExt, Bytes as ReaderStream};
37
38use tracing::debug;
39
40use crate::{
41 meta::{EndRequestRec, Header, RequestType, HEADER_LEN},
42 ClientError, ClientResult,
43};
44
45#[derive(Default, Clone)]
50#[non_exhaustive]
51pub struct Response {
52 pub stdout: Option<Vec<u8>>,
54 pub stderr: Option<Vec<u8>>,
56}
57
58impl Debug for Response {
59 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
60 f.debug_struct("Response")
61 .field("stdout", &self.stdout.as_deref().map(str::from_utf8))
62 .field("stderr", &self.stderr.as_deref().map(str::from_utf8))
63 .finish()
64 }
65}
66
67pub enum Content {
72 Stdout(Bytes),
74 Stderr(Bytes),
76}
77
78pub struct ResponseStream<S: AsyncRead + Unpin> {
86 #[cfg(feature = "tokio")]
87 stream: ReaderStream<S>,
88 #[cfg(feature = "smol")]
89 stream: ReaderStream<S>,
90 id: u16,
91 eof: bool,
92 header: Option<Header>,
93 buf: BytesMut,
94}
95
96impl<S: AsyncRead + Unpin> ResponseStream<S> {
97 #[inline]
104 pub(crate) fn new(stream: S, id: u16) -> Self {
105 Self {
106 #[cfg(feature = "tokio")]
107 stream: ReaderStream::new(stream),
108 #[cfg(feature = "smol")]
109 stream: stream.bytes(),
110 id,
111 eof: false,
112 header: None,
113 buf: BytesMut::new(),
114 }
115 }
116
117 #[inline]
121 fn read_header(&mut self) -> Option<Header> {
122 if self.buf.len() < HEADER_LEN {
123 return None;
124 }
125 let buf = self.buf.split_to(HEADER_LEN);
126 let header = (&buf as &[u8]).try_into().expect("failed to read header");
127 Some(Header::new_from_buf(header))
128 }
129
130 #[inline]
134 fn read_content(&mut self) -> Option<Bytes> {
135 let header = self.header.as_ref().unwrap();
136 let block_length = header.content_length as usize + header.padding_length as usize;
137 if self.buf.len() < block_length {
138 return None;
139 }
140 let content = self.buf.split_to(header.content_length as usize);
141 let _ = self.buf.split_to(header.padding_length as usize);
142 self.header = None;
143 Some(content.freeze())
144 }
145
146 fn process_message(&mut self) -> Result<Option<Content>, ClientError> {
151 if self.buf.is_empty() {
152 return Ok(None);
153 }
154 if self.header.is_none() {
155 match self.read_header() {
156 Some(header) => self.header = Some(header),
157 None => return Ok(None),
158 }
159 }
160 let header = self.header.as_ref().unwrap();
161 match header.r#type.clone() {
162 RequestType::Stdout => {
163 if let Some(data) = self.read_content() {
164 return Ok(Some(Content::Stdout(data)));
165 }
166 }
167 RequestType::Stderr => {
168 if let Some(data) = self.read_content() {
169 return Ok(Some(Content::Stderr(data)));
170 }
171 }
172 RequestType::EndRequest => {
173 let header = header.clone();
174 let Some(data) = self.read_content() else {
175 return Ok(None);
176 };
177
178 let end = EndRequestRec::new_from_buf(header, &data);
179 debug!(id = self.id, ?end, "Receive from stream.");
180
181 self.eof = true;
182 end.end_request
183 .protocol_status
184 .convert_to_client_result(end.end_request.app_status)?;
185 return Ok(None);
186 }
187 r#type => {
188 self.eof = true;
189 return Err(ClientError::UnknownRequestType {
190 request_type: r#type,
191 });
192 }
193 }
194 Ok(None)
195 }
196}
197
198impl<S> Stream for ResponseStream<S>
199where
200 S: AsyncRead + Unpin,
201{
202 type Item = ClientResult<Content>;
203
204 fn poll_next(
205 mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>,
206 ) -> std::task::Poll<Option<Self::Item>> {
207 let mut pending = false;
208 loop {
209 match Pin::new(&mut self.stream).poll_next(cx) {
210 Poll::Ready(Some(Ok(data))) => {
211 #[cfg(feature = "smol")]
212 self.buf.extend_from_slice(&[data]);
213 #[cfg(feature = "tokio")]
214 self.buf.extend_from_slice(&data);
215
216 match self.process_message() {
217 Ok(Some(data)) => return Poll::Ready(Some(Ok(data))),
218 Ok(None) if self.eof => return Poll::Ready(None),
219 Ok(None) => continue,
220 Err(err) => return Poll::Ready(Some(Err(err))),
221 }
222 }
223 Poll::Ready(Some(Err(err))) => return Poll::Ready(Some(Err(err.into()))),
224 Poll::Ready(None) => break,
225 Poll::Pending => {
226 pending = true;
227 break;
228 }
229 }
230 }
231 match self.process_message() {
232 Ok(Some(data)) => Poll::Ready(Some(Ok(data))),
233 Ok(None) if !self.eof && pending => Poll::Pending,
234 Ok(None) => Poll::Ready(None),
235 Err(err) => Poll::Ready(Some(Err(err))),
236 }
237 }
238}