flatty_io/async_/
recv.rs

1#[cfg(feature = "io")]
2use super::IoBuffer;
3use super::RecvError;
4use core::{
5    future::Future,
6    marker::PhantomData,
7    mem::forget,
8    ops::Deref,
9    pin::Pin,
10    task::{Context, Poll},
11};
12use flatty::{error::ErrorKind, Flat};
13#[cfg(feature = "io")]
14use futures::io::AsyncRead;
15
16pub trait AsyncReadBuffer: Deref<Target = [u8]> + Unpin {
17    type Error;
18
19    /// Receive more bytes and put them in the buffer.
20    /// Returns the number of received bytes, zero means that channel is closed.
21    fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<usize, Self::Error>>;
22
23    fn read(&mut self) -> Read<'_, Self> {
24        Read(self)
25    }
26
27    /// Skip first `count` bytes. Remaining bytes *may* be discarded.
28    fn skip(&mut self, count: usize);
29}
30
31pub struct Read<'a, B: AsyncReadBuffer + ?Sized>(&'a mut B);
32
33impl<'a, B: AsyncReadBuffer + ?Sized> Future for Read<'a, B> {
34    type Output = Result<usize, B::Error>;
35    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
36        Pin::new(&mut *self.0).poll_read(cx)
37    }
38}
39
40pub struct Receiver<M: Flat + ?Sized, B: AsyncReadBuffer> {
41    pub(crate) buffer: B,
42    _ghost: PhantomData<M>,
43}
44
45impl<M: Flat + ?Sized, B: AsyncReadBuffer> Receiver<M, B> {
46    pub fn new(buffer: B) -> Self {
47        Self {
48            buffer,
49            _ghost: PhantomData,
50        }
51    }
52}
53
54#[cfg(feature = "io")]
55pub type IoReceiver<M, P> = Receiver<M, IoBuffer<P>>;
56
57#[cfg(feature = "io")]
58impl<M: Flat + ?Sized, P: AsyncRead + Unpin> IoReceiver<M, P> {
59    pub fn io(pipe: P, max_msg_len: usize) -> Self {
60        Self::new(IoBuffer::new(pipe, 2 * max_msg_len.max(M::MIN_SIZE), M::ALIGN))
61    }
62}
63
64impl<M: Flat + ?Sized, B: AsyncReadBuffer> Receiver<M, B> {
65    pub async fn recv(&mut self) -> Result<RecvGuard<'_, M, B>, RecvError<B::Error>> {
66        while let Err(e) = M::validate(&self.buffer) {
67            match e.kind {
68                ErrorKind::InsufficientSize => (),
69                _ => return Err(RecvError::Parse(e)),
70            }
71            if self.buffer.read().await.map_err(RecvError::Read)? == 0 {
72                return Err(RecvError::Closed);
73            }
74        }
75        Ok(RecvGuard::new(&mut self.buffer))
76    }
77}
78
79pub struct RecvGuard<'a, M: Flat + ?Sized, B: AsyncReadBuffer + 'a> {
80    pub(crate) buffer: &'a mut B,
81    _ghost: PhantomData<M>,
82}
83
84impl<'a, M: Flat + ?Sized, B: AsyncReadBuffer + 'a> RecvGuard<'a, M, B> {
85    pub(crate) fn new(buffer: &'a mut B) -> Self {
86        Self {
87            buffer,
88            _ghost: PhantomData,
89        }
90    }
91    /// Destroy guard but do not remove message from receiver.
92    ///
93    /// Effect of this call is the same as leak of the guard.
94    pub fn retain(self) {
95        forget(self);
96    }
97}
98
99impl<'a, M: Flat + ?Sized, B: AsyncReadBuffer + 'a> Drop for RecvGuard<'a, M, B> {
100    fn drop(&mut self) {
101        let size = self.size();
102        self.buffer.skip(size);
103    }
104}
105
106impl<'a, M: Flat + ?Sized, B: AsyncReadBuffer + 'a> Deref for RecvGuard<'a, M, B> {
107    type Target = M;
108    fn deref(&self) -> &M {
109        unsafe { M::from_bytes_unchecked(self.buffer) }
110    }
111}