mtp-rs 0.23.0

Pure-Rust MTP (Media Transfer Protocol) library for modern Android devices
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
//! Storage operations (a thin façade over the active backend).

use crate::cancel::{bail_if_cancelled, CancelToken};
use crate::mtp::backend::{BackendListing, ByteRange, MtpBackend, ProgressFn};
use crate::mtp::object::NewObjectInfo;
use crate::mtp::stream::{FileDownload, Progress, WindowedDownload, DEFAULT_DOWNLOAD_WINDOW};
use crate::mtp::{Error, ObjectHandle, ObjectInfo, StorageId, StorageInfo, UploadError};
use bytes::Bytes;
use futures::{Stream, StreamExt};
use std::ops::ControlFlow;
use std::sync::Arc;

/// An in-progress directory listing that yields [`ObjectInfo`] items one at a time.
///
/// Created by [`Storage::list_objects_stream()`]. After the device returns the handle list, the
/// total count is known immediately ([`total()`](Self::total)). Each call to [`next()`](Self::next)
/// fetches one object's metadata, so the consumer can report progress (e.g.,
/// "Loading files (42 of 500)...") as items arrive.
///
/// # Important
///
/// The device is busy while this listing is active. You must consume all items (or drop the
/// listing) before calling other storage methods.
///
/// # Example
///
/// ```rust,no_run
/// use mtp_rs::mtp::MtpDevice;
///
/// # async fn example() -> Result<(), mtp_rs::Error> {
/// # let device = MtpDevice::open_first().await?;
/// # let storages = device.storages().await?;
/// # let storage = &storages[0];
/// let mut listing = storage.list_objects_stream(None).await?;
/// println!("Loading {} files...", listing.total());
///
/// while let Some(result) = listing.next().await {
///     let info = result?;
///     println!("[{}/{}] {}", listing.fetched(), listing.total(), info.filename);
/// }
/// # Ok(())
/// # }
/// ```
pub struct ObjectListing {
    inner: BackendListing,
    /// Items the backend has already yielded (post-filter).
    fetched: usize,
}

impl ObjectListing {
    fn new(inner: BackendListing) -> Self {
        Self { inner, fetched: 0 }
    }

    /// Total number of object handles returned by the device.
    ///
    /// When a parent filter is active (e.g. devices that return all objects for root), some items
    /// may be skipped, so the actual yielded count can be lower.
    #[must_use]
    pub fn total(&self) -> usize {
        self.inner.total
    }

    /// Number of items yielded so far.
    #[must_use]
    pub fn fetched(&self) -> usize {
        self.fetched
    }

    /// Fetch the next object from the device.
    ///
    /// Returns `None` when the listing is exhausted. Items that don't match the parent filter are
    /// skipped by the backend.
    ///
    /// If a [`CancelToken`] was passed via [`Storage::list_objects_stream_with_cancel`] and it's
    /// been cancelled, this returns `Some(Err(Error::Cancelled))` at the next per-handle boundary.
    pub async fn next(&mut self) -> Option<Result<ObjectInfo, Error>> {
        match self.inner.items.next().await {
            Some(Ok(info)) => {
                self.fetched += 1;
                Some(Ok(info))
            }
            other => other,
        }
    }
}

/// A storage location on an MTP device.
///
/// `Storage` holds a shared reference to the active backend so it can outlive the original
/// `MtpDevice` and be used from multiple tasks.
pub struct Storage {
    backend: Arc<dyn MtpBackend>,
    id: StorageId,
    info: StorageInfo,
}

impl Storage {
    /// Create a new Storage (internal).
    pub(crate) fn new(backend: Arc<dyn MtpBackend>, id: StorageId, info: StorageInfo) -> Self {
        Self { backend, id, info }
    }

    #[must_use]
    pub fn id(&self) -> StorageId {
        self.id
    }

    /// Storage information (cached, call refresh() to update).
    #[must_use]
    pub fn info(&self) -> &StorageInfo {
        &self.info
    }

    /// Refresh storage info from device (updates free space, etc.).
    pub async fn refresh(&mut self) -> Result<(), Error> {
        self.info = self.backend.storage_info(self.id).await?;
        Ok(())
    }

