Skip to main content

vortex_ipc/messages/
reader_buf.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use bytes::Buf;
5use vortex_error::VortexResult;
6use vortex_error::vortex_err;
7
8use crate::messages::DecoderMessage;
9use crate::messages::MessageDecoder;
10use crate::messages::PollRead;
11
12/// An IPC message reader backed by a `Read` stream.
13pub struct BufMessageReader<B> {
14    buffer: B,
15    decoder: MessageDecoder,
16}
17
18impl<B: Buf> BufMessageReader<B> {
19    pub fn new(buffer: B) -> Self {
20        BufMessageReader {
21            buffer,
22            decoder: MessageDecoder::default(),
23        }
24    }
25}
26
27impl<B: Buf> Iterator for BufMessageReader<B> {
28    type Item = VortexResult<DecoderMessage>;
29
30    fn next(&mut self) -> Option<Self::Item> {
31        if !self.buffer.has_remaining() {
32            // End-of-buffer reached
33            return None;
34        }
35        match self.decoder.read_next(&mut self.buffer) {
36            Ok(PollRead::Some(msg)) => Some(Ok(msg)),
37            Ok(PollRead::NeedMore(_)) => Some(Err(vortex_err!(
38                "Buffer did not have sufficient bytes for an IPC message"
39            ))),
40            Err(e) => Some(Err(e)),
41        }
42    }
43}