use bytes::Bytes;
use futures::{
channel::mpsc::{channel, Sender},
StreamExt,
};
use http_body_util::{BodyDataStream, BodyExt};
use wasip3::{
http::types::ErrorCode,
http_compat::{IncomingBody, IncomingMessage},
};
#[allow(async_fn_in_trait)]
pub trait IncomingBodyExt {
fn stream(self) -> BodyDataStream<Self>
where
Self: Sized;
async fn bytes(self) -> Result<Bytes, ErrorCode>;
}
impl<T: IncomingMessage> IncomingBodyExt for IncomingBody<T> {
fn stream(self) -> BodyDataStream<Self>
where
Self: Sized,
{
BodyDataStream::new(self)
}
async fn bytes(self) -> Result<Bytes, ErrorCode> {
self.collect().await.map(|c| c.to_bytes())
}
}
pub fn stream<T: Into<Bytes>>() -> (
Sender<T>,
impl http_body::Body<Data = Bytes, Error = anyhow::Error>,
) {
stream_any::<T>(|t| t.into())
}
pub fn stream_any<T>(
f: impl Fn(T) -> Bytes,
) -> (
Sender<T>,
impl http_body::Body<Data = Bytes, Error = anyhow::Error>,
) {
let (tx, rx) = channel::<T>(1024);
let stm = rx.map(move |value| Ok(http_body::Frame::data(f(value))));
(tx, http_body_util::StreamBody::new(stm))
}