flatty_io/blocking/
recv.rs

1#[cfg(feature = "io")]
2use crate::IoBuffer;
3use crate::RecvError;
4use core::{marker::PhantomData, mem::forget, ops::Deref};
5use flatty::{error::ErrorKind, Flat};
6#[cfg(feature = "io")]
7use std::io::Read;
8
9pub trait ReadBuffer: Deref<Target = [u8]> {
10    type Error;
11
12    /// Receive more bytes and put them in the buffer.
13    /// Returns the number of received bytes, zero means that channel is closed.
14    fn read(&mut self) -> Result<usize, Self::Error>;
15
16    /// Skip first `count` bytes. Remaining bytes *may* be discarded.
17    fn skip(&mut self, count: usize);
18}
19
20pub struct Receiver<M: Flat + ?Sized, B: ReadBuffer> {
21    pub(crate) buffer: B,
22    _ghost: PhantomData<M>,
23}
24
25impl<M: Flat + ?Sized, B: ReadBuffer> Receiver<M, B> {
26    pub fn new(buffer: B) -> Self {
27        Self {
28            buffer,
29            _ghost: PhantomData,
30        }
31    }
32}
33
34#[cfg(feature = "io")]
35pub type IoReceiver<M, P> = Receiver<M, IoBuffer<P>>;
36
37#[cfg(feature = "io")]
38impl<M: Flat + ?Sized, P: Read> IoReceiver<M, P> {
39    pub fn io(pipe: P, max_msg_len: usize) -> Self {
40        Self::new(IoBuffer::new(pipe, 2 * max_msg_len.max(M::MIN_SIZE), M::ALIGN))
41    }
42}
43
44impl<M: Flat + ?Sized, B: ReadBuffer> Receiver<M, B> {
45    pub fn recv(&mut self) -> Result<RecvGuard<'_, M, B>, RecvError<B::Error>> {
46        while let Err(e) = M::validate(&self.buffer) {
47            match e.kind {
48                ErrorKind::InsufficientSize => (),
49                _ => return Err(RecvError::Parse(e)),
50            }
51            if self.buffer.read().map_err(RecvError::Read)? == 0 {
52                return Err(RecvError::Closed);
53            }
54        }
55        Ok(RecvGuard::new(&mut self.buffer))
56    }
57}
58
59pub struct RecvGuard<'a, M: Flat + ?Sized, B: ReadBuffer + 'a> {
60    pub(crate) buffer: &'a mut B,
61    _ghost: PhantomData<M>,
62}
63
64impl<'a, M: Flat + ?Sized, B: ReadBuffer + 'a> RecvGuard<'a, M, B> {
65    pub(crate) fn new(buffer: &'a mut B) -> Self {
66        Self {
67            buffer,
68            _ghost: PhantomData,
69        }
70    }
71    /// Destroy guard but do not remove message from receiver.
72    ///
73    /// Effect of this call is the same as leak of the guard.
74    pub fn retain(self) {
75        forget(self);
76    }
77}
78
79impl<'a, M: Flat + ?Sized, B: ReadBuffer + 'a> Drop for RecvGuard<'a, M, B> {
80    fn drop(&mut self) {
81        let size = self.size();
82        self.buffer.skip(size);
83    }
84}
85
86impl<'a, M: Flat + ?Sized, B: ReadBuffer + 'a> Deref for RecvGuard<'a, M, B> {
87    type Target = M;
88    fn deref(&self) -> &M {
89        unsafe { M::from_bytes_unchecked(self.buffer) }
90    }
91}