vortex_ipc/messages/
reader_async.rs1use std::pin::Pin;
5use std::task::{Context, Poll, ready};
6
7use bytes::BytesMut;
8use futures::{AsyncRead, Stream};
9use pin_project_lite::pin_project;
10use vortex_array::ArrayRegistry;
11use vortex_error::VortexResult;
12
13use crate::messages::{DecoderMessage, MessageDecoder, PollRead};
14
15pin_project! {
16 pub struct AsyncMessageReader<R> {
18 #[pin]
19 read: R,
20 buffer: BytesMut,
21 decoder: MessageDecoder,
22 bytes_read: usize,
23 }
24}
25
26impl<R> AsyncMessageReader<R> {
27 pub fn new(read: R, registry: ArrayRegistry) -> Self {
28 AsyncMessageReader {
29 read,
30 buffer: BytesMut::new(),
31 decoder: MessageDecoder::new(registry),
32 bytes_read: 0,
33 }
34 }
35}
36
37impl<R: AsyncRead> Stream for AsyncMessageReader<R> {
38 type Item = VortexResult<DecoderMessage>;
39
40 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
41 let mut this = self.project();
42 loop {
43 match this.decoder.read_next(this.buffer)? {
44 PollRead::Some(msg) => return Poll::Ready(Some(Ok(msg))),
45 PollRead::NeedMore(nbytes) => {
46 this.buffer.resize(nbytes, 0x00);
47
48 match ready!(
49 this.read
50 .as_mut()
51 .poll_read(cx, &mut this.buffer.as_mut()[*this.bytes_read..])
52 ) {
53 Ok(0) => {
54 return Poll::Ready(None);
56 }
57 Ok(nbytes) => {
58 *this.bytes_read += nbytes;
59 if *this.bytes_read == nbytes {
62 *this.bytes_read = 0;
63 }
64 }
65 Err(e) => return Poll::Ready(Some(Err(e.into()))),
66 }
67 }
68 }
69 }
70 }
71}