dialectic_tokio_serde/
lib.rs

1//! This crate provides an interface to build a family of backend implementations using the
2//! [`serde`](serde) crate to transport [`Serialize`] and [`Deserialize`] values by reference across
3//! any [`AsyncRead`] and [`AsyncWrite`] transports. In order to use it, you will need to depend on
4//! some other crate which provides the definition of a serialization format; none such are defined
5//! here.
6//!
7//! To use this backend, select:
8//! - a particular serialization format, such as from the [`dialectic_tokio_serde_bincode`] or
9//!   [`dialectic_tokio_serde_json`] crates,
10//! - a particular [`codec`] for encoding and decoding frames, and
11//! - your choice of [`AsyncRead`] and [`AsyncWrite`] reader and writer.
12//!
13//! Then, use [`symmetrical`](symmetrical)([`_with_capacity`](symmetrical_with_capacity)) to
14//! construct a pair of [`Sender`] and [`Receiver`].
15//!
16//! If your outgoing and incoming streams are encoded or serialized differently, or your
17//! serialization or encoding format is not [`Clone`], use [`Sender::new`] and [`Receiver::new`]
18//! directly to construct each end of the connection.
19//!
20//! [`dialectic_tokio_serde_bincode`]: https://docs.rs/dialectic-tokio-serde-bincode
21//! [`dialectic_tokio_serde_json`]: https://docs.rs/dialectic-tokio-serde-json
22
23#![warn(missing_docs)]
24#![warn(missing_copy_implementations, missing_debug_implementations)]
25#![warn(unused_qualifications, unused_results)]
26#![warn(future_incompatible)]
27#![warn(unused)]
28// Documentation configuration
29#![forbid(broken_intra_doc_links)]
30
31use std::{future::Future, pin::Pin};
32
33use dialectic::{
34    backend::{self, By, Choice, Mut, Receive, Ref, Transmit, Val},
35    Chan,
36};
37use futures::sink::SinkExt;
38use futures::stream::StreamExt;
39use serde::{Deserialize, Serialize};
40use std::fmt::{Debug, Display};
41use tokio::io::{AsyncRead, AsyncWrite};
42use tokio_util::codec::{Decoder, Encoder, FramedRead, FramedWrite};
43
44mod error;
45pub use error::*;
46
47#[doc(no_inline)]
48pub use tokio_util::codec;
49
50/// The serialization end of a serialization format: an object which can serialize any [`Serialize`]
51/// value.
52///
53/// This trait *resembles* [`serde::Serializer`](serde::Serializer), but is not identical to
54/// it. Unlike [`serde::Serializer`](serde::Serializer), it defines the
55/// [`Output`](Serializer::Output) of a serializer, which should be something like `Bytes`,
56/// [`String`], or another output format.
57///
58/// Most [`serde::Serializer`](serde::Serializer)s can be easily given an instance of
59/// [`Serializer`]. When implementing this trait, you should pick the most specific output format
60/// for [`Output`](Serializer::Output). For instance, if you can serialize to a [`String`] or a
61/// [`Vec<u8>`](Vec), pick [`String`], because it implements [`AsRef<[u8]>`](AsRef).
62pub trait Serializer {
63    /// The type of errors during serialization.
64    type Error;
65
66    /// The output format for serialization (e.g. `Bytes`, `String`, etc.).
67    type Output;
68
69    /// Serialize a reference to any [`Serialize`] value.
70    fn serialize<T: Serialize>(&mut self, item: &T) -> Result<Self::Output, Self::Error>;
71}
72
73/// The deserialization end of a serialization format: an object which can deserialize to any
74/// non-lifetime-restricted [`Deserialize`] value.
75///
76/// This trait *resembles* [`serde::Deserializer`](serde::Deserializer), but is not identical
77/// to it. Unlike [`serde::Deserializer`](serde::Deserializer), it is parameterized by the
78/// `Input` to a deserializer, which might be something like `Bytes`, [`String`], or another input
79/// format.
80///
81/// Most [`serde::Deserializer`](serde::Deserializer)s can be easily given an instance of
82/// [`Deserializer`]. When implementing this trait, you should usually be as general as possible for
83/// the `Input` parameter. Consider whether you can implement it for all [`AsRef<[u8]>`](AsRef) or
84/// [`AsRef<str>`](AsRef) rather than a single concrete input format.
85pub trait Deserializer<Input> {
86    /// The type of errors during deserialization.
87    type Error;
88
89    /// Deserialize any [`Deserialize`] value from the input format (e.g. `Bytes`, `String`, etc.),
90    /// provided that the deserialized value can live forever.
91    fn deserialize<T: for<'a> Deserialize<'a>>(&mut self, src: &Input) -> Result<T, Self::Error>;
92}
93
94/// Create a [`Sender`]/[`Receiver`] pair which use the same serialization format and frame encoding
95/// in both directions.
96pub fn symmetrical<F, E, W, R>(
97    format: F,
98    encoding: E,
99    writer: W,
100    reader: R,
101) -> (Sender<F, E, W>, Receiver<F, E, R>)
102where
103    F: Serializer + Deserializer<<E as Decoder>::Item> + Clone,
104    E: Encoder<<F as Serializer>::Output> + Decoder + Clone,
105    W: AsyncWrite,
106    R: AsyncRead,
107{
108    (
109        Sender::new(format.clone(), encoding.clone(), writer),
110        Receiver::new(format, encoding, reader),
111    )
112}
113
114/// A [`Chan`] for the session type `S` and the environment `E`, using a symmetrical
115/// serialization/encoding and the [`AsyncWrite`]/[`AsyncRead`] pair `W`/`R` as transport.
116pub type SymmetricalChan<S, F, E, W, R> = Chan<S, Sender<F, E, W>, Receiver<F, E, R>>;
117
118/// Create a [`Sender`]/[`Receiver`] pair which use the same serialization format and frame encoding
119/// in both directions, allocating an initial capacity for the read buffer on the receiver.
120pub fn symmetrical_with_capacity<F, E, W, R>(
121    format: F,
122    encoding: E,
123    writer: W,
124    reader: R,
125    capacity: usize,
126) -> (Sender<F, E, W>, Receiver<F, E, R>)
127where
128    F: Serializer + Deserializer<<E as Decoder>::Item> + Clone,
129    E: Encoder<<F as Serializer>::Output> + Decoder + Clone,
130    W: AsyncWrite,
131    R: AsyncRead,
132{
133    (
134        Sender::new(format.clone(), encoding.clone(), writer),
135        Receiver::with_capacity(format, encoding, reader, capacity),
136    )
137}
138
139/// A `Sender<F, E, W>` is capable of sending any [`Serialize`] value using the serialization format
140/// `F` and the frame encoding `E` to the asynchronous writer `W`.
141#[derive(Debug)]
142pub struct Sender<F, E, W> {
143    serializer: F,
144    framed_write: FramedWrite<W, E>,
145}
146
147impl<F: Serializer, E: Encoder<F::Output>, W: AsyncWrite> Sender<F, E, W> {
148    /// Construct a new [`Sender`] given a [`Serializer`], [`Encoder`], and [`AsyncWrite`]r.
149    pub fn new(serializer: F, encoder: E, writer: W) -> Self {
150        Sender {
151            serializer,
152            framed_write: FramedWrite::new(writer, encoder),
153        }
154    }
155}
156
157impl<F, E, W> backend::Transmitter for Sender<F, E, W>
158where
159    F: Serializer + Unpin + Send,
160    F::Output: Send,
161    F::Error: Send,
162    E: Encoder<F::Output> + Send,
163    W: AsyncWrite + Unpin + Send,
164{
165    type Error = SendError<F, E>;
166
167    fn send_choice<'async_lifetime, const LENGTH: usize>(
168        &'async_lifetime mut self,
169        choice: Choice<LENGTH>,
170    ) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send + 'async_lifetime>> {
171        Box::pin(async move {
172            let serialized = self
173                .serializer
174                .serialize(&choice)
175                .map_err(SendError::Serialize)?;
176            self.framed_write
177                .send(serialized)
178                .await
179                .map_err(SendError::Encode)?;
180            Ok(())
181        })
182    }
183}
184
185impl<T, F, E, W> Transmit<T, Val> for Sender<F, E, W>
186where
187    T: Serialize + Send + 'static,
188    F: Serializer + Unpin + Send,
189    F::Output: Send,
190    F::Error: Send,
191    E: Encoder<F::Output> + Send,
192    W: AsyncWrite + Unpin + Send,
193{
194    fn send<'a, 'async_lifetime>(
195        &'async_lifetime mut self,
196        message: <T as By<'a, Val>>::Type,
197    ) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send + 'async_lifetime>>
198    where
199        'a: 'async_lifetime,
200    {
201        Box::pin(async move {
202            let serialized = self
203                .serializer
204                .serialize(&message)
205                .map_err(SendError::Serialize)?;
206            self.framed_write
207                .send(serialized)
208                .await
209                .map_err(SendError::Encode)?;
210            Ok(())
211        })
212    }
213}
214
215impl<T, F, E, W> Transmit<T, Ref> for Sender<F, E, W>
216where
217    T: Serialize + Sync,
218    F: Serializer + Unpin + Send,
219    F::Output: Send,
220    F::Error: Send,
221    E: Encoder<F::Output> + Send,
222    W: AsyncWrite + Unpin + Send,
223{
224    fn send<'a, 'async_lifetime>(
225        &'async_lifetime mut self,
226        message: <T as By<'a, Ref>>::Type,
227    ) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send + 'async_lifetime>>
228    where
229        'a: 'async_lifetime,
230    {
231        Box::pin(async move {
232            let serialized = self
233                .serializer
234                .serialize(message)
235                .map_err(SendError::Serialize)?;
236            self.framed_write
237                .send(serialized)
238                .await
239                .map_err(SendError::Encode)?;
240            Ok(())
241        })
242    }
243}
244
245impl<T, F, E, W> Transmit<T, Mut> for Sender<F, E, W>
246where
247    T: Serialize + Sync,
248    F: Serializer + Unpin + Send,
249    F::Output: Send,
250    F::Error: Send,
251    E: Encoder<F::Output> + Send,
252    W: AsyncWrite + Unpin + Send,
253{
254    fn send<'a, 'async_lifetime>(
255        &'async_lifetime mut self,
256        message: <T as By<'a, Mut>>::Type,
257    ) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send + 'async_lifetime>>
258    where
259        'a: 'async_lifetime,
260    {
261        <Self as Transmit<T, Ref>>::send(self, &*message)
262    }
263}
264
265/// A `Receiver<F, D, R>` is capable of receiving any [`Deserialize`] value using the serialization
266/// format `F` and the frame decoding `D`, from the asynchronous reader `R`.
267#[derive(Debug)]
268pub struct Receiver<F, D, R> {
269    deserializer: F,
270    framed_read: FramedRead<R, D>,
271}
272
273impl<F: Deserializer<D::Item>, D: Decoder, R: AsyncRead> Receiver<F, D, R> {
274    /// Construct a new [`Receiver`] given a [`Deserializer`], [`Decoder`], and [`AsyncRead`]er.
275    pub fn new(deserializer: F, decoder: D, reader: R) -> Self {
276        Receiver {
277            deserializer,
278            framed_read: FramedRead::new(reader, decoder),
279        }
280    }
281
282    /// Construct a new [`Receiver`] given a [`Deserializer`], [`Decoder`], and [`AsyncRead`]er,
283    /// with a given initial buffer capacity.
284    pub fn with_capacity(deserializer: F, decoder: D, reader: R, capacity: usize) -> Self {
285        Receiver {
286            deserializer,
287            framed_read: FramedRead::with_capacity(reader, decoder, capacity),
288        }
289    }
290}
291
292impl<F, D, R> backend::Receiver for Receiver<F, D, R>
293where
294    F: Deserializer<D::Item> + Unpin + Send,
295    D: Decoder + Send,
296    R: AsyncRead + Unpin + Send,
297{
298    type Error = RecvError<F, D>;
299
300    fn recv_choice<'async_lifetime, const LENGTH: usize>(
301        &'async_lifetime mut self,
302    ) -> Pin<Box<dyn Future<Output = Result<Choice<LENGTH>, Self::Error>> + Send + 'async_lifetime>>
303    {
304        <Self as backend::Receive<Choice<LENGTH>>>::recv(self)
305    }
306}
307
308impl<T, F, D, R> Receive<T> for Receiver<F, D, R>
309where
310    T: for<'a> Deserialize<'a>,
311    F: Deserializer<D::Item> + Unpin + Send,
312    D: Decoder + Send,
313    R: AsyncRead + Unpin + Send,
314{
315    fn recv<'async_lifetime>(
316        &'async_lifetime mut self,
317    ) -> Pin<Box<dyn Future<Output = Result<T, Self::Error>> + Send + 'async_lifetime>> {
318        Box::pin(async move {
319            let unframed = self
320                .framed_read
321                .next()
322                .await
323                .ok_or(RecvError::Closed)?
324                .map_err(RecvError::Decode)?;
325            self.deserializer
326                .deserialize(&unframed)
327                .map_err(RecvError::Deserialize)
328        })
329    }
330}