stream_download/settings.rs
1use std::num::NonZeroUsize;
2use std::ops::Range;
3use std::time::Duration;
4
5use educe::Educe;
6use tokio_util::sync::CancellationToken;
7
8pub(crate) type ProgressFn<S> = Box<dyn FnMut(&S, StreamState, &CancellationToken) + Send + Sync>;
9
10pub(crate) type ReconnectFn<S> = Box<dyn FnMut(&S, &CancellationToken) + Send + Sync>;
11
12/// Current phase of the download for use during a progress callback.
13#[derive(Clone, Copy, Debug, PartialEq, Eq)]
14#[non_exhaustive]
15pub enum StreamPhase {
16 /// Stream is currently in a prefetch state.
17 #[non_exhaustive]
18 Prefetching {
19 /// Current prefetch target.
20 target: u64,
21 /// Size of the most recently downloaded chunk.
22 chunk_size: usize,
23 },
24 /// Stream is currently in a downloading state.
25 #[non_exhaustive]
26 Downloading {
27 /// Size of the most recently downloaded chunk.
28 chunk_size: usize,
29 },
30 /// Stream has finished downloading.
31 Complete,
32}
33
34#[derive(Clone, Debug, PartialEq, Eq)]
35#[non_exhaustive]
36/// State of the stream for use during a progress callback.
37pub struct StreamState {
38 /// Current position of the stream.
39 pub current_position: u64,
40 /// Time elapsed since download start.
41 pub elapsed: Duration,
42 /// Current phase of the download.
43 pub phase: StreamPhase,
44 /// Current chunk of the stream being downloaded.
45 pub current_chunk: Range<u64>,
46}
47
48/// Settings to configure the stream behavior.
49#[derive(Educe)]
50#[educe(Debug, PartialEq, Eq)]
51pub struct Settings<S> {
52 pub(crate) prefetch_bytes: u64,
53 pub(crate) batch_write_size: usize,
54 pub(crate) retry_timeout: Duration,
55 pub(crate) cancel_on_drop: bool,
56 #[educe(Debug = false, PartialEq = false)]
57 pub(crate) on_progress: Option<ProgressFn<S>>,
58 #[educe(Debug = false, PartialEq = false)]
59 pub(crate) on_reconnect: Option<ReconnectFn<S>>,
60}
61
62impl<S> Default for Settings<S> {
63 fn default() -> Self {
64 Self {
65 prefetch_bytes: 256 * 1024,
66 batch_write_size: 4096,
67 retry_timeout: Duration::from_secs(5),
68 cancel_on_drop: true,
69 on_progress: None,
70 on_reconnect: None,
71 }
72 }
73}
74
75impl<S> Settings<S> {
76 /// How many bytes to download from the stream before allowing read requests.
77 /// This is used to create a buffer between the read position and the stream position
78 /// and prevent stuttering.
79 ///
80 /// The default value is 256 kilobytes.
81 #[must_use]
82 pub fn prefetch_bytes(self, prefetch_bytes: u64) -> Self {
83 Self {
84 prefetch_bytes,
85 ..self
86 }
87 }
88
89 /// The maximum number of bytes written to the underlying
90 /// [`StorageWriter`](crate::storage::StorageWriter) before yielding to the async runtime. This
91 /// prevents large writes from performing long blocking operations without giving the scheduler
92 /// a chance to schedule other tasks.
93 ///
94 /// The default value is 4096.
95 pub fn batch_write_size(self, batch_write_size: NonZeroUsize) -> Self {
96 Self {
97 batch_write_size: batch_write_size.get(),
98 ..self
99 }
100 }
101
102 /// If there is no new data for a duration greater than this timeout, we will attempt to
103 /// reconnect to the server.
104 ///
105 /// This timeout is designed to help streams recover during temporary network failures,
106 /// but you may need to increase this if you're seeing warnings about timeouts in the logs
107 /// under normal network conditions.
108 ///
109 /// The default value is 5 seconds.
110 #[must_use]
111 pub fn retry_timeout(self, retry_timeout: Duration) -> Self {
112 Self {
113 retry_timeout,
114 ..self
115 }
116 }
117
118 /// If set to `true`, this will cause the stream download task to automatically cancel when the
119 /// [`StreamDownload`][crate::StreamDownload] instance is dropped. It's useful to disable this
120 /// if you want to ensure the stream shuts down cleanly.
121 ///
122 /// The default value is `true`.
123 #[must_use]
124 pub fn cancel_on_drop(self, cancel_on_drop: bool) -> Self {
125 Self {
126 cancel_on_drop,
127 ..self
128 }
129 }
130
131 /// Attach a callback function that will be called when a new chunk of the stream is processed.
132 /// The provided [`CancellationToken`] can be used to immediately cancel the stream.
133 ///
134 /// # Example
135 ///
136 /// ```
137 /// use reqwest::Client;
138 /// use stream_download::Settings;
139 /// use stream_download::http::HttpStream;
140 /// use stream_download::source::SourceStream;
141 ///
142 /// let settings = Settings::default();
143 /// settings.on_progress(|stream: &HttpStream<Client>, state, _| {
144 /// let progress = state.current_position as f32 / stream.content_length().unwrap() as f32;
145 /// println!("progress: {}%", progress * 100.0);
146 /// });
147 /// ```
148 #[must_use]
149 pub fn on_progress<F>(mut self, f: F) -> Self
150 where
151 F: FnMut(&S, StreamState, &CancellationToken) + Send + Sync + 'static,
152 {
153 self.on_progress = Some(Box::new(f));
154 self
155 }
156
157 /// Attach a callback function that will be called when the stream reconnects after a failure.
158 /// The provided [`CancellationToken`] can be used to immediately cancel the stream.
159 #[must_use]
160 pub fn on_reconnect<F>(mut self, f: F) -> Self
161 where
162 F: FnMut(&S, &CancellationToken) + Send + Sync + 'static,
163 {
164 self.on_reconnect = Some(Box::new(f));
165 self
166 }
167
168 /// Retrieves the configured prefetch bytes
169 pub const fn get_prefetch_bytes(&self) -> u64 {
170 self.prefetch_bytes
171 }
172
173 /// Retrieves the configured batch write size
174 pub const fn get_write_batch_size(&self) -> usize {
175 self.batch_write_size
176 }
177}
178
179#[cfg(feature = "reqwest-middleware")]
180impl Settings<crate::http::HttpStream<::reqwest_middleware::ClientWithMiddleware>> {
181 /// Adds a new [`reqwest_middleware::Middleware`]
182 pub fn add_default_middleware<M>(middleware: M)
183 where
184 M: reqwest_middleware::Middleware,
185 {
186 crate::http::reqwest_middleware_client::add_default_middleware(middleware);
187 }
188}