vortex_ipc/messages/
reader_async.rs

1use std::pin::Pin;
2use std::task::{Context, Poll, ready};
3
4use bytes::BytesMut;
5use futures_util::{AsyncRead, Stream};
6use pin_project_lite::pin_project;
7use vortex_array::ArrayRegistry;
8use vortex_error::VortexResult;
9
10use crate::messages::{DecoderMessage, MessageDecoder, PollRead};
11
12pin_project! {
13    /// An IPC message reader backed by an `AsyncRead` stream.
14    pub struct AsyncMessageReader<R> {
15        #[pin]
16        read: R,
17        buffer: BytesMut,
18        decoder: MessageDecoder,
19        bytes_read: usize,
20    }
21}
22
23impl<R> AsyncMessageReader<R> {
24    pub fn new(read: R, registry: ArrayRegistry) -> Self {
25        AsyncMessageReader {
26            read,
27            buffer: BytesMut::new(),
28            decoder: MessageDecoder::new(registry),
29            bytes_read: 0,
30        }
31    }
32}
33
34impl<R: AsyncRead> Stream for AsyncMessageReader<R> {
35    type Item = VortexResult<DecoderMessage>;
36
37    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
38        let mut this = self.project();
39        loop {
40            match this.decoder.read_next(this.buffer)? {
41                PollRead::Some(msg) => return Poll::Ready(Some(Ok(msg))),
42                PollRead::NeedMore(nbytes) => {
43                    this.buffer.resize(nbytes, 0x00);
44
45                    match ready!(
46                        this.read
47                            .as_mut()
48                            .poll_read(cx, &mut this.buffer.as_mut()[*this.bytes_read..])
49                    ) {
50                        Ok(0) => {
51                            // End of file
52                            return Poll::Ready(None);
53                        }
54                        Ok(nbytes) => {
55                            *this.bytes_read += nbytes;
56                            // If we've finished the read operation, then we continue the loop
57                            // and the decoder should present us with a new response.
58                            if *this.bytes_read == nbytes {
59                                *this.bytes_read = 0;
60                            }
61                        }
62                        Err(e) => return Poll::Ready(Some(Err(e.into()))),
63                    }
64                }
65            }
66        }
67    }
68}