vortex_ipc/messages/
reader_async.rs1use std::pin::Pin;
2use std::task::{Context, Poll, ready};
3
4use bytes::BytesMut;
5use futures_util::{AsyncRead, Stream};
6use pin_project_lite::pin_project;
7use vortex_array::ArrayRegistry;
8use vortex_error::VortexResult;
9
10use crate::messages::{DecoderMessage, MessageDecoder, PollRead};
11
12pin_project! {
13 pub struct AsyncMessageReader<R> {
15 #[pin]
16 read: R,
17 buffer: BytesMut,
18 decoder: MessageDecoder,
19 bytes_read: usize,
20 }
21}
22
23impl<R> AsyncMessageReader<R> {
24 pub fn new(read: R, registry: ArrayRegistry) -> Self {
25 AsyncMessageReader {
26 read,
27 buffer: BytesMut::new(),
28 decoder: MessageDecoder::new(registry),
29 bytes_read: 0,
30 }
31 }
32}
33
34impl<R: AsyncRead> Stream for AsyncMessageReader<R> {
35 type Item = VortexResult<DecoderMessage>;
36
37 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
38 let mut this = self.project();
39 loop {
40 match this.decoder.read_next(this.buffer)? {
41 PollRead::Some(msg) => return Poll::Ready(Some(Ok(msg))),
42 PollRead::NeedMore(nbytes) => {
43 this.buffer.resize(nbytes, 0x00);
44
45 match ready!(
46 this.read
47 .as_mut()
48 .poll_read(cx, &mut this.buffer.as_mut()[*this.bytes_read..])
49 ) {
50 Ok(0) => {
51 return Poll::Ready(None);
53 }
54 Ok(nbytes) => {
55 *this.bytes_read += nbytes;
56 if *this.bytes_read == nbytes {
59 *this.bytes_read = 0;
60 }
61 }
62 Err(e) => return Poll::Ready(Some(Err(e.into()))),
63 }
64 }
65 }
66 }
67 }
68}