nuts_tool_api/
bson.rs

1// MIT License
2//
3// Copyright (c) 2024 Robin Doer
4//
5// Permission is hereby granted, free of charge, to any person obtaining a copy
6// of this software and associated documentation files (the "Software"), to
7// deal in the Software without restriction, including without limitation the
8// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
9// sell copies of the Software, and to permit persons to whom the Software is
10// furnished to do so, subject to the following conditions:
11//
12// The above copyright notice and this permission notice shall be included in
13// all copies or substantial portions of the Software.
14//
15// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
21// IN THE SOFTWARE.
22
23use bytes::{Buf, BytesMut};
24use log::{debug, trace};
25use serde::de::DeserializeOwned;
26use serde::Serialize;
27use std::fmt;
28use std::io::{Cursor, Read, Write};
29use thiserror::Error;
30
31enum BsonDisplay<'a> {
32    DocRef(&'a bson::Document),
33    BsonRef(&'a bson::Bson),
34}
35
36#[cfg(feature = "debug-condensed")]
37impl<'a> BsonDisplay<'a> {
38    fn as_document(&self) -> Option<&'a bson::Document> {
39        match *self {
40            Self::DocRef(doc) => Some(doc),
41            Self::BsonRef(bson) => bson.as_document(),
42        }
43    }
44
45    fn as_array(&self) -> Option<&'a Vec<bson::Bson>> {
46        match *self {
47            Self::DocRef(_) => None,
48            Self::BsonRef(bson) => bson.as_array(),
49        }
50    }
51}
52
53#[cfg(feature = "debug-condensed")]
54impl<'a> fmt::Display for BsonDisplay<'a> {
55    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
56        if let Some(doc) = self.as_document() {
57            let mut first = true;
58
59            fmt.write_str("{")?;
60
61            for (key, value) in doc {
62                if first {
63                    first = false;
64                    fmt.write_str(" ")?;
65                } else {
66                    fmt.write_str(", ")?;
67                }
68
69                write!(fmt, "\"{}\": {}", key, Self::BsonRef(value))?;
70            }
71
72            write!(fmt, "{}}}", if !first { " " } else { "" })
73        } else if let Some(arr) = self.as_array() {
74            write!(fmt, "[ <{} bytes> ]", arr.len())
75        } else {
76            match self {
77                Self::DocRef(doc) => fmt::Display::fmt(doc, fmt),
78                Self::BsonRef(bson) => fmt::Display::fmt(bson, fmt),
79            }
80        }
81    }
82}
83
84#[cfg(not(feature = "debug-condensed"))]
85impl<'a> fmt::Display for BsonDisplay<'a> {
86    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
87        match self {
88            Self::DocRef(doc) => fmt::Display::fmt(doc, fmt),
89            Self::BsonRef(bson) => fmt::Display::fmt(bson, fmt),
90        }
91    }
92}
93
94/// Error type used by the [`BsonReader`] and [`BsonWriter`] utilities.
95#[derive(Error, Debug)]
96pub enum BsonError {
97    /// An error occured while deserializing BSON data.
98    #[error(transparent)]
99    Deserialize(#[from] bson::de::Error),
100
101    /// An error occured while serializing BSON data.
102    #[error(transparent)]
103    Serialization(#[from] bson::ser::Error),
104
105    /// An IO-error occured.
106    #[error(transparent)]
107    Io(#[from] std::io::Error),
108
109    /// The connection to the peer is closed but there are still buffered data.
110    /// This means that the peer closed the connection while sending a frame.
111    #[error("connection reset by peer")]
112    ConnectionResetByPeer,
113}
114
115type BsonResult<T> = Result<T, BsonError>;
116
117/// A utility to read [BSON](bson) encoded data from an arbitrary
118/// [source](Read).
119pub struct BsonReader<R> {
120    source: R,
121    buffer: BytesMut,
122}
123
124impl<R: Read> BsonReader<R> {
125    /// Creates a new `BsonReader` instance, which reads from the given`source`.
126    pub fn new(source: R) -> BsonReader<R> {
127        BsonReader {
128            source,
129            buffer: BytesMut::new(),
130        }
131    }
132
133    /// Reads a [BSON](bson) document from the attached source and deserialize
134    /// it into the given type `T`.
135    ///
136    /// It reads data from the attached source until another BSON document is
137    /// available. Next, the document is serialized into `T` and returned into
138    /// a wrapped [`Some`] value. Returns [`None`] if the peer closes the
139    /// connection.
140    pub fn read<T: DeserializeOwned + fmt::Debug>(&mut self) -> BsonResult<Option<T>> {
141        loop {
142            if let Some(frame) = self.parse_frame()? {
143                return Ok(Some(frame));
144            }
145
146            let n = self.read_next_chunk()?;
147
148            if n == 0 {
149                if self.buffer.is_empty() {
150                    // No buffered data, clean shutdown.
151                    return Ok(None);
152                } else {
153                    // There are still buffered data. This means that the peer
154                    // closed the connection while sending a frame.
155                    return Err(BsonError::ConnectionResetByPeer);
156                }
157            }
158        }
159    }
160
161    fn parse_frame<T: DeserializeOwned + fmt::Debug>(&mut self) -> BsonResult<Option<T>> {
162        if self.is_complete() {
163            let mut cursor = Cursor::new(&self.buffer[..]);
164            let bson = bson::from_reader(&mut cursor)?;
165
166            trace!(
167                "read doc {} bytes: {}",
168                cursor.position(),
169                &BsonDisplay::BsonRef(&bson)
170            );
171
172            let value = bson::from_bson(bson)?;
173
174            let pos = cursor.position();
175            self.buffer.advance(pos as usize);
176
177            debug!("read deserialized: {:?}", value);
178
179            Ok(Some(value))
180        } else {
181            Ok(None)
182        }
183    }
184
185    fn is_complete(&self) -> bool {
186        if self.buffer.remaining() >= 4 {
187            let mut slice = &self.buffer[..];
188            let len = slice.get_u32_le() as usize;
189
190            self.buffer.remaining() >= len
191        } else {
192            false
193        }
194    }
195
196    fn read_next_chunk(&mut self) -> BsonResult<usize> {
197        let mut buf = [0; 1024];
198        let n = self.source.read(&mut buf)?;
199
200        self.buffer.extend_from_slice(&buf[..n]);
201
202        Ok(n)
203    }
204}
205
206/// A utility to write [BSON](bson) encoded data into an arbitrary
207/// [target](Write).
208pub struct BsonWriter<W> {
209    target: W,
210}
211
212impl<W: Write> BsonWriter<W> {
213    /// Creates a new `BsonWriter` instance which writes into the given `target`.
214    pub fn new(target: W) -> BsonWriter<W> {
215        BsonWriter { target }
216    }
217
218    /// Deserializes the given `value` into a [BSON](bson) document and writes
219    /// it into the attached target.
220    pub fn write<T: Serialize + fmt::Debug>(&mut self, value: T) -> BsonResult<()> {
221        debug!("write serialized: {:?}", value);
222
223        let doc = bson::to_document(&value)?;
224        trace!("write doc {}", BsonDisplay::DocRef(&doc));
225
226        doc.to_writer(&mut self.target)?;
227        self.target.flush()?;
228
229        Ok(())
230    }
231}