1#![warn(missing_docs)]
24#![warn(missing_copy_implementations, missing_debug_implementations)]
25#![warn(unused_qualifications, unused_results)]
26#![warn(future_incompatible)]
27#![warn(unused)]
28#![forbid(broken_intra_doc_links)]
30
31use std::{future::Future, pin::Pin};
32
33use dialectic::{
34 backend::{self, By, Choice, Mut, Receive, Ref, Transmit, Val},
35 Chan,
36};
37use futures::sink::SinkExt;
38use futures::stream::StreamExt;
39use serde::{Deserialize, Serialize};
40use std::fmt::{Debug, Display};
41use tokio::io::{AsyncRead, AsyncWrite};
42use tokio_util::codec::{Decoder, Encoder, FramedRead, FramedWrite};
43
44mod error;
45pub use error::*;
46
47#[doc(no_inline)]
48pub use tokio_util::codec;
49
50pub trait Serializer {
63 type Error;
65
66 type Output;
68
69 fn serialize<T: Serialize>(&mut self, item: &T) -> Result<Self::Output, Self::Error>;
71}
72
73pub trait Deserializer<Input> {
86 type Error;
88
89 fn deserialize<T: for<'a> Deserialize<'a>>(&mut self, src: &Input) -> Result<T, Self::Error>;
92}
93
94pub fn symmetrical<F, E, W, R>(
97 format: F,
98 encoding: E,
99 writer: W,
100 reader: R,
101) -> (Sender<F, E, W>, Receiver<F, E, R>)
102where
103 F: Serializer + Deserializer<<E as Decoder>::Item> + Clone,
104 E: Encoder<<F as Serializer>::Output> + Decoder + Clone,
105 W: AsyncWrite,
106 R: AsyncRead,
107{
108 (
109 Sender::new(format.clone(), encoding.clone(), writer),
110 Receiver::new(format, encoding, reader),
111 )
112}
113
114pub type SymmetricalChan<S, F, E, W, R> = Chan<S, Sender<F, E, W>, Receiver<F, E, R>>;
117
118pub fn symmetrical_with_capacity<F, E, W, R>(
121 format: F,
122 encoding: E,
123 writer: W,
124 reader: R,
125 capacity: usize,
126) -> (Sender<F, E, W>, Receiver<F, E, R>)
127where
128 F: Serializer + Deserializer<<E as Decoder>::Item> + Clone,
129 E: Encoder<<F as Serializer>::Output> + Decoder + Clone,
130 W: AsyncWrite,
131 R: AsyncRead,
132{
133 (
134 Sender::new(format.clone(), encoding.clone(), writer),
135 Receiver::with_capacity(format, encoding, reader, capacity),
136 )
137}
138
139#[derive(Debug)]
142pub struct Sender<F, E, W> {
143 serializer: F,
144 framed_write: FramedWrite<W, E>,
145}
146
147impl<F: Serializer, E: Encoder<F::Output>, W: AsyncWrite> Sender<F, E, W> {
148 pub fn new(serializer: F, encoder: E, writer: W) -> Self {
150 Sender {
151 serializer,
152 framed_write: FramedWrite::new(writer, encoder),
153 }
154 }
155}
156
157impl<F, E, W> backend::Transmitter for Sender<F, E, W>
158where
159 F: Serializer + Unpin + Send,
160 F::Output: Send,
161 F::Error: Send,
162 E: Encoder<F::Output> + Send,
163 W: AsyncWrite + Unpin + Send,
164{
165 type Error = SendError<F, E>;
166
167 fn send_choice<'async_lifetime, const LENGTH: usize>(
168 &'async_lifetime mut self,
169 choice: Choice<LENGTH>,
170 ) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send + 'async_lifetime>> {
171 Box::pin(async move {
172 let serialized = self
173 .serializer
174 .serialize(&choice)
175 .map_err(SendError::Serialize)?;
176 self.framed_write
177 .send(serialized)
178 .await
179 .map_err(SendError::Encode)?;
180 Ok(())
181 })
182 }
183}
184
185impl<T, F, E, W> Transmit<T, Val> for Sender<F, E, W>
186where
187 T: Serialize + Send + 'static,
188 F: Serializer + Unpin + Send,
189 F::Output: Send,
190 F::Error: Send,
191 E: Encoder<F::Output> + Send,
192 W: AsyncWrite + Unpin + Send,
193{
194 fn send<'a, 'async_lifetime>(
195 &'async_lifetime mut self,
196 message: <T as By<'a, Val>>::Type,
197 ) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send + 'async_lifetime>>
198 where
199 'a: 'async_lifetime,
200 {
201 Box::pin(async move {
202 let serialized = self
203 .serializer
204 .serialize(&message)
205 .map_err(SendError::Serialize)?;
206 self.framed_write
207 .send(serialized)
208 .await
209 .map_err(SendError::Encode)?;
210 Ok(())
211 })
212 }
213}
214
215impl<T, F, E, W> Transmit<T, Ref> for Sender<F, E, W>
216where
217 T: Serialize + Sync,
218 F: Serializer + Unpin + Send,
219 F::Output: Send,
220 F::Error: Send,
221 E: Encoder<F::Output> + Send,
222 W: AsyncWrite + Unpin + Send,
223{
224 fn send<'a, 'async_lifetime>(
225 &'async_lifetime mut self,
226 message: <T as By<'a, Ref>>::Type,
227 ) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send + 'async_lifetime>>
228 where
229 'a: 'async_lifetime,
230 {
231 Box::pin(async move {
232 let serialized = self
233 .serializer
234 .serialize(message)
235 .map_err(SendError::Serialize)?;
236 self.framed_write
237 .send(serialized)
238 .await
239 .map_err(SendError::Encode)?;
240 Ok(())
241 })
242 }
243}
244
245impl<T, F, E, W> Transmit<T, Mut> for Sender<F, E, W>
246where
247 T: Serialize + Sync,
248 F: Serializer + Unpin + Send,
249 F::Output: Send,
250 F::Error: Send,
251 E: Encoder<F::Output> + Send,
252 W: AsyncWrite + Unpin + Send,
253{
254 fn send<'a, 'async_lifetime>(
255 &'async_lifetime mut self,
256 message: <T as By<'a, Mut>>::Type,
257 ) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send + 'async_lifetime>>
258 where
259 'a: 'async_lifetime,
260 {
261 <Self as Transmit<T, Ref>>::send(self, &*message)
262 }
263}
264
265#[derive(Debug)]
268pub struct Receiver<F, D, R> {
269 deserializer: F,
270 framed_read: FramedRead<R, D>,
271}
272
273impl<F: Deserializer<D::Item>, D: Decoder, R: AsyncRead> Receiver<F, D, R> {
274 pub fn new(deserializer: F, decoder: D, reader: R) -> Self {
276 Receiver {
277 deserializer,
278 framed_read: FramedRead::new(reader, decoder),
279 }
280 }
281
282 pub fn with_capacity(deserializer: F, decoder: D, reader: R, capacity: usize) -> Self {
285 Receiver {
286 deserializer,
287 framed_read: FramedRead::with_capacity(reader, decoder, capacity),
288 }
289 }
290}
291
292impl<F, D, R> backend::Receiver for Receiver<F, D, R>
293where
294 F: Deserializer<D::Item> + Unpin + Send,
295 D: Decoder + Send,
296 R: AsyncRead + Unpin + Send,
297{
298 type Error = RecvError<F, D>;
299
300 fn recv_choice<'async_lifetime, const LENGTH: usize>(
301 &'async_lifetime mut self,
302 ) -> Pin<Box<dyn Future<Output = Result<Choice<LENGTH>, Self::Error>> + Send + 'async_lifetime>>
303 {
304 <Self as backend::Receive<Choice<LENGTH>>>::recv(self)
305 }
306}
307
308impl<T, F, D, R> Receive<T> for Receiver<F, D, R>
309where
310 T: for<'a> Deserialize<'a>,
311 F: Deserializer<D::Item> + Unpin + Send,
312 D: Decoder + Send,
313 R: AsyncRead + Unpin + Send,
314{
315 fn recv<'async_lifetime>(
316 &'async_lifetime mut self,
317 ) -> Pin<Box<dyn Future<Output = Result<T, Self::Error>> + Send + 'async_lifetime>> {
318 Box::pin(async move {
319 let unframed = self
320 .framed_read
321 .next()
322 .await
323 .ok_or(RecvError::Closed)?
324 .map_err(RecvError::Decode)?;
325 self.deserializer
326 .deserialize(&unframed)
327 .map_err(RecvError::Deserialize)
328 })
329 }
330}