http_json_stream/
json_stream.rs

1use crate::{BodyDecoder, ContentEncoding, JsonPart, PartialJson};
2use bytes::Buf;
3use futures_core::{FusedStream, Future, Stream};
4use http::{Response, StatusCode};
5use http_body::Body;
6use serde::de::DeserializeOwned;
7use std::convert::Infallible;
8use std::fmt;
9use std::future::Ready;
10use std::io::Write;
11use std::marker::Unpin;
12use std::pin::Pin;
13use std::task::{Context, Poll};
14
15/// An asynchronous JSON streamer for HTTP network requests.
16#[must_use = "streams do nothing unless polled"]
17pub struct JsonStream<C, B, T> {
18    state: State<C, B, T>,
19    part: JsonPart,
20}
21
22impl<C, B, T, D, E> JsonStream<C, B, T>
23where
24    C: Future<Output = Result<D, E>> + Unpin,
25    D: Into<Response<B>>,
26    B: Body + Unpin,
27    T: DeserializeOwned,
28    E: std::error::Error + 'static,
29    B::Error: std::error::Error + 'static,
30{
31    /// Creates a new JSON streamer from the given HTTP response future.
32    ///
33    /// Completes the request and streams the body for status 2XX responses.
34    ///
35    /// Use [`JsonStream::process`] instead to customize response handling.
36    pub fn request(call: C, part: JsonPart) -> Self {
37        JsonStream {
38            state: State::Connecting(call),
39            part,
40        }
41    }
42}
43
44impl<B, T> JsonStream<Ready<Result<Response<B>, Infallible>>, B, T>
45where
46    B: Body + Unpin,
47    T: DeserializeOwned,
48    B::Error: std::error::Error + 'static,
49{
50    /// Creates a new JSON streamer from the given HTTP response.
51    pub fn process(resp: impl Into<Response<B>>, part: JsonPart) -> Self {
52        let resp = resp.into();
53        let writer = PartialJson::new(part);
54        let encoding = ContentEncoding::from(resp.headers());
55        let json = BodyDecoder::new(writer, encoding);
56        JsonStream {
57            state: State::Streaming { resp, json },
58            part,
59        }
60    }
61}
62
63enum State<C, B, T> {
64    Connecting(C),
65    Reporting {
66        resp: Response<B>,
67        text: BodyDecoder<Vec<u8>>,
68    },
69    Streaming {
70        resp: Response<B>,
71        json: BodyDecoder<PartialJson<T>>,
72    },
73    Finished,
74}
75
76unsafe impl<C, B, T> Send for State<C, B, T> {}
77unsafe impl<C, B, T> Sync for State<C, B, T> {}
78impl<C, B, T> Unpin for State<C, B, T> {}
79
80impl<C, B, T> fmt::Debug for JsonStream<C, B, T> {
81    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
82        match self.state {
83            State::Connecting(_) => f.pad("JsonStream(Connecting)"),
84            State::Reporting { .. } => f.pad("JsonStream(Reporting)"),
85            State::Streaming { .. } => f.pad("JsonStream(Streaming)"),
86            State::Finished => f.pad("JsonStream(Finished)"),
87        }
88    }
89}
90
91impl<C, B, T, D, E> FusedStream for JsonStream<C, B, T>
92where
93    C: Future<Output = Result<D, E>> + Unpin,
94    D: Into<Response<B>>,
95    B: Body + Unpin,
96    T: DeserializeOwned,
97    E: std::error::Error + 'static,
98    B::Error: std::error::Error + 'static,
99{
100    fn is_terminated(&self) -> bool {
101        matches!(self.state, State::Finished)
102    }
103}
104
105impl<C, B, T, D, E> Stream for JsonStream<C, B, T>
106where
107    C: Future<Output = Result<D, E>> + Unpin,
108    D: Into<Response<B>>,
109    B: Body + Unpin,
110    T: DeserializeOwned,
111    E: std::error::Error + 'static,
112    B::Error: std::error::Error + 'static,
113{
114    type Item = crate::Result<T>;
115    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
116        loop {
117            match self.as_mut().pull(cx) {
118                Pull::Pending => return Poll::Pending,
119                Pull::Ready(t) => return Poll::Ready(Some(t)),
120                Pull::Done => return Poll::Ready(None),
121                Pull::Repeat => {}
122            }
123        }
124    }
125}
126
127enum Pull<T> {
128    Pending,
129    Ready(T),
130    Repeat,
131    Done,
132}
133
134impl<C, B, T, D, E> JsonStream<C, B, T>
135where
136    C: Future<Output = Result<D, E>> + Unpin,
137    D: Into<Response<B>>,
138    B: Body + Unpin,
139    T: DeserializeOwned,
140    E: std::error::Error + 'static,
141    B::Error: std::error::Error + 'static,
142{
143    fn pull(&mut self, cx: &mut Context<'_>) -> Pull<crate::Result<T>> {
144        match &mut self.state {
145            State::Connecting(conn) => match Pin::new(conn).poll(cx) {
146                Poll::Pending => Pull::Pending,
147                Poll::Ready(Err(err)) => {
148                    self.state = State::Finished;
149                    Pull::Ready(Err(crate::Error::Conn(Box::new(err))))
150                }
151                Poll::Ready(Ok(resp)) => {
152                    let resp = resp.into();
153                    match resp.status() {
154                        StatusCode::NO_CONTENT => {
155                            self.state = State::Finished;
156                            Pull::Done
157                        }
158                        status if status.is_success() => {
159                            let writer = PartialJson::new(self.part);
160                            let encoding = ContentEncoding::from(resp.headers());
161                            let json = BodyDecoder::new(writer, encoding);
162                            self.state = State::Streaming { resp, json };
163                            Pull::Repeat
164                        }
165                        _ => {
166                            let writer = Vec::new();
167                            let encoding = ContentEncoding::from(resp.headers());
168                            let text = BodyDecoder::new(writer, encoding);
169                            self.state = State::Reporting { resp, text };
170                            Pull::Repeat
171                        }
172                    }
173                }
174            },
175            State::Reporting { resp, text } => {
176                let body = resp.body_mut();
177                match Pin::new(body).poll_frame(cx) {
178                    Poll::Pending => Pull::Pending,
179                    Poll::Ready(Some(Err(err))) => {
180                        self.state = State::Finished;
181                        Pull::Ready(Err(crate::Error::Body(Box::new(err))))
182                    }
183                    Poll::Ready(Some(Ok(buf))) => {
184                        if let Ok(mut data) = buf.into_data() {
185                            while data.remaining() > 0 {
186                                let chunk = data.chunk();
187                                if let Err(err) = text.write_all(chunk) {
188                                    return Pull::Ready(Err(err.into()));
189                                }
190                                data.advance(chunk.len());
191                            }
192                        }
193                        Pull::Repeat
194                    }
195                    Poll::Ready(None) => {
196                        if let Err(err) = text.flush().and_then(|_| text.try_finish()) {
197                            return Pull::Ready(Err(err.into()));
198                        }
199                        let status = resp.status();
200                        let mut body = Vec::new();
201                        std::mem::swap(&mut body, text.get_mut());
202                        let body = String::from_utf8(body);
203                        self.state = State::Finished;
204                        Pull::Ready(Err(crate::Error::Http(status, body)))
205                    }
206                }
207            }
208            State::Streaming { resp, json } => {
209                if let Some(item) = json.get_mut().next() {
210                    return Pull::Ready(item);
211                }
212                let body = resp.body_mut();
213                match Pin::new(body).poll_frame(cx) {
214                    Poll::Pending => Pull::Pending,
215                    Poll::Ready(Some(Err(err))) => {
216                        self.state = State::Finished;
217                        Pull::Ready(Err(crate::Error::Body(Box::new(err))))
218                    }
219                    Poll::Ready(Some(Ok(buf))) => {
220                        if let Ok(mut data) = buf.into_data() {
221                            while data.remaining() > 0 {
222                                let chunk = data.chunk();
223                                if let Err(err) = json.write_all(chunk) {
224                                    return Pull::Ready(Err(err.into()));
225                                }
226                                data.advance(chunk.len());
227                            }
228                        }
229                        if let Err(err) = json.flush() {
230                            return Pull::Ready(Err(err.into()));
231                        }
232                        match json.get_mut().next() {
233                            Some(item) => Pull::Ready(item),
234                            None => Pull::Repeat,
235                        }
236                    }
237                    Poll::Ready(None) => {
238                        if let Err(err) = json.try_finish() {
239                            return Pull::Ready(Err(err.into()));
240                        };
241                        if let Some(item) = json.get_mut().next() {
242                            return Pull::Ready(item);
243                        }
244                        let last = json.get_mut().done();
245                        self.state = State::Finished;
246                        match last {
247                            Some(last) => Pull::Ready(last),
248                            None => Pull::Done,
249                        }
250                    }
251                }
252            }
253            State::Finished => Pull::Done,
254        }
255    }
256}