1#[cfg(feature = "io")]
2use super::IoBuffer;
3use super::RecvError;
4use core::{
5 future::Future,
6 marker::PhantomData,
7 mem::forget,
8 ops::Deref,
9 pin::Pin,
10 task::{Context, Poll},
11};
12use flatty::{error::ErrorKind, Flat};
13#[cfg(feature = "io")]
14use futures::io::AsyncRead;
15
16pub trait AsyncReadBuffer: Deref<Target = [u8]> + Unpin {
17 type Error;
18
19 fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<usize, Self::Error>>;
22
23 fn read(&mut self) -> Read<'_, Self> {
24 Read(self)
25 }
26
27 fn skip(&mut self, count: usize);
29}
30
31pub struct Read<'a, B: AsyncReadBuffer + ?Sized>(&'a mut B);
32
33impl<'a, B: AsyncReadBuffer + ?Sized> Future for Read<'a, B> {
34 type Output = Result<usize, B::Error>;
35 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
36 Pin::new(&mut *self.0).poll_read(cx)
37 }
38}
39
40pub struct Receiver<M: Flat + ?Sized, B: AsyncReadBuffer> {
41 pub(crate) buffer: B,
42 _ghost: PhantomData<M>,
43}
44
45impl<M: Flat + ?Sized, B: AsyncReadBuffer> Receiver<M, B> {
46 pub fn new(buffer: B) -> Self {
47 Self {
48 buffer,
49 _ghost: PhantomData,
50 }
51 }
52}
53
54#[cfg(feature = "io")]
55pub type IoReceiver<M, P> = Receiver<M, IoBuffer<P>>;
56
57#[cfg(feature = "io")]
58impl<M: Flat + ?Sized, P: AsyncRead + Unpin> IoReceiver<M, P> {
59 pub fn io(pipe: P, max_msg_len: usize) -> Self {
60 Self::new(IoBuffer::new(pipe, 2 * max_msg_len.max(M::MIN_SIZE), M::ALIGN))
61 }
62}
63
64impl<M: Flat + ?Sized, B: AsyncReadBuffer> Receiver<M, B> {
65 pub async fn recv(&mut self) -> Result<RecvGuard<'_, M, B>, RecvError<B::Error>> {
66 while let Err(e) = M::validate(&self.buffer) {
67 match e.kind {
68 ErrorKind::InsufficientSize => (),
69 _ => return Err(RecvError::Parse(e)),
70 }
71 if self.buffer.read().await.map_err(RecvError::Read)? == 0 {
72 return Err(RecvError::Closed);
73 }
74 }
75 Ok(RecvGuard::new(&mut self.buffer))
76 }
77}
78
79pub struct RecvGuard<'a, M: Flat + ?Sized, B: AsyncReadBuffer + 'a> {
80 pub(crate) buffer: &'a mut B,
81 _ghost: PhantomData<M>,
82}
83
84impl<'a, M: Flat + ?Sized, B: AsyncReadBuffer + 'a> RecvGuard<'a, M, B> {
85 pub(crate) fn new(buffer: &'a mut B) -> Self {
86 Self {
87 buffer,
88 _ghost: PhantomData,
89 }
90 }
91 pub fn retain(self) {
95 forget(self);
96 }
97}
98
99impl<'a, M: Flat + ?Sized, B: AsyncReadBuffer + 'a> Drop for RecvGuard<'a, M, B> {
100 fn drop(&mut self) {
101 let size = self.size();
102 self.buffer.skip(size);
103 }
104}
105
106impl<'a, M: Flat + ?Sized, B: AsyncReadBuffer + 'a> Deref for RecvGuard<'a, M, B> {
107 type Target = M;
108 fn deref(&self) -> &M {
109 unsafe { M::from_bytes_unchecked(self.buffer) }
110 }
111}