conjure_runtime/blocking/
body.rs1use bytes::{Bytes, BytesMut};
15use conjure_error::Error;
16use conjure_http::client::{AsyncWriteBody, WriteBody};
17use futures::channel::{mpsc, oneshot};
18use futures::{executor, SinkExt, Stream, StreamExt};
19use std::io::{self, BufRead, Read, Write};
20use std::pin::Pin;
21use tokio::io::{AsyncBufReadExt, AsyncReadExt};
22use tokio::runtime::Handle;
23
24pub(crate) fn shim(
25 body_writer: Box<dyn WriteBody<BodyWriter> + '_>,
26) -> (BodyWriterShim, BodyStreamer<'_>) {
27 let (sender, receiver) = mpsc::channel(1);
28 (
29 BodyWriterShim { sender },
30 BodyStreamer {
31 body_writer,
32 receiver,
33 },
34 )
35}
36
37enum ShimRequest {
38 Write(mpsc::Sender<BodyPart>),
39 Reset(oneshot::Sender<bool>),
40}
41
42pub(crate) struct BodyWriterShim {
43 sender: mpsc::Sender<ShimRequest>,
44}
45
46impl AsyncWriteBody<crate::BodyWriter> for BodyWriterShim {
47 async fn write_body(
48 mut self: Pin<&mut Self>,
49 mut w: Pin<&mut crate::BodyWriter>,
50 ) -> Result<(), Error> {
51 let (sender, mut receiver) = mpsc::channel(1);
52 self.sender
53 .send(ShimRequest::Write(sender))
54 .await
55 .map_err(Error::internal_safe)?;
56
57 loop {
58 match receiver.next().await {
59 Some(BodyPart::Data(bytes)) => w
60 .as_mut()
61 .write_bytes(bytes)
62 .await
63 .map_err(Error::internal_safe)?,
64 Some(BodyPart::Error(error)) => return Err(error),
65 Some(BodyPart::Done) => return Ok(()),
66 None => return Err(Error::internal_safe("body write aborted")),
67 }
68 }
69 }
70
71 async fn reset(mut self: Pin<&mut Self>) -> bool {
72 let (sender, receiver) = oneshot::channel();
73 if self.sender.send(ShimRequest::Reset(sender)).await.is_err() {
74 return false;
75 }
76
77 receiver.await.unwrap_or(false)
78 }
79}
80
81pub(crate) struct BodyStreamer<'a> {
82 body_writer: Box<dyn WriteBody<BodyWriter> + 'a>,
83 receiver: mpsc::Receiver<ShimRequest>,
84}
85
86impl BodyStreamer<'_> {
87 pub fn stream(mut self) {
88 while let Some(request) = executor::block_on(self.receiver.next()) {
89 match request {
90 ShimRequest::Write(sender) => {
91 let mut writer = BodyWriter::new(sender);
92 let _ = match self.body_writer.write_body(&mut writer) {
93 Ok(()) => writer.finish(),
94 Err(e) => writer.send(BodyPart::Error(e)),
95 };
96 }
97 ShimRequest::Reset(sender) => {
98 let reset = self.body_writer.reset();
99 let _ = sender.send(reset);
100 }
101 }
102 }
103 }
104}
105
106enum BodyPart {
107 Data(Bytes),
108 Error(Error),
109 Done,
110}
111
112pub struct BodyWriter {
114 sender: mpsc::Sender<BodyPart>,
115 buf: BytesMut,
116}
117
118impl BodyWriter {
119 fn new(sender: mpsc::Sender<BodyPart>) -> BodyWriter {
120 BodyWriter {
121 sender,
122 buf: BytesMut::new(),
123 }
124 }
125
126 fn finish(mut self) -> io::Result<()> {
127 self.flush()?;
128 self.send(BodyPart::Done)
129 }
130
131 fn send(&mut self, message: BodyPart) -> io::Result<()> {
132 executor::block_on(self.sender.send(message)).map_err(io::Error::other)
133 }
134
135 pub fn write_bytes(&mut self, buf: Bytes) -> io::Result<()> {
140 self.flush()?;
141 self.send(BodyPart::Data(buf))
142 }
143}
144
145impl Write for BodyWriter {
146 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
147 self.buf.extend_from_slice(buf);
148 if buf.len() > 4906 {
149 self.flush()?;
150 }
151
152 Ok(buf.len())
153 }
154
155 fn flush(&mut self) -> io::Result<()> {
156 if self.buf.is_empty() {
157 return Ok(());
158 }
159
160 let bytes = self.buf.split().freeze();
161 self.send(BodyPart::Data(bytes))
162 }
163}
164
165pub struct ResponseBody {
167 inner: Pin<Box<crate::ResponseBody>>,
168 handle: Handle,
169}
170
171impl ResponseBody {
172 pub(crate) fn new(inner: crate::ResponseBody, handle: Handle) -> Self {
173 ResponseBody {
174 inner: Box::pin(inner),
175 handle,
176 }
177 }
178}
179
180impl Iterator for ResponseBody {
181 type Item = Result<Bytes, Error>;
182
183 fn next(&mut self) -> Option<Self::Item> {
184 self.handle.block_on(self.inner.as_mut().next())
185 }
186
187 fn size_hint(&self) -> (usize, Option<usize>) {
188 self.inner.size_hint()
189 }
190}
191
192impl Read for ResponseBody {
193 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
194 self.handle.block_on(self.inner.as_mut().read(buf))
195 }
196}
197
198impl BufRead for ResponseBody {
199 fn fill_buf(&mut self) -> io::Result<&[u8]> {
200 self.handle.block_on(self.inner.as_mut().fill_buf())?;
202 Ok(self.inner.buffer())
203 }
204
205 fn consume(&mut self, amt: usize) {
206 self.inner.as_mut().consume(amt)
207 }
208}