conjure_runtime/blocking/
body.rs1use crate::raw::DefaultRawBody;
15use bytes::{Bytes, BytesMut};
16use conjure_error::Error;
17use conjure_http::client::{AsyncWriteBody, WriteBody};
18use futures::channel::{mpsc, oneshot};
19use futures::{executor, SinkExt, Stream, StreamExt};
20use http_body::Body;
21use std::error;
22use std::io::{self, BufRead, Read, Write};
23use std::pin::Pin;
24use tokio::io::{AsyncBufReadExt, AsyncReadExt};
25use tokio::runtime::Handle;
26
27pub(crate) fn shim(
28 body_writer: Box<dyn WriteBody<BodyWriter> + '_>,
29) -> (BodyWriterShim, BodyStreamer<'_>) {
30 let (sender, receiver) = mpsc::channel(1);
31 (
32 BodyWriterShim { sender },
33 BodyStreamer {
34 body_writer,
35 receiver,
36 },
37 )
38}
39
40enum ShimRequest {
41 Write(mpsc::Sender<BodyPart>),
42 Reset(oneshot::Sender<bool>),
43}
44
45pub(crate) struct BodyWriterShim {
46 sender: mpsc::Sender<ShimRequest>,
47}
48
49impl AsyncWriteBody<crate::BodyWriter> for BodyWriterShim {
50 async fn write_body(
51 mut self: Pin<&mut Self>,
52 mut w: Pin<&mut crate::BodyWriter>,
53 ) -> Result<(), Error> {
54 let (sender, mut receiver) = mpsc::channel(1);
55 self.sender
56 .send(ShimRequest::Write(sender))
57 .await
58 .map_err(Error::internal_safe)?;
59
60 loop {
61 match receiver.next().await {
62 Some(BodyPart::Data(bytes)) => w
63 .as_mut()
64 .write_bytes(bytes)
65 .await
66 .map_err(Error::internal_safe)?,
67 Some(BodyPart::Error(error)) => return Err(error),
68 Some(BodyPart::Done) => return Ok(()),
69 None => return Err(Error::internal_safe("body write aborted")),
70 }
71 }
72 }
73
74 async fn reset(mut self: Pin<&mut Self>) -> bool {
75 let (sender, receiver) = oneshot::channel();
76 if self.sender.send(ShimRequest::Reset(sender)).await.is_err() {
77 return false;
78 }
79
80 receiver.await.unwrap_or(false)
81 }
82}
83
84pub(crate) struct BodyStreamer<'a> {
85 body_writer: Box<dyn WriteBody<BodyWriter> + 'a>,
86 receiver: mpsc::Receiver<ShimRequest>,
87}
88
89impl BodyStreamer<'_> {
90 pub fn stream(mut self) {
91 while let Some(request) = executor::block_on(self.receiver.next()) {
92 match request {
93 ShimRequest::Write(sender) => {
94 let mut writer = BodyWriter::new(sender);
95 let _ = match self.body_writer.write_body(&mut writer) {
96 Ok(()) => writer.finish(),
97 Err(e) => writer.send(BodyPart::Error(e)),
98 };
99 }
100 ShimRequest::Reset(sender) => {
101 let reset = self.body_writer.reset();
102 let _ = sender.send(reset);
103 }
104 }
105 }
106 }
107}
108
109enum BodyPart {
110 Data(Bytes),
111 Error(Error),
112 Done,
113}
114
115pub struct BodyWriter {
117 sender: mpsc::Sender<BodyPart>,
118 buf: BytesMut,
119}
120
121impl BodyWriter {
122 fn new(sender: mpsc::Sender<BodyPart>) -> BodyWriter {
123 BodyWriter {
124 sender,
125 buf: BytesMut::new(),
126 }
127 }
128
129 fn finish(mut self) -> io::Result<()> {
130 self.flush()?;
131 self.send(BodyPart::Done)
132 }
133
134 fn send(&mut self, message: BodyPart) -> io::Result<()> {
135 executor::block_on(self.sender.send(message))
136 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))
137 }
138
139 pub fn write_bytes(&mut self, buf: Bytes) -> io::Result<()> {
144 self.flush()?;
145 self.send(BodyPart::Data(buf))
146 }
147}
148
149impl Write for BodyWriter {
150 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
151 self.buf.extend_from_slice(buf);
152 if buf.len() > 4906 {
153 self.flush()?;
154 }
155
156 Ok(buf.len())
157 }
158
159 fn flush(&mut self) -> io::Result<()> {
160 if self.buf.is_empty() {
161 return Ok(());
162 }
163
164 let bytes = self.buf.split().freeze();
165 self.send(BodyPart::Data(bytes))
166 }
167}
168
169pub struct ResponseBody<B = DefaultRawBody> {
171 inner: Pin<Box<crate::ResponseBody<B>>>,
172 handle: Handle,
173}
174
175impl<B> ResponseBody<B> {
176 pub(crate) fn new(inner: crate::ResponseBody<B>, handle: Handle) -> Self {
177 ResponseBody {
178 inner: Box::pin(inner),
179 handle,
180 }
181 }
182}
183
184impl<B> Iterator for ResponseBody<B>
185where
186 B: Body<Data = Bytes>,
187 B::Error: Into<Box<dyn error::Error + Sync + Send>>,
188{
189 type Item = Result<Bytes, Error>;
190
191 fn next(&mut self) -> Option<Self::Item> {
192 self.handle.block_on(self.inner.as_mut().next())
193 }
194
195 fn size_hint(&self) -> (usize, Option<usize>) {
196 self.inner.size_hint()
197 }
198}
199
200impl<B> Read for ResponseBody<B>
201where
202 B: Body<Data = Bytes>,
203 B::Error: Into<Box<dyn error::Error + Sync + Send>>,
204{
205 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
206 self.handle.block_on(self.inner.as_mut().read(buf))
207 }
208}
209
210impl<B> BufRead for ResponseBody<B>
211where
212 B: Body<Data = Bytes>,
213 B::Error: Into<Box<dyn error::Error + Sync + Send>>,
214{
215 fn fill_buf(&mut self) -> io::Result<&[u8]> {
216 self.handle.block_on(self.inner.as_mut().fill_buf())?;
218 Ok(self.inner.buffer())
219 }
220
221 fn consume(&mut self, amt: usize) {
222 self.inner.as_mut().consume(amt)
223 }
224}