vortex_ipc/messages/
reader_buf.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use bytes::Buf;
5use vortex_array::ArrayRegistry;
6use vortex_error::{VortexResult, vortex_err};
7
8use crate::messages::{DecoderMessage, MessageDecoder, PollRead};
9
10/// An IPC message reader backed by a `Read` stream.
11pub struct BufMessageReader<B> {
12    buffer: B,
13    decoder: MessageDecoder,
14}
15
16impl<B: Buf> BufMessageReader<B> {
17    pub fn new(buffer: B, registry: ArrayRegistry) -> Self {
18        BufMessageReader {
19            buffer,
20            decoder: MessageDecoder::new(registry),
21        }
22    }
23}
24
25impl<B: Buf> Iterator for BufMessageReader<B> {
26    type Item = VortexResult<DecoderMessage>;
27
28    fn next(&mut self) -> Option<Self::Item> {
29        if !self.buffer.has_remaining() {
30            // End-of-buffer reached
31            return None;
32        }
33        match self.decoder.read_next(&mut self.buffer) {
34            Ok(PollRead::Some(msg)) => Some(Ok(msg)),
35            Ok(PollRead::NeedMore(_)) => Some(Err(vortex_err!(
36                "Buffer did not have sufficient bytes for an IPC message"
37            ))),
38            Err(e) => Some(Err(e)),
39        }
40    }
41}