use alloc::boxed::Box;
use alloc::string::String;
use core::error::Error;
use core::fmt;
use core::pin::Pin;
pub type BoxError = Box<dyn Error + Send + Sync + 'static>;
#[derive(Debug, thiserror::Error, miette::Diagnostic)]
pub struct StreamError {
kind: StreamErrorKind,
#[source]
source: Option<BoxError>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum StreamErrorKind {
Transport,
Closed,
Protocol,
Decode,
Encode,
WrongMessageFormat,
}
impl StreamError {
pub fn new(kind: StreamErrorKind, source: Option<BoxError>) -> Self {
Self { kind, source }
}
pub fn kind(&self) -> &StreamErrorKind {
&self.kind
}
pub fn source(&self) -> Option<&BoxError> {
self.source.as_ref()
}
pub fn closed() -> Self {
Self {
kind: StreamErrorKind::Closed,
source: None,
}
}
pub fn transport(source: impl Error + Send + Sync + 'static) -> Self {
Self {
kind: StreamErrorKind::Transport,
source: Some(Box::new(source)),
}
}
pub fn protocol(msg: impl Into<String>) -> Self {
Self {
kind: StreamErrorKind::Protocol,
source: Some(msg.into().into()),
}
}
pub fn decode(source: impl Error + Send + Sync + 'static) -> Self {
Self {
kind: StreamErrorKind::Decode,
source: Some(Box::new(source)),
}
}
pub fn encode(source: impl Error + Send + Sync + 'static) -> Self {
Self {
kind: StreamErrorKind::Encode,
source: Some(Box::new(source)),
}
}
pub fn wrong_message_format(msg: impl Into<String>) -> Self {
Self {
kind: StreamErrorKind::WrongMessageFormat,
source: Some(msg.into().into()),
}
}
}
impl fmt::Display for StreamError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self.kind {
StreamErrorKind::Transport => write!(f, "Transport error"),
StreamErrorKind::Closed => write!(f, "Stream closed"),
StreamErrorKind::Protocol => write!(f, "Protocol error"),
StreamErrorKind::Decode => write!(f, "Decode error"),
StreamErrorKind::Encode => write!(f, "Encode error"),
StreamErrorKind::WrongMessageFormat => write!(f, "Wrong message format"),
}?;
if let Some(source) = &self.source {
write!(f, ": {}", source)?;
}
Ok(())
}
}
use bytes::Bytes;
#[cfg(not(target_arch = "wasm32"))]
type Boxed<T> = Pin<Box<dyn n0_future::Stream<Item = T> + Send>>;
#[cfg(target_arch = "wasm32")]
type Boxed<T> = Pin<Box<dyn n0_future::Stream<Item = T>>>;
pub struct ByteStream {
inner: Boxed<Result<Bytes, StreamError>>,
}
impl ByteStream {
#[cfg(not(target_arch = "wasm32"))]
pub fn new<S>(stream: S) -> Self
where
S: n0_future::Stream<Item = Result<Bytes, StreamError>> + Unpin + Send + 'static,
{
Self {
inner: Box::pin(stream),
}
}
#[cfg(target_arch = "wasm32")]
pub fn new<S>(stream: S) -> Self
where
S: n0_future::Stream<Item = Result<Bytes, StreamError>> + Unpin + 'static,
{
Self {
inner: Box::pin(stream),
}
}
pub fn is_empty(&self) -> bool {
false
}
pub fn into_inner(self) -> Boxed<Result<Bytes, StreamError>> {
self.inner
}
pub fn tee(self) -> (ByteStream, ByteStream) {
use futures::channel::mpsc;
use n0_future::StreamExt as _;
let (tx1, rx1) = mpsc::unbounded();
let (tx2, rx2) = mpsc::unbounded();
n0_future::task::spawn(async move {
let mut stream = self.inner;
while let Some(result) = stream.next().await {
match result {
Ok(chunk) => {
let chunk2 = chunk.clone();
let send1 = tx1.unbounded_send(Ok(chunk));
let send2 = tx2.unbounded_send(Ok(chunk2));
if send1.is_err() && send2.is_err() {
break;
}
}
Err(_e) => {
break;
}
}
}
});
(ByteStream::new(rx1), ByteStream::new(rx2))
}
}
impl fmt::Debug for ByteStream {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ByteStream").finish_non_exhaustive()
}
}
pub struct ByteSink {
inner: Pin<Box<dyn n0_future::Sink<Bytes, Error = StreamError>>>,
}
impl ByteSink {
pub fn new<S>(sink: S) -> Self
where
S: n0_future::Sink<Bytes, Error = StreamError> + 'static,
{
Self {
inner: Box::pin(sink),
}
}
pub fn into_inner(self) -> Pin<Box<dyn n0_future::Sink<Bytes, Error = StreamError>>> {
self.inner
}
}
impl fmt::Debug for ByteSink {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ByteSink").finish_non_exhaustive()
}
}
#[cfg(test)]
mod tests {
use super::*;
use bytes::Bytes;
#[test]
fn stream_error_carries_kind_and_source() {
let source = std::io::Error::new(std::io::ErrorKind::BrokenPipe, "pipe closed");
let err = StreamError::new(StreamErrorKind::Transport, Some(Box::new(source)));
assert_eq!(err.kind(), &StreamErrorKind::Transport);
assert!(err.source().is_some());
assert_eq!(format!("{}", err), "Transport error: pipe closed");
}
#[test]
fn stream_error_without_source() {
let err = StreamError::closed();
assert_eq!(err.kind(), &StreamErrorKind::Closed);
assert!(err.source().is_none());
}
#[tokio::test]
async fn byte_stream_can_be_created() {
use futures::stream;
let data = vec![Ok(Bytes::from("hello")), Ok(Bytes::from(" world"))];
let stream = stream::iter(data);
let byte_stream = ByteStream::new(stream);
assert!(!byte_stream.is_empty());
}
}