Skip to main content

mtp_rs/mtp/
storage.rs

1//! Storage operations.
2
3use crate::cancel::{bail_if_cancelled, CancelToken};
4use crate::mtp::object::NewObjectInfo;
5use crate::mtp::stream::{FileDownload, Progress};
6use crate::ptp::{ObjectHandle, ObjectInfo, StorageId, StorageInfo};
7use crate::{Error, UploadError};
8use bytes::Bytes;
9use futures::Stream;
10use std::ops::ControlFlow;
11use std::sync::Arc;
12
13use super::device::MtpDeviceInner;
14
15/// An in-progress directory listing that yields [`ObjectInfo`] items one at a time.
16///
17/// Created by [`Storage::list_objects_stream()`]. After `GetObjectHandles` completes,
18/// the total count is known immediately. Each call to [`next()`](Self::next) fetches
19/// one `GetObjectInfo` from USB, so the consumer can report progress (e.g.,
20/// "Loading files (42 of 500)...") as items arrive.
21///
22/// # Important
23///
24/// The MTP session is busy while this listing is active. You must consume
25/// all items (or drop the listing) before calling other storage methods.
26///
27/// # Example
28///
29/// ```rust,no_run
30/// use mtp_rs::mtp::MtpDevice;
31///
32/// # async fn example() -> Result<(), mtp_rs::Error> {
33/// # let device = MtpDevice::open_first().await?;
34/// # let storages = device.storages().await?;
35/// # let storage = &storages[0];
36/// let mut listing = storage.list_objects_stream(None).await?;
37/// println!("Loading {} files...", listing.total());
38///
39/// while let Some(result) = listing.next().await {
40///     let info = result?;
41///     println!("[{}/{}] {}", listing.fetched(), listing.total(), info.filename);
42/// }
43/// # Ok(())
44/// # }
45/// ```
46pub struct ObjectListing {
47    inner: Arc<MtpDeviceInner>,
48    handles: Vec<ObjectHandle>,
49    /// Index of the next handle to fetch.
50    cursor: usize,
51    /// Parent filter: if Some, only items matching this parent are yielded.
52    parent_filter: Option<ParentFilter>,
53    /// Optional cancellation handle. When set and flipped, [`next`](Self::next)
54    /// returns `Some(Err(Error::Cancelled))` at the next per-handle boundary.
55    cancel: Option<CancelToken>,
56}
57
58/// Describes how to filter objects by parent handle.
59enum ParentFilter {
60    /// Accept objects whose parent matches exactly.
61    Exact(ObjectHandle),
62    /// Android root: accept parent 0 or 0xFFFFFFFF.
63    AndroidRoot,
64}
65
66impl ObjectListing {
67    /// Total number of object handles returned by the device.
68    ///
69    /// When a parent filter is active (e.g., Fuji devices that return all objects
70    /// for root), some items may be skipped, so the actual yielded count can be lower.
71    #[must_use]
72    pub fn total(&self) -> usize {
73        self.handles.len()
74    }
75
76    /// Number of handles processed so far (including filtered-out items).
77    #[must_use]
78    pub fn fetched(&self) -> usize {
79        self.cursor
80    }
81
82    /// Fetch the next object from the device.
83    ///
84    /// Returns `None` when all handles have been processed.
85    /// Items that don't match the parent filter are silently skipped.
86    ///
87    /// If a [`CancelToken`] was passed via
88    /// [`Storage::list_objects_stream_with_cancel`] and it's been cancelled,
89    /// this returns `Some(Err(Error::Cancelled))` at the next per-handle
90    /// boundary (typically within one `GetObjectInfo` USB roundtrip).
91    pub async fn next(&mut self) -> Option<Result<ObjectInfo, Error>> {
92        loop {
93            if self.cursor >= self.handles.len() {
94                return None;
95            }
96
97            // Cooperative cancel check before issuing the per-handle USB
98            // roundtrip. On a 1k-photo listing this is the actual stop point.
99            if let Err(e) = bail_if_cancelled(self.cancel.as_ref()) {
100                return Some(Err(e));
101            }
102
103            let handle = self.handles[self.cursor];
104            self.cursor += 1;
105
106            let mut info = match self.inner.session.get_object_info_full(handle).await {
107                Ok(info) => info,
108                Err(e) => return Some(Err(e)),
109            };
110            info.handle = handle;
111
112            // Apply parent filter if present
113            if let Some(filter) = &self.parent_filter {
114                let matches = match filter {
115                    ParentFilter::Exact(expected) => info.parent == *expected,
116                    ParentFilter::AndroidRoot => info.parent.0 == 0 || info.parent.0 == 0xFFFFFFFF,
117                };
118                if !matches {
119                    continue;
120                }
121            }
122
123            return Some(Ok(info));
124        }
125    }
126}
127
128/// A storage location on an MTP device.
129///
130/// `Storage` holds an `Arc<MtpDeviceInner>` so it can outlive the original
131/// `MtpDevice` and be used from multiple tasks.
132pub struct Storage {
133    inner: Arc<MtpDeviceInner>,
134    id: StorageId,
135    info: StorageInfo,
136}
137
138impl Storage {
139    /// Create a new Storage (internal).
140    pub(crate) fn new(inner: Arc<MtpDeviceInner>, id: StorageId, info: StorageInfo) -> Self {
141        Self { inner, id, info }
142    }
143
144    #[must_use]
145    pub fn id(&self) -> StorageId {
146        self.id
147    }
148
149    /// Storage information (cached, call refresh() to update).
150    #[must_use]
151    pub fn info(&self) -> &StorageInfo {
152        &self.info
153    }
154
155    /// Refresh storage info from device (updates free space, etc.).
156    pub async fn refresh(&mut self) -> Result<(), Error> {
157        self.info = self.inner.session.get_storage_info(self.id).await?;
158        Ok(())
159    }
160
161    /// List objects in a folder (None = root), returning all results at once.
162    ///
163    /// For progress reporting during large listings, use
164    /// [`list_objects_stream()`](Self::list_objects_stream) instead.
165    ///
166    /// This method handles various device quirks:
167    /// - Root listing tries parent=0xFFFFFFFF first (fast path for Android, Kindle, etc.)
168    /// - Falls back to parent=0 only when the device rejects 0xFFFFFFFF
169    /// - Samsung devices: return InvalidObjectHandle for parent=0, so we fall back to recursive
170    /// - Fuji devices: return all objects for root, so we filter by parent handle
171    pub async fn list_objects(
172        &self,
173        parent: Option<ObjectHandle>,
174    ) -> Result<Vec<ObjectInfo>, Error> {
175        self.list_objects_with_cancel(parent, None).await
176    }
177
178    /// Like [`list_objects`](Self::list_objects), but takes a cooperative
179    /// cancellation token.
180    ///
181    /// When `cancel` is `Some(&token)` and the token has been cancelled, the
182    /// call bails between per-handle fetches with `Err(Error::Cancelled)`.
183    /// Useful for large folders (1k+ entries on Android), where the
184    /// `GetObjectInfo` per-handle loop dominates wall-clock time.
185    ///
186    /// Pass `None` for backwards-compatible behavior identical to
187    /// `list_objects`.
188    pub async fn list_objects_with_cancel(
189        &self,
190        parent: Option<ObjectHandle>,
191        cancel: Option<&CancelToken>,
192    ) -> Result<Vec<ObjectInfo>, Error> {
193        let mut listing = self.list_objects_stream_with_cancel(parent, cancel).await?;
194        let mut objects = Vec::with_capacity(listing.total());
195        while let Some(result) = listing.next().await {
196            objects.push(result?);
197        }
198        Ok(objects)
199    }
200
201    /// List objects in a folder as a streaming [`ObjectListing`].
202    ///
203    /// Returns immediately after `GetObjectHandles` completes. The total count
204    /// is then known via [`ObjectListing::total()`], and each call to
205    /// [`ObjectListing::next()`] fetches one object's metadata from USB.
206    ///
207    /// For root listings (`parent=None`), tries `parent=0xFFFFFFFF` first — this
208    /// returns only root-level handles on Android, Kindle, and many other devices.
209    /// Falls back to `parent=0` only when the device rejects `0xFFFFFFFF` with an
210    /// error. An empty result from `0xFFFFFFFF` is treated as an empty storage,
211    /// not as a reason to fall back.
212    ///
213    /// This enables progress reporting (e.g., "Loading 42 of 500...") during
214    /// what would otherwise be a single blocking `list_objects()` call.
215    ///
216    /// Handles the same device quirks as [`list_objects()`](Self::list_objects).
217    ///
218    /// # Example
219    ///
220    /// ```rust,no_run
221    /// use mtp_rs::mtp::MtpDevice;
222    ///
223    /// # async fn example() -> Result<(), mtp_rs::Error> {
224    /// # let device = MtpDevice::open_first().await?;
225    /// # let storages = device.storages().await?;
226    /// # let storage = &storages[0];
227    /// let mut listing = storage.list_objects_stream(None).await?;
228    /// println!("Found {} items", listing.total());
229    ///
230    /// while let Some(result) = listing.next().await {
231    ///     let info = result?;
232    ///     println!("[{}/{}] {}", listing.fetched(), listing.total(), info.filename);
233    /// }
234    /// # Ok(())
235    /// # }
236    /// ```
237    pub async fn list_objects_stream(
238        &self,
239        parent: Option<ObjectHandle>,
240    ) -> Result<ObjectListing, Error> {
241        self.list_objects_stream_with_cancel(parent, None).await
242    }
243
244    /// Like [`list_objects_stream`](Self::list_objects_stream), but the returned
245    /// [`ObjectListing`] carries an optional [`CancelToken`]. Every call to
246    /// [`ObjectListing::next`] checks the token before issuing the next
247    /// `GetObjectInfo` USB roundtrip, so a flipped token bails within one
248    /// roundtrip's worth of latency instead of running to completion.
249    pub async fn list_objects_stream_with_cancel(
250        &self,
251        parent: Option<ObjectHandle>,
252        cancel: Option<&CancelToken>,
253    ) -> Result<ObjectListing, Error> {
254        bail_if_cancelled(cancel)?;
255
256        // For root listings, try parent=0xFFFFFFFF first. Many devices (Android,
257        // Kindle, others) return only root-level handles for this value, while
258        // parent=0 returns every object on the storage. Fall back to parent=0
259        // only when the device rejects 0xFFFFFFFF with an error.
260        if parent.is_none() {
261            let fast = self
262                .inner
263                .session
264                .get_object_handles(self.id, None, Some(ObjectHandle::ALL))
265                .await;
266
267            match fast {
268                Ok(handles) => {
269                    return Ok(ObjectListing {
270                        inner: Arc::clone(&self.inner),
271                        handles,
272                        cursor: 0,
273                        parent_filter: Some(ParentFilter::AndroidRoot),
274                        cancel: cancel.cloned(),
275                    });
276                }
277                Err(_) => {
278                    // 0xFFFFFFFF rejected; fall through to parent=0 path
279                }
280            }
281        }
282
283        bail_if_cancelled(cancel)?;
284
285        let result = self
286            .inner
287            .session
288            .get_object_handles(self.id, None, parent)
289            .await;
290
291        let handles = match result {
292            Ok(h) => h,
293            Err(Error::Protocol {
294                code: crate::ptp::ResponseCode::InvalidObjectHandle,
295                ..
296            }) if parent.is_none() => {
297                // Samsung fallback: use recursive listing and filter to root items
298                return self.list_objects_stream_samsung_fallback(cancel).await;
299            }
300            Err(e) => return Err(e),
301        };
302
303        let parent_filter = Some(ParentFilter::Exact(parent.unwrap_or(ObjectHandle::ROOT)));
304
305        Ok(ObjectListing {
306            inner: Arc::clone(&self.inner),
307            handles,
308            cursor: 0,
309            parent_filter,
310            cancel: cancel.cloned(),
311        })
312    }
313
314    /// Samsung fallback returning a streaming [`ObjectListing`].
315    async fn list_objects_stream_samsung_fallback(
316        &self,
317        cancel: Option<&CancelToken>,
318    ) -> Result<ObjectListing, Error> {
319        let handles = self
320            .inner
321            .session
322            .get_object_handles(self.id, None, Some(ObjectHandle::ALL))
323            .await?;
324
325        Ok(ObjectListing {
326            inner: Arc::clone(&self.inner),
327            handles,
328            cursor: 0,
329            // Root items have parent 0 or 0xFFFFFFFF (depending on device)
330            parent_filter: Some(ParentFilter::AndroidRoot),
331            cancel: cancel.cloned(),
332        })
333    }
334
335    /// List objects recursively.
336    ///
337    /// This method automatically detects Android devices and uses manual traversal
338    /// for them, since Android's MTP implementation doesn't support the native
339    /// `ObjectHandle::ALL` recursive listing.
340    ///
341    /// For non-Android devices, it tries native recursive listing first and falls
342    /// back to manual traversal if the results look incomplete.
343    pub async fn list_objects_recursive(
344        &self,
345        parent: Option<ObjectHandle>,
346    ) -> Result<Vec<ObjectInfo>, Error> {
347        if self.inner.is_android() {
348            return self.list_objects_recursive_manual(parent).await;
349        }
350
351        let native_result = self.list_objects_recursive_native(parent).await?;
352
353        // Heuristic: if we only got folders and no files, native listing
354        // probably didn't work - fall back to manual traversal
355        let has_files = native_result.iter().any(|o| o.is_file());
356        if !native_result.is_empty() && !has_files {
357            return self.list_objects_recursive_manual(parent).await;
358        }
359
360        Ok(native_result)
361    }
362
363    /// List objects recursively using native MTP recursive listing.
364    pub async fn list_objects_recursive_native(
365        &self,
366        parent: Option<ObjectHandle>,
367    ) -> Result<Vec<ObjectInfo>, Error> {
368        let recursive_parent = if parent.is_none() {
369            Some(ObjectHandle::ALL)
370        } else {
371            parent
372        };
373
374        let handles = self
375            .inner
376            .session
377            .get_object_handles(self.id, None, recursive_parent)
378            .await?;
379
380        let mut objects = Vec::with_capacity(handles.len());
381        for handle in handles {
382            let mut info = self.inner.session.get_object_info_full(handle).await?;
383            info.handle = handle;
384            objects.push(info);
385        }
386        Ok(objects)
387    }
388
389    /// List objects recursively using manual folder traversal.
390    pub async fn list_objects_recursive_manual(
391        &self,
392        parent: Option<ObjectHandle>,
393    ) -> Result<Vec<ObjectInfo>, Error> {
394        let mut result = Vec::new();
395        let mut folders_to_visit = vec![parent];
396
397        while let Some(current_parent) = folders_to_visit.pop() {
398            let objects = self.list_objects(current_parent).await?;
399
400            for obj in objects {
401                if obj.is_folder() {
402                    folders_to_visit.push(Some(obj.handle));
403                }
404                result.push(obj);
405            }
406        }
407
408        Ok(result)
409    }
410
411    /// Get object metadata by handle.
412    ///
413    /// Files larger than 4 GB have their u64 size auto-resolved via
414    /// `GetObjectPropValue(ObjectSize)`; the standard `ObjectInfo` dataset
415    /// only encodes a u32 size which saturates at 4 GB - 1.
416    pub async fn get_object_info(&self, handle: ObjectHandle) -> Result<ObjectInfo, Error> {
417        let mut info = self.inner.session.get_object_info_full(handle).await?;
418        info.handle = handle;
419        Ok(info)
420    }
421
422    // =========================================================================
423    // Download operations
424    // =========================================================================
425
426    /// Download a file and return all bytes.
427    ///
428    /// For small to medium files where you want all the data in memory.
429    /// For large files or streaming to disk, use [`download_stream()`](Self::download_stream).
430    pub async fn download(&self, handle: ObjectHandle) -> Result<Vec<u8>, Error> {
431        self.inner.session.get_object(handle).await
432    }
433
434    /// Download a partial file (byte range).
435    ///
436    /// Uses the standard `GetPartialObject` operation, which has a 32-bit offset.
437    /// Offsets beyond 4 GB will be silently truncated — for files larger than 4 GB,
438    /// use [`download_partial_64()`](Self::download_partial_64) instead.
439    pub async fn download_partial(
440        &self,
441        handle: ObjectHandle,
442        offset: u64,
443        size: u32,
444    ) -> Result<Vec<u8>, Error> {
445        self.inner
446            .session
447            .get_partial_object(handle, offset, size)
448            .await
449    }
450
451    /// Download a partial file (byte range) with 64-bit offset support.
452    ///
453    /// Uses the Android/MTP extension `GetPartialObject64`, which supports offsets
454    /// beyond 4 GB. Only works on devices that advertise support for it (most modern
455    /// Android devices do); others return `OperationNotSupported`.
456    pub async fn download_partial_64(
457        &self,
458        handle: ObjectHandle,
459        offset: u64,
460        size: u32,
461    ) -> Result<Vec<u8>, Error> {
462        self.inner
463            .session
464            .get_partial_object_64(handle, offset, size)
465            .await
466    }
467
468    pub async fn download_thumbnail(&self, handle: ObjectHandle) -> Result<Vec<u8>, Error> {
469        self.inner.session.get_thumb(handle).await
470    }
471
472    /// Download a file as a stream (true USB streaming).
473    ///
474    /// Unlike [`download()`](Self::download), this method yields data chunks
475    /// directly from USB as they arrive, without buffering the entire file
476    /// in memory. Ideal for large files or when piping data to disk.
477    ///
478    /// # Important
479    ///
480    /// The MTP session is locked while the download is active. You must either
481    /// consume the entire download or call [`FileDownload::cancel()`] before
482    /// dropping it.
483    ///
484    /// # Example
485    ///
486    /// ```rust,no_run
487    /// use mtp_rs::mtp::MtpDevice;
488    /// use mtp_rs::ObjectHandle;
489    /// use tokio::io::AsyncWriteExt;
490    ///
491    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
492    /// # let device = MtpDevice::open_first().await?;
493    /// # let storages = device.storages().await?;
494    /// # let storage = &storages[0];
495    /// # let handle = ObjectHandle(1);
496    /// let mut download = storage.download_stream(handle).await?;
497    /// println!("Downloading {} bytes...", download.size());
498    ///
499    /// let mut file = tokio::fs::File::create("output.bin").await?;
500    /// while let Some(chunk) = download.next_chunk().await {
501    ///     let bytes = chunk?;
502    ///     file.write_all(&bytes).await?;
503    ///     println!("Progress: {:.1}%", download.progress() * 100.0);
504    /// }
505    /// # Ok(())
506    /// # }
507    /// ```
508    pub async fn download_stream(&self, handle: ObjectHandle) -> Result<FileDownload, Error> {
509        let info = self.get_object_info(handle).await?;
510        let size = info.size;
511
512        let stream = self
513            .inner
514            .session
515            .execute_with_receive_stream(crate::ptp::OperationCode::GetObject, &[handle.0])
516            .await?;
517
518        Ok(FileDownload::new(size, stream))
519    }
520
521    // =========================================================================
522    // Upload operations
523    // =========================================================================
524
525    /// Upload a file from a stream.
526    ///
527    /// The data streams directly to USB in chunks; the protocol only needs the
528    /// total size upfront (provided via `info`), not the whole file in memory.
529    ///
530    /// # Arguments
531    ///
532    /// * `parent` - Parent folder handle (None for root)
533    /// * `info` - Object metadata including filename and size
534    /// * `data` - Stream of data chunks to upload
535    ///
536    /// # Errors
537    ///
538    /// Returns [`UploadError`] on failure. PTP uploads are two-phase:
539    /// `SendObjectInfo` creates the object on the device (yielding a handle),
540    /// then `SendObject` streams the bytes. If the data phase fails, the device
541    /// is left holding a partial (possibly empty or truncated) object, and
542    /// [`UploadError::partial`] carries its handle so you can
543    /// [`delete`](Self::delete) it or retry the data phase to resume. The library
544    /// does **not** auto-delete it. See [`UploadError`] for the full contract.
545    pub async fn upload<S>(
546        &self,
547        parent: Option<ObjectHandle>,
548        info: NewObjectInfo,
549        data: S,
550    ) -> Result<ObjectHandle, UploadError>
551    where
552        S: Stream<Item = Result<Bytes, std::io::Error>> + Unpin + Send,
553    {
554        self.upload_with_progress(parent, info, data, |_| ControlFlow::Continue(()))
555            .await
556    }
557
558    /// Upload a file with progress callback.
559    ///
560    /// Progress is reported as data is read from the stream. Return
561    /// `ControlFlow::Break(())` from the callback to cancel the upload.
562    ///
563    /// # Errors
564    ///
565    /// Returns [`UploadError`] on failure or cancellation. When the failure
566    /// happens after the object was created (any error or cancellation during
567    /// the data phase), [`UploadError::partial`] carries the handle of the
568    /// possibly-partial object the device still holds, so you can
569    /// [`delete`](Self::delete) it or retry the data phase to resume. The library
570    /// does **not** auto-delete it. Cancellation surfaces as
571    /// [`Error::Cancelled`](crate::Error::Cancelled) in
572    /// [`UploadError::source`]. See [`UploadError`] for the full contract.
573    pub async fn upload_with_progress<S, F>(
574        &self,
575        parent: Option<ObjectHandle>,
576        info: NewObjectInfo,
577        data: S,
578        mut on_progress: F,
579    ) -> Result<ObjectHandle, UploadError>
580    where
581        S: Stream<Item = Result<Bytes, std::io::Error>> + Unpin + Send,
582        F: FnMut(Progress) -> ControlFlow<()> + Send,
583    {
584        use futures::StreamExt;
585
586        let total_size = info.size;
587        let object_info = info.to_object_info();
588        let parent_handle = parent.unwrap_or(ObjectHandle::ROOT);
589
590        // Phase 1: SendObjectInfo. No object exists yet, so a failure here has no
591        // partial to surface.
592        let (_, _, handle) = self
593            .inner
594            .session
595            .send_object_info(self.id, parent_handle, &object_info)
596            .await
597            .map_err(|source| UploadError {
598                source,
599                partial: None,
600            })?;
601
602        // Wrap the stream to report progress and support cancellation.
603        let mut bytes_sent = 0u64;
604        let progress_stream = data.map(move |chunk_result| {
605            let chunk = chunk_result?;
606            bytes_sent += chunk.len() as u64;
607            let progress = Progress {
608                bytes_transferred: bytes_sent,
609                total_bytes: Some(total_size),
610            };
611            if let ControlFlow::Break(()) = on_progress(progress) {
612                return Err(std::io::Error::new(
613                    std::io::ErrorKind::Interrupted,
614                    "cancelled",
615                ));
616            }
617            Ok(chunk)
618        });
619
620        // Phase 2: SendObject. The object already exists on the device, so any
621        // failure (genuine error or cancellation) surfaces the handle as
622        // `partial` for the caller to delete or resume. We do NOT delete it here.
623        self.inner
624            .session
625            .send_object_stream(total_size, progress_stream)
626            .await
627            .map_err(|e| match &e {
628                Error::Io(io_err) if io_err.kind() == std::io::ErrorKind::Interrupted => {
629                    Error::Cancelled
630                }
631                _ => e,
632            })
633            .map_err(|source| UploadError {
634                source,
635                partial: Some(handle),
636            })?;
637
638        Ok(handle)
639    }
640
641    // =========================================================================
642    // Folder and object management
643    // =========================================================================
644
645    pub async fn create_folder(
646        &self,
647        parent: Option<ObjectHandle>,
648        name: &str,
649    ) -> Result<ObjectHandle, Error> {
650        let info = NewObjectInfo::folder(name);
651        let object_info = info.to_object_info();
652        let parent_handle = parent.unwrap_or(ObjectHandle::ROOT);
653
654        let (_, _, handle) = self
655            .inner
656            .session
657            .send_object_info(self.id, parent_handle, &object_info)
658            .await?;
659
660        Ok(handle)
661    }
662
663    pub async fn delete(&self, handle: ObjectHandle) -> Result<(), Error> {
664        self.inner.session.delete_object(handle).await
665    }
666
667    /// Like [`delete`](Self::delete), but bails with `Err(Error::Cancelled)`
668    /// before issuing the PTP `DeleteObject` request when the token is set.
669    ///
670    /// The PTP `DeleteObject` is a single fast transaction (no internal loop),
671    /// so the meaningful cancel point is _between_ recursive iterations on the
672    /// caller side. This entry point lets callers thread the same token through
673    /// list and delete without juggling two API shapes.
674    pub async fn delete_with_cancel(
675        &self,
676        handle: ObjectHandle,
677        cancel: Option<&CancelToken>,
678    ) -> Result<(), Error> {
679        bail_if_cancelled(cancel)?;
680        self.inner.session.delete_object(handle).await
681    }
682
683    /// Move an object to a different folder.
684    pub async fn move_object(
685        &self,
686        handle: ObjectHandle,
687        new_parent: ObjectHandle,
688        new_storage: Option<StorageId>,
689    ) -> Result<(), Error> {
690        let storage = new_storage.unwrap_or(self.id);
691        self.inner
692            .session
693            .move_object(handle, storage, new_parent)
694            .await
695    }
696
697    pub async fn copy_object(
698        &self,
699        handle: ObjectHandle,
700        new_parent: ObjectHandle,
701        new_storage: Option<StorageId>,
702    ) -> Result<ObjectHandle, Error> {
703        let storage = new_storage.unwrap_or(self.id);
704        self.inner
705            .session
706            .copy_object(handle, storage, new_parent)
707            .await
708    }
709
710    /// Rename an object (file or folder).
711    ///
712    /// Not all devices support renaming. Use `MtpDevice::supports_rename()`
713    /// to check if this operation is available.
714    pub async fn rename(&self, handle: ObjectHandle, new_name: &str) -> Result<(), Error> {
715        self.inner.session.rename_object(handle, new_name).await
716    }
717}
718
719#[cfg(test)]
720mod tests {
721    use super::*;
722    use crate::ptp::{
723        pack_u16, pack_u32, pack_u32_array, ContainerType, DateTime, DeviceInfo, ObjectFormatCode,
724        OperationCode, PtpSession, ResponseCode, StorageInfo,
725    };
726    use crate::transport::mock::MockTransport;
727
728    // -- Test helpers (same protocol-level helpers as session tests) -----------
729
730    fn mock_transport() -> (Arc<dyn crate::transport::Transport>, Arc<MockTransport>) {
731        let mock = Arc::new(MockTransport::new());
732        let transport: Arc<dyn crate::transport::Transport> = Arc::clone(&mock) as _;
733        (transport, mock)
734    }
735
736    fn ok_response(tx_id: u32) -> Vec<u8> {
737        let mut buf = Vec::with_capacity(12);
738        buf.extend_from_slice(&pack_u32(12));
739        buf.extend_from_slice(&pack_u16(ContainerType::Response.to_code()));
740        buf.extend_from_slice(&pack_u16(ResponseCode::Ok.into()));
741        buf.extend_from_slice(&pack_u32(tx_id));
742        buf
743    }
744
745    fn error_response(tx_id: u32, code: ResponseCode) -> Vec<u8> {
746        let mut buf = Vec::with_capacity(12);
747        buf.extend_from_slice(&pack_u32(12));
748        buf.extend_from_slice(&pack_u16(ContainerType::Response.to_code()));
749        buf.extend_from_slice(&pack_u16(code.into()));
750        buf.extend_from_slice(&pack_u32(tx_id));
751        buf
752    }
753
754    fn data_container(tx_id: u32, code: OperationCode, payload: &[u8]) -> Vec<u8> {
755        let len = 12 + payload.len();
756        let mut buf = Vec::with_capacity(len);
757        buf.extend_from_slice(&pack_u32(len as u32));
758        buf.extend_from_slice(&pack_u16(ContainerType::Data.to_code()));
759        buf.extend_from_slice(&pack_u16(code.into()));
760        buf.extend_from_slice(&pack_u32(tx_id));
761        buf.extend_from_slice(payload);
762        buf
763    }
764
765    // -- Storage-level helpers ------------------------------------------------
766
767    /// Build a Storage backed by a mock transport for testing.
768    ///
769    /// Queues the OpenSession response automatically. The caller must queue
770    /// further responses before calling list methods.
771    async fn mock_storage(
772        transport: Arc<dyn crate::transport::Transport>,
773        vendor_extension_desc: &str,
774    ) -> Storage {
775        let session = Arc::new(PtpSession::open(transport, 1).await.unwrap());
776        let inner = Arc::new(MtpDeviceInner {
777            session,
778            device_info: DeviceInfo {
779                vendor_extension_desc: vendor_extension_desc.to_string(),
780                ..DeviceInfo::default()
781            },
782        });
783        Storage::new(inner, StorageId(1), StorageInfo::default())
784    }
785
786    /// Build a minimal ObjectInfo binary payload with a given filename and parent.
787    fn object_info_bytes(filename: &str, parent: u32) -> Vec<u8> {
788        let info = ObjectInfo {
789            storage_id: StorageId(1),
790            format: ObjectFormatCode::Jpeg,
791            parent: ObjectHandle(parent),
792            filename: filename.to_string(),
793            created: Some(DateTime {
794                year: 2024,
795                month: 1,
796                day: 1,
797                hour: 0,
798                minute: 0,
799                second: 0,
800            }),
801            ..ObjectInfo::default()
802        };
803        info.to_bytes().unwrap()
804    }
805
806    /// Queue GetObjectHandles response (data + ok) for a given transaction ID.
807    fn queue_handles(mock: &MockTransport, tx_id: u32, handles: &[u32]) {
808        let data = pack_u32_array(handles);
809        mock.queue_response(data_container(
810            tx_id,
811            OperationCode::GetObjectHandles,
812            &data,
813        ));
814        mock.queue_response(ok_response(tx_id));
815    }
816
817    /// Queue GetObjectInfo response (data + ok) for a given transaction ID.
818    fn queue_object_info(mock: &MockTransport, tx_id: u32, filename: &str, parent: u32) {
819        let data = object_info_bytes(filename, parent);
820        mock.queue_response(data_container(tx_id, OperationCode::GetObjectInfo, &data));
821        mock.queue_response(ok_response(tx_id));
822    }
823
824    // -- Tests ----------------------------------------------------------------
825
826    #[tokio::test]
827    async fn stream_returns_objects_with_correct_counters() {
828        let (transport, mock) = mock_transport();
829        mock.queue_response(ok_response(0)); // OpenSession
830
831        queue_handles(&mock, 1, &[10, 20, 30]);
832        queue_object_info(&mock, 2, "photo.jpg", 0);
833        queue_object_info(&mock, 3, "video.mp4", 0);
834        queue_object_info(&mock, 4, "notes.txt", 0);
835
836        let storage = mock_storage(transport, "").await;
837        let mut listing = storage.list_objects_stream(None).await.unwrap();
838
839        assert_eq!(listing.total(), 3);
840        assert_eq!(listing.fetched(), 0);
841
842        let first = listing.next().await.unwrap().unwrap();
843        assert_eq!(first.filename, "photo.jpg");
844        assert_eq!(first.handle, ObjectHandle(10));
845        assert_eq!(listing.fetched(), 1);
846
847        let second = listing.next().await.unwrap().unwrap();
848        assert_eq!(second.filename, "video.mp4");
849        assert_eq!(second.handle, ObjectHandle(20));
850        assert_eq!(listing.fetched(), 2);
851
852        let third = listing.next().await.unwrap().unwrap();
853        assert_eq!(third.filename, "notes.txt");
854        assert_eq!(third.handle, ObjectHandle(30));
855        assert_eq!(listing.fetched(), 3);
856
857        assert!(listing.next().await.is_none());
858    }
859
860    #[tokio::test]
861    async fn stream_empty_directory() {
862        let (transport, mock) = mock_transport();
863        mock.queue_response(ok_response(0)); // OpenSession
864        queue_handles(&mock, 1, &[]);
865
866        let storage = mock_storage(transport, "").await;
867        let mut listing = storage.list_objects_stream(None).await.unwrap();
868
869        assert_eq!(listing.total(), 0);
870        assert!(listing.next().await.is_none());
871    }
872
873    #[tokio::test]
874    async fn stream_filters_by_parent() {
875        // Root fast path (0xFFFFFFFF) uses AndroidRoot filter, which rejects
876        // objects whose parent is neither 0 nor 0xFFFFFFFF.
877        let (transport, mock) = mock_transport();
878        mock.queue_response(ok_response(0)); // OpenSession
879
880        queue_handles(&mock, 1, &[10, 20, 30]);
881        queue_object_info(&mock, 2, "root_file.jpg", 0); // parent=ROOT, included
882        queue_object_info(&mock, 3, "nested.jpg", 99); // parent=99, filtered out
883        queue_object_info(&mock, 4, "another_root.txt", 0); // parent=ROOT, included
884
885        let storage = mock_storage(transport, "").await;
886        let mut listing = storage.list_objects_stream(None).await.unwrap();
887
888        assert_eq!(listing.total(), 3); // Total handles from device
889
890        let first = listing.next().await.unwrap().unwrap();
891        assert_eq!(first.filename, "root_file.jpg");
892        assert_eq!(listing.fetched(), 1);
893
894        let second = listing.next().await.unwrap().unwrap();
895        assert_eq!(second.filename, "another_root.txt");
896        assert_eq!(listing.fetched(), 3); // Processed all 3 (including filtered one)
897
898        assert!(listing.next().await.is_none());
899    }
900
901    #[tokio::test]
902    async fn stream_root_accepts_both_parent_values() {
903        // Root fast path uses AndroidRoot filter: accepts parent 0 or 0xFFFFFFFF
904        let (transport, mock) = mock_transport();
905        mock.queue_response(ok_response(0)); // OpenSession
906
907        queue_handles(&mock, 1, &[10, 20, 30]);
908        queue_object_info(&mock, 2, "dcim", 0); // parent=0, root
909        queue_object_info(&mock, 3, "download", 0xFFFFFFFF); // parent=ALL, also root
910        queue_object_info(&mock, 4, "nested", 42); // not root
911
912        let storage = mock_storage(transport, "").await;
913        let mut listing = storage.list_objects_stream(None).await.unwrap();
914
915        let first = listing.next().await.unwrap().unwrap();
916        assert_eq!(first.filename, "dcim");
917
918        let second = listing.next().await.unwrap().unwrap();
919        assert_eq!(second.filename, "download");
920
921        assert!(listing.next().await.is_none()); // "nested" filtered out
922    }
923
924    #[tokio::test]
925    async fn stream_subfolder_listing() {
926        let (transport, mock) = mock_transport();
927        mock.queue_response(ok_response(0)); // OpenSession
928
929        let parent_handle = 42u32;
930        queue_handles(&mock, 1, &[100, 101]);
931        queue_object_info(&mock, 2, "IMG_001.jpg", parent_handle);
932        queue_object_info(&mock, 3, "IMG_002.jpg", parent_handle);
933
934        let storage = mock_storage(transport, "").await;
935        let mut listing = storage
936            .list_objects_stream(Some(ObjectHandle(parent_handle)))
937            .await
938            .unwrap();
939
940        assert_eq!(listing.total(), 2);
941        let first = listing.next().await.unwrap().unwrap();
942        assert_eq!(first.filename, "IMG_001.jpg");
943        let second = listing.next().await.unwrap().unwrap();
944        assert_eq!(second.filename, "IMG_002.jpg");
945        assert!(listing.next().await.is_none());
946    }
947
948    #[tokio::test]
949    async fn stream_propagates_mid_listing_error() {
950        let (transport, mock) = mock_transport();
951        mock.queue_response(ok_response(0)); // OpenSession
952
953        queue_handles(&mock, 1, &[10, 20]);
954        queue_object_info(&mock, 2, "good.jpg", 0);
955        // Handle 20: device returns error instead of object info
956        mock.queue_response(error_response(3, ResponseCode::InvalidObjectHandle));
957
958        let storage = mock_storage(transport, "").await;
959        let mut listing = storage.list_objects_stream(None).await.unwrap();
960
961        let first = listing.next().await.unwrap().unwrap();
962        assert_eq!(first.filename, "good.jpg");
963
964        let second = listing.next().await.unwrap();
965        assert!(second.is_err());
966    }
967
968    #[tokio::test]
969    async fn list_objects_matches_stream_collect() {
970        // Verify list_objects() returns identical results to collecting the stream
971        let (transport1, mock1) = mock_transport();
972        let (transport2, mock2) = mock_transport();
973
974        for mock in [&mock1, &mock2] {
975            mock.queue_response(ok_response(0)); // OpenSession
976            queue_handles(mock, 1, &[10, 20]);
977            queue_object_info(mock, 2, "a.jpg", 0);
978            queue_object_info(mock, 3, "b.txt", 0);
979        }
980
981        let storage1 = mock_storage(transport1, "").await;
982        let storage2 = mock_storage(transport2, "").await;
983
984        let all_at_once = storage1.list_objects(None).await.unwrap();
985
986        let mut listing = storage2.list_objects_stream(None).await.unwrap();
987        let mut streamed = Vec::new();
988        while let Some(result) = listing.next().await {
989            streamed.push(result.unwrap());
990        }
991
992        assert_eq!(all_at_once.len(), streamed.len());
993        for (a, b) in all_at_once.iter().zip(streamed.iter()) {
994            assert_eq!(a.filename, b.filename);
995            assert_eq!(a.handle, b.handle);
996        }
997    }
998
999    // -- Root listing fast-path / fallback tests --------------------------------
1000
1001    #[tokio::test]
1002    async fn stream_root_falls_back_on_error() {
1003        // When 0xFFFFFFFF is rejected, falls through to parent=0 with Exact(ROOT)
1004        let (transport, mock) = mock_transport();
1005        mock.queue_response(ok_response(0)); // OpenSession
1006
1007        // Fast path (0xFFFFFFFF): device rejects with InvalidObjectHandle
1008        mock.queue_response(error_response(1, ResponseCode::InvalidObjectHandle));
1009
1010        // Fallback path (parent=0): returns root objects
1011        queue_handles(&mock, 2, &[10, 20]);
1012        queue_object_info(&mock, 3, "root.jpg", 0);
1013        queue_object_info(&mock, 4, "nested.jpg", 99); // filtered by Exact(ROOT)
1014
1015        let storage = mock_storage(transport, "").await;
1016        let mut listing = storage.list_objects_stream(None).await.unwrap();
1017
1018        assert_eq!(listing.total(), 2);
1019
1020        let first = listing.next().await.unwrap().unwrap();
1021        assert_eq!(first.filename, "root.jpg");
1022
1023        // nested.jpg has parent=99, filtered out by Exact(ROOT)
1024        assert!(listing.next().await.is_none());
1025    }
1026
1027    #[tokio::test]
1028    async fn stream_root_empty_is_not_fallback() {
1029        // Empty Ok([]) from 0xFFFFFFFF is a valid empty storage, not a fallback trigger
1030        let (transport, mock) = mock_transport();
1031        mock.queue_response(ok_response(0)); // OpenSession
1032
1033        queue_handles(&mock, 1, &[]); // fast path returns empty
1034
1035        let storage = mock_storage(transport, "").await;
1036        let mut listing = storage.list_objects_stream(None).await.unwrap();
1037
1038        assert_eq!(listing.total(), 0);
1039        assert!(listing.next().await.is_none());
1040    }
1041
1042    #[tokio::test]
1043    async fn stream_kindle_root_uses_fast_path() {
1044        // Non-Android device (Kindle) benefits from 0xFFFFFFFF fast path
1045        let (transport, mock) = mock_transport();
1046        mock.queue_response(ok_response(0)); // OpenSession
1047
1048        queue_handles(&mock, 1, &[100, 101, 102]);
1049        queue_object_info(&mock, 2, "documents", 0);
1050        queue_object_info(&mock, 3, "system", 0);
1051        queue_object_info(&mock, 4, "fonts", 0);
1052
1053        let storage = mock_storage(transport, "microsoft.com/WMDRMPD:10.1").await;
1054        let mut listing = storage.list_objects_stream(None).await.unwrap();
1055
1056        assert_eq!(listing.total(), 3);
1057
1058        let first = listing.next().await.unwrap().unwrap();
1059        assert_eq!(first.filename, "documents");
1060        let second = listing.next().await.unwrap().unwrap();
1061        assert_eq!(second.filename, "system");
1062        let third = listing.next().await.unwrap().unwrap();
1063        assert_eq!(third.filename, "fonts");
1064
1065        assert!(listing.next().await.is_none());
1066    }
1067
1068    #[tokio::test]
1069    async fn stream_subfolder_skips_fast_path() {
1070        // Subfolder listing should not attempt 0xFFFFFFFF; only one get_object_handles call
1071        let (transport, mock) = mock_transport();
1072        mock.queue_response(ok_response(0)); // OpenSession
1073
1074        let parent_handle = 50u32;
1075        queue_handles(&mock, 1, &[200, 201]);
1076        queue_object_info(&mock, 2, "child_a.txt", parent_handle);
1077        queue_object_info(&mock, 3, "child_b.txt", parent_handle);
1078
1079        let storage = mock_storage(transport, "").await;
1080        let mut listing = storage
1081            .list_objects_stream(Some(ObjectHandle(parent_handle)))
1082            .await
1083            .unwrap();
1084
1085        assert_eq!(listing.total(), 2);
1086        let first = listing.next().await.unwrap().unwrap();
1087        assert_eq!(first.filename, "child_a.txt");
1088        let second = listing.next().await.unwrap().unwrap();
1089        assert_eq!(second.filename, "child_b.txt");
1090        assert!(listing.next().await.is_none());
1091    }
1092
1093    // -- Full-size (>4 GB) resolution via GetObjectPropValue ------------------
1094
1095    /// Build an ObjectInfo payload with a specific `size`. Sizes > u32::MAX are
1096    /// serialized as u32::MAX by `ObjectInfo::to_bytes`, matching real-device behavior.
1097    fn object_info_bytes_with_size(filename: &str, parent: u32, size: u64) -> Vec<u8> {
1098        let info = ObjectInfo {
1099            storage_id: StorageId(1),
1100            format: ObjectFormatCode::Jpeg,
1101            parent: ObjectHandle(parent),
1102            filename: filename.to_string(),
1103            size,
1104            ..ObjectInfo::default()
1105        };
1106        info.to_bytes().unwrap()
1107    }
1108
1109    fn queue_object_info_with_size(
1110        mock: &MockTransport,
1111        tx_id: u32,
1112        filename: &str,
1113        parent: u32,
1114        size: u64,
1115    ) {
1116        let data = object_info_bytes_with_size(filename, parent, size);
1117        mock.queue_response(data_container(tx_id, OperationCode::GetObjectInfo, &data));
1118        mock.queue_response(ok_response(tx_id));
1119    }
1120
1121    fn queue_object_size_prop(mock: &MockTransport, tx_id: u32, size: u64) {
1122        let payload = crate::ptp::pack_u64(size);
1123        mock.queue_response(data_container(
1124            tx_id,
1125            OperationCode::GetObjectPropValue,
1126            &payload,
1127        ));
1128        mock.queue_response(ok_response(tx_id));
1129    }
1130
1131    #[tokio::test]
1132    async fn get_object_info_resolves_saturated_size() {
1133        // Simulate a 5 GB file: ObjectInfo reports u32::MAX, GetObjectPropValue returns real u64.
1134        const REAL_SIZE: u64 = 5 * 1024 * 1024 * 1024;
1135
1136        let (transport, mock) = mock_transport();
1137        mock.queue_response(ok_response(0)); // OpenSession
1138        queue_object_info_with_size(&mock, 1, "big.mkv", 0, REAL_SIZE);
1139        queue_object_size_prop(&mock, 2, REAL_SIZE);
1140
1141        let storage = mock_storage(transport, "").await;
1142        let info = storage.get_object_info(ObjectHandle(42)).await.unwrap();
1143
1144        assert_eq!(info.filename, "big.mkv");
1145        assert_eq!(info.size, REAL_SIZE, "size should be resolved to full u64");
1146    }
1147
1148    #[tokio::test]
1149    async fn get_object_info_skips_lookup_when_size_fits_u32() {
1150        // Under u32::MAX: GetObjectPropValue must NOT be called. If it were, the test
1151        // would hang or fail because we only queue one response.
1152        let (transport, mock) = mock_transport();
1153        mock.queue_response(ok_response(0)); // OpenSession
1154        queue_object_info_with_size(&mock, 1, "small.jpg", 0, 1_000_000);
1155
1156        let storage = mock_storage(transport, "").await;
1157        let info = storage.get_object_info(ObjectHandle(42)).await.unwrap();
1158
1159        assert_eq!(info.size, 1_000_000);
1160    }
1161
1162    #[tokio::test]
1163    async fn get_object_info_falls_back_when_prop_lookup_fails() {
1164        // Device reports saturated size but doesn't support GetObjectPropValue.
1165        // Caller should receive the saturated value rather than an error.
1166        let (transport, mock) = mock_transport();
1167        mock.queue_response(ok_response(0)); // OpenSession
1168        queue_object_info_with_size(&mock, 1, "big.mkv", 0, 8 * 1024 * 1024 * 1024);
1169        mock.queue_response(error_response(2, ResponseCode::OperationNotSupported));
1170
1171        let storage = mock_storage(transport, "").await;
1172        let info = storage.get_object_info(ObjectHandle(42)).await.unwrap();
1173
1174        assert_eq!(
1175            info.size,
1176            u64::from(u32::MAX),
1177            "should keep saturated u32::MAX when prop lookup fails"
1178        );
1179    }
1180
1181    #[tokio::test]
1182    async fn get_object_info_exactly_u32_max_triggers_lookup() {
1183        // A file whose real size happens to equal u32::MAX is ambiguous: we can't
1184        // distinguish it from a saturated >4GB file. The lookup runs and returns the
1185        // true size, which in this case happens to match. Verify we handle it correctly.
1186        const REAL_SIZE: u64 = u32::MAX as u64;
1187
1188        let (transport, mock) = mock_transport();
1189        mock.queue_response(ok_response(0)); // OpenSession
1190        queue_object_info_with_size(&mock, 1, "edge.bin", 0, REAL_SIZE);
1191        queue_object_size_prop(&mock, 2, REAL_SIZE);
1192
1193        let storage = mock_storage(transport, "").await;
1194        let info = storage.get_object_info(ObjectHandle(42)).await.unwrap();
1195
1196        assert_eq!(info.size, REAL_SIZE);
1197    }
1198
1199    // -- CancelToken support --------------------------------------------------
1200
1201    #[tokio::test]
1202    async fn cancel_before_first_handle_bails_immediately() {
1203        // Cancel set before the first GetObjectInfo is issued: no per-handle
1204        // USB calls happen at all. We only queue the OpenSession and the
1205        // GetObjectHandles responses; if `next()` issued a GetObjectInfo,
1206        // the mock would block waiting for an unqueued response.
1207        let (transport, mock) = mock_transport();
1208        mock.queue_response(ok_response(0)); // OpenSession
1209        queue_handles(&mock, 1, &[10, 20, 30]);
1210
1211        let storage = mock_storage(transport, "").await;
1212        let cancel = CancelToken::new();
1213        let mut listing = storage
1214            .list_objects_stream_with_cancel(None, Some(&cancel))
1215            .await
1216            .unwrap();
1217
1218        assert_eq!(listing.total(), 3);
1219
1220        // Flip cancel before pulling any handles.
1221        cancel.cancel();
1222
1223        let first = listing.next().await.expect("expected Some(Err(Cancelled))");
1224        assert!(matches!(first, Err(Error::Cancelled)));
1225    }
1226
1227    #[tokio::test]
1228    async fn cancel_mid_listing_bails_at_next_boundary() {
1229        // Queue two GetObjectInfo responses; pull one successfully, then cancel
1230        // and assert the next call returns Cancelled instead of fetching the
1231        // third (which we never queue).
1232        let (transport, mock) = mock_transport();
1233        mock.queue_response(ok_response(0)); // OpenSession
1234        queue_handles(&mock, 1, &[10, 20, 30]);
1235        queue_object_info(&mock, 2, "first.jpg", 0);
1236        // Only one GetObjectInfo is queued; if cancel didn't short-circuit,
1237        // the test would block on the unqueued second response.
1238
1239        let storage = mock_storage(transport, "").await;
1240        let cancel = CancelToken::new();
1241        let mut listing = storage
1242            .list_objects_stream_with_cancel(None, Some(&cancel))
1243            .await
1244            .unwrap();
1245
1246        let first = listing.next().await.unwrap().unwrap();
1247        assert_eq!(first.filename, "first.jpg");
1248
1249        // Cancel between iterations; the next() call should see it before the
1250        // second GetObjectInfo USB roundtrip.
1251        cancel.cancel();
1252
1253        let second = listing.next().await.expect("expected Some(Err(Cancelled))");
1254        assert!(matches!(second, Err(Error::Cancelled)));
1255    }
1256
1257    #[tokio::test]
1258    async fn cancel_via_list_objects_returns_cancelled_error() {
1259        // list_objects_with_cancel surfaces the inner Cancelled error to the
1260        // caller, not a Vec.
1261        let (transport, mock) = mock_transport();
1262        mock.queue_response(ok_response(0)); // OpenSession
1263        queue_handles(&mock, 1, &[10, 20, 30]);
1264
1265        let storage = mock_storage(transport, "").await;
1266        let cancel = CancelToken::new();
1267        cancel.cancel();
1268
1269        let result = storage.list_objects_with_cancel(None, Some(&cancel)).await;
1270        assert!(matches!(result, Err(Error::Cancelled)));
1271    }
1272
1273    #[tokio::test]
1274    async fn delete_with_cancel_bails_before_request() {
1275        // delete_with_cancel must NOT issue the DeleteObject request when the
1276        // token is set. We don't queue any response after the OpenSession; if
1277        // the implementation issued the request, the call would block waiting
1278        // for an unqueued response.
1279        let (transport, mock) = mock_transport();
1280        mock.queue_response(ok_response(0)); // OpenSession
1281
1282        let storage = mock_storage(transport, "").await;
1283        let cancel = CancelToken::new();
1284        cancel.cancel();
1285
1286        let result = storage
1287            .delete_with_cancel(ObjectHandle(1), Some(&cancel))
1288            .await;
1289        assert!(matches!(result, Err(Error::Cancelled)));
1290    }
1291
1292    #[tokio::test]
1293    async fn delete_with_cancel_no_token_runs_normally() {
1294        // None token: behaves identically to delete().
1295        let (transport, mock) = mock_transport();
1296        mock.queue_response(ok_response(0)); // OpenSession
1297        mock.queue_response(ok_response(1)); // DeleteObject
1298
1299        let storage = mock_storage(transport, "").await;
1300        let result = storage.delete_with_cancel(ObjectHandle(1), None).await;
1301        assert!(result.is_ok());
1302    }
1303
1304    #[tokio::test]
1305    async fn cancel_does_not_affect_unset_token() {
1306        // Sanity: the new code path with `cancel: None` must produce identical
1307        // results to the legacy `list_objects` API.
1308        let (transport, mock) = mock_transport();
1309        mock.queue_response(ok_response(0)); // OpenSession
1310        queue_handles(&mock, 1, &[10, 20]);
1311        queue_object_info(&mock, 2, "a.jpg", 0);
1312        queue_object_info(&mock, 3, "b.jpg", 0);
1313
1314        let storage = mock_storage(transport, "").await;
1315        let objects = storage.list_objects_with_cancel(None, None).await.unwrap();
1316        assert_eq!(objects.len(), 2);
1317        assert_eq!(objects[0].filename, "a.jpg");
1318        assert_eq!(objects[1].filename, "b.jpg");
1319    }
1320
1321    #[tokio::test]
1322    async fn list_objects_stream_resolves_saturated_size() {
1323        // Verify the fix also applies to the streaming listing path.
1324        const REAL_SIZE: u64 = 6 * 1024 * 1024 * 1024;
1325
1326        let (transport, mock) = mock_transport();
1327        mock.queue_response(ok_response(0)); // OpenSession
1328        queue_handles(&mock, 1, &[10, 20]);
1329        queue_object_info_with_size(&mock, 2, "small.jpg", 0, 500_000);
1330        queue_object_info_with_size(&mock, 3, "huge.mkv", 0, REAL_SIZE);
1331        queue_object_size_prop(&mock, 4, REAL_SIZE);
1332
1333        let storage = mock_storage(transport, "").await;
1334        let objects = storage.list_objects(None).await.unwrap();
1335
1336        assert_eq!(objects.len(), 2);
1337        assert_eq!(objects[0].size, 500_000);
1338        assert_eq!(objects[1].size, REAL_SIZE);
1339    }
1340}