flatty_io/async_/
send.rs

1use super::SendError;
2#[cfg(feature = "io")]
3use crate::IoBuffer;
4use core::{
5    future::Future,
6    marker::PhantomData,
7    ops::{Deref, DerefMut},
8    pin::Pin,
9    task::{Context, Poll},
10};
11use flatty::{self, prelude::*, Emplacer};
12#[cfg(feature = "io")]
13use futures::io::AsyncWrite;
14
15pub trait AsyncWriteBuffer: DerefMut<Target = [u8]> + Unpin {
16    type Error;
17
18    /// Allocate some fixed amount of bytes in the buffer.
19    fn poll_alloc(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
20
21    /// Allocate some fixed amount of bytes in the buffer.
22    fn alloc(&mut self) -> Alloc<'_, Self> {
23        Alloc(self)
24    }
25
26    type WriteAll<'a>: Future<Output = Result<(), Self::Error>>
27    where
28        Self: 'a;
29
30    /// Send exactly `count` bytes from buffer.
31    /// Remaining bytes are discarded.
32    fn write_all(&mut self, count: usize) -> Self::WriteAll<'_>;
33}
34
35pub struct Alloc<'a, B: AsyncWriteBuffer + ?Sized>(&'a mut B);
36
37impl<'a, B: AsyncWriteBuffer + ?Sized> Future for Alloc<'a, B> {
38    type Output = Result<(), B::Error>;
39    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
40        Pin::new(&mut *self.0).poll_alloc(cx)
41    }
42}
43
44pub struct Sender<M: Flat + ?Sized, B: AsyncWriteBuffer> {
45    pub(crate) buffer: B,
46    _ghost: PhantomData<M>,
47}
48
49impl<M: Flat + ?Sized, B: AsyncWriteBuffer> Sender<M, B> {
50    pub fn new(buf_send: B) -> Self {
51        Self {
52            buffer: buf_send,
53            _ghost: PhantomData,
54        }
55    }
56}
57
58#[cfg(feature = "io")]
59pub type IoSender<M, P> = Sender<M, IoBuffer<P>>;
60
61#[cfg(feature = "io")]
62impl<M: Flat + ?Sized, P: AsyncWrite + Unpin> IoSender<M, P> {
63    pub fn io(pipe: P, max_msg_len: usize) -> Self {
64        Self::new(IoBuffer::new(pipe, 2 * max_msg_len.max(M::MIN_SIZE), M::ALIGN))
65    }
66}
67
68impl<M: Flat + ?Sized, B: AsyncWriteBuffer> Sender<M, B> {
69    pub async fn alloc(&mut self) -> Result<UninitSendGuard<'_, M, B>, SendError<B::Error>> {
70        self.buffer.alloc().await?;
71        Ok(UninitSendGuard::new(&mut self.buffer))
72    }
73}
74
75impl<'a, M: Flat + ?Sized, B: AsyncWriteBuffer> SendGuard<'a, M, B> {
76    pub fn send(self) -> B::WriteAll<'a> {
77        let size = self.size();
78        self.buffer.write_all(size)
79    }
80}
81
82impl<'a, M: Flat + ?Sized, B: AsyncWriteBuffer, const INIT: bool> Unpin for SendGuard<'a, M, B, INIT> {}
83
84pub struct SendGuard<'a, M: Flat + ?Sized, B: AsyncWriteBuffer + 'a, const INIT: bool = true> {
85    pub(crate) buffer: &'a mut B,
86    _ghost: PhantomData<M>,
87}
88
89pub type UninitSendGuard<'a, M, B> = SendGuard<'a, M, B, false>;
90
91impl<'a, M: Flat + ?Sized, B: AsyncWriteBuffer + 'a> UninitSendGuard<'a, M, B> {
92    pub(crate) fn new(buffer: &'a mut B) -> Self {
93        Self {
94            buffer,
95            _ghost: PhantomData,
96        }
97    }
98
99    pub fn as_bytes(&self) -> &[u8] {
100        self.buffer
101    }
102    pub fn as_mut_bytes(&mut self) -> &mut [u8] {
103        self.buffer
104    }
105
106    /// # Safety
107    ///
108    /// Underlying message data must be initialized.
109    pub unsafe fn assume_init(self) -> SendGuard<'a, M, B> {
110        SendGuard {
111            buffer: self.buffer,
112            _ghost: PhantomData,
113        }
114    }
115
116    pub fn new_in_place(self, emplacer: impl Emplacer<M>) -> Result<SendGuard<'a, M, B>, flatty::Error> {
117        M::new_in_place(self.buffer, emplacer)?;
118        Ok(unsafe { self.assume_init() })
119    }
120}
121
122impl<'a, M: Flat + FlatDefault + ?Sized, B: AsyncWriteBuffer + 'a> UninitSendGuard<'a, M, B> {
123    pub fn default_in_place(self) -> Result<SendGuard<'a, M, B>, flatty::Error> {
124        M::default_in_place(self.buffer)?;
125        Ok(unsafe { self.assume_init() })
126    }
127}
128
129impl<'a, M: Flat + ?Sized, B: AsyncWriteBuffer + 'a> Deref for SendGuard<'a, M, B> {
130    type Target = M;
131    fn deref(&self) -> &M {
132        unsafe { M::from_bytes_unchecked(self.buffer) }
133    }
134}
135
136impl<'a, M: Flat + ?Sized, B: AsyncWriteBuffer + 'a> DerefMut for SendGuard<'a, M, B> {
137    fn deref_mut(&mut self) -> &mut M {
138        unsafe { M::from_mut_bytes_unchecked(self.buffer) }
139    }
140}