vortex_ipc/messages/
reader_sync.rs

1use std::io::Read;
2
3use bytes::BytesMut;
4use vortex_array::ArrayRegistry;
5use vortex_error::VortexResult;
6
7use crate::messages::{DecoderMessage, MessageDecoder, PollRead};
8
9/// An IPC message reader backed by a `Read` stream.
10pub struct SyncMessageReader<R> {
11    read: R,
12    buffer: BytesMut,
13    decoder: MessageDecoder,
14}
15
16impl<R: Read> SyncMessageReader<R> {
17    pub fn new(read: R, registry: ArrayRegistry) -> Self {
18        SyncMessageReader {
19            read,
20            buffer: BytesMut::new(),
21            decoder: MessageDecoder::new(registry),
22        }
23    }
24}
25
26impl<R: Read> Iterator for SyncMessageReader<R> {
27    type Item = VortexResult<DecoderMessage>;
28
29    fn next(&mut self) -> Option<Self::Item> {
30        loop {
31            match self.decoder.read_next(&mut self.buffer) {
32                Ok(PollRead::Some(msg)) => {
33                    return Some(Ok(msg));
34                }
35                Ok(PollRead::NeedMore(nbytes)) => {
36                    self.buffer.resize(nbytes, 0x00);
37                    match self.read.read(&mut self.buffer) {
38                        Ok(0) => {
39                            // EOF
40                            return None;
41                        }
42                        Ok(_nbytes) => {
43                            // Continue in the loop
44                        }
45                        Err(e) => return Some(Err(e.into())),
46                    }
47                }
48                Err(e) => return Some(Err(e)),
49            }
50        }
51    }
52}