prevayler_rs/
serializer.rs

1//! This module handles serialization and deserialization of the redolog and snapshots
2//!
3//! As a default, it provides a [`JsonSerializer`] that does serialization in json format for anyone that implements the serde serialization traits. But you can disable it and use whaetever format that you want.
4
5use async_std::io::Read;
6use async_std::stream::Stream;
7use std::error;
8use std::marker::Unpin;
9
10#[cfg(feature = "json_serializer")]
11pub use json_serializer::JsonSerializer;
12
13pub type SerializerResult<T> = Result<T, SerializerError>;
14
15/// Serializer trait
16///
17/// This methods defines how to serialize and deserialize transactions and the snapshoted data. You can implement it to have your logs in whaetever format that you like.
18pub trait Serializer<T> {
19    /// Serialize method. It receives a reference to a data and it should return a boxed byte array with the serialized result.
20    ///
21    /// The redolog will consist of appending multiple calls to this method. The snapshot will be a single file with the result of one call to this method.
22    fn serialize(&self, data_to_serialize: &T) -> SerializerResult<Box<[u8]>>;
23
24    /// Deserialize method. It receives a reader to the redolog (or snapshot), and it should return a stream of deserialized data
25    ///
26    /// In the case of the redolog, it will execute all transactions returned in the stream. If it is a snaphot, only the first item will be used.
27    fn deserialize<'a, R: Read + Unpin + 'a>(
28        &self,
29        reader: R,
30    ) -> Box<dyn Stream<Item = SerializerResult<T>> + Unpin + 'a>
31    where
32        T: 'a;
33}
34
35#[derive(Debug)]
36pub enum SerializerError {
37    ErrorSerializing(Box<dyn error::Error + Send>),
38    ErrorDeserializing(Box<dyn error::Error + Send>),
39}
40
41#[cfg(feature = "json_serializer")]
42pub mod json_serializer {
43    //! Json serializer. Only available if using "json_serializer" or "default" features.
44
45    use super::*;
46    use core::pin::Pin;
47    use futures::task::{Context, Poll};
48
49    /// Implementation of [`Serializer`] for the Json format.
50    ///
51    /// This implements the [`Serializer`] trait for every data that also implements serde [Serialize](serde::Serialize) and [Deserialize](serde::de::DeserializeOwned).
52    pub struct JsonSerializer {}
53
54    impl JsonSerializer {
55        pub fn new() -> Self {
56            JsonSerializer {}
57        }
58    }
59
60    impl<T> Serializer<T> for JsonSerializer
61    where
62        T: serde::Serialize + serde::de::DeserializeOwned + Unpin,
63    {
64        fn serialize(&self, data_to_serialize: &T) -> SerializerResult<Box<[u8]>> {
65            let mut ser = serde_json::to_vec(&data_to_serialize)
66                .map_err(|err| SerializerError::ErrorSerializing(Box::new(err)))?;
67            ser.push(b'\n');
68            Ok(ser.into())
69        }
70
71        fn deserialize<'a, R: Read + Unpin + 'a>(
72            &self,
73            reader: R,
74        ) -> Box<dyn Stream<Item = SerializerResult<T>> + Unpin + 'a>
75        where
76            T: 'a,
77        {
78            Box::new(JsonDeserializerStream::new(reader))
79        }
80    }
81
82    struct JsonDeserializerStream<R, T> {
83        reader: R,
84        buf: Vec<u8>,
85        _p: std::marker::PhantomData<T>,
86    }
87
88    impl<R, T> JsonDeserializerStream<R, T>
89    where
90        R: Read,
91    {
92        fn new(reader: R) -> Self {
93            JsonDeserializerStream {
94                reader,
95                buf: Vec::new(),
96                _p: std::marker::PhantomData {},
97            }
98        }
99
100        fn process_buffer(&mut self) -> Option<SerializerResult<T>>
101        where
102            T: serde::de::DeserializeOwned,
103        {
104            for (index, byte) in self.buf.iter().enumerate() {
105                if *byte == b'\n' {
106                    let data = serde_json::from_slice(&self.buf[..index])
107                        .map_err(|err| SerializerError::ErrorDeserializing(Box::new(err)));
108                    self.buf = self.buf.split_off(index + 1);
109                    return Some(data);
110                }
111            }
112            return None;
113        }
114    }
115
116    impl<R, T> futures::stream::Stream for JsonDeserializerStream<R, T>
117    where
118        R: Read + Unpin,
119        T: serde::de::DeserializeOwned + Unpin,
120    {
121        type Item = SerializerResult<T>;
122
123        fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
124            let this = &mut self.get_mut();
125            if let Some(data) = this.process_buffer() {
126                return Poll::Ready(Some(data));
127            }
128            loop {
129                let mut buf = [0; 1024];
130                return match Pin::new(&mut this.reader).poll_read(cx, &mut buf) {
131                    Poll::Pending => Poll::Pending,
132                    Poll::Ready(read_bytes) => {
133                        let read_bytes = read_bytes.expect("Error");
134                        if read_bytes == 0 {
135                            return Poll::Ready(None);
136                        }
137                        this.buf.append(&mut Vec::from(&buf[..read_bytes]));
138                        if let Some(data) = this.process_buffer() {
139                            return Poll::Ready(Some(data));
140                        }
141                        continue;
142                    }
143                };
144            }
145        }
146    }
147}