vortex_ipc/messages/
reader_buf.rs1use bytes::Buf;
5use vortex_array::ArrayRegistry;
6use vortex_error::{VortexResult, vortex_err};
7
8use crate::messages::{DecoderMessage, MessageDecoder, PollRead};
9
10pub struct BufMessageReader<B> {
12 buffer: B,
13 decoder: MessageDecoder,
14}
15
16impl<B: Buf> BufMessageReader<B> {
17 pub fn new(buffer: B, registry: ArrayRegistry) -> Self {
18 BufMessageReader {
19 buffer,
20 decoder: MessageDecoder::new(registry),
21 }
22 }
23}
24
25impl<B: Buf> Iterator for BufMessageReader<B> {
26 type Item = VortexResult<DecoderMessage>;
27
28 fn next(&mut self) -> Option<Self::Item> {
29 if !self.buffer.has_remaining() {
30 return None;
32 }
33 match self.decoder.read_next(&mut self.buffer) {
34 Ok(PollRead::Some(msg)) => Some(Ok(msg)),
35 Ok(PollRead::NeedMore(_)) => Some(Err(vortex_err!(
36 "Buffer did not have sufficient bytes for an IPC message"
37 ))),
38 Err(e) => Some(Err(e)),
39 }
40 }
41}