vortex_ipc/messages/
reader_sync.rs1use std::io::Read;
5
6use bytes::BytesMut;
7use vortex_array::session::ArrayRegistry;
8use vortex_error::VortexResult;
9
10use crate::messages::DecoderMessage;
11use crate::messages::MessageDecoder;
12use crate::messages::PollRead;
13
14pub struct SyncMessageReader<R> {
16 read: R,
17 buffer: BytesMut,
18 decoder: MessageDecoder,
19}
20
21impl<R: Read> SyncMessageReader<R> {
22 pub fn new(read: R, registry: ArrayRegistry) -> Self {
23 SyncMessageReader {
24 read,
25 buffer: BytesMut::new(),
26 decoder: MessageDecoder::new(registry),
27 }
28 }
29}
30
31impl<R: Read> Iterator for SyncMessageReader<R> {
32 type Item = VortexResult<DecoderMessage>;
33
34 fn next(&mut self) -> Option<Self::Item> {
35 loop {
36 match self.decoder.read_next(&mut self.buffer) {
37 Ok(PollRead::Some(msg)) => {
38 return Some(Ok(msg));
39 }
40 Ok(PollRead::NeedMore(nbytes)) => {
41 self.buffer.resize(nbytes, 0x00);
42 match self.read.read(&mut self.buffer) {
43 Ok(0) => {
44 return None;
46 }
47 Ok(_nbytes) => {
48 }
50 Err(e) => return Some(Err(e.into())),
51 }
52 }
53 Err(e) => return Some(Err(e)),
54 }
55 }
56 }
57}