    // =========================================================================
    // Listing
    // =========================================================================

    /// List objects in a folder (None = root), returning all results at once.
    ///
    /// For progress reporting during large listings, use
    /// [`list_objects_stream()`](Self::list_objects_stream) instead.
    ///
    /// The backend handles device quirks (root-listing fast path and Android/Samsung/Fuji
    /// fallbacks).
    pub async fn list_objects(
        &self,
        parent: Option<ObjectHandle>,
    ) -> Result<Vec<ObjectInfo>, Error> {
        self.list_objects_with_cancel(parent, None).await
    }

    /// Like [`list_objects`](Self::list_objects), but takes a cooperative cancellation token.
    ///
    /// When `cancel` is `Some(&token)` and the token has been cancelled, the call bails between
    /// per-handle fetches with `Err(Error::Cancelled)`. Useful for large folders (1k+ entries on
    /// Android), where the per-handle loop dominates wall-clock time.
    pub async fn list_objects_with_cancel(
        &self,
        parent: Option<ObjectHandle>,
        cancel: Option<&CancelToken>,
    ) -> Result<Vec<ObjectInfo>, Error> {
        let mut listing = self.list_objects_stream_with_cancel(parent, cancel).await?;
        let mut objects = Vec::with_capacity(listing.total());
        while let Some(result) = listing.next().await {
            objects.push(result?);
        }
        Ok(objects)
    }

    /// List objects in a folder as a streaming [`ObjectListing`].
    ///
    /// Returns immediately after the device returns the handle list. The total count is then known
    /// via [`ObjectListing::total()`], and each call to [`ObjectListing::next()`] fetches one
    /// object's metadata.
    ///
    /// # Example
    ///
    /// ```rust,no_run
    /// use mtp_rs::mtp::MtpDevice;
    ///
    /// # async fn example() -> Result<(), mtp_rs::Error> {
    /// # let device = MtpDevice::open_first().await?;
    /// # let storages = device.storages().await?;
    /// # let storage = &storages[0];
    /// let mut listing = storage.list_objects_stream(None).await?;
    /// println!("Found {} items", listing.total());
    ///
    /// while let Some(result) = listing.next().await {
    ///     let info = result?;
    ///     println!("[{}/{}] {}", listing.fetched(), listing.total(), info.filename);
    /// }
    /// # Ok(())
    /// # }
    /// ```
    pub async fn list_objects_stream(
        &self,
        parent: Option<ObjectHandle>,
    ) -> Result<ObjectListing, Error> {
        self.list_objects_stream_with_cancel(parent, None).await
    }

    /// Like [`list_objects_stream`](Self::list_objects_stream), but the returned [`ObjectListing`]
    /// carries an optional [`CancelToken`]. Every call to [`ObjectListing::next`] checks the token
    /// before issuing the next metadata roundtrip, so a flipped token bails within one roundtrip's
    /// worth of latency instead of running to completion.
    pub async fn list_objects_stream_with_cancel(
        &self,
        parent: Option<ObjectHandle>,
        cancel: Option<&CancelToken>,
    ) -> Result<ObjectListing, Error> {
        let listing = self.backend.list(self.id, parent, cancel).await?;
        Ok(ObjectListing::new(listing))
    }

    /// List objects recursively.
    ///
    /// Walks the folder tree manually via [`list_objects`](Self::list_objects), which already
    /// applies the backend's root/quirk handling. Works the same across all devices, including
    /// Android (whose native `GetObjectHandles` recursion is broken).
    pub async fn list_objects_recursive(
        &self,
        parent: Option<ObjectHandle>,
    ) -> Result<Vec<ObjectInfo>, Error> {
        let mut result = Vec::new();
        let mut folders_to_visit = vec![parent];

        while let Some(current_parent) = folders_to_visit.pop() {
            let objects = self.list_objects(current_parent).await?;
            for obj in objects {
                if obj.is_folder() {
                    folders_to_visit.push(Some(obj.handle));
                }
                result.push(obj);
            }
        }
        Ok(result)
    }

