vortex_ipc/messages/
reader_sync.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::io::Read;
5
6use bytes::BytesMut;
7use vortex_array::session::ArrayRegistry;
8use vortex_error::VortexResult;
9
10use crate::messages::DecoderMessage;
11use crate::messages::MessageDecoder;
12use crate::messages::PollRead;
13
14/// An IPC message reader backed by a `Read` stream.
15pub struct SyncMessageReader<R> {
16    read: R,
17    buffer: BytesMut,
18    decoder: MessageDecoder,
19}
20
21impl<R: Read> SyncMessageReader<R> {
22    pub fn new(read: R, registry: ArrayRegistry) -> Self {
23        SyncMessageReader {
24            read,
25            buffer: BytesMut::new(),
26            decoder: MessageDecoder::new(registry),
27        }
28    }
29}
30
31impl<R: Read> Iterator for SyncMessageReader<R> {
32    type Item = VortexResult<DecoderMessage>;
33
34    fn next(&mut self) -> Option<Self::Item> {
35        loop {
36            match self.decoder.read_next(&mut self.buffer) {
37                Ok(PollRead::Some(msg)) => {
38                    return Some(Ok(msg));
39                }
40                Ok(PollRead::NeedMore(nbytes)) => {
41                    self.buffer.resize(nbytes, 0x00);
42                    match self.read.read(&mut self.buffer) {
43                        Ok(0) => {
44                            // EOF
45                            return None;
46                        }
47                        Ok(_nbytes) => {
48                            // Continue in the loop
49                        }
50                        Err(e) => return Some(Err(e.into())),
51                    }
52                }
53                Err(e) => return Some(Err(e)),
54            }
55        }
56    }
57}