use bytes::{Buf, BytesMut};
use log::{debug, trace};
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::fmt;
use std::io::{Cursor, Read, Write};
use thiserror::Error;
enum BsonDisplay<'a> {
DocRef(&'a bson::Document),
BsonRef(&'a bson::Bson),
}
#[cfg(feature = "debug-condensed")]
impl<'a> BsonDisplay<'a> {
fn as_document(&self) -> Option<&'a bson::Document> {
match *self {
Self::DocRef(doc) => Some(doc),
Self::BsonRef(bson) => bson.as_document(),
}
}
fn as_array(&self) -> Option<&'a Vec<bson::Bson>> {
match *self {
Self::DocRef(_) => None,
Self::BsonRef(bson) => bson.as_array(),
}
}
}
#[cfg(feature = "debug-condensed")]
impl<'a> fmt::Display for BsonDisplay<'a> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
if let Some(doc) = self.as_document() {
let mut first = true;
fmt.write_str("{")?;
for (key, value) in doc {
if first {
first = false;
fmt.write_str(" ")?;
} else {
fmt.write_str(", ")?;
}
write!(fmt, "\"{}\": {}", key, Self::BsonRef(value))?;
}
write!(fmt, "{}}}", if !first { " " } else { "" })
} else if let Some(arr) = self.as_array() {
write!(fmt, "[ <{} bytes> ]", arr.len())
} else {
match self {
Self::DocRef(doc) => fmt::Display::fmt(doc, fmt),
Self::BsonRef(bson) => fmt::Display::fmt(bson, fmt),
}
}
}
}
#[cfg(not(feature = "debug-condensed"))]
impl<'a> fmt::Display for BsonDisplay<'a> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
match self {
Self::DocRef(doc) => fmt::Display::fmt(doc, fmt),
Self::BsonRef(bson) => fmt::Display::fmt(bson, fmt),
}
}
}
#[derive(Error, Debug)]
pub enum BsonError {
#[error(transparent)]
Deserialize(#[from] bson::de::Error),
#[error(transparent)]
Serialization(#[from] bson::ser::Error),
#[error(transparent)]
Io(#[from] std::io::Error),
#[error("connection reset by peer")]
ConnectionResetByPeer,
}
type BsonResult<T> = Result<T, BsonError>;
pub struct BsonReader<R> {
source: R,
buffer: BytesMut,
}
impl<R: Read> BsonReader<R> {
pub fn new(source: R) -> BsonReader<R> {
BsonReader {
source,
buffer: BytesMut::new(),
}
}
pub fn read<T: DeserializeOwned + fmt::Debug>(&mut self) -> BsonResult<Option<T>> {
loop {
if let Some(frame) = self.parse_frame()? {
return Ok(Some(frame));
}
let n = self.read_next_chunk()?;
if n == 0 {
if self.buffer.is_empty() {
return Ok(None);
} else {
return Err(BsonError::ConnectionResetByPeer);
}
}
}
}
fn parse_frame<T: DeserializeOwned + fmt::Debug>(&mut self) -> BsonResult<Option<T>> {
if self.is_complete() {
let mut cursor = Cursor::new(&self.buffer[..]);
let bson = bson::from_reader(&mut cursor)?;
trace!(
"read doc {} bytes: {}",
cursor.position(),
&BsonDisplay::BsonRef(&bson)
);
let value = bson::from_bson(bson)?;
let pos = cursor.position();
self.buffer.advance(pos as usize);
debug!("read deserialized: {:?}", value);
Ok(Some(value))
} else {
Ok(None)
}
}
fn is_complete(&self) -> bool {
if self.buffer.remaining() >= 4 {
let mut slice = &self.buffer[..];
let len = slice.get_u32_le() as usize;
self.buffer.remaining() >= len
} else {
false
}
}
fn read_next_chunk(&mut self) -> BsonResult<usize> {
let mut buf = [0; 1024];
let n = self.source.read(&mut buf)?;
self.buffer.extend_from_slice(&buf[..n]);
Ok(n)
}
}
pub struct BsonWriter<W> {
target: W,
}
impl<W: Write> BsonWriter<W> {
pub fn new(target: W) -> BsonWriter<W> {
BsonWriter { target }
}
pub fn write<T: Serialize + fmt::Debug>(&mut self, value: T) -> BsonResult<()> {
debug!("write serialized: {:?}", value);
let doc = bson::to_document(&value)?;
trace!("write doc {}", BsonDisplay::DocRef(&doc));
doc.to_writer(&mut self.target)?;
self.target.flush()?;
Ok(())
}
}