kvarn_fastcgi_client/
response.rs1use crate::{
16 meta::{EndRequestRec, Header, RequestType},
17 ClientError, ClientResult,
18};
19use std::{cmp::min, fmt, fmt::Debug, str};
20use tokio::io::{AsyncRead, AsyncReadExt};
21use tracing::debug;
22
23#[derive(Default, Clone)]
25#[non_exhaustive]
26pub struct Response {
27 pub stdout: Option<Vec<u8>>,
28 pub stderr: Option<Vec<u8>>,
29}
30
31impl Debug for Response {
32 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
33 f.debug_struct("Response")
34 .field("stdout", &self.stdout.as_deref().map(str::from_utf8))
35 .field("stderr", &self.stderr.as_deref().map(str::from_utf8))
36 .finish()
37 }
38}
39
40pub enum Content<'a> {
41 Stdout(&'a [u8]),
42 Stderr(&'a [u8]),
43}
44
45#[derive(PartialEq)]
46enum ReadStep {
47 Content,
48 Padding,
49}
50
51pub struct ResponseStream<S: AsyncRead + Unpin> {
59 stream: S,
60 id: u16,
61
62 ended: bool,
63
64 header: Option<Header>,
65
66 content_buf: Vec<u8>,
67 content_read: usize,
68
69 read_step: ReadStep,
70}
71
72impl<S: AsyncRead + Unpin + Send> ResponseStream<S> {
73 #[inline]
74 pub(crate) fn new(stream: S, id: u16) -> Self {
75 Self {
76 stream,
77 id,
78 ended: false,
79 header: None,
80 content_buf: vec![0; 4096],
81 content_read: 0,
82 read_step: ReadStep::Content,
83 }
84 }
85
86 pub async fn next(&mut self) -> Option<ClientResult<Content<'_>>> {
87 if self.ended {
88 return None;
89 }
90
91 loop {
92 if self.header.is_none() {
93 match Header::new_from_stream(&mut self.stream).await {
94 Ok(header) => {
95 self.header = Some(header);
96 }
97 Err(err) => {
98 self.ended = true;
99 return Some(Err(err.into()));
100 }
101 };
102 }
103
104 let header = self.header.as_ref().unwrap();
105
106 match header.r#type.clone() {
107 RequestType::Stdout => match self.read_step {
108 ReadStep::Content => {
109 return self
110 .read_to_content(
111 header.content_length as usize,
112 Content::Stdout,
113 Self::prepare_for_read_padding,
114 )
115 .await;
116 }
117 ReadStep::Padding => {
118 self.read_to_content(
119 header.padding_length as usize,
120 Content::Stdout,
121 Self::prepare_for_read_header,
122 )
123 .await;
124 continue;
125 }
126 },
127 RequestType::Stderr => match self.read_step {
128 ReadStep::Content => {
129 return self
130 .read_to_content(
131 header.content_length as usize,
132 Content::Stderr,
133 Self::prepare_for_read_padding,
134 )
135 .await;
136 }
137 ReadStep::Padding => {
138 self.read_to_content(
139 header.padding_length as usize,
140 Content::Stderr,
141 Self::prepare_for_read_header,
142 )
143 .await;
144 continue;
145 }
146 },
147 RequestType::EndRequest => {
148 let end_request_rec =
149 match EndRequestRec::from_header(header, &mut self.stream).await {
150 Ok(rec) => rec,
151 Err(err) => {
152 self.ended = true;
153 return Some(Err(err.into()));
154 }
155 };
156 debug!(id = self.id, ?end_request_rec, "Receive from stream.");
157
158 self.ended = true;
159
160 return match end_request_rec
161 .end_request
162 .protocol_status
163 .convert_to_client_result(end_request_rec.end_request.app_status)
164 {
165 Ok(_) => None,
166 Err(err) => Some(Err(err)),
167 };
168 }
169 r#type => {
170 self.ended = true;
171 return Some(Err(ClientError::UnknownRequestType {
172 request_type: r#type,
173 }));
174 }
175 }
176 }
177 }
178
179 async fn read_to_content<'a, T: 'a>(
180 &'a mut self, length: usize, content_fn: impl FnOnce(&'a [u8]) -> T,
181 prepare_for_next_fn: impl FnOnce(&mut Self),
182 ) -> Option<ClientResult<T>> {
183 let content_len = self.content_buf.len();
184 let read = match self
185 .stream
186 .read(&mut self.content_buf[..min(content_len, length - self.content_read)])
187 .await
188 {
189 Ok(read) => read,
190 Err(err) => {
191 self.ended = true;
192 return Some(Err(err.into()));
193 }
194 };
195
196 self.content_read += read;
197 if self.content_read >= length {
198 self.content_read = 0;
199 prepare_for_next_fn(self);
200 }
201
202 Some(Ok(content_fn(&self.content_buf[..read])))
203 }
204
205 fn prepare_for_read_padding(&mut self) {
206 self.read_step = ReadStep::Padding;
207 }
208
209 fn prepare_for_read_header(&mut self) {
210 self.header = None;
211 self.read_step = ReadStep::Content;
212 }
213}