vortex_ipc/messages/
reader_sync.rs1use std::io::Read;
5
6use bytes::BytesMut;
7use vortex_array::ArrayRegistry;
8use vortex_error::VortexResult;
9
10use crate::messages::{DecoderMessage, MessageDecoder, PollRead};
11
12pub struct SyncMessageReader<R> {
14 read: R,
15 buffer: BytesMut,
16 decoder: MessageDecoder,
17}
18
19impl<R: Read> SyncMessageReader<R> {
20 pub fn new(read: R, registry: ArrayRegistry) -> Self {
21 SyncMessageReader {
22 read,
23 buffer: BytesMut::new(),
24 decoder: MessageDecoder::new(registry),
25 }
26 }
27}
28
29impl<R: Read> Iterator for SyncMessageReader<R> {
30 type Item = VortexResult<DecoderMessage>;
31
32 fn next(&mut self) -> Option<Self::Item> {
33 loop {
34 match self.decoder.read_next(&mut self.buffer) {
35 Ok(PollRead::Some(msg)) => {
36 return Some(Ok(msg));
37 }
38 Ok(PollRead::NeedMore(nbytes)) => {
39 self.buffer.resize(nbytes, 0x00);
40 match self.read.read(&mut self.buffer) {
41 Ok(0) => {
42 return None;
44 }
45 Ok(_nbytes) => {
46 }
48 Err(e) => return Some(Err(e.into())),
49 }
50 }
51 Err(e) => return Some(Err(e)),
52 }
53 }
54 }
55}