use std::io;
use base64::{Engine, prelude::BASE64_STANDARD};
use bytes::{Buf, BytesMut};
use dencode::{Decoder, FramedRead};
use serde::de::DeserializeOwned;
use serde_json::Deserializer;
use crate::{
BUFLEN,
json::{Json2JsonInput, JsonError, JsonInput},
messages::Sbp,
};
pub fn iter_messages<R: io::Read>(input: R) -> impl Iterator<Item = Result<Sbp, JsonError>> {
JsonDecoder::framed(input)
}
pub fn iter_messages_from_fields<R: io::Read>(
input: R,
) -> impl Iterator<Item = Result<Sbp, JsonError>> {
Deserializer::from_reader(input)
.into_iter()
.map(|msg| msg.map_err(JsonError::SerdeJsonError))
}
pub fn iter_json2json_messages<R: io::Read>(
input: R,
) -> impl Iterator<Item = Result<Json2JsonInput, JsonError>> {
Json2JsonDecoder::framed(input)
}
#[cfg(feature = "async")]
pub fn stream_messages<R: futures::AsyncRead + Unpin>(
input: R,
) -> impl futures::Stream<Item = Result<Sbp, JsonError>> {
JsonDecoder::framed(input)
}
#[derive(Debug, Default)]
struct JsonDecoder {
payload_buf: Vec<u8>,
eof_seen: bool,
}
impl JsonDecoder {
fn new() -> Self {
JsonDecoder {
payload_buf: Vec::with_capacity(BUFLEN),
eof_seen: false,
}
}
fn framed<R>(input: R) -> FramedRead<R, Self> {
FramedRead::new(input, Self::new())
}
fn parse_json(&mut self, input: JsonInput) -> Result<Sbp, JsonError> {
let data = input.into_inner()?;
self.payload_buf.clear();
BASE64_STANDARD.decode_vec(&data.payload, &mut self.payload_buf)?;
Sbp::from_parts(
data.msg_type,
data.sender,
BytesMut::from(&self.payload_buf[..]),
)
.map_err(|e| {
eprintln!("unable to decode JsonInput {data:?}");
e.into()
})
}
}
impl Decoder for JsonDecoder {
type Item = Sbp;
type Error = JsonError;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
let value = match decode_one::<JsonInput>(&mut self.eof_seen, src)? {
Some(v) => v,
None => return Ok(None),
};
self.parse_json(value).map(Some)
}
}
#[derive(Debug)]
struct Json2JsonDecoder {
eof_seen: bool,
}
impl Json2JsonDecoder {
fn framed<R>(input: R) -> FramedRead<R, Self> {
FramedRead::new(input, Self { eof_seen: false })
}
}
impl Decoder for Json2JsonDecoder {
type Item = Json2JsonInput;
type Error = JsonError;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
decode_one::<Json2JsonInput>(&mut self.eof_seen, src).map_err(Into::into)
}
}
fn decode_one<T>(eof_seen: &mut bool, buf: &mut BytesMut) -> Result<Option<T>, serde_json::Error>
where
T: DeserializeOwned,
{
let mut de = Deserializer::from_slice(buf).into_iter::<T>();
let value = de.next();
let bytes_read = de.byte_offset();
buf.advance(bytes_read);
match value.transpose() {
Ok(v) => {
*eof_seen = false;
Ok(v)
}
Err(e) if e.is_eof() => {
if *eof_seen {
buf.advance(buf.len());
Err(e)
} else {
*eof_seen = true;
Ok(None)
}
}
Err(e) if e.is_data() => {
let mut de = Deserializer::from_slice(buf).into_iter::<serde_json::Value>();
let _ = de.next();
let bytes_read = de.byte_offset();
buf.advance(bytes_read);
Err(e)
}
Err(e) if e.is_syntax() => {
buf.advance(buf.len());
Err(e)
}
Err(e) => Err(e),
}
}