    /// Get object metadata by handle.
    ///
    /// Files larger than 4 GB have their u64 size auto-resolved by the backend.
    pub async fn get_object_info(&self, handle: ObjectHandle) -> Result<ObjectInfo, Error> {
        self.backend.object_info(handle).await
    }

    // =========================================================================
    // Download operations
    // =========================================================================

    /// Download a whole file and return all bytes.
    ///
    /// For small to medium files where you want all the data in memory. For large files or
    /// streaming to disk, use [`download`](Self::download).
    pub async fn download_to_vec(&self, handle: ObjectHandle) -> Result<Vec<u8>, Error> {
        self.backend.read_range(handle, 0, None).await
    }

    /// Read a bounded byte range into a `Vec<u8>` (single shot, buffered).
    ///
    /// Uses the device's 64-bit partial-read operation, so offsets beyond 4 GB work on devices that
    /// advertise it. `len` is capped at `u32::MAX` per call.
    pub async fn read_range(
        &self,
        handle: ObjectHandle,
        offset: u64,
        len: u32,
    ) -> Result<Vec<u8>, Error> {
        self.backend.read_range(handle, offset, Some(len)).await
    }

    /// Fetch the thumbnail image bytes for an object.
    pub async fn thumbnail(&self, handle: ObjectHandle) -> Result<Vec<u8>, Error> {
        self.backend.thumbnail(handle).await
    }

    /// Download a file as a stream (true streaming), holding the session for the whole file.
    ///
    /// Yields data chunks as they arrive without buffering the entire file in memory. This is the
    /// raw-speed path; it holds the device's one session open for the whole file (see [`download`]
    /// docs). For a long read where the device must stay responsive to other work, use
    /// [`download_windowed`](Self::download_windowed) instead.
    ///
    /// # Resume on forward-only-seek devices
    ///
    /// A [`ByteRange::From`]/[`ByteRange::Range`] resume assumes the device can seek to the offset
    /// cheaply. The Windows WPD backend's Pixel-class devices return `E_NOTIMPL` from `IStream::Seek`,
    /// so the backend reaches the offset by reading and discarding the prefix: a resume is O(offset)
    /// and re-streams every byte before the offset. Resuming near the end of a large file re-reads
    /// almost the whole file, so prefer a single in-order pass over many small offset resumes there.
    ///
    /// [`download`]: Self::download
    pub async fn download(
        &self,
        handle: ObjectHandle,
        range: ByteRange,
    ) -> Result<FileDownload, Error> {
        let dl = self.backend.download(handle, range).await?;
        Ok(FileDownload::new(dl.size, dl.body))
    }

    /// Read a large file as a sequence of bounded windows, **freeing the session between every
    /// window** so the device stays responsive.
    ///
    /// Each [`next_window()`](WindowedDownload::next_window) is a single bounded read that completes
    /// and releases the device. Between two `next_window()` calls a consumer can interleave other
    /// device work (service a pending folder listing, navigate, check a cancel flag) without
    /// aborting the read.
    ///
    /// `window_size` is the maximum bytes per window. [`DEFAULT_DOWNLOAD_WINDOW`] (8 MiB) is a
    /// documented suggestion; a `window_size` of 0 is clamped to 1.
    ///
    /// # Resume on forward-only-seek devices
    ///
    /// A windowed *resume* from an offset (`ByteRange::From`/`Range`) re-reads the skipped prefix on
    /// devices whose `IStream::Seek` is `E_NOTIMPL` (the Windows WPD backend's Pixel-class devices),
    /// making the first window after the offset O(offset). The session-freeing benefit between windows
    /// still holds, but starting deep into a large file pays a full re-read of the prefix first;
    /// prefer covering the file from the start (`ByteRange::Full`) where possible.
    pub async fn download_windowed(
        &self,
        handle: ObjectHandle,
        range: ByteRange,
        window_size: u32,
    ) -> Result<WindowedDownload, Error> {
        let size = self.backend.object_info(handle).await?.size;
        let offset = range.offset();
        if offset > size {
            return Err(Error::invalid_data(format!(
                "windowed download offset {offset} is past the object size {size}"
            )));
        }
        Ok(WindowedDownload::new(
            Arc::clone(&self.backend),
            handle,
            size,
            offset,
            window_size,
        ))
    }

