vortex_ipc/messages/
reader_buf.rs1use bytes::Buf;
2use vortex_array::ArrayRegistry;
3use vortex_error::{VortexResult, vortex_err};
4
5use crate::messages::{DecoderMessage, MessageDecoder, PollRead};
6
7pub struct BufMessageReader<B> {
9 buffer: B,
10 decoder: MessageDecoder,
11}
12
13impl<B: Buf> BufMessageReader<B> {
14 pub fn new(buffer: B, registry: ArrayRegistry) -> Self {
15 BufMessageReader {
16 buffer,
17 decoder: MessageDecoder::new(registry),
18 }
19 }
20}
21
22impl<B: Buf> Iterator for BufMessageReader<B> {
23 type Item = VortexResult<DecoderMessage>;
24
25 fn next(&mut self) -> Option<Self::Item> {
26 if !self.buffer.has_remaining() {
27 return None;
29 }
30 match self.decoder.read_next(&mut self.buffer) {
31 Ok(PollRead::Some(msg)) => Some(Ok(msg)),
32 Ok(PollRead::NeedMore(_)) => Some(Err(vortex_err!(
33 "Buffer did not have sufficient bytes for an IPC message"
34 ))),
35 Err(e) => Some(Err(e)),
36 }
37 }
38}