#![cfg_attr(
not(feature = "no-async-fn-in-trait-feature"),
allow(incomplete_features)
)]
#![cfg_attr(
not(feature = "no-async-fn-in-trait-feature"),
feature(async_fn_in_trait)
)]
pub mod reader;
use async_stream::try_stream;
use bytes::Buf;
use futures_core::Stream;
pub type ErrorFor<Reader, Decoder, Buffer> =
Error<<Reader as self::Reader>::Error, <Decoder as streamdata::Decoder<Buffer>>::Error, Buffer>;
pub type ResultFor<Reader, Decoder, Buffer> =
Result<<Decoder as streamdata::Decoder<Buffer>>::Value, ErrorFor<Reader, Decoder, Buffer>>;
#[cfg_attr(feature = "async-trait", async_trait::async_trait)]
pub trait Reader {
type Data<'data>: bytes::Buf;
type Error;
async fn next(&mut self) -> Option<Result<Self::Data<'_>, Self::Error>>;
}
pub fn stream<Reader, Decoder, Buffer>(
mut reader: Reader,
mut state: streamdata::State<Decoder, Buffer>,
) -> impl Stream<Item = ResultFor<Reader, Decoder, Buffer>>
where
Reader: self::Reader,
Decoder: streamdata::Decoder<Buffer>,
Buffer: streamdata::Buffer,
{
try_stream! {
while let Some(data) = reader.next().await {
let data = data.map_err(Error::Reading)?;
let results = state.process_next_chunk(data.chunk());
for result in results {
let value = result.map_err(Error::Decoding)?;
yield value;
}
}
state.finish().map_err(|data| Error::UndecodedDataLeftUponCompletion { data })?;
}
}
#[derive(Debug, thiserror::Error)]
pub enum Error<ReaderError, DecoderError, Buffer> {
#[error("reading: {0}")]
Reading(#[source] ReaderError),
#[error("decoding: {0}")]
Decoding(#[source] DecoderError),
#[error("some data left in the buffer after the data was read completely")]
UndecodedDataLeftUponCompletion {
data: Buffer,
},
}