prevayler_rs/
serializer.rs1use async_std::io::Read;
6use async_std::stream::Stream;
7use std::error;
8use std::marker::Unpin;
9
10#[cfg(feature = "json_serializer")]
11pub use json_serializer::JsonSerializer;
12
13pub type SerializerResult<T> = Result<T, SerializerError>;
14
15pub trait Serializer<T> {
19 fn serialize(&self, data_to_serialize: &T) -> SerializerResult<Box<[u8]>>;
23
24 fn deserialize<'a, R: Read + Unpin + 'a>(
28 &self,
29 reader: R,
30 ) -> Box<dyn Stream<Item = SerializerResult<T>> + Unpin + 'a>
31 where
32 T: 'a;
33}
34
35#[derive(Debug)]
36pub enum SerializerError {
37 ErrorSerializing(Box<dyn error::Error + Send>),
38 ErrorDeserializing(Box<dyn error::Error + Send>),
39}
40
41#[cfg(feature = "json_serializer")]
42pub mod json_serializer {
43 use super::*;
46 use core::pin::Pin;
47 use futures::task::{Context, Poll};
48
49 pub struct JsonSerializer {}
53
54 impl JsonSerializer {
55 pub fn new() -> Self {
56 JsonSerializer {}
57 }
58 }
59
60 impl<T> Serializer<T> for JsonSerializer
61 where
62 T: serde::Serialize + serde::de::DeserializeOwned + Unpin,
63 {
64 fn serialize(&self, data_to_serialize: &T) -> SerializerResult<Box<[u8]>> {
65 let mut ser = serde_json::to_vec(&data_to_serialize)
66 .map_err(|err| SerializerError::ErrorSerializing(Box::new(err)))?;
67 ser.push(b'\n');
68 Ok(ser.into())
69 }
70
71 fn deserialize<'a, R: Read + Unpin + 'a>(
72 &self,
73 reader: R,
74 ) -> Box<dyn Stream<Item = SerializerResult<T>> + Unpin + 'a>
75 where
76 T: 'a,
77 {
78 Box::new(JsonDeserializerStream::new(reader))
79 }
80 }
81
82 struct JsonDeserializerStream<R, T> {
83 reader: R,
84 buf: Vec<u8>,
85 _p: std::marker::PhantomData<T>,
86 }
87
88 impl<R, T> JsonDeserializerStream<R, T>
89 where
90 R: Read,
91 {
92 fn new(reader: R) -> Self {
93 JsonDeserializerStream {
94 reader,
95 buf: Vec::new(),
96 _p: std::marker::PhantomData {},
97 }
98 }
99
100 fn process_buffer(&mut self) -> Option<SerializerResult<T>>
101 where
102 T: serde::de::DeserializeOwned,
103 {
104 for (index, byte) in self.buf.iter().enumerate() {
105 if *byte == b'\n' {
106 let data = serde_json::from_slice(&self.buf[..index])
107 .map_err(|err| SerializerError::ErrorDeserializing(Box::new(err)));
108 self.buf = self.buf.split_off(index + 1);
109 return Some(data);
110 }
111 }
112 return None;
113 }
114 }
115
116 impl<R, T> futures::stream::Stream for JsonDeserializerStream<R, T>
117 where
118 R: Read + Unpin,
119 T: serde::de::DeserializeOwned + Unpin,
120 {
121 type Item = SerializerResult<T>;
122
123 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
124 let this = &mut self.get_mut();
125 if let Some(data) = this.process_buffer() {
126 return Poll::Ready(Some(data));
127 }
128 loop {
129 let mut buf = [0; 1024];
130 return match Pin::new(&mut this.reader).poll_read(cx, &mut buf) {
131 Poll::Pending => Poll::Pending,
132 Poll::Ready(read_bytes) => {
133 let read_bytes = read_bytes.expect("Error");
134 if read_bytes == 0 {
135 return Poll::Ready(None);
136 }
137 this.buf.append(&mut Vec::from(&buf[..read_bytes]));
138 if let Some(data) = this.process_buffer() {
139 return Poll::Ready(Some(data));
140 }
141 continue;
142 }
143 };
144 }
145 }
146 }
147}