    /// Read a large file in windows using the default window size
    /// ([`DEFAULT_DOWNLOAD_WINDOW`], 8 MiB), covering the whole file.
    pub async fn download_windowed_default(
        &self,
        handle: ObjectHandle,
    ) -> Result<WindowedDownload, Error> {
        self.download_windowed(handle, ByteRange::Full, DEFAULT_DOWNLOAD_WINDOW)
            .await
    }

    // =========================================================================
    // Upload operations
    // =========================================================================

    /// Upload a file from a stream.
    ///
    /// The data streams directly to the device in chunks; the protocol only needs the total size
    /// upfront (provided via `info`), not the whole file in memory.
    ///
    /// # Errors
    ///
    /// Returns [`UploadError`] on failure. Uploads are two-phase: the object is created (yielding a
    /// handle), then the bytes are streamed. If the data phase fails, the device may keep a partial
    /// object, and [`UploadError::partial`] carries its handle so you can [`delete`](Self::delete)
    /// it or retry the data phase to resume. The library does **not** auto-delete it.
    pub async fn upload<'a, S>(
        &'a self,
        parent: Option<ObjectHandle>,
        info: NewObjectInfo,
        data: S,
    ) -> Result<ObjectHandle, UploadError>
    where
        S: Stream<Item = Result<Bytes, std::io::Error>> + Unpin + Send + 'a,
    {
        self.backend
            .upload(self.id, parent, info, Box::pin(data), None)
            .await
    }

    /// Upload a file with a progress callback.
    ///
    /// Progress is reported as data is read from the stream. Return `ControlFlow::Break(())` from
    /// the callback to cancel the upload (which surfaces as [`Error::Cancelled`] in
    /// [`UploadError::source`]).
    pub async fn upload_with_progress<'a, S, F>(
        &'a self,
        parent: Option<ObjectHandle>,
        info: NewObjectInfo,
        data: S,
        on_progress: F,
    ) -> Result<ObjectHandle, UploadError>
    where
        S: Stream<Item = Result<Bytes, std::io::Error>> + Unpin + Send + 'a,
        F: FnMut(Progress) -> ControlFlow<()> + Send + 'a,
    {
        let progress: ProgressFn<'a> = Box::new(on_progress);
        self.backend
            .upload(self.id, parent, info, Box::pin(data), Some(progress))
            .await
    }

    // =========================================================================
    // Folder and object management
    // =========================================================================

    pub async fn create_folder(
        &self,
        parent: Option<ObjectHandle>,
        name: &str,
    ) -> Result<ObjectHandle, Error> {
        self.backend.create_folder(self.id, parent, name).await
    }

    pub async fn delete(&self, handle: ObjectHandle) -> Result<(), Error> {
        self.backend.delete(handle, None).await
    }

    /// Like [`delete`](Self::delete), but bails with `Err(Error::Cancelled)` before issuing the
    /// delete request when the token is set.
    pub async fn delete_with_cancel(
        &self,
        handle: ObjectHandle,
        cancel: Option<&CancelToken>,
    ) -> Result<(), Error> {
        bail_if_cancelled(cancel)?;
        self.backend.delete(handle, cancel).await
    }

    /// Move an object to a different folder (optionally a different storage).
    pub async fn move_object(
        &self,
        handle: ObjectHandle,
        new_parent: ObjectHandle,
        new_storage: Option<StorageId>,
    ) -> Result<(), Error> {
        let storage = new_storage.unwrap_or(self.id);
        self.backend.move_object(handle, new_parent, storage).await
    }

    pub async fn copy_object(
        &self,
        handle: ObjectHandle,
        new_parent: ObjectHandle,
        new_storage: Option<StorageId>,
    ) -> Result<ObjectHandle, Error> {
        let storage = new_storage.unwrap_or(self.id);
        self.backend.copy_object(handle, new_parent, storage).await
    }

    /// Rename an object (file or folder).
    ///
    /// Not all devices support renaming. Use `MtpDevice::supports_rename()` to check.
    pub async fn rename(&self, handle: ObjectHandle, new_name: &str) -> Result<(), Error> {
        self.backend.rename(handle, new_name).await
    }
}