mtp_rs/mtp/stream.rs
1//! Streaming download/upload support (backend-neutral façade types).
2
3use crate::mtp::backend::{DownloadBody, MtpBackend};
4use crate::mtp::{Error, ObjectHandle};
5use bytes::Bytes;
6use std::ops::ControlFlow;
7use std::sync::Arc;
8
9/// Progress information for transfers.
10#[derive(Debug, Clone)]
11pub struct Progress {
12 /// Bytes transferred so far.
13 pub bytes_transferred: u64,
14 /// Total bytes (if known).
15 pub total_bytes: Option<u64>,
16}
17
18impl Progress {
19 /// Progress as a percentage (0.0 to 100.0).
20 #[must_use]
21 pub fn percent(&self) -> f64 {
22 self.fraction() * 100.0
23 }
24
25 /// Progress as a fraction (0.0 to 1.0).
26 #[must_use]
27 pub fn fraction(&self) -> f64 {
28 self.total_bytes.map_or(1.0, |total| {
29 if total == 0 {
30 1.0
31 } else {
32 self.bytes_transferred as f64 / total as f64
33 }
34 })
35 }
36}
37
38/// Default idle timeout for cancel drain operations.
39///
40/// After sending the cancel request, this is how long to wait for additional data on each pipe
41/// before assuming it's clear. Matches the 300ms timeout used by libmtp, which mirrors Windows
42/// behavior.
43pub const DEFAULT_CANCEL_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(300);
44
45/// A file download in progress, streamed from the device.
46///
47/// Wraps the active backend's download body and tracks progress. Data is streamed directly from the
48/// device as chunks arrive, without buffering the entire file in memory.
49///
50/// # Important
51///
52/// On the USB backend the session is held while this download is active. You must either consume
53/// the entire download or call [`cancel()`](Self::cancel) before dropping it; cancelling drains the
54/// pipe and frees the session.
55///
56/// # Example
57///
58/// ```rust,no_run
59/// use mtp_rs::mtp::MtpDevice;
60/// use mtp_rs::{ByteRange, ObjectHandle};
61/// use tokio::io::AsyncWriteExt;
62///
63/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
64/// # let device = MtpDevice::open_first().await?;
65/// # let storages = device.storages().await?;
66/// # let storage = &storages[0];
67/// # let handle = ObjectHandle(1);
68/// let mut download = storage.download(handle, ByteRange::Full).await?;
69/// println!("Downloading {} bytes...", download.size());
70///
71/// # let mut file = tokio::fs::File::create("output.bin").await?;
72/// while let Some(chunk) = download.next_chunk().await {
73/// let bytes = chunk?;
74/// file.write_all(&bytes).await?;
75/// println!("Progress: {:.1}%", download.progress() * 100.0);
76/// }
77/// # Ok(())
78/// # }
79/// ```
80#[must_use = "dropping a FileDownload mid-transfer may corrupt the USB session; \
81 consume it fully or call cancel()"]
82pub struct FileDownload {
83 size: u64,
84 bytes_received: u64,
85 body: Box<dyn DownloadBody>,
86}
87
88impl FileDownload {
89 /// Create a new FileDownload wrapping a backend download body.
90 pub(crate) fn new(size: u64, body: Box<dyn DownloadBody>) -> Self {
91 Self {
92 size,
93 bytes_received: 0,
94 body,
95 }
96 }
97
98 /// Total file size in bytes (always the whole file, even for a ranged download).
99 #[must_use]
100 pub fn size(&self) -> u64 {
101 self.size
102 }
103
104 /// Bytes received so far in this stream.
105 #[must_use]
106 pub fn bytes_received(&self) -> u64 {
107 self.bytes_received
108 }
109
110 /// Progress as a fraction (0.0 to 1.0).
111 #[must_use]
112 pub fn progress(&self) -> f64 {
113 if self.size == 0 {
114 1.0
115 } else {
116 self.bytes_received as f64 / self.size as f64
117 }
118 }
119
120 /// Cancel the in-progress download.
121 ///
122 /// On the USB backend this uses the Still Image Class cancel mechanism to stop the transfer and
123 /// drain remaining data, leaving the session clean for the next operation. The `idle_timeout`
124 /// controls how long to wait during the pipe drain. If the download is already complete, this
125 /// is a no-op.
126 pub async fn cancel(&mut self, idle_timeout: std::time::Duration) -> Result<(), Error> {
127 self.body.cancel(idle_timeout).await
128 }
129
130 /// Get the next chunk of data. Returns `None` when the download is complete.
131 pub async fn next_chunk(&mut self) -> Option<Result<Bytes, Error>> {
132 match self.body.next_chunk().await {
133 Some(Ok(bytes)) => {
134 self.bytes_received += bytes.len() as u64;
135 Some(Ok(bytes))
136 }
137 other => other,
138 }
139 }
140
141 /// Consume the download and iterate with a progress callback.
142 ///
143 /// Calls `on_progress` after each chunk. Return `ControlFlow::Break(())` to cancel the download.
144 pub async fn collect_with_progress<F>(mut self, mut on_progress: F) -> Result<Vec<u8>, Error>
145 where
146 F: FnMut(Progress) -> ControlFlow<()>,
147 {
148 let mut data = Vec::with_capacity(self.size as usize);
149
150 while let Some(result) = self.next_chunk().await {
151 let chunk = result?;
152 data.extend_from_slice(&chunk);
153
154 let progress = Progress {
155 bytes_transferred: self.bytes_received,
156 total_bytes: Some(self.size),
157 };
158
159 if let ControlFlow::Break(()) = on_progress(progress) {
160 self.body.cancel(DEFAULT_CANCEL_TIMEOUT).await?;
161 return Err(Error::Cancelled);
162 }
163 }
164
165 Ok(data)
166 }
167
168 /// Collect all remaining data into a `Vec<u8>`. Consumes the download.
169 pub async fn collect(mut self) -> Result<Vec<u8>, Error> {
170 let mut data = Vec::with_capacity(self.size as usize);
171 while let Some(result) = self.next_chunk().await {
172 data.extend_from_slice(&result?);
173 }
174 Ok(data)
175 }
176}
177
178/// Default window size for [`Storage::download_windowed`](crate::mtp::Storage::download_windowed):
179/// 8 MiB.
180///
181/// Each window is one bounded transaction that **releases** the one-per-device session the moment it
182/// returns. On a Pixel 9 Pro XL an 8 MiB window completes in roughly 80 ms: small enough that a
183/// concurrent folder listing or navigation slips in between windows at its natural cost, yet large
184/// enough to keep throughput high.
185///
186/// This is a documented suggestion, not a baked-in policy: pass your own `window_size` to
187/// [`download_windowed`](crate::mtp::Storage::download_windowed) to tune the
188/// responsiveness/throughput tradeoff for your device and workload.
189pub const DEFAULT_DOWNLOAD_WINDOW: u32 = 8 * 1024 * 1024;
190
191/// A large-file reader that fetches the file as a sequence of bounded windows, **releasing the
192/// session between every window**.
193///
194/// A streaming [`FileDownload`] holds the device's single session open for the *entire* file, so
195/// while a big download is in flight no other operation (a folder listing, navigation) can touch
196/// the device until the read finishes or is cancelled. `WindowedDownload` solves that: each
197/// [`next_window()`](Self::next_window) is a single, short bounded read that completes and frees the
198/// session, so a listing issued between two windows just works.
199///
200/// # The consumer owns the policy
201///
202/// `WindowedDownload` owns the *bookkeeping* (total size, current offset, window sizing, EOF
203/// detection). It deliberately owns **no** policy: there's no pause, debounce, rate-limit, or
204/// priority gate baked in. Whatever a consumer wants to do "while the session is free" it does
205/// *between* `next_window()` calls.
206///
207/// # Stopping early needs no `cancel()`
208///
209/// Unlike [`FileDownload`], a `WindowedDownload` holds nothing between windows. To stop early, just
210/// stop calling `next_window()` and drop it.
211pub struct WindowedDownload {
212 backend: Arc<dyn MtpBackend>,
213 handle: ObjectHandle,
214 /// Full object size, cached at construction so progress/ETA stays anchored.
215 total_size: u64,
216 /// Byte offset of the next window to fetch.
217 offset: u64,
218 /// Max bytes requested per window.
219 window_size: u32,
220}
221
222impl WindowedDownload {
223 /// Create a windowed download starting at `start_offset`.
224 ///
225 /// `window_size` is clamped to at least 1 byte: a zero window can't make progress.
226 pub(crate) fn new(
227 backend: Arc<dyn MtpBackend>,
228 handle: ObjectHandle,
229 total_size: u64,
230 start_offset: u64,
231 window_size: u32,
232 ) -> Self {
233 Self {
234 backend,
235 handle,
236 total_size,
237 offset: start_offset,
238 window_size: window_size.max(1),
239 }
240 }
241
242 /// Full object size in bytes (always the whole file, even for a windowed download started at a
243 /// non-zero offset, so progress/ETA stays anchored to the complete file).
244 #[must_use]
245 pub fn size(&self) -> u64 {
246 self.total_size
247 }
248
249 /// Byte offset of the next window to be read.
250 #[must_use]
251 pub fn offset(&self) -> u64 {
252 self.offset
253 }
254
255 /// Read the next window via one bounded read, **releasing the session on return**.
256 ///
257 /// Returns:
258 /// - `Some(Ok(bytes))`: the next window, clamped to the bytes remaining. A device may legally
259 /// return *fewer* bytes than requested mid-file; the offset advances by what actually came
260 /// back.
261 /// - `None`: clean end of file (also the first result for an empty file, or for a download
262 /// started at `offset == size`). No transaction is issued in that case.
263 /// - `Some(Err(_))`: a transfer error, **or** a device stall (a 0-byte read while bytes still
264 /// remain), reported as [`Error::InvalidData`] rather than silently treated as EOF.
265 pub async fn next_window(&mut self) -> Option<Result<Vec<u8>, Error>> {
266 // EOF: nothing left. Covers the empty-file and offset==size cases too, and issues no
267 // transaction.
268 if self.offset >= self.total_size {
269 return None;
270 }
271
272 // Clamp the request to the bytes that remain and to the configured window size.
273 let remaining = self.total_size - self.offset;
274 let want = u32::try_from(remaining.min(u64::from(self.window_size))).unwrap_or(u32::MAX);
275
276 match self
277 .backend
278 .read_range(self.handle, self.offset, Some(want))
279 .await
280 {
281 Ok(bytes) => {
282 if bytes.is_empty() {
283 // We asked for >0 bytes but the device returned none: a stall, not EOF.
284 return Some(Err(Error::invalid_data(format!(
285 "device returned 0 bytes at offset {} of {} (expected up to {want}); \
286 treating as a stall rather than end-of-file",
287 self.offset, self.total_size
288 ))));
289 }
290 // A short non-zero read is legal: advance by what actually came back.
291 self.offset += bytes.len() as u64;
292 Some(Ok(bytes))
293 }
294 Err(e) => Some(Err(e)),
295 }
296 }
297}
298
299#[cfg(test)]
300mod tests {
301 use super::*;
302
303 #[test]
304 fn progress_calculations() {
305 let cases = [
306 (50, Some(100), 50.0, 0.5),
307 (100, Some(100), 100.0, 1.0),
308 (25, Some(100), 25.0, 0.25),
309 (0, Some(0), 100.0, 1.0), // Empty file
310 (50, None, 100.0, 1.0), // Unknown total defaults to complete
311 ];
312 for (transferred, total, expected_pct, expected_frac) in cases {
313 let p = Progress {
314 bytes_transferred: transferred,
315 total_bytes: total,
316 };
317 assert_eq!(
318 p.percent(),
319 expected_pct,
320 "percent failed for {transferred}/{total:?}"
321 );
322 assert_eq!(
323 p.fraction(),
324 expected_frac,
325 "fraction failed for {transferred}/{total:?}"
326 );
327 }
328
329 // Large numbers
330 let large = Progress {
331 bytes_transferred: u64::MAX / 2,
332 total_bytes: Some(u64::MAX),
333 };
334 let frac = large.fraction();
335 assert!(frac > 0.49 && frac < 0.51);
336 }
337}