Skip to main content

kithara_stream/dl/
response.rs

1use std::{
2    pin::Pin,
3    task::{Context, Poll},
4};
5
6use bytes::Bytes;
7use futures::{Stream, StreamExt, stream};
8use kithara_net::{ByteStream, Headers, NetError};
9use kithara_platform::{
10    CancelGroup,
11    time::{Duration, sleep},
12    tokio,
13};
14
15/// Boxed inner stream used inside [`BodyStream`].
16///
17/// On native: requires `Send` (multi-threaded tokio runtime).
18/// On wasm32: no `Send` bound (JsValue-backed streams are `!Send` and
19/// the browser tokio runtime is single-threaded — `Send` is vacuous).
20#[cfg(not(target_arch = "wasm32"))]
21type InnerStream = Pin<Box<dyn Stream<Item = Result<Bytes, NetError>> + Send>>;
22#[cfg(target_arch = "wasm32")]
23type InnerStream = Pin<Box<dyn Stream<Item = Result<Bytes, NetError>>>>;
24
25/// Response from a fetch — headers available immediately, body as
26/// async stream.
27pub struct FetchResponse {
28    /// Body as an async byte stream.
29    pub body: BodyStream,
30    /// HTTP response headers.
31    pub headers: Headers,
32}
33
34impl std::fmt::Debug for FetchResponse {
35    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36        f.debug_struct("FetchResponse")
37            .field("headers", &self.headers)
38            .finish_non_exhaustive()
39    }
40}
41
42/// Async byte stream with cancel + timeout.
43///
44/// Wraps the raw HTTP body stream. Consumer pulls chunks at own pace,
45/// providing natural backpressure. I/O happens on the consumer's task,
46/// not on the downloader's worker threads.
47pub struct BodyStream {
48    inner: InnerStream,
49}
50
51impl std::fmt::Debug for BodyStream {
52    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
53        f.debug_struct("BodyStream").finish_non_exhaustive()
54    }
55}
56
57impl BodyStream {
58    /// Collect entire body into bytes.
59    ///
60    /// Use for small control-plane responses (playlists, DRM keys).
61    ///
62    /// # Errors
63    /// Returns an error when the underlying stream yields a network
64    /// error or the cancel token fires.
65    pub async fn collect(mut self) -> Result<Bytes, NetError> {
66        let mut buf = Vec::new();
67        while let Some(chunk) = self.next().await {
68            buf.extend_from_slice(&chunk?);
69        }
70        Ok(Bytes::from(buf))
71    }
72
73    /// Empty body (for HEAD responses).
74    pub(super) fn empty() -> Self {
75        Self {
76            inner: Box::pin(stream::empty()),
77        }
78    }
79
80    /// Wrap an HTTP [`ByteStream`] with per-chunk cancel + timeout.
81    pub(super) fn wrap_http(
82        byte_stream: ByteStream,
83        cancel: CancelGroup,
84        chunk_timeout: Duration,
85    ) -> Self {
86        Self {
87            inner: wrap_with_cancel(byte_stream, cancel, chunk_timeout),
88        }
89    }
90
91    /// Wrap a raw stream (for testing or non-HTTP sources).
92    #[must_use]
93    pub fn wrap_raw(inner: InnerStream) -> Self {
94        Self { inner }
95    }
96
97    /// Stream chunks through a writer, return total bytes written.
98    ///
99    /// The writer runs on the consumer's task — not on the downloader's
100    /// worker threads.
101    ///
102    /// # Errors
103    /// Returns an error when the stream yields a network error, the
104    /// writer returns an I/O error, or the cancel token fires.
105    pub async fn write_all<W>(mut self, mut writer: W) -> Result<u64, NetError>
106    where
107        W: FnMut(&[u8]) -> std::io::Result<()>,
108    {
109        let mut total: u64 = 0;
110        while let Some(chunk) = self.next().await {
111            let data = chunk?;
112            writer(data.as_ref()).map_err(|e| NetError::Http(e.to_string()))?;
113            total += data.len() as u64;
114        }
115        Ok(total)
116    }
117}
118
119impl Stream for BodyStream {
120    type Item = Result<Bytes, NetError>;
121
122    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
123        self.get_mut().inner.as_mut().poll_next(cx)
124    }
125}
126
127/// State for the cancel+timeout body stream wrapper.
128struct WrapState {
129    stream: ByteStream,
130    cancel: CancelGroup,
131    timeout: Duration,
132    done: bool,
133}
134
135/// Wrap a [`ByteStream`] with per-chunk cancellation and idle timeout.
136fn wrap_with_cancel(
137    byte_stream: ByteStream,
138    cancel: CancelGroup,
139    chunk_timeout: Duration,
140) -> InnerStream {
141    Box::pin(stream::unfold(
142        WrapState {
143            cancel,
144            stream: byte_stream,
145            timeout: chunk_timeout,
146            done: false,
147        },
148        |mut state| async {
149            if state.done {
150                return None;
151            }
152            let chunk = tokio::select! {
153                () = state.cancel.cancelled() => {
154                    state.done = true;
155                    return Some((Err(NetError::Cancelled), state));
156                }
157                c = state.stream.next() => c,
158                () = sleep(state.timeout) => None,
159            };
160            chunk.map(|item| (item, state))
161        },
162    ))
163}