conjure_runtime/blocking/
body.rs

1// Copyright 2020 Palantir Technologies, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14use crate::raw::DefaultRawBody;
15use bytes::{Bytes, BytesMut};
16use conjure_error::Error;
17use conjure_http::client::{AsyncWriteBody, WriteBody};
18use futures::channel::{mpsc, oneshot};
19use futures::{executor, SinkExt, Stream, StreamExt};
20use http_body::Body;
21use std::error;
22use std::io::{self, BufRead, Read, Write};
23use std::pin::Pin;
24use tokio::io::{AsyncBufReadExt, AsyncReadExt};
25use tokio::runtime::Handle;
26
27pub(crate) fn shim(
28    body_writer: Box<dyn WriteBody<BodyWriter> + '_>,
29) -> (BodyWriterShim, BodyStreamer<'_>) {
30    let (sender, receiver) = mpsc::channel(1);
31    (
32        BodyWriterShim { sender },
33        BodyStreamer {
34            body_writer,
35            receiver,
36        },
37    )
38}
39
40enum ShimRequest {
41    Write(mpsc::Sender<BodyPart>),
42    Reset(oneshot::Sender<bool>),
43}
44
45pub(crate) struct BodyWriterShim {
46    sender: mpsc::Sender<ShimRequest>,
47}
48
49impl AsyncWriteBody<crate::BodyWriter> for BodyWriterShim {
50    async fn write_body(
51        mut self: Pin<&mut Self>,
52        mut w: Pin<&mut crate::BodyWriter>,
53    ) -> Result<(), Error> {
54        let (sender, mut receiver) = mpsc::channel(1);
55        self.sender
56            .send(ShimRequest::Write(sender))
57            .await
58            .map_err(Error::internal_safe)?;
59
60        loop {
61            match receiver.next().await {
62                Some(BodyPart::Data(bytes)) => w
63                    .as_mut()
64                    .write_bytes(bytes)
65                    .await
66                    .map_err(Error::internal_safe)?,
67                Some(BodyPart::Error(error)) => return Err(error),
68                Some(BodyPart::Done) => return Ok(()),
69                None => return Err(Error::internal_safe("body write aborted")),
70            }
71        }
72    }
73
74    async fn reset(mut self: Pin<&mut Self>) -> bool {
75        let (sender, receiver) = oneshot::channel();
76        if self.sender.send(ShimRequest::Reset(sender)).await.is_err() {
77            return false;
78        }
79
80        receiver.await.unwrap_or(false)
81    }
82}
83
84pub(crate) struct BodyStreamer<'a> {
85    body_writer: Box<dyn WriteBody<BodyWriter> + 'a>,
86    receiver: mpsc::Receiver<ShimRequest>,
87}
88
89impl BodyStreamer<'_> {
90    pub fn stream(mut self) {
91        while let Some(request) = executor::block_on(self.receiver.next()) {
92            match request {
93                ShimRequest::Write(sender) => {
94                    let mut writer = BodyWriter::new(sender);
95                    let _ = match self.body_writer.write_body(&mut writer) {
96                        Ok(()) => writer.finish(),
97                        Err(e) => writer.send(BodyPart::Error(e)),
98                    };
99                }
100                ShimRequest::Reset(sender) => {
101                    let reset = self.body_writer.reset();
102                    let _ = sender.send(reset);
103                }
104            }
105        }
106    }
107}
108
109enum BodyPart {
110    Data(Bytes),
111    Error(Error),
112    Done,
113}
114
115/// The blocking writer passed to [`WriteBody::write_body`].
116pub struct BodyWriter {
117    sender: mpsc::Sender<BodyPart>,
118    buf: BytesMut,
119}
120
121impl BodyWriter {
122    fn new(sender: mpsc::Sender<BodyPart>) -> BodyWriter {
123        BodyWriter {
124            sender,
125            buf: BytesMut::new(),
126        }
127    }
128
129    fn finish(mut self) -> io::Result<()> {
130        self.flush()?;
131        self.send(BodyPart::Done)
132    }
133
134    fn send(&mut self, message: BodyPart) -> io::Result<()> {
135        executor::block_on(self.sender.send(message))
136            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))
137    }
138
139    /// Writes a block of body bytes.
140    ///
141    /// Compared to the [`Write`] implementation, this method avoids some copies if the caller already has the body in
142    /// [`Bytes`] objects.
143    pub fn write_bytes(&mut self, buf: Bytes) -> io::Result<()> {
144        self.flush()?;
145        self.send(BodyPart::Data(buf))
146    }
147}
148
149impl Write for BodyWriter {
150    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
151        self.buf.extend_from_slice(buf);
152        if buf.len() > 4906 {
153            self.flush()?;
154        }
155
156        Ok(buf.len())
157    }
158
159    fn flush(&mut self) -> io::Result<()> {
160        if self.buf.is_empty() {
161            return Ok(());
162        }
163
164        let bytes = self.buf.split().freeze();
165        self.send(BodyPart::Data(bytes))
166    }
167}
168
169/// A blocking streaming response body.
170pub struct ResponseBody<B = DefaultRawBody> {
171    inner: Pin<Box<crate::ResponseBody<B>>>,
172    handle: Handle,
173}
174
175impl<B> ResponseBody<B> {
176    pub(crate) fn new(inner: crate::ResponseBody<B>, handle: Handle) -> Self {
177        ResponseBody {
178            inner: Box::pin(inner),
179            handle,
180        }
181    }
182}
183
184impl<B> Iterator for ResponseBody<B>
185where
186    B: Body<Data = Bytes>,
187    B::Error: Into<Box<dyn error::Error + Sync + Send>>,
188{
189    type Item = Result<Bytes, Error>;
190
191    fn next(&mut self) -> Option<Self::Item> {
192        self.handle.block_on(self.inner.as_mut().next())
193    }
194
195    fn size_hint(&self) -> (usize, Option<usize>) {
196        self.inner.size_hint()
197    }
198}
199
200impl<B> Read for ResponseBody<B>
201where
202    B: Body<Data = Bytes>,
203    B::Error: Into<Box<dyn error::Error + Sync + Send>>,
204{
205    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
206        self.handle.block_on(self.inner.as_mut().read(buf))
207    }
208}
209
210impl<B> BufRead for ResponseBody<B>
211where
212    B: Body<Data = Bytes>,
213    B::Error: Into<Box<dyn error::Error + Sync + Send>>,
214{
215    fn fill_buf(&mut self) -> io::Result<&[u8]> {
216        // lifetime shenanigans mean we can't return the value of fill_buf directly
217        self.handle.block_on(self.inner.as_mut().fill_buf())?;
218        Ok(self.inner.buffer())
219    }
220
221    fn consume(&mut self, amt: usize) {
222        self.inner.as_mut().consume(amt)
223    }
224}