1use core::{future::poll_fn, pin::pin};
4
5use futures_core::stream::Stream;
6
7use crate::service::pipeline::PipelineE;
8
9pub async fn collect_body<B, T, E>(body: B) -> Result<Vec<u8>, E>
11where
12 B: Stream<Item = Result<T, E>>,
13 T: AsRef<[u8]>,
14{
15 let mut body = pin!(body);
16
17 let mut res = Vec::new();
18
19 while let Some(chunk) = poll_fn(|cx| body.as_mut().poll_next(cx)).await {
20 res.extend_from_slice(chunk?.as_ref());
21 }
22
23 Ok(res)
24}
25
26pub type CollectStringError<E> = PipelineE<std::string::FromUtf8Error, E>;
27
28pub async fn collect_string_body<B, T, E>(body: B) -> Result<String, CollectStringError<E>>
30where
31 B: Stream<Item = Result<T, E>>,
32 T: AsRef<[u8]>,
33{
34 let body = collect_body(body).await.map_err(CollectStringError::Second)?;
35 String::from_utf8(body).map_err(CollectStringError::First)
36}