use std::cmp::Ordering;
use futures::Stream;
use futures::StreamExt;
use oio::Read;
use crate::raw::*;
use crate::*;
pub struct HttpBody {
#[cfg(not(target_arch = "wasm32"))]
stream: Box<dyn Stream<Item = Result<Buffer>> + Send + Sync + Unpin + 'static>,
#[cfg(target_arch = "wasm32")]
stream: Box<dyn Stream<Item = Result<Buffer>> + Unpin + 'static>,
size: Option<u64>,
consumed: u64,
}
unsafe impl Send for HttpBody {}
unsafe impl Sync for HttpBody {}
impl HttpBody {
#[cfg(not(target_arch = "wasm32"))]
pub fn new<S>(stream: S, size: Option<u64>) -> Self
where
S: Stream<Item = Result<Buffer>> + Send + Sync + Unpin + 'static,
{
HttpBody {
stream: Box::new(stream),
size,
consumed: 0,
}
}
#[cfg(target_arch = "wasm32")]
pub fn new<S>(stream: S, size: Option<u64>) -> Self
where
S: Stream<Item = Result<Buffer>> + Unpin + 'static,
{
HttpBody {
stream: Box::new(stream),
size,
consumed: 0,
}
}
#[cfg(not(target_arch = "wasm32"))]
pub fn map_inner(
mut self,
f: impl FnOnce(
Box<dyn Stream<Item = Result<Buffer>> + Send + Sync + Unpin + 'static>,
)
-> Box<dyn Stream<Item = Result<Buffer>> + Send + Sync + Unpin + 'static>,
) -> Self {
self.stream = f(self.stream);
self
}
#[cfg(target_arch = "wasm32")]
pub fn map_inner(
mut self,
f: impl FnOnce(
Box<dyn Stream<Item = Result<Buffer>> + Unpin + 'static>,
) -> Box<dyn Stream<Item = Result<Buffer>> + Unpin + 'static>,
) -> Self {
self.stream = f(self.stream);
self
}
#[inline]
fn check(&self) -> Result<()> {
let Some(expect) = self.size else {
return Ok(());
};
let actual = self.consumed;
match actual.cmp(&expect) {
Ordering::Equal => Ok(()),
Ordering::Less => Err(Error::new(
ErrorKind::Unexpected,
format!("http response got too little data, expect: {expect}, actual: {actual}"),
)
.set_temporary()),
Ordering::Greater => Err(Error::new(
ErrorKind::Unexpected,
format!("http response got too much data, expect: {expect}, actual: {actual}"),
)
.set_temporary()),
}
}
pub async fn to_buffer(&mut self) -> Result<Buffer> {
self.read_all().await
}
}
impl oio::Read for HttpBody {
async fn read(&mut self) -> Result<Buffer> {
match self.stream.next().await.transpose()? {
Some(buf) => {
self.consumed += buf.len() as u64;
Ok(buf)
}
None => {
self.check()?;
Ok(Buffer::new())
}
}
}
}