Skip to main content

vortex_ipc/messages/
reader_sync.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::io::Read;
5
6use bytes::BytesMut;
7use vortex_error::VortexResult;
8
9use crate::messages::DecoderMessage;
10use crate::messages::MessageDecoder;
11use crate::messages::PollRead;
12
13/// An IPC message reader backed by a `Read` stream.
14pub struct SyncMessageReader<R> {
15    read: R,
16    buffer: BytesMut,
17    decoder: MessageDecoder,
18}
19
20impl<R: Read> SyncMessageReader<R> {
21    pub fn new(read: R) -> Self {
22        SyncMessageReader {
23            read,
24            buffer: BytesMut::new(),
25            decoder: MessageDecoder::default(),
26        }
27    }
28}
29
30impl<R: Read> Iterator for SyncMessageReader<R> {
31    type Item = VortexResult<DecoderMessage>;
32
33    fn next(&mut self) -> Option<Self::Item> {
34        loop {
35            match self.decoder.read_next(&mut self.buffer) {
36                Ok(PollRead::Some(msg)) => {
37                    return Some(Ok(msg));
38                }
39                Ok(PollRead::NeedMore(nbytes)) => {
40                    self.buffer.resize(nbytes, 0x00);
41                    match self.read.read(&mut self.buffer) {
42                        Ok(0) => {
43                            // EOF
44                            return None;
45                        }
46                        Ok(_nbytes) => {
47                            // Continue in the loop
48                        }
49                        Err(e) => return Some(Err(e.into())),
50                    }
51                }
52                Err(e) => return Some(Err(e)),
53            }
54        }
55    }
56}