vortex_ipc/messages/
reader_sync.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::io::Read;
5
6use bytes::BytesMut;
7use vortex_array::ArrayRegistry;
8use vortex_error::VortexResult;
9
10use crate::messages::{DecoderMessage, MessageDecoder, PollRead};
11
12/// An IPC message reader backed by a `Read` stream.
13pub struct SyncMessageReader<R> {
14    read: R,
15    buffer: BytesMut,
16    decoder: MessageDecoder,
17}
18
19impl<R: Read> SyncMessageReader<R> {
20    pub fn new(read: R, registry: ArrayRegistry) -> Self {
21        SyncMessageReader {
22            read,
23            buffer: BytesMut::new(),
24            decoder: MessageDecoder::new(registry),
25        }
26    }
27}
28
29impl<R: Read> Iterator for SyncMessageReader<R> {
30    type Item = VortexResult<DecoderMessage>;
31
32    fn next(&mut self) -> Option<Self::Item> {
33        loop {
34            match self.decoder.read_next(&mut self.buffer) {
35                Ok(PollRead::Some(msg)) => {
36                    return Some(Ok(msg));
37                }
38                Ok(PollRead::NeedMore(nbytes)) => {
39                    self.buffer.resize(nbytes, 0x00);
40                    match self.read.read(&mut self.buffer) {
41                        Ok(0) => {
42                            // EOF
43                            return None;
44                        }
45                        Ok(_nbytes) => {
46                            // Continue in the loop
47                        }
48                        Err(e) => return Some(Err(e.into())),
49                    }
50                }
51                Err(e) => return Some(Err(e)),
52            }
53        }
54    }
55}