1use crate::{BodyDecoder, ContentEncoding, JsonPart, PartialJson};
2use bytes::Buf;
3use futures_core::{FusedStream, Future, Stream};
4use http::{Response, StatusCode};
5use http_body::Body;
6use serde::de::DeserializeOwned;
7use std::convert::Infallible;
8use std::fmt;
9use std::future::Ready;
10use std::io::Write;
11use std::marker::Unpin;
12use std::pin::Pin;
13use std::task::{Context, Poll};
14
15#[must_use = "streams do nothing unless polled"]
17pub struct JsonStream<C, B, T> {
18 state: State<C, B, T>,
19 part: JsonPart,
20}
21
22impl<C, B, T, D, E> JsonStream<C, B, T>
23where
24 C: Future<Output = Result<D, E>> + Unpin,
25 D: Into<Response<B>>,
26 B: Body + Unpin,
27 T: DeserializeOwned,
28 E: std::error::Error + 'static,
29 B::Error: std::error::Error + 'static,
30{
31 pub fn request(call: C, part: JsonPart) -> Self {
37 JsonStream {
38 state: State::Connecting(call),
39 part,
40 }
41 }
42}
43
44impl<B, T> JsonStream<Ready<Result<Response<B>, Infallible>>, B, T>
45where
46 B: Body + Unpin,
47 T: DeserializeOwned,
48 B::Error: std::error::Error + 'static,
49{
50 pub fn process(resp: impl Into<Response<B>>, part: JsonPart) -> Self {
52 let resp = resp.into();
53 let writer = PartialJson::new(part);
54 let encoding = ContentEncoding::from(resp.headers());
55 let json = BodyDecoder::new(writer, encoding);
56 JsonStream {
57 state: State::Streaming { resp, json },
58 part,
59 }
60 }
61}
62
63enum State<C, B, T> {
64 Connecting(C),
65 Reporting {
66 resp: Response<B>,
67 text: BodyDecoder<Vec<u8>>,
68 },
69 Streaming {
70 resp: Response<B>,
71 json: BodyDecoder<PartialJson<T>>,
72 },
73 Finished,
74}
75
76unsafe impl<C, B, T> Send for State<C, B, T> {}
77unsafe impl<C, B, T> Sync for State<C, B, T> {}
78impl<C, B, T> Unpin for State<C, B, T> {}
79
80impl<C, B, T> fmt::Debug for JsonStream<C, B, T> {
81 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
82 match self.state {
83 State::Connecting(_) => f.pad("JsonStream(Connecting)"),
84 State::Reporting { .. } => f.pad("JsonStream(Reporting)"),
85 State::Streaming { .. } => f.pad("JsonStream(Streaming)"),
86 State::Finished => f.pad("JsonStream(Finished)"),
87 }
88 }
89}
90
91impl<C, B, T, D, E> FusedStream for JsonStream<C, B, T>
92where
93 C: Future<Output = Result<D, E>> + Unpin,
94 D: Into<Response<B>>,
95 B: Body + Unpin,
96 T: DeserializeOwned,
97 E: std::error::Error + 'static,
98 B::Error: std::error::Error + 'static,
99{
100 fn is_terminated(&self) -> bool {
101 matches!(self.state, State::Finished)
102 }
103}
104
105impl<C, B, T, D, E> Stream for JsonStream<C, B, T>
106where
107 C: Future<Output = Result<D, E>> + Unpin,
108 D: Into<Response<B>>,
109 B: Body + Unpin,
110 T: DeserializeOwned,
111 E: std::error::Error + 'static,
112 B::Error: std::error::Error + 'static,
113{
114 type Item = crate::Result<T>;
115 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
116 loop {
117 match self.as_mut().pull(cx) {
118 Pull::Pending => return Poll::Pending,
119 Pull::Ready(t) => return Poll::Ready(Some(t)),
120 Pull::Done => return Poll::Ready(None),
121 Pull::Repeat => {}
122 }
123 }
124 }
125}
126
127enum Pull<T> {
128 Pending,
129 Ready(T),
130 Repeat,
131 Done,
132}
133
134impl<C, B, T, D, E> JsonStream<C, B, T>
135where
136 C: Future<Output = Result<D, E>> + Unpin,
137 D: Into<Response<B>>,
138 B: Body + Unpin,
139 T: DeserializeOwned,
140 E: std::error::Error + 'static,
141 B::Error: std::error::Error + 'static,
142{
143 fn pull(&mut self, cx: &mut Context<'_>) -> Pull<crate::Result<T>> {
144 match &mut self.state {
145 State::Connecting(conn) => match Pin::new(conn).poll(cx) {
146 Poll::Pending => Pull::Pending,
147 Poll::Ready(Err(err)) => {
148 self.state = State::Finished;
149 Pull::Ready(Err(crate::Error::Conn(Box::new(err))))
150 }
151 Poll::Ready(Ok(resp)) => {
152 let resp = resp.into();
153 match resp.status() {
154 StatusCode::NO_CONTENT => {
155 self.state = State::Finished;
156 Pull::Done
157 }
158 status if status.is_success() => {
159 let writer = PartialJson::new(self.part);
160 let encoding = ContentEncoding::from(resp.headers());
161 let json = BodyDecoder::new(writer, encoding);
162 self.state = State::Streaming { resp, json };
163 Pull::Repeat
164 }
165 _ => {
166 let writer = Vec::new();
167 let encoding = ContentEncoding::from(resp.headers());
168 let text = BodyDecoder::new(writer, encoding);
169 self.state = State::Reporting { resp, text };
170 Pull::Repeat
171 }
172 }
173 }
174 },
175 State::Reporting { resp, text } => {
176 let body = resp.body_mut();
177 match Pin::new(body).poll_frame(cx) {
178 Poll::Pending => Pull::Pending,
179 Poll::Ready(Some(Err(err))) => {
180 self.state = State::Finished;
181 Pull::Ready(Err(crate::Error::Body(Box::new(err))))
182 }
183 Poll::Ready(Some(Ok(buf))) => {
184 if let Ok(mut data) = buf.into_data() {
185 while data.remaining() > 0 {
186 let chunk = data.chunk();
187 if let Err(err) = text.write_all(chunk) {
188 return Pull::Ready(Err(err.into()));
189 }
190 data.advance(chunk.len());
191 }
192 }
193 Pull::Repeat
194 }
195 Poll::Ready(None) => {
196 if let Err(err) = text.flush().and_then(|_| text.try_finish()) {
197 return Pull::Ready(Err(err.into()));
198 }
199 let status = resp.status();
200 let mut body = Vec::new();
201 std::mem::swap(&mut body, text.get_mut());
202 let body = String::from_utf8(body);
203 self.state = State::Finished;
204 Pull::Ready(Err(crate::Error::Http(status, body)))
205 }
206 }
207 }
208 State::Streaming { resp, json } => {
209 if let Some(item) = json.get_mut().next() {
210 return Pull::Ready(item);
211 }
212 let body = resp.body_mut();
213 match Pin::new(body).poll_frame(cx) {
214 Poll::Pending => Pull::Pending,
215 Poll::Ready(Some(Err(err))) => {
216 self.state = State::Finished;
217 Pull::Ready(Err(crate::Error::Body(Box::new(err))))
218 }
219 Poll::Ready(Some(Ok(buf))) => {
220 if let Ok(mut data) = buf.into_data() {
221 while data.remaining() > 0 {
222 let chunk = data.chunk();
223 if let Err(err) = json.write_all(chunk) {
224 return Pull::Ready(Err(err.into()));
225 }
226 data.advance(chunk.len());
227 }
228 }
229 if let Err(err) = json.flush() {
230 return Pull::Ready(Err(err.into()));
231 }
232 match json.get_mut().next() {
233 Some(item) => Pull::Ready(item),
234 None => Pull::Repeat,
235 }
236 }
237 Poll::Ready(None) => {
238 if let Err(err) = json.try_finish() {
239 return Pull::Ready(Err(err.into()));
240 };
241 if let Some(item) = json.get_mut().next() {
242 return Pull::Ready(item);
243 }
244 let last = json.get_mut().done();
245 self.state = State::Finished;
246 match last {
247 Some(last) => Pull::Ready(last),
248 None => Pull::Done,
249 }
250 }
251 }
252 }
253 State::Finished => Pull::Done,
254 }
255 }
256}