1use bytes::{Buf, BytesMut};
24use log::{debug, trace};
25use serde::de::DeserializeOwned;
26use serde::Serialize;
27use std::fmt;
28use std::io::{Cursor, Read, Write};
29use thiserror::Error;
30
31enum BsonDisplay<'a> {
32 DocRef(&'a bson::Document),
33 BsonRef(&'a bson::Bson),
34}
35
36#[cfg(feature = "debug-condensed")]
37impl<'a> BsonDisplay<'a> {
38 fn as_document(&self) -> Option<&'a bson::Document> {
39 match *self {
40 Self::DocRef(doc) => Some(doc),
41 Self::BsonRef(bson) => bson.as_document(),
42 }
43 }
44
45 fn as_array(&self) -> Option<&'a Vec<bson::Bson>> {
46 match *self {
47 Self::DocRef(_) => None,
48 Self::BsonRef(bson) => bson.as_array(),
49 }
50 }
51}
52
53#[cfg(feature = "debug-condensed")]
54impl<'a> fmt::Display for BsonDisplay<'a> {
55 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
56 if let Some(doc) = self.as_document() {
57 let mut first = true;
58
59 fmt.write_str("{")?;
60
61 for (key, value) in doc {
62 if first {
63 first = false;
64 fmt.write_str(" ")?;
65 } else {
66 fmt.write_str(", ")?;
67 }
68
69 write!(fmt, "\"{}\": {}", key, Self::BsonRef(value))?;
70 }
71
72 write!(fmt, "{}}}", if !first { " " } else { "" })
73 } else if let Some(arr) = self.as_array() {
74 write!(fmt, "[ <{} bytes> ]", arr.len())
75 } else {
76 match self {
77 Self::DocRef(doc) => fmt::Display::fmt(doc, fmt),
78 Self::BsonRef(bson) => fmt::Display::fmt(bson, fmt),
79 }
80 }
81 }
82}
83
84#[cfg(not(feature = "debug-condensed"))]
85impl<'a> fmt::Display for BsonDisplay<'a> {
86 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
87 match self {
88 Self::DocRef(doc) => fmt::Display::fmt(doc, fmt),
89 Self::BsonRef(bson) => fmt::Display::fmt(bson, fmt),
90 }
91 }
92}
93
94#[derive(Error, Debug)]
96pub enum BsonError {
97 #[error(transparent)]
99 Deserialize(#[from] bson::de::Error),
100
101 #[error(transparent)]
103 Serialization(#[from] bson::ser::Error),
104
105 #[error(transparent)]
107 Io(#[from] std::io::Error),
108
109 #[error("connection reset by peer")]
112 ConnectionResetByPeer,
113}
114
115type BsonResult<T> = Result<T, BsonError>;
116
117pub struct BsonReader<R> {
120 source: R,
121 buffer: BytesMut,
122}
123
124impl<R: Read> BsonReader<R> {
125 pub fn new(source: R) -> BsonReader<R> {
127 BsonReader {
128 source,
129 buffer: BytesMut::new(),
130 }
131 }
132
133 pub fn read<T: DeserializeOwned + fmt::Debug>(&mut self) -> BsonResult<Option<T>> {
141 loop {
142 if let Some(frame) = self.parse_frame()? {
143 return Ok(Some(frame));
144 }
145
146 let n = self.read_next_chunk()?;
147
148 if n == 0 {
149 if self.buffer.is_empty() {
150 return Ok(None);
152 } else {
153 return Err(BsonError::ConnectionResetByPeer);
156 }
157 }
158 }
159 }
160
161 fn parse_frame<T: DeserializeOwned + fmt::Debug>(&mut self) -> BsonResult<Option<T>> {
162 if self.is_complete() {
163 let mut cursor = Cursor::new(&self.buffer[..]);
164 let bson = bson::from_reader(&mut cursor)?;
165
166 trace!(
167 "read doc {} bytes: {}",
168 cursor.position(),
169 &BsonDisplay::BsonRef(&bson)
170 );
171
172 let value = bson::from_bson(bson)?;
173
174 let pos = cursor.position();
175 self.buffer.advance(pos as usize);
176
177 debug!("read deserialized: {:?}", value);
178
179 Ok(Some(value))
180 } else {
181 Ok(None)
182 }
183 }
184
185 fn is_complete(&self) -> bool {
186 if self.buffer.remaining() >= 4 {
187 let mut slice = &self.buffer[..];
188 let len = slice.get_u32_le() as usize;
189
190 self.buffer.remaining() >= len
191 } else {
192 false
193 }
194 }
195
196 fn read_next_chunk(&mut self) -> BsonResult<usize> {
197 let mut buf = [0; 1024];
198 let n = self.source.read(&mut buf)?;
199
200 self.buffer.extend_from_slice(&buf[..n]);
201
202 Ok(n)
203 }
204}
205
206pub struct BsonWriter<W> {
209 target: W,
210}
211
212impl<W: Write> BsonWriter<W> {
213 pub fn new(target: W) -> BsonWriter<W> {
215 BsonWriter { target }
216 }
217
218 pub fn write<T: Serialize + fmt::Debug>(&mut self, value: T) -> BsonResult<()> {
221 debug!("write serialized: {:?}", value);
222
223 let doc = bson::to_document(&value)?;
224 trace!("write doc {}", BsonDisplay::DocRef(&doc));
225
226 doc.to_writer(&mut self.target)?;
227 self.target.flush()?;
228
229 Ok(())
230 }
231}