1use super::SendError;
2#[cfg(feature = "io")]
3use crate::IoBuffer;
4use core::{
5 future::Future,
6 marker::PhantomData,
7 ops::{Deref, DerefMut},
8 pin::Pin,
9 task::{Context, Poll},
10};
11use flatty::{self, prelude::*, Emplacer};
12#[cfg(feature = "io")]
13use futures::io::AsyncWrite;
14
15pub trait AsyncWriteBuffer: DerefMut<Target = [u8]> + Unpin {
16 type Error;
17
18 fn poll_alloc(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
20
21 fn alloc(&mut self) -> Alloc<'_, Self> {
23 Alloc(self)
24 }
25
26 type WriteAll<'a>: Future<Output = Result<(), Self::Error>>
27 where
28 Self: 'a;
29
30 fn write_all(&mut self, count: usize) -> Self::WriteAll<'_>;
33}
34
35pub struct Alloc<'a, B: AsyncWriteBuffer + ?Sized>(&'a mut B);
36
37impl<'a, B: AsyncWriteBuffer + ?Sized> Future for Alloc<'a, B> {
38 type Output = Result<(), B::Error>;
39 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
40 Pin::new(&mut *self.0).poll_alloc(cx)
41 }
42}
43
44pub struct Sender<M: Flat + ?Sized, B: AsyncWriteBuffer> {
45 pub(crate) buffer: B,
46 _ghost: PhantomData<M>,
47}
48
49impl<M: Flat + ?Sized, B: AsyncWriteBuffer> Sender<M, B> {
50 pub fn new(buf_send: B) -> Self {
51 Self {
52 buffer: buf_send,
53 _ghost: PhantomData,
54 }
55 }
56}
57
58#[cfg(feature = "io")]
59pub type IoSender<M, P> = Sender<M, IoBuffer<P>>;
60
61#[cfg(feature = "io")]
62impl<M: Flat + ?Sized, P: AsyncWrite + Unpin> IoSender<M, P> {
63 pub fn io(pipe: P, max_msg_len: usize) -> Self {
64 Self::new(IoBuffer::new(pipe, 2 * max_msg_len.max(M::MIN_SIZE), M::ALIGN))
65 }
66}
67
68impl<M: Flat + ?Sized, B: AsyncWriteBuffer> Sender<M, B> {
69 pub async fn alloc(&mut self) -> Result<UninitSendGuard<'_, M, B>, SendError<B::Error>> {
70 self.buffer.alloc().await?;
71 Ok(UninitSendGuard::new(&mut self.buffer))
72 }
73}
74
75impl<'a, M: Flat + ?Sized, B: AsyncWriteBuffer> SendGuard<'a, M, B> {
76 pub fn send(self) -> B::WriteAll<'a> {
77 let size = self.size();
78 self.buffer.write_all(size)
79 }
80}
81
82impl<'a, M: Flat + ?Sized, B: AsyncWriteBuffer, const INIT: bool> Unpin for SendGuard<'a, M, B, INIT> {}
83
84pub struct SendGuard<'a, M: Flat + ?Sized, B: AsyncWriteBuffer + 'a, const INIT: bool = true> {
85 pub(crate) buffer: &'a mut B,
86 _ghost: PhantomData<M>,
87}
88
89pub type UninitSendGuard<'a, M, B> = SendGuard<'a, M, B, false>;
90
91impl<'a, M: Flat + ?Sized, B: AsyncWriteBuffer + 'a> UninitSendGuard<'a, M, B> {
92 pub(crate) fn new(buffer: &'a mut B) -> Self {
93 Self {
94 buffer,
95 _ghost: PhantomData,
96 }
97 }
98
99 pub fn as_bytes(&self) -> &[u8] {
100 self.buffer
101 }
102 pub fn as_mut_bytes(&mut self) -> &mut [u8] {
103 self.buffer
104 }
105
106 pub unsafe fn assume_init(self) -> SendGuard<'a, M, B> {
110 SendGuard {
111 buffer: self.buffer,
112 _ghost: PhantomData,
113 }
114 }
115
116 pub fn new_in_place(self, emplacer: impl Emplacer<M>) -> Result<SendGuard<'a, M, B>, flatty::Error> {
117 M::new_in_place(self.buffer, emplacer)?;
118 Ok(unsafe { self.assume_init() })
119 }
120}
121
122impl<'a, M: Flat + FlatDefault + ?Sized, B: AsyncWriteBuffer + 'a> UninitSendGuard<'a, M, B> {
123 pub fn default_in_place(self) -> Result<SendGuard<'a, M, B>, flatty::Error> {
124 M::default_in_place(self.buffer)?;
125 Ok(unsafe { self.assume_init() })
126 }
127}
128
129impl<'a, M: Flat + ?Sized, B: AsyncWriteBuffer + 'a> Deref for SendGuard<'a, M, B> {
130 type Target = M;
131 fn deref(&self) -> &M {
132 unsafe { M::from_bytes_unchecked(self.buffer) }
133 }
134}
135
136impl<'a, M: Flat + ?Sized, B: AsyncWriteBuffer + 'a> DerefMut for SendGuard<'a, M, B> {
137 fn deref_mut(&mut self) -> &mut M {
138 unsafe { M::from_mut_bytes_unchecked(self.buffer) }
139 }
140}