vortex_ipc/messages/
reader_buf.rs

1use bytes::Buf;
2use vortex_array::ArrayRegistry;
3use vortex_error::{VortexResult, vortex_err};
4
5use crate::messages::{DecoderMessage, MessageDecoder, PollRead};
6
7/// An IPC message reader backed by a `Read` stream.
8pub struct BufMessageReader<B> {
9    buffer: B,
10    decoder: MessageDecoder,
11}
12
13impl<B: Buf> BufMessageReader<B> {
14    pub fn new(buffer: B, registry: ArrayRegistry) -> Self {
15        BufMessageReader {
16            buffer,
17            decoder: MessageDecoder::new(registry),
18        }
19    }
20}
21
22impl<B: Buf> Iterator for BufMessageReader<B> {
23    type Item = VortexResult<DecoderMessage>;
24
25    fn next(&mut self) -> Option<Self::Item> {
26        if !self.buffer.has_remaining() {
27            // End-of-buffer reached
28            return None;
29        }
30        match self.decoder.read_next(&mut self.buffer) {
31            Ok(PollRead::Some(msg)) => Some(Ok(msg)),
32            Ok(PollRead::NeedMore(_)) => Some(Err(vortex_err!(
33                "Buffer did not have sufficient bytes for an IPC message"
34            ))),
35            Err(e) => Some(Err(e)),
36        }
37    }
38}