stream_download/
settings.rs

1use std::num::NonZeroUsize;
2use std::ops::Range;
3use std::time::Duration;
4
5use educe::Educe;
6use tokio_util::sync::CancellationToken;
7
8pub(crate) type ProgressFn<S> = Box<dyn FnMut(&S, StreamState, &CancellationToken) + Send + Sync>;
9
10pub(crate) type ReconnectFn<S> = Box<dyn FnMut(&S, &CancellationToken) + Send + Sync>;
11
12/// Current phase of the download for use during a progress callback.
13#[derive(Clone, Copy, Debug, PartialEq, Eq)]
14#[non_exhaustive]
15pub enum StreamPhase {
16    /// Stream is currently in a prefetch state.
17    #[non_exhaustive]
18    Prefetching {
19        /// Current prefetch target.
20        target: u64,
21        /// Size of the most recently downloaded chunk.
22        chunk_size: usize,
23    },
24    /// Stream is currently in a downloading state.
25    #[non_exhaustive]
26    Downloading {
27        /// Size of the most recently downloaded chunk.
28        chunk_size: usize,
29    },
30    /// Stream has finished downloading.
31    Complete,
32}
33
34#[derive(Clone, Debug, PartialEq, Eq)]
35#[non_exhaustive]
36/// State of the stream for use during a progress callback.
37pub struct StreamState {
38    /// Current position of the stream.
39    pub current_position: u64,
40    /// Time elapsed since download start.
41    pub elapsed: Duration,
42    /// Current phase of the download.
43    pub phase: StreamPhase,
44    /// Current chunk of the stream being downloaded.
45    pub current_chunk: Range<u64>,
46}
47
48/// Settings to configure the stream behavior.
49#[derive(Educe)]
50#[educe(Debug, PartialEq, Eq)]
51pub struct Settings<S> {
52    pub(crate) prefetch_bytes: u64,
53    pub(crate) batch_write_size: usize,
54    pub(crate) retry_timeout: Duration,
55    pub(crate) cancel_on_drop: bool,
56    #[educe(Debug = false, PartialEq = false)]
57    pub(crate) on_progress: Option<ProgressFn<S>>,
58    #[educe(Debug = false, PartialEq = false)]
59    pub(crate) on_reconnect: Option<ReconnectFn<S>>,
60}
61
62impl<S> Default for Settings<S> {
63    fn default() -> Self {
64        Self {
65            prefetch_bytes: 256 * 1024,
66            batch_write_size: 4096,
67            retry_timeout: Duration::from_secs(5),
68            cancel_on_drop: true,
69            on_progress: None,
70            on_reconnect: None,
71        }
72    }
73}
74
75impl<S> Settings<S> {
76    /// How many bytes to download from the stream before allowing read requests.
77    /// This is used to create a buffer between the read position and the stream position
78    /// and prevent stuttering.
79    ///
80    /// The default value is 256 kilobytes.
81    #[must_use]
82    pub fn prefetch_bytes(self, prefetch_bytes: u64) -> Self {
83        Self {
84            prefetch_bytes,
85            ..self
86        }
87    }
88
89    /// The maximum number of bytes written to the underlying
90    /// [`StorageWriter`](crate::storage::StorageWriter) before yielding to the async runtime. This
91    /// prevents large writes from performing long blocking operations without giving the scheduler
92    /// a chance to schedule other tasks.
93    ///
94    /// The default value is 4096.
95    pub fn batch_write_size(self, batch_write_size: NonZeroUsize) -> Self {
96        Self {
97            batch_write_size: batch_write_size.get(),
98            ..self
99        }
100    }
101
102    /// If there is no new data for a duration greater than this timeout, we will attempt to
103    /// reconnect to the server.
104    ///  
105    /// This timeout is designed to help streams recover during temporary network failures,
106    /// but you may need to increase this if you're seeing warnings about timeouts in the logs
107    /// under normal network conditions.
108    ///
109    /// The default value is 5 seconds.
110    #[must_use]
111    pub fn retry_timeout(self, retry_timeout: Duration) -> Self {
112        Self {
113            retry_timeout,
114            ..self
115        }
116    }
117
118    /// If set to `true`, this will cause the stream download task to automatically cancel when the
119    /// [`StreamDownload`][crate::StreamDownload] instance is dropped. It's useful to disable this
120    /// if you want to ensure the stream shuts down cleanly.
121    ///
122    /// The default value is `true`.
123    #[must_use]
124    pub fn cancel_on_drop(self, cancel_on_drop: bool) -> Self {
125        Self {
126            cancel_on_drop,
127            ..self
128        }
129    }
130
131    /// Attach a callback function that will be called when a new chunk of the stream is processed.
132    /// The provided [`CancellationToken`] can be used to immediately cancel the stream.
133    ///
134    /// # Example
135    ///
136    /// ```
137    /// use reqwest::Client;
138    /// use stream_download::Settings;
139    /// use stream_download::http::HttpStream;
140    /// use stream_download::source::SourceStream;
141    ///
142    /// let settings = Settings::default();
143    /// settings.on_progress(|stream: &HttpStream<Client>, state, _| {
144    ///     let progress = state.current_position as f32 / stream.content_length().unwrap() as f32;
145    ///     println!("progress: {}%", progress * 100.0);
146    /// });
147    /// ```
148    #[must_use]
149    pub fn on_progress<F>(mut self, f: F) -> Self
150    where
151        F: FnMut(&S, StreamState, &CancellationToken) + Send + Sync + 'static,
152    {
153        self.on_progress = Some(Box::new(f));
154        self
155    }
156
157    /// Attach a callback function that will be called when the stream reconnects after a failure.
158    /// The provided [`CancellationToken`] can be used to immediately cancel the stream.
159    #[must_use]
160    pub fn on_reconnect<F>(mut self, f: F) -> Self
161    where
162        F: FnMut(&S, &CancellationToken) + Send + Sync + 'static,
163    {
164        self.on_reconnect = Some(Box::new(f));
165        self
166    }
167
168    /// Retrieves the configured prefetch bytes
169    pub const fn get_prefetch_bytes(&self) -> u64 {
170        self.prefetch_bytes
171    }
172
173    /// Retrieves the configured batch write size
174    pub const fn get_write_batch_size(&self) -> usize {
175        self.batch_write_size
176    }
177}
178
179#[cfg(feature = "reqwest-middleware")]
180impl Settings<crate::http::HttpStream<::reqwest_middleware::ClientWithMiddleware>> {
181    /// Adds a new [`reqwest_middleware::Middleware`]
182    pub fn add_default_middleware<M>(middleware: M)
183    where
184        M: reqwest_middleware::Middleware,
185    {
186        crate::http::reqwest_middleware_client::add_default_middleware(middleware);
187    }
188}