vortex_ipc/messages/
reader_async.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::pin::Pin;
5use std::task::{Context, Poll, ready};
6
7use bytes::BytesMut;
8use futures::{AsyncRead, Stream};
9use pin_project_lite::pin_project;
10use vortex_array::ArrayRegistry;
11use vortex_error::VortexResult;
12
13use crate::messages::{DecoderMessage, MessageDecoder, PollRead};
14
15pin_project! {
16    /// An IPC message reader backed by an `AsyncRead` stream.
17    pub struct AsyncMessageReader<R> {
18        #[pin]
19        read: R,
20        buffer: BytesMut,
21        decoder: MessageDecoder,
22        bytes_read: usize,
23    }
24}
25
26impl<R> AsyncMessageReader<R> {
27    pub fn new(read: R, registry: ArrayRegistry) -> Self {
28        AsyncMessageReader {
29            read,
30            buffer: BytesMut::new(),
31            decoder: MessageDecoder::new(registry),
32            bytes_read: 0,
33        }
34    }
35}
36
37impl<R: AsyncRead> Stream for AsyncMessageReader<R> {
38    type Item = VortexResult<DecoderMessage>;
39
40    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
41        let mut this = self.project();
42        loop {
43            match this.decoder.read_next(this.buffer)? {
44                PollRead::Some(msg) => return Poll::Ready(Some(Ok(msg))),
45                PollRead::NeedMore(nbytes) => {
46                    this.buffer.resize(nbytes, 0x00);
47
48                    match ready!(
49                        this.read
50                            .as_mut()
51                            .poll_read(cx, &mut this.buffer.as_mut()[*this.bytes_read..])
52                    ) {
53                        Ok(0) => {
54                            // End of file
55                            return Poll::Ready(None);
56                        }
57                        Ok(nbytes) => {
58                            *this.bytes_read += nbytes;
59                            // If we've finished the read operation, then we continue the loop
60                            // and the decoder should present us with a new response.
61                            if *this.bytes_read == nbytes {
62                                *this.bytes_read = 0;
63                            }
64                        }
65                        Err(e) => return Poll::Ready(Some(Err(e.into()))),
66                    }
67                }
68            }
69        }
70    }
71}