Skip to main content

mtp_rs/mtp/
storage.rs

1//! Storage operations.
2
3use crate::mtp::object::NewObjectInfo;
4use crate::mtp::stream::{FileDownload, Progress};
5use crate::ptp::{ObjectHandle, ObjectInfo, StorageId, StorageInfo};
6use crate::Error;
7use bytes::Bytes;
8use futures::Stream;
9use std::ops::ControlFlow;
10use std::sync::Arc;
11
12use super::device::MtpDeviceInner;
13
14/// An in-progress directory listing that yields [`ObjectInfo`] items one at a time.
15///
16/// Created by [`Storage::list_objects_stream()`]. After `GetObjectHandles` completes,
17/// the total count is known immediately. Each call to [`next()`](Self::next) fetches
18/// one `GetObjectInfo` from USB, so the consumer can report progress (e.g.,
19/// "Loading files (42 of 500)...") as items arrive.
20///
21/// # Important
22///
23/// The MTP session is busy while this listing is active. You must consume
24/// all items (or drop the listing) before calling other storage methods.
25///
26/// # Example
27///
28/// ```rust,ignore
29/// let mut listing = storage.list_objects_stream(None).await?;
30/// println!("Loading {} files...", listing.total());
31///
32/// while let Some(result) = listing.next().await {
33///     let info = result?;
34///     println!("[{}/{}] {}", listing.fetched(), listing.total(), info.filename);
35/// }
36/// ```
37pub struct ObjectListing {
38    inner: Arc<MtpDeviceInner>,
39    handles: Vec<ObjectHandle>,
40    /// Index of the next handle to fetch.
41    cursor: usize,
42    /// Parent filter: if Some, only items matching this parent are yielded.
43    parent_filter: Option<ParentFilter>,
44}
45
46/// Describes how to filter objects by parent handle.
47enum ParentFilter {
48    /// Accept objects whose parent matches exactly.
49    Exact(ObjectHandle),
50    /// Android root: accept parent 0 or 0xFFFFFFFF.
51    AndroidRoot,
52}
53
54impl ObjectListing {
55    /// Total number of object handles returned by the device.
56    ///
57    /// When a parent filter is active (e.g., Fuji devices that return all objects
58    /// for root), some items may be skipped, so the actual yielded count can be lower.
59    #[must_use]
60    pub fn total(&self) -> usize {
61        self.handles.len()
62    }
63
64    /// Number of handles processed so far (including filtered-out items).
65    #[must_use]
66    pub fn fetched(&self) -> usize {
67        self.cursor
68    }
69
70    /// Fetch the next object from the device.
71    ///
72    /// Returns `None` when all handles have been processed.
73    /// Items that don't match the parent filter are silently skipped.
74    pub async fn next(&mut self) -> Option<Result<ObjectInfo, Error>> {
75        loop {
76            if self.cursor >= self.handles.len() {
77                return None;
78            }
79
80            let handle = self.handles[self.cursor];
81            self.cursor += 1;
82
83            let mut info = match self.inner.session.get_object_info(handle).await {
84                Ok(info) => info,
85                Err(e) => return Some(Err(e)),
86            };
87            info.handle = handle;
88
89            // Apply parent filter if present
90            if let Some(filter) = &self.parent_filter {
91                let matches = match filter {
92                    ParentFilter::Exact(expected) => info.parent == *expected,
93                    ParentFilter::AndroidRoot => info.parent.0 == 0 || info.parent.0 == 0xFFFFFFFF,
94                };
95                if !matches {
96                    continue;
97                }
98            }
99
100            return Some(Ok(info));
101        }
102    }
103}
104
105/// A storage location on an MTP device.
106///
107/// `Storage` holds an `Arc<MtpDeviceInner>` so it can outlive the original
108/// `MtpDevice` and be used from multiple tasks.
109pub struct Storage {
110    inner: Arc<MtpDeviceInner>,
111    id: StorageId,
112    info: StorageInfo,
113}
114
115impl Storage {
116    /// Create a new Storage (internal).
117    pub(crate) fn new(inner: Arc<MtpDeviceInner>, id: StorageId, info: StorageInfo) -> Self {
118        Self { inner, id, info }
119    }
120
121    #[must_use]
122    pub fn id(&self) -> StorageId {
123        self.id
124    }
125
126    /// Storage information (cached, call refresh() to update).
127    #[must_use]
128    pub fn info(&self) -> &StorageInfo {
129        &self.info
130    }
131
132    /// Refresh storage info from device (updates free space, etc.).
133    pub async fn refresh(&mut self) -> Result<(), Error> {
134        self.info = self.inner.session.get_storage_info(self.id).await?;
135        Ok(())
136    }
137
138    /// List objects in a folder (None = root), returning all results at once.
139    ///
140    /// For progress reporting during large listings, use
141    /// [`list_objects_stream()`](Self::list_objects_stream) instead.
142    ///
143    /// This method handles various device quirks:
144    /// - Android devices: parent=0 returns ALL objects, so we use parent=0xFFFFFFFF instead
145    /// - Samsung devices: return InvalidObjectHandle for parent=0, so we fall back to recursive
146    /// - Fuji devices: return all objects for root, so we filter by parent handle
147    pub async fn list_objects(
148        &self,
149        parent: Option<ObjectHandle>,
150    ) -> Result<Vec<ObjectInfo>, Error> {
151        let mut listing = self.list_objects_stream(parent).await?;
152        let mut objects = Vec::with_capacity(listing.total());
153        while let Some(result) = listing.next().await {
154            objects.push(result?);
155        }
156        Ok(objects)
157    }
158
159    /// List objects in a folder as a streaming [`ObjectListing`].
160    ///
161    /// Returns immediately after `GetObjectHandles` completes (one USB round-trip).
162    /// The total count is then known via [`ObjectListing::total()`], and each call
163    /// to [`ObjectListing::next()`] fetches one object's metadata from USB.
164    ///
165    /// This enables progress reporting (e.g., "Loading 42 of 500...") during
166    /// what would otherwise be a single blocking `list_objects()` call.
167    ///
168    /// Handles the same device quirks as [`list_objects()`](Self::list_objects).
169    ///
170    /// # Example
171    ///
172    /// ```rust,ignore
173    /// let mut listing = storage.list_objects_stream(None).await?;
174    /// println!("Found {} items", listing.total());
175    ///
176    /// while let Some(result) = listing.next().await {
177    ///     let info = result?;
178    ///     println!("[{}/{}] {}", listing.fetched(), listing.total(), info.filename);
179    /// }
180    /// ```
181    pub async fn list_objects_stream(
182        &self,
183        parent: Option<ObjectHandle>,
184    ) -> Result<ObjectListing, Error> {
185        // Android quirk: When listing root (parent=None/0), Android returns ALL objects
186        // on the device instead of just root-level objects. This makes listing extremely slow.
187        // Counter-intuitively, using parent=0xFFFFFFFF (ObjectHandle::ALL) returns the
188        // actual root-level objects on Android devices.
189        let effective_parent = if parent.is_none() && self.inner.is_android() {
190            Some(ObjectHandle::ALL)
191        } else {
192            parent
193        };
194
195        let result = self
196            .inner
197            .session
198            .get_object_handles(self.id, None, effective_parent)
199            .await;
200
201        let handles = match result {
202            Ok(h) => h,
203            Err(Error::Protocol {
204                code: crate::ptp::ResponseCode::InvalidObjectHandle,
205                ..
206            }) if parent.is_none() => {
207                // Samsung fallback: use recursive listing and filter to root items
208                return self.list_objects_stream_samsung_fallback().await;
209            }
210            Err(e) => return Err(e),
211        };
212
213        // Build parent filter for devices that return more objects than requested
214        let parent_filter = if parent.is_none() && self.inner.is_android() {
215            Some(ParentFilter::AndroidRoot)
216        } else {
217            // Filter by exact parent (catches Fuji devices that return all objects for root)
218            Some(ParentFilter::Exact(parent.unwrap_or(ObjectHandle::ROOT)))
219        };
220
221        Ok(ObjectListing {
222            inner: Arc::clone(&self.inner),
223            handles,
224            cursor: 0,
225            parent_filter,
226        })
227    }
228
229    /// Samsung fallback returning a streaming [`ObjectListing`].
230    async fn list_objects_stream_samsung_fallback(&self) -> Result<ObjectListing, Error> {
231        let handles = self
232            .inner
233            .session
234            .get_object_handles(self.id, None, Some(ObjectHandle::ALL))
235            .await?;
236
237        Ok(ObjectListing {
238            inner: Arc::clone(&self.inner),
239            handles,
240            cursor: 0,
241            // Root items have parent 0 or 0xFFFFFFFF (depending on device)
242            parent_filter: Some(ParentFilter::AndroidRoot),
243        })
244    }
245
246    /// List objects recursively.
247    ///
248    /// This method automatically detects Android devices and uses manual traversal
249    /// for them, since Android's MTP implementation doesn't support the native
250    /// `ObjectHandle::ALL` recursive listing.
251    ///
252    /// For non-Android devices, it tries native recursive listing first and falls
253    /// back to manual traversal if the results look incomplete.
254    pub async fn list_objects_recursive(
255        &self,
256        parent: Option<ObjectHandle>,
257    ) -> Result<Vec<ObjectInfo>, Error> {
258        if self.inner.is_android() {
259            return self.list_objects_recursive_manual(parent).await;
260        }
261
262        let native_result = self.list_objects_recursive_native(parent).await?;
263
264        // Heuristic: if we only got folders and no files, native listing
265        // probably didn't work - fall back to manual traversal
266        let has_files = native_result.iter().any(|o| o.is_file());
267        if !native_result.is_empty() && !has_files {
268            return self.list_objects_recursive_manual(parent).await;
269        }
270
271        Ok(native_result)
272    }
273
274    /// List objects recursively using native MTP recursive listing.
275    pub async fn list_objects_recursive_native(
276        &self,
277        parent: Option<ObjectHandle>,
278    ) -> Result<Vec<ObjectInfo>, Error> {
279        let recursive_parent = if parent.is_none() {
280            Some(ObjectHandle::ALL)
281        } else {
282            parent
283        };
284
285        let handles = self
286            .inner
287            .session
288            .get_object_handles(self.id, None, recursive_parent)
289            .await?;
290
291        let mut objects = Vec::with_capacity(handles.len());
292        for handle in handles {
293            let mut info = self.inner.session.get_object_info(handle).await?;
294            info.handle = handle;
295            objects.push(info);
296        }
297        Ok(objects)
298    }
299
300    /// List objects recursively using manual folder traversal.
301    pub async fn list_objects_recursive_manual(
302        &self,
303        parent: Option<ObjectHandle>,
304    ) -> Result<Vec<ObjectInfo>, Error> {
305        let mut result = Vec::new();
306        let mut folders_to_visit = vec![parent];
307
308        while let Some(current_parent) = folders_to_visit.pop() {
309            let objects = self.list_objects(current_parent).await?;
310
311            for obj in objects {
312                if obj.is_folder() {
313                    folders_to_visit.push(Some(obj.handle));
314                }
315                result.push(obj);
316            }
317        }
318
319        Ok(result)
320    }
321
322    /// Get object metadata by handle.
323    pub async fn get_object_info(&self, handle: ObjectHandle) -> Result<ObjectInfo, Error> {
324        let mut info = self.inner.session.get_object_info(handle).await?;
325        info.handle = handle;
326        Ok(info)
327    }
328
329    // =========================================================================
330    // Download operations
331    // =========================================================================
332
333    /// Download a file and return all bytes.
334    ///
335    /// For small to medium files where you want all the data in memory.
336    /// For large files or streaming to disk, use [`download_stream()`](Self::download_stream).
337    pub async fn download(&self, handle: ObjectHandle) -> Result<Vec<u8>, Error> {
338        self.inner.session.get_object(handle).await
339    }
340
341    /// Download a partial file (byte range).
342    pub async fn download_partial(
343        &self,
344        handle: ObjectHandle,
345        offset: u64,
346        size: u32,
347    ) -> Result<Vec<u8>, Error> {
348        self.inner
349            .session
350            .get_partial_object(handle, offset, size)
351            .await
352    }
353
354    pub async fn download_thumbnail(&self, handle: ObjectHandle) -> Result<Vec<u8>, Error> {
355        self.inner.session.get_thumb(handle).await
356    }
357
358    /// Download a file as a stream (true USB streaming).
359    ///
360    /// Unlike [`download()`](Self::download), this method yields data chunks
361    /// directly from USB as they arrive, without buffering the entire file
362    /// in memory. Ideal for large files or when piping data to disk.
363    ///
364    /// # Important
365    ///
366    /// The MTP session is locked while the download is active. You must consume
367    /// the entire download (or drop it) before calling other storage methods.
368    ///
369    /// # Example
370    ///
371    /// ```rust,ignore
372    /// let mut download = storage.download_stream(handle).await?;
373    /// println!("Downloading {} bytes...", download.size());
374    ///
375    /// let mut file = tokio::fs::File::create("output.bin").await?;
376    /// while let Some(chunk) = download.next_chunk().await {
377    ///     let bytes = chunk?;
378    ///     file.write_all(&bytes).await?;
379    ///     println!("Progress: {:.1}%", download.progress() * 100.0);
380    /// }
381    /// ```
382    pub async fn download_stream(&self, handle: ObjectHandle) -> Result<FileDownload, Error> {
383        let info = self.get_object_info(handle).await?;
384        let size = info.size;
385
386        let stream = self
387            .inner
388            .session
389            .execute_with_receive_stream(crate::ptp::OperationCode::GetObject, &[handle.0])
390            .await?;
391
392        Ok(FileDownload::new(size, stream))
393    }
394
395    // =========================================================================
396    // Upload operations
397    // =========================================================================
398
399    /// Upload a file from a stream.
400    ///
401    /// The stream is consumed and all data is buffered before sending
402    /// (MTP protocol requires knowing the total size upfront).
403    ///
404    /// # Arguments
405    ///
406    /// * `parent` - Parent folder handle (None for root)
407    /// * `info` - Object metadata including filename and size
408    /// * `data` - Stream of data chunks to upload
409    pub async fn upload<S>(
410        &self,
411        parent: Option<ObjectHandle>,
412        info: NewObjectInfo,
413        data: S,
414    ) -> Result<ObjectHandle, Error>
415    where
416        S: Stream<Item = Result<Bytes, std::io::Error>> + Unpin,
417    {
418        self.upload_with_progress(parent, info, data, |_| ControlFlow::Continue(()))
419            .await
420    }
421
422    /// Upload a file with progress callback.
423    ///
424    /// Progress is reported as data is read from the stream. Return
425    /// `ControlFlow::Break(())` from the callback to cancel the upload.
426    pub async fn upload_with_progress<S, F>(
427        &self,
428        parent: Option<ObjectHandle>,
429        info: NewObjectInfo,
430        mut data: S,
431        mut on_progress: F,
432    ) -> Result<ObjectHandle, Error>
433    where
434        S: Stream<Item = Result<Bytes, std::io::Error>> + Unpin,
435        F: FnMut(Progress) -> ControlFlow<()>,
436    {
437        use futures::StreamExt;
438
439        let total_size = info.size;
440        let mut buffer = Vec::with_capacity(total_size as usize);
441        let mut bytes_received = 0u64;
442
443        while let Some(chunk) = data.next().await {
444            let chunk = chunk.map_err(Error::Io)?;
445            bytes_received += chunk.len() as u64;
446            buffer.extend_from_slice(&chunk);
447
448            let progress = Progress {
449                bytes_transferred: bytes_received,
450                total_bytes: Some(total_size),
451            };
452
453            if let ControlFlow::Break(()) = on_progress(progress) {
454                return Err(Error::Cancelled);
455            }
456        }
457
458        let object_info = info.to_object_info();
459        let parent_handle = parent.unwrap_or(ObjectHandle::ROOT);
460        let (_, _, handle) = self
461            .inner
462            .session
463            .send_object_info(self.id, parent_handle, &object_info)
464            .await?;
465
466        self.inner.session.send_object(&buffer).await?;
467
468        Ok(handle)
469    }
470
471    // =========================================================================
472    // Folder and object management
473    // =========================================================================
474
475    pub async fn create_folder(
476        &self,
477        parent: Option<ObjectHandle>,
478        name: &str,
479    ) -> Result<ObjectHandle, Error> {
480        let info = NewObjectInfo::folder(name);
481        let object_info = info.to_object_info();
482        let parent_handle = parent.unwrap_or(ObjectHandle::ROOT);
483
484        let (_, _, handle) = self
485            .inner
486            .session
487            .send_object_info(self.id, parent_handle, &object_info)
488            .await?;
489
490        Ok(handle)
491    }
492
493    pub async fn delete(&self, handle: ObjectHandle) -> Result<(), Error> {
494        self.inner.session.delete_object(handle).await
495    }
496
497    /// Move an object to a different folder.
498    pub async fn move_object(
499        &self,
500        handle: ObjectHandle,
501        new_parent: ObjectHandle,
502        new_storage: Option<StorageId>,
503    ) -> Result<(), Error> {
504        let storage = new_storage.unwrap_or(self.id);
505        self.inner
506            .session
507            .move_object(handle, storage, new_parent)
508            .await
509    }
510
511    pub async fn copy_object(
512        &self,
513        handle: ObjectHandle,
514        new_parent: ObjectHandle,
515        new_storage: Option<StorageId>,
516    ) -> Result<ObjectHandle, Error> {
517        let storage = new_storage.unwrap_or(self.id);
518        self.inner
519            .session
520            .copy_object(handle, storage, new_parent)
521            .await
522    }
523
524    /// Rename an object (file or folder).
525    ///
526    /// Not all devices support renaming. Use `MtpDevice::supports_rename()`
527    /// to check if this operation is available.
528    pub async fn rename(&self, handle: ObjectHandle, new_name: &str) -> Result<(), Error> {
529        self.inner.session.rename_object(handle, new_name).await
530    }
531}
532
533#[cfg(test)]
534mod tests {
535    use super::*;
536    use crate::ptp::{
537        pack_u16, pack_u32, pack_u32_array, ContainerType, DateTime, DeviceInfo, ObjectFormatCode,
538        OperationCode, PtpSession, ResponseCode, StorageInfo,
539    };
540    use crate::transport::mock::MockTransport;
541
542    // -- Test helpers (same protocol-level helpers as session tests) -----------
543
544    fn mock_transport() -> (Arc<dyn crate::transport::Transport>, Arc<MockTransport>) {
545        let mock = Arc::new(MockTransport::new());
546        let transport: Arc<dyn crate::transport::Transport> = Arc::clone(&mock) as _;
547        (transport, mock)
548    }
549
550    fn ok_response(tx_id: u32) -> Vec<u8> {
551        let mut buf = Vec::with_capacity(12);
552        buf.extend_from_slice(&pack_u32(12));
553        buf.extend_from_slice(&pack_u16(ContainerType::Response.to_code()));
554        buf.extend_from_slice(&pack_u16(ResponseCode::Ok.into()));
555        buf.extend_from_slice(&pack_u32(tx_id));
556        buf
557    }
558
559    fn error_response(tx_id: u32, code: ResponseCode) -> Vec<u8> {
560        let mut buf = Vec::with_capacity(12);
561        buf.extend_from_slice(&pack_u32(12));
562        buf.extend_from_slice(&pack_u16(ContainerType::Response.to_code()));
563        buf.extend_from_slice(&pack_u16(code.into()));
564        buf.extend_from_slice(&pack_u32(tx_id));
565        buf
566    }
567
568    fn data_container(tx_id: u32, code: OperationCode, payload: &[u8]) -> Vec<u8> {
569        let len = 12 + payload.len();
570        let mut buf = Vec::with_capacity(len);
571        buf.extend_from_slice(&pack_u32(len as u32));
572        buf.extend_from_slice(&pack_u16(ContainerType::Data.to_code()));
573        buf.extend_from_slice(&pack_u16(code.into()));
574        buf.extend_from_slice(&pack_u32(tx_id));
575        buf.extend_from_slice(payload);
576        buf
577    }
578
579    // -- Storage-level helpers ------------------------------------------------
580
581    /// Build a Storage backed by a mock transport for testing.
582    ///
583    /// Queues the OpenSession response automatically. The caller must queue
584    /// further responses before calling list methods.
585    async fn mock_storage(
586        transport: Arc<dyn crate::transport::Transport>,
587        vendor_extension_desc: &str,
588    ) -> Storage {
589        let session = Arc::new(PtpSession::open(transport, 1).await.unwrap());
590        let inner = Arc::new(MtpDeviceInner {
591            session,
592            device_info: DeviceInfo {
593                vendor_extension_desc: vendor_extension_desc.to_string(),
594                ..DeviceInfo::default()
595            },
596        });
597        Storage::new(inner, StorageId(1), StorageInfo::default())
598    }
599
600    /// Build a minimal ObjectInfo binary payload with a given filename and parent.
601    fn object_info_bytes(filename: &str, parent: u32) -> Vec<u8> {
602        let info = ObjectInfo {
603            storage_id: StorageId(1),
604            format: ObjectFormatCode::Jpeg,
605            parent: ObjectHandle(parent),
606            filename: filename.to_string(),
607            created: Some(DateTime {
608                year: 2024,
609                month: 1,
610                day: 1,
611                hour: 0,
612                minute: 0,
613                second: 0,
614            }),
615            ..ObjectInfo::default()
616        };
617        info.to_bytes().unwrap()
618    }
619
620    /// Queue GetObjectHandles response (data + ok) for a given transaction ID.
621    fn queue_handles(mock: &MockTransport, tx_id: u32, handles: &[u32]) {
622        let data = pack_u32_array(handles);
623        mock.queue_response(data_container(
624            tx_id,
625            OperationCode::GetObjectHandles,
626            &data,
627        ));
628        mock.queue_response(ok_response(tx_id));
629    }
630
631    /// Queue GetObjectInfo response (data + ok) for a given transaction ID.
632    fn queue_object_info(mock: &MockTransport, tx_id: u32, filename: &str, parent: u32) {
633        let data = object_info_bytes(filename, parent);
634        mock.queue_response(data_container(tx_id, OperationCode::GetObjectInfo, &data));
635        mock.queue_response(ok_response(tx_id));
636    }
637
638    // -- Tests ----------------------------------------------------------------
639
640    #[tokio::test]
641    async fn stream_returns_objects_with_correct_counters() {
642        let (transport, mock) = mock_transport();
643        mock.queue_response(ok_response(1)); // OpenSession
644
645        queue_handles(&mock, 2, &[10, 20, 30]);
646        queue_object_info(&mock, 3, "photo.jpg", 0);
647        queue_object_info(&mock, 4, "video.mp4", 0);
648        queue_object_info(&mock, 5, "notes.txt", 0);
649
650        let storage = mock_storage(transport, "").await;
651        let mut listing = storage.list_objects_stream(None).await.unwrap();
652
653        assert_eq!(listing.total(), 3);
654        assert_eq!(listing.fetched(), 0);
655
656        let first = listing.next().await.unwrap().unwrap();
657        assert_eq!(first.filename, "photo.jpg");
658        assert_eq!(first.handle, ObjectHandle(10));
659        assert_eq!(listing.fetched(), 1);
660
661        let second = listing.next().await.unwrap().unwrap();
662        assert_eq!(second.filename, "video.mp4");
663        assert_eq!(second.handle, ObjectHandle(20));
664        assert_eq!(listing.fetched(), 2);
665
666        let third = listing.next().await.unwrap().unwrap();
667        assert_eq!(third.filename, "notes.txt");
668        assert_eq!(third.handle, ObjectHandle(30));
669        assert_eq!(listing.fetched(), 3);
670
671        assert!(listing.next().await.is_none());
672    }
673
674    #[tokio::test]
675    async fn stream_empty_directory() {
676        let (transport, mock) = mock_transport();
677        mock.queue_response(ok_response(1)); // OpenSession
678        queue_handles(&mock, 2, &[]);
679
680        let storage = mock_storage(transport, "").await;
681        let mut listing = storage.list_objects_stream(None).await.unwrap();
682
683        assert_eq!(listing.total(), 0);
684        assert!(listing.next().await.is_none());
685    }
686
687    #[tokio::test]
688    async fn stream_filters_by_parent() {
689        // Simulates Fuji quirk: device returns objects with wrong parent handles
690        let (transport, mock) = mock_transport();
691        mock.queue_response(ok_response(1)); // OpenSession
692
693        queue_handles(&mock, 2, &[10, 20, 30]);
694        queue_object_info(&mock, 3, "root_file.jpg", 0); // parent=ROOT, included
695        queue_object_info(&mock, 4, "nested.jpg", 99); // parent=99, filtered out
696        queue_object_info(&mock, 5, "another_root.txt", 0); // parent=ROOT, included
697
698        let storage = mock_storage(transport, "").await;
699        let mut listing = storage.list_objects_stream(None).await.unwrap();
700
701        assert_eq!(listing.total(), 3); // Total handles from device
702
703        let first = listing.next().await.unwrap().unwrap();
704        assert_eq!(first.filename, "root_file.jpg");
705        assert_eq!(listing.fetched(), 1);
706
707        let second = listing.next().await.unwrap().unwrap();
708        assert_eq!(second.filename, "another_root.txt");
709        assert_eq!(listing.fetched(), 3); // Processed all 3 (including filtered one)
710
711        assert!(listing.next().await.is_none());
712    }
713
714    #[tokio::test]
715    async fn stream_android_root_accepts_both_parents() {
716        // Android quirk: root items may have parent 0 or 0xFFFFFFFF
717        let (transport, mock) = mock_transport();
718        mock.queue_response(ok_response(1)); // OpenSession
719
720        queue_handles(&mock, 2, &[10, 20, 30]);
721        queue_object_info(&mock, 3, "dcim", 0); // parent=0, root
722        queue_object_info(&mock, 4, "download", 0xFFFFFFFF); // parent=ALL, also root on Android
723        queue_object_info(&mock, 5, "nested", 42); // not root
724
725        let storage = mock_storage(transport, "android.com").await;
726        let mut listing = storage.list_objects_stream(None).await.unwrap();
727
728        let first = listing.next().await.unwrap().unwrap();
729        assert_eq!(first.filename, "dcim");
730
731        let second = listing.next().await.unwrap().unwrap();
732        assert_eq!(second.filename, "download");
733
734        assert!(listing.next().await.is_none()); // "nested" filtered out
735    }
736
737    #[tokio::test]
738    async fn stream_subfolder_listing() {
739        let (transport, mock) = mock_transport();
740        mock.queue_response(ok_response(1)); // OpenSession
741
742        let parent_handle = 42u32;
743        queue_handles(&mock, 2, &[100, 101]);
744        queue_object_info(&mock, 3, "IMG_001.jpg", parent_handle);
745        queue_object_info(&mock, 4, "IMG_002.jpg", parent_handle);
746
747        let storage = mock_storage(transport, "").await;
748        let mut listing = storage
749            .list_objects_stream(Some(ObjectHandle(parent_handle)))
750            .await
751            .unwrap();
752
753        assert_eq!(listing.total(), 2);
754        let first = listing.next().await.unwrap().unwrap();
755        assert_eq!(first.filename, "IMG_001.jpg");
756        let second = listing.next().await.unwrap().unwrap();
757        assert_eq!(second.filename, "IMG_002.jpg");
758        assert!(listing.next().await.is_none());
759    }
760
761    #[tokio::test]
762    async fn stream_propagates_mid_listing_error() {
763        let (transport, mock) = mock_transport();
764        mock.queue_response(ok_response(1)); // OpenSession
765
766        queue_handles(&mock, 2, &[10, 20]);
767        queue_object_info(&mock, 3, "good.jpg", 0);
768        // Handle 20: device returns error instead of object info
769        mock.queue_response(error_response(4, ResponseCode::InvalidObjectHandle));
770
771        let storage = mock_storage(transport, "").await;
772        let mut listing = storage.list_objects_stream(None).await.unwrap();
773
774        let first = listing.next().await.unwrap().unwrap();
775        assert_eq!(first.filename, "good.jpg");
776
777        let second = listing.next().await.unwrap();
778        assert!(second.is_err());
779    }
780
781    #[tokio::test]
782    async fn list_objects_matches_stream_collect() {
783        // Verify list_objects() returns identical results to collecting the stream
784        let (transport1, mock1) = mock_transport();
785        let (transport2, mock2) = mock_transport();
786
787        for mock in [&mock1, &mock2] {
788            mock.queue_response(ok_response(1)); // OpenSession
789            queue_handles(mock, 2, &[10, 20]);
790            queue_object_info(mock, 3, "a.jpg", 0);
791            queue_object_info(mock, 4, "b.txt", 0);
792        }
793
794        let storage1 = mock_storage(transport1, "").await;
795        let storage2 = mock_storage(transport2, "").await;
796
797        let all_at_once = storage1.list_objects(None).await.unwrap();
798
799        let mut listing = storage2.list_objects_stream(None).await.unwrap();
800        let mut streamed = Vec::new();
801        while let Some(result) = listing.next().await {
802            streamed.push(result.unwrap());
803        }
804
805        assert_eq!(all_at_once.len(), streamed.len());
806        for (a, b) in all_at_once.iter().zip(streamed.iter()) {
807            assert_eq!(a.filename, b.filename);
808            assert_eq!(a.handle, b.handle);
809        }
810    }
811}