use std::{
pin::Pin,
task::{Context, Poll},
};
use bytes::Bytes;
use futures::{Stream, StreamExt, stream};
use kithara_net::{ByteStream, Headers, NetError};
use kithara_platform::{
CancelGroup,
time::{Duration, sleep},
tokio,
};
#[cfg(not(target_arch = "wasm32"))]
type InnerStream = Pin<Box<dyn Stream<Item = Result<Bytes, NetError>> + Send>>;
#[cfg(target_arch = "wasm32")]
type InnerStream = Pin<Box<dyn Stream<Item = Result<Bytes, NetError>>>>;
pub struct FetchResponse {
pub body: BodyStream,
pub headers: Headers,
}
impl std::fmt::Debug for FetchResponse {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("FetchResponse")
.field("headers", &self.headers)
.finish_non_exhaustive()
}
}
pub struct BodyStream {
inner: InnerStream,
}
impl std::fmt::Debug for BodyStream {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BodyStream").finish_non_exhaustive()
}
}
impl BodyStream {
pub async fn collect(mut self) -> Result<Bytes, NetError> {
let mut buf = Vec::new();
while let Some(chunk) = self.next().await {
buf.extend_from_slice(&chunk?);
}
Ok(Bytes::from(buf))
}
pub(super) fn empty() -> Self {
Self {
inner: Box::pin(stream::empty()),
}
}
pub(super) fn wrap_http(
byte_stream: ByteStream,
cancel: CancelGroup,
chunk_timeout: Duration,
) -> Self {
Self {
inner: wrap_with_cancel(byte_stream, cancel, chunk_timeout),
}
}
#[must_use]
pub fn wrap_raw(inner: InnerStream) -> Self {
Self { inner }
}
pub async fn write_all<W>(mut self, mut writer: W) -> Result<u64, NetError>
where
W: FnMut(&[u8]) -> std::io::Result<()>,
{
let mut total: u64 = 0;
while let Some(chunk) = self.next().await {
let data = chunk?;
writer(data.as_ref()).map_err(|e| NetError::Http(e.to_string()))?;
total += data.len() as u64;
}
Ok(total)
}
}
impl Stream for BodyStream {
type Item = Result<Bytes, NetError>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.get_mut().inner.as_mut().poll_next(cx)
}
}
struct WrapState {
stream: ByteStream,
cancel: CancelGroup,
timeout: Duration,
done: bool,
}
fn wrap_with_cancel(
byte_stream: ByteStream,
cancel: CancelGroup,
chunk_timeout: Duration,
) -> InnerStream {
Box::pin(stream::unfold(
WrapState {
cancel,
stream: byte_stream,
timeout: chunk_timeout,
done: false,
},
|mut state| async {
if state.done {
return None;
}
let chunk = tokio::select! {
() = state.cancel.cancelled() => {
state.done = true;
return Some((Err(NetError::Cancelled), state));
}
c = state.stream.next() => c,
() = sleep(state.timeout) => None,
};
chunk.map(|item| (item, state))
},
))
}