kithara_stream/dl/
response.rs1use std::{
2 pin::Pin,
3 task::{Context, Poll},
4};
5
6use bytes::Bytes;
7use futures::{Stream, StreamExt, stream};
8use kithara_net::{ByteStream, Headers, NetError};
9use kithara_platform::{
10 CancelGroup,
11 time::{Duration, sleep},
12 tokio,
13};
14
15#[cfg(not(target_arch = "wasm32"))]
21type InnerStream = Pin<Box<dyn Stream<Item = Result<Bytes, NetError>> + Send>>;
22#[cfg(target_arch = "wasm32")]
23type InnerStream = Pin<Box<dyn Stream<Item = Result<Bytes, NetError>>>>;
24
25pub struct FetchResponse {
28 pub body: BodyStream,
30 pub headers: Headers,
32}
33
34impl std::fmt::Debug for FetchResponse {
35 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36 f.debug_struct("FetchResponse")
37 .field("headers", &self.headers)
38 .finish_non_exhaustive()
39 }
40}
41
42pub struct BodyStream {
48 inner: InnerStream,
49}
50
51impl std::fmt::Debug for BodyStream {
52 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
53 f.debug_struct("BodyStream").finish_non_exhaustive()
54 }
55}
56
57impl BodyStream {
58 pub async fn collect(mut self) -> Result<Bytes, NetError> {
66 let mut buf = Vec::new();
67 while let Some(chunk) = self.next().await {
68 buf.extend_from_slice(&chunk?);
69 }
70 Ok(Bytes::from(buf))
71 }
72
73 pub(super) fn empty() -> Self {
75 Self {
76 inner: Box::pin(stream::empty()),
77 }
78 }
79
80 pub(super) fn wrap_http(
82 byte_stream: ByteStream,
83 cancel: CancelGroup,
84 chunk_timeout: Duration,
85 ) -> Self {
86 Self {
87 inner: wrap_with_cancel(byte_stream, cancel, chunk_timeout),
88 }
89 }
90
91 #[must_use]
93 pub fn wrap_raw(inner: InnerStream) -> Self {
94 Self { inner }
95 }
96
97 pub async fn write_all<W>(mut self, mut writer: W) -> Result<u64, NetError>
106 where
107 W: FnMut(&[u8]) -> std::io::Result<()>,
108 {
109 let mut total: u64 = 0;
110 while let Some(chunk) = self.next().await {
111 let data = chunk?;
112 writer(data.as_ref()).map_err(|e| NetError::Http(e.to_string()))?;
113 total += data.len() as u64;
114 }
115 Ok(total)
116 }
117}
118
119impl Stream for BodyStream {
120 type Item = Result<Bytes, NetError>;
121
122 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
123 self.get_mut().inner.as_mut().poll_next(cx)
124 }
125}
126
127struct WrapState {
129 stream: ByteStream,
130 cancel: CancelGroup,
131 timeout: Duration,
132 done: bool,
133}
134
135fn wrap_with_cancel(
137 byte_stream: ByteStream,
138 cancel: CancelGroup,
139 chunk_timeout: Duration,
140) -> InnerStream {
141 Box::pin(stream::unfold(
142 WrapState {
143 cancel,
144 stream: byte_stream,
145 timeout: chunk_timeout,
146 done: false,
147 },
148 |mut state| async {
149 if state.done {
150 return None;
151 }
152 let chunk = tokio::select! {
153 () = state.cancel.cancelled() => {
154 state.done = true;
155 return Some((Err(NetError::Cancelled), state));
156 }
157 c = state.stream.next() => c,
158 () = sleep(state.timeout) => None,
159 };
160 chunk.map(|item| (item, state))
161 },
162 ))
163}