conjure_runtime/blocking/
body.rs

1// Copyright 2020 Palantir Technologies, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14use bytes::{Bytes, BytesMut};
15use conjure_error::Error;
16use conjure_http::client::{AsyncWriteBody, WriteBody};
17use futures::channel::{mpsc, oneshot};
18use futures::{executor, SinkExt, Stream, StreamExt};
19use std::io::{self, BufRead, Read, Write};
20use std::pin::Pin;
21use tokio::io::{AsyncBufReadExt, AsyncReadExt};
22use tokio::runtime::Handle;
23
24pub(crate) fn shim(
25    body_writer: Box<dyn WriteBody<BodyWriter> + '_>,
26) -> (BodyWriterShim, BodyStreamer<'_>) {
27    let (sender, receiver) = mpsc::channel(1);
28    (
29        BodyWriterShim { sender },
30        BodyStreamer {
31            body_writer,
32            receiver,
33        },
34    )
35}
36
37enum ShimRequest {
38    Write(mpsc::Sender<BodyPart>),
39    Reset(oneshot::Sender<bool>),
40}
41
42pub(crate) struct BodyWriterShim {
43    sender: mpsc::Sender<ShimRequest>,
44}
45
46impl AsyncWriteBody<crate::BodyWriter> for BodyWriterShim {
47    async fn write_body(
48        mut self: Pin<&mut Self>,
49        mut w: Pin<&mut crate::BodyWriter>,
50    ) -> Result<(), Error> {
51        let (sender, mut receiver) = mpsc::channel(1);
52        self.sender
53            .send(ShimRequest::Write(sender))
54            .await
55            .map_err(Error::internal_safe)?;
56
57        loop {
58            match receiver.next().await {
59                Some(BodyPart::Data(bytes)) => w
60                    .as_mut()
61                    .write_bytes(bytes)
62                    .await
63                    .map_err(Error::internal_safe)?,
64                Some(BodyPart::Error(error)) => return Err(error),
65                Some(BodyPart::Done) => return Ok(()),
66                None => return Err(Error::internal_safe("body write aborted")),
67            }
68        }
69    }
70
71    async fn reset(mut self: Pin<&mut Self>) -> bool {
72        let (sender, receiver) = oneshot::channel();
73        if self.sender.send(ShimRequest::Reset(sender)).await.is_err() {
74            return false;
75        }
76
77        receiver.await.unwrap_or(false)
78    }
79}
80
81pub(crate) struct BodyStreamer<'a> {
82    body_writer: Box<dyn WriteBody<BodyWriter> + 'a>,
83    receiver: mpsc::Receiver<ShimRequest>,
84}
85
86impl BodyStreamer<'_> {
87    pub fn stream(mut self) {
88        while let Some(request) = executor::block_on(self.receiver.next()) {
89            match request {
90                ShimRequest::Write(sender) => {
91                    let mut writer = BodyWriter::new(sender);
92                    let _ = match self.body_writer.write_body(&mut writer) {
93                        Ok(()) => writer.finish(),
94                        Err(e) => writer.send(BodyPart::Error(e)),
95                    };
96                }
97                ShimRequest::Reset(sender) => {
98                    let reset = self.body_writer.reset();
99                    let _ = sender.send(reset);
100                }
101            }
102        }
103    }
104}
105
106enum BodyPart {
107    Data(Bytes),
108    Error(Error),
109    Done,
110}
111
112/// The blocking writer passed to [`WriteBody::write_body`].
113pub struct BodyWriter {
114    sender: mpsc::Sender<BodyPart>,
115    buf: BytesMut,
116}
117
118impl BodyWriter {
119    fn new(sender: mpsc::Sender<BodyPart>) -> BodyWriter {
120        BodyWriter {
121            sender,
122            buf: BytesMut::new(),
123        }
124    }
125
126    fn finish(mut self) -> io::Result<()> {
127        self.flush()?;
128        self.send(BodyPart::Done)
129    }
130
131    fn send(&mut self, message: BodyPart) -> io::Result<()> {
132        executor::block_on(self.sender.send(message)).map_err(io::Error::other)
133    }
134
135    /// Writes a block of body bytes.
136    ///
137    /// Compared to the [`Write`] implementation, this method avoids some copies if the caller already has the body in
138    /// [`Bytes`] objects.
139    pub fn write_bytes(&mut self, buf: Bytes) -> io::Result<()> {
140        self.flush()?;
141        self.send(BodyPart::Data(buf))
142    }
143}
144
145impl Write for BodyWriter {
146    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
147        self.buf.extend_from_slice(buf);
148        if buf.len() > 4906 {
149            self.flush()?;
150        }
151
152        Ok(buf.len())
153    }
154
155    fn flush(&mut self) -> io::Result<()> {
156        if self.buf.is_empty() {
157            return Ok(());
158        }
159
160        let bytes = self.buf.split().freeze();
161        self.send(BodyPart::Data(bytes))
162    }
163}
164
165/// A blocking streaming response body.
166pub struct ResponseBody {
167    inner: Pin<Box<crate::ResponseBody>>,
168    handle: Handle,
169}
170
171impl ResponseBody {
172    pub(crate) fn new(inner: crate::ResponseBody, handle: Handle) -> Self {
173        ResponseBody {
174            inner: Box::pin(inner),
175            handle,
176        }
177    }
178}
179
180impl Iterator for ResponseBody {
181    type Item = Result<Bytes, Error>;
182
183    fn next(&mut self) -> Option<Self::Item> {
184        self.handle.block_on(self.inner.as_mut().next())
185    }
186
187    fn size_hint(&self) -> (usize, Option<usize>) {
188        self.inner.size_hint()
189    }
190}
191
192impl Read for ResponseBody {
193    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
194        self.handle.block_on(self.inner.as_mut().read(buf))
195    }
196}
197
198impl BufRead for ResponseBody {
199    fn fill_buf(&mut self) -> io::Result<&[u8]> {
200        // lifetime shenanigans mean we can't return the value of fill_buf directly
201        self.handle.block_on(self.inner.as_mut().fill_buf())?;
202        Ok(self.inner.buffer())
203    }
204
205    fn consume(&mut self, amt: usize) {
206        self.inner.as_mut().consume(amt)
207    }
208}