1use std::{
21 fmt::{self, Debug},
22 pin::Pin,
23 str,
24 task::Poll,
25};
26
27use bytes::{Bytes, BytesMut};
28use futures_util::stream::Stream;
29use tokio::io::AsyncRead;
30use tokio_util::io::ReaderStream;
31use tracing::debug;
32
33use crate::{
34 meta::{EndRequestRec, Header, RequestType, HEADER_LEN},
35 ClientError, ClientResult,
36};
37
38#[derive(Default, Clone)]
43#[non_exhaustive]
44pub struct Response {
45 pub stdout: Option<Bytes>,
47 pub stderr: Option<Bytes>,
49}
50
51impl Debug for Response {
52 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
53 f.debug_struct("Response")
54 .field("stdout", &self.stdout.as_deref().map(str::from_utf8))
55 .field("stderr", &self.stderr.as_deref().map(str::from_utf8))
56 .finish()
57 }
58}
59
60pub enum Content {
65 Stdout(Bytes),
67 Stderr(Bytes),
69}
70
71pub struct ResponseStream<S: AsyncRead + Unpin> {
79 stream: ReaderStream<S>,
80 id: u16,
81 eof: bool,
82 header: Option<Header>,
83 buf: BytesMut,
84}
85
86impl<S: AsyncRead + Unpin> ResponseStream<S> {
87 #[inline]
94 pub(crate) fn new(stream: S, id: u16) -> Self {
95 Self {
96 stream: ReaderStream::new(stream),
97 id,
98 eof: false,
99 header: None,
100 buf: BytesMut::new(),
101 }
102 }
103
104 #[inline]
108 fn read_header(&mut self) -> Option<Header> {
109 if self.buf.len() < HEADER_LEN {
110 return None;
111 }
112 let buf = self.buf.split_to(HEADER_LEN);
113 Some(Header::from(buf))
114 }
115
116 #[inline]
120 fn read_content(&mut self) -> Option<BytesMut> {
121 let header = self.header.as_ref().unwrap();
122 let block_length = header.content_length as usize + header.padding_length as usize;
123 if self.buf.len() < block_length {
124 return None;
125 }
126 let content = self.buf.split_to(header.content_length as usize);
127 let _ = self.buf.split_to(header.padding_length as usize);
128 self.header = None;
129 Some(content)
130 }
131
132 fn process_message(&mut self) -> Result<Option<Content>, ClientError> {
137 if self.buf.is_empty() {
138 return Ok(None);
139 }
140 if self.header.is_none() {
141 match self.read_header() {
142 Some(header) => self.header = Some(header),
143 None => return Ok(None),
144 }
145 }
146 let header = self.header.as_ref().unwrap();
147 match header.r#type.clone() {
148 RequestType::Stdout => {
149 if let Some(data) = self.read_content() {
150 return Ok(Some(Content::Stdout(data.freeze())));
151 }
152 }
153 RequestType::Stderr => {
154 if let Some(data) = self.read_content() {
155 return Ok(Some(Content::Stderr(data.freeze())));
156 }
157 }
158 RequestType::EndRequest => {
159 let header = header.clone();
160 let Some(data) = self.read_content() else {
161 return Ok(None);
162 };
163
164 let end = EndRequestRec::new_from_buf(header, data);
165 debug!(id = self.id, ?end, "Receive from stream.");
166
167 self.eof = true;
168 end.end_request
169 .protocol_status
170 .convert_to_client_result(end.end_request.app_status)?;
171 return Ok(None);
172 }
173 r#type => {
174 self.eof = true;
175 return Err(ClientError::UnknownRequestType {
176 request_type: r#type,
177 });
178 }
179 }
180 Ok(None)
181 }
182}
183
184impl<S> Stream for ResponseStream<S>
185where
186 S: AsyncRead + Unpin,
187{
188 type Item = ClientResult<Content>;
189
190 fn poll_next(
191 mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>,
192 ) -> std::task::Poll<Option<Self::Item>> {
193 let mut pending = false;
194 loop {
195 match Pin::new(&mut self.stream).poll_next(cx) {
196 Poll::Ready(Some(Ok(data))) => {
197 self.buf.extend_from_slice(&data);
198
199 match self.process_message() {
200 Ok(Some(data)) => return Poll::Ready(Some(Ok(data))),
201 Ok(None) if self.eof => return Poll::Ready(None),
202 Ok(None) => continue,
203 Err(err) => return Poll::Ready(Some(Err(err))),
204 }
205 }
206 Poll::Ready(Some(Err(err))) => return Poll::Ready(Some(Err(err.into()))),
207 Poll::Ready(None) => break,
208 Poll::Pending => {
209 pending = true;
210 break;
211 }
212 }
213 }
214 match self.process_message() {
215 Ok(Some(data)) => Poll::Ready(Some(Ok(data))),
216 Ok(None) if !self.eof && pending => Poll::Pending,
217 Ok(None) => Poll::Ready(None),
218 Err(err) => Poll::Ready(Some(Err(err))),
219 }
220 }
221}