vortex_ipc/messages/
reader_buf.rs1use bytes::Buf;
5use vortex_error::VortexResult;
6use vortex_error::vortex_err;
7
8use crate::messages::DecoderMessage;
9use crate::messages::MessageDecoder;
10use crate::messages::PollRead;
11
12pub struct BufMessageReader<B> {
14 buffer: B,
15 decoder: MessageDecoder,
16}
17
18impl<B: Buf> BufMessageReader<B> {
19 pub fn new(buffer: B) -> Self {
20 BufMessageReader {
21 buffer,
22 decoder: MessageDecoder::default(),
23 }
24 }
25}
26
27impl<B: Buf> Iterator for BufMessageReader<B> {
28 type Item = VortexResult<DecoderMessage>;
29
30 fn next(&mut self) -> Option<Self::Item> {
31 if !self.buffer.has_remaining() {
32 return None;
34 }
35 match self.decoder.read_next(&mut self.buffer) {
36 Ok(PollRead::Some(msg)) => Some(Ok(msg)),
37 Ok(PollRead::NeedMore(_)) => Some(Err(vortex_err!(
38 "Buffer did not have sufficient bytes for an IPC message"
39 ))),
40 Err(e) => Some(Err(e)),
41 }
42 }
43}