flatty_io/blocking/
recv.rs1#[cfg(feature = "io")]
2use crate::IoBuffer;
3use crate::RecvError;
4use core::{marker::PhantomData, mem::forget, ops::Deref};
5use flatty::{error::ErrorKind, Flat};
6#[cfg(feature = "io")]
7use std::io::Read;
8
9pub trait ReadBuffer: Deref<Target = [u8]> {
10 type Error;
11
12 fn read(&mut self) -> Result<usize, Self::Error>;
15
16 fn skip(&mut self, count: usize);
18}
19
20pub struct Receiver<M: Flat + ?Sized, B: ReadBuffer> {
21 pub(crate) buffer: B,
22 _ghost: PhantomData<M>,
23}
24
25impl<M: Flat + ?Sized, B: ReadBuffer> Receiver<M, B> {
26 pub fn new(buffer: B) -> Self {
27 Self {
28 buffer,
29 _ghost: PhantomData,
30 }
31 }
32}
33
34#[cfg(feature = "io")]
35pub type IoReceiver<M, P> = Receiver<M, IoBuffer<P>>;
36
37#[cfg(feature = "io")]
38impl<M: Flat + ?Sized, P: Read> IoReceiver<M, P> {
39 pub fn io(pipe: P, max_msg_len: usize) -> Self {
40 Self::new(IoBuffer::new(pipe, 2 * max_msg_len.max(M::MIN_SIZE), M::ALIGN))
41 }
42}
43
44impl<M: Flat + ?Sized, B: ReadBuffer> Receiver<M, B> {
45 pub fn recv(&mut self) -> Result<RecvGuard<'_, M, B>, RecvError<B::Error>> {
46 while let Err(e) = M::validate(&self.buffer) {
47 match e.kind {
48 ErrorKind::InsufficientSize => (),
49 _ => return Err(RecvError::Parse(e)),
50 }
51 if self.buffer.read().map_err(RecvError::Read)? == 0 {
52 return Err(RecvError::Closed);
53 }
54 }
55 Ok(RecvGuard::new(&mut self.buffer))
56 }
57}
58
59pub struct RecvGuard<'a, M: Flat + ?Sized, B: ReadBuffer + 'a> {
60 pub(crate) buffer: &'a mut B,
61 _ghost: PhantomData<M>,
62}
63
64impl<'a, M: Flat + ?Sized, B: ReadBuffer + 'a> RecvGuard<'a, M, B> {
65 pub(crate) fn new(buffer: &'a mut B) -> Self {
66 Self {
67 buffer,
68 _ghost: PhantomData,
69 }
70 }
71 pub fn retain(self) {
75 forget(self);
76 }
77}
78
79impl<'a, M: Flat + ?Sized, B: ReadBuffer + 'a> Drop for RecvGuard<'a, M, B> {
80 fn drop(&mut self) {
81 let size = self.size();
82 self.buffer.skip(size);
83 }
84}
85
86impl<'a, M: Flat + ?Sized, B: ReadBuffer + 'a> Deref for RecvGuard<'a, M, B> {
87 type Target = M;
88 fn deref(&self) -> &M {
89 unsafe { M::from_bytes_unchecked(self.buffer) }
90 }
91}