vortex_ipc/messages/
reader_sync.rs1use std::io::Read;
2
3use bytes::BytesMut;
4use vortex_array::ArrayRegistry;
5use vortex_error::VortexResult;
6
7use crate::messages::{DecoderMessage, MessageDecoder, PollRead};
8
9pub struct SyncMessageReader<R> {
11 read: R,
12 buffer: BytesMut,
13 decoder: MessageDecoder,
14}
15
16impl<R: Read> SyncMessageReader<R> {
17 pub fn new(read: R, registry: ArrayRegistry) -> Self {
18 SyncMessageReader {
19 read,
20 buffer: BytesMut::new(),
21 decoder: MessageDecoder::new(registry),
22 }
23 }
24}
25
26impl<R: Read> Iterator for SyncMessageReader<R> {
27 type Item = VortexResult<DecoderMessage>;
28
29 fn next(&mut self) -> Option<Self::Item> {
30 loop {
31 match self.decoder.read_next(&mut self.buffer) {
32 Ok(PollRead::Some(msg)) => {
33 return Some(Ok(msg));
34 }
35 Ok(PollRead::NeedMore(nbytes)) => {
36 self.buffer.resize(nbytes, 0x00);
37 match self.read.read(&mut self.buffer) {
38 Ok(0) => {
39 return None;
41 }
42 Ok(_nbytes) => {
43 }
45 Err(e) => return Some(Err(e.into())),
46 }
47 }
48 Err(e) => return Some(Err(e)),
49 }
50 }
51 }
52}