1use crate::mtp::object::NewObjectInfo;
4use crate::mtp::stream::{FileDownload, Progress};
5use crate::ptp::{ObjectHandle, ObjectInfo, StorageId, StorageInfo};
6use crate::Error;
7use bytes::Bytes;
8use futures::Stream;
9use std::ops::ControlFlow;
10use std::sync::Arc;
11
12use super::device::MtpDeviceInner;
13
14pub struct ObjectListing {
38 inner: Arc<MtpDeviceInner>,
39 handles: Vec<ObjectHandle>,
40 cursor: usize,
42 parent_filter: Option<ParentFilter>,
44}
45
46enum ParentFilter {
48 Exact(ObjectHandle),
50 AndroidRoot,
52}
53
54impl ObjectListing {
55 #[must_use]
60 pub fn total(&self) -> usize {
61 self.handles.len()
62 }
63
64 #[must_use]
66 pub fn fetched(&self) -> usize {
67 self.cursor
68 }
69
70 pub async fn next(&mut self) -> Option<Result<ObjectInfo, Error>> {
75 loop {
76 if self.cursor >= self.handles.len() {
77 return None;
78 }
79
80 let handle = self.handles[self.cursor];
81 self.cursor += 1;
82
83 let mut info = match self.inner.session.get_object_info(handle).await {
84 Ok(info) => info,
85 Err(e) => return Some(Err(e)),
86 };
87 info.handle = handle;
88
89 if let Some(filter) = &self.parent_filter {
91 let matches = match filter {
92 ParentFilter::Exact(expected) => info.parent == *expected,
93 ParentFilter::AndroidRoot => info.parent.0 == 0 || info.parent.0 == 0xFFFFFFFF,
94 };
95 if !matches {
96 continue;
97 }
98 }
99
100 return Some(Ok(info));
101 }
102 }
103}
104
105pub struct Storage {
110 inner: Arc<MtpDeviceInner>,
111 id: StorageId,
112 info: StorageInfo,
113}
114
115impl Storage {
116 pub(crate) fn new(inner: Arc<MtpDeviceInner>, id: StorageId, info: StorageInfo) -> Self {
118 Self { inner, id, info }
119 }
120
121 #[must_use]
122 pub fn id(&self) -> StorageId {
123 self.id
124 }
125
126 #[must_use]
128 pub fn info(&self) -> &StorageInfo {
129 &self.info
130 }
131
132 pub async fn refresh(&mut self) -> Result<(), Error> {
134 self.info = self.inner.session.get_storage_info(self.id).await?;
135 Ok(())
136 }
137
138 pub async fn list_objects(
148 &self,
149 parent: Option<ObjectHandle>,
150 ) -> Result<Vec<ObjectInfo>, Error> {
151 let mut listing = self.list_objects_stream(parent).await?;
152 let mut objects = Vec::with_capacity(listing.total());
153 while let Some(result) = listing.next().await {
154 objects.push(result?);
155 }
156 Ok(objects)
157 }
158
159 pub async fn list_objects_stream(
182 &self,
183 parent: Option<ObjectHandle>,
184 ) -> Result<ObjectListing, Error> {
185 let effective_parent = if parent.is_none() && self.inner.is_android() {
190 Some(ObjectHandle::ALL)
191 } else {
192 parent
193 };
194
195 let result = self
196 .inner
197 .session
198 .get_object_handles(self.id, None, effective_parent)
199 .await;
200
201 let handles = match result {
202 Ok(h) => h,
203 Err(Error::Protocol {
204 code: crate::ptp::ResponseCode::InvalidObjectHandle,
205 ..
206 }) if parent.is_none() => {
207 return self.list_objects_stream_samsung_fallback().await;
209 }
210 Err(e) => return Err(e),
211 };
212
213 let parent_filter = if parent.is_none() && self.inner.is_android() {
215 Some(ParentFilter::AndroidRoot)
216 } else {
217 Some(ParentFilter::Exact(parent.unwrap_or(ObjectHandle::ROOT)))
219 };
220
221 Ok(ObjectListing {
222 inner: Arc::clone(&self.inner),
223 handles,
224 cursor: 0,
225 parent_filter,
226 })
227 }
228
229 async fn list_objects_stream_samsung_fallback(&self) -> Result<ObjectListing, Error> {
231 let handles = self
232 .inner
233 .session
234 .get_object_handles(self.id, None, Some(ObjectHandle::ALL))
235 .await?;
236
237 Ok(ObjectListing {
238 inner: Arc::clone(&self.inner),
239 handles,
240 cursor: 0,
241 parent_filter: Some(ParentFilter::AndroidRoot),
243 })
244 }
245
246 pub async fn list_objects_recursive(
255 &self,
256 parent: Option<ObjectHandle>,
257 ) -> Result<Vec<ObjectInfo>, Error> {
258 if self.inner.is_android() {
259 return self.list_objects_recursive_manual(parent).await;
260 }
261
262 let native_result = self.list_objects_recursive_native(parent).await?;
263
264 let has_files = native_result.iter().any(|o| o.is_file());
267 if !native_result.is_empty() && !has_files {
268 return self.list_objects_recursive_manual(parent).await;
269 }
270
271 Ok(native_result)
272 }
273
274 pub async fn list_objects_recursive_native(
276 &self,
277 parent: Option<ObjectHandle>,
278 ) -> Result<Vec<ObjectInfo>, Error> {
279 let recursive_parent = if parent.is_none() {
280 Some(ObjectHandle::ALL)
281 } else {
282 parent
283 };
284
285 let handles = self
286 .inner
287 .session
288 .get_object_handles(self.id, None, recursive_parent)
289 .await?;
290
291 let mut objects = Vec::with_capacity(handles.len());
292 for handle in handles {
293 let mut info = self.inner.session.get_object_info(handle).await?;
294 info.handle = handle;
295 objects.push(info);
296 }
297 Ok(objects)
298 }
299
300 pub async fn list_objects_recursive_manual(
302 &self,
303 parent: Option<ObjectHandle>,
304 ) -> Result<Vec<ObjectInfo>, Error> {
305 let mut result = Vec::new();
306 let mut folders_to_visit = vec![parent];
307
308 while let Some(current_parent) = folders_to_visit.pop() {
309 let objects = self.list_objects(current_parent).await?;
310
311 for obj in objects {
312 if obj.is_folder() {
313 folders_to_visit.push(Some(obj.handle));
314 }
315 result.push(obj);
316 }
317 }
318
319 Ok(result)
320 }
321
322 pub async fn get_object_info(&self, handle: ObjectHandle) -> Result<ObjectInfo, Error> {
324 let mut info = self.inner.session.get_object_info(handle).await?;
325 info.handle = handle;
326 Ok(info)
327 }
328
329 pub async fn download(&self, handle: ObjectHandle) -> Result<Vec<u8>, Error> {
338 self.inner.session.get_object(handle).await
339 }
340
341 pub async fn download_partial(
343 &self,
344 handle: ObjectHandle,
345 offset: u64,
346 size: u32,
347 ) -> Result<Vec<u8>, Error> {
348 self.inner
349 .session
350 .get_partial_object(handle, offset, size)
351 .await
352 }
353
354 pub async fn download_thumbnail(&self, handle: ObjectHandle) -> Result<Vec<u8>, Error> {
355 self.inner.session.get_thumb(handle).await
356 }
357
358 pub async fn download_stream(&self, handle: ObjectHandle) -> Result<FileDownload, Error> {
383 let info = self.get_object_info(handle).await?;
384 let size = info.size;
385
386 let stream = self
387 .inner
388 .session
389 .execute_with_receive_stream(crate::ptp::OperationCode::GetObject, &[handle.0])
390 .await?;
391
392 Ok(FileDownload::new(size, stream))
393 }
394
395 pub async fn upload<S>(
410 &self,
411 parent: Option<ObjectHandle>,
412 info: NewObjectInfo,
413 data: S,
414 ) -> Result<ObjectHandle, Error>
415 where
416 S: Stream<Item = Result<Bytes, std::io::Error>> + Unpin,
417 {
418 self.upload_with_progress(parent, info, data, |_| ControlFlow::Continue(()))
419 .await
420 }
421
422 pub async fn upload_with_progress<S, F>(
427 &self,
428 parent: Option<ObjectHandle>,
429 info: NewObjectInfo,
430 mut data: S,
431 mut on_progress: F,
432 ) -> Result<ObjectHandle, Error>
433 where
434 S: Stream<Item = Result<Bytes, std::io::Error>> + Unpin,
435 F: FnMut(Progress) -> ControlFlow<()>,
436 {
437 use futures::StreamExt;
438
439 let total_size = info.size;
440 let mut buffer = Vec::with_capacity(total_size as usize);
441 let mut bytes_received = 0u64;
442
443 while let Some(chunk) = data.next().await {
444 let chunk = chunk.map_err(Error::Io)?;
445 bytes_received += chunk.len() as u64;
446 buffer.extend_from_slice(&chunk);
447
448 let progress = Progress {
449 bytes_transferred: bytes_received,
450 total_bytes: Some(total_size),
451 };
452
453 if let ControlFlow::Break(()) = on_progress(progress) {
454 return Err(Error::Cancelled);
455 }
456 }
457
458 let object_info = info.to_object_info();
459 let parent_handle = parent.unwrap_or(ObjectHandle::ROOT);
460 let (_, _, handle) = self
461 .inner
462 .session
463 .send_object_info(self.id, parent_handle, &object_info)
464 .await?;
465
466 self.inner.session.send_object(&buffer).await?;
467
468 Ok(handle)
469 }
470
471 pub async fn create_folder(
476 &self,
477 parent: Option<ObjectHandle>,
478 name: &str,
479 ) -> Result<ObjectHandle, Error> {
480 let info = NewObjectInfo::folder(name);
481 let object_info = info.to_object_info();
482 let parent_handle = parent.unwrap_or(ObjectHandle::ROOT);
483
484 let (_, _, handle) = self
485 .inner
486 .session
487 .send_object_info(self.id, parent_handle, &object_info)
488 .await?;
489
490 Ok(handle)
491 }
492
493 pub async fn delete(&self, handle: ObjectHandle) -> Result<(), Error> {
494 self.inner.session.delete_object(handle).await
495 }
496
497 pub async fn move_object(
499 &self,
500 handle: ObjectHandle,
501 new_parent: ObjectHandle,
502 new_storage: Option<StorageId>,
503 ) -> Result<(), Error> {
504 let storage = new_storage.unwrap_or(self.id);
505 self.inner
506 .session
507 .move_object(handle, storage, new_parent)
508 .await
509 }
510
511 pub async fn copy_object(
512 &self,
513 handle: ObjectHandle,
514 new_parent: ObjectHandle,
515 new_storage: Option<StorageId>,
516 ) -> Result<ObjectHandle, Error> {
517 let storage = new_storage.unwrap_or(self.id);
518 self.inner
519 .session
520 .copy_object(handle, storage, new_parent)
521 .await
522 }
523
524 pub async fn rename(&self, handle: ObjectHandle, new_name: &str) -> Result<(), Error> {
529 self.inner.session.rename_object(handle, new_name).await
530 }
531}
532
533#[cfg(test)]
534mod tests {
535 use super::*;
536 use crate::ptp::{
537 pack_u16, pack_u32, pack_u32_array, ContainerType, DateTime, DeviceInfo, ObjectFormatCode,
538 OperationCode, PtpSession, ResponseCode, StorageInfo,
539 };
540 use crate::transport::mock::MockTransport;
541
542 fn mock_transport() -> (Arc<dyn crate::transport::Transport>, Arc<MockTransport>) {
545 let mock = Arc::new(MockTransport::new());
546 let transport: Arc<dyn crate::transport::Transport> = Arc::clone(&mock) as _;
547 (transport, mock)
548 }
549
550 fn ok_response(tx_id: u32) -> Vec<u8> {
551 let mut buf = Vec::with_capacity(12);
552 buf.extend_from_slice(&pack_u32(12));
553 buf.extend_from_slice(&pack_u16(ContainerType::Response.to_code()));
554 buf.extend_from_slice(&pack_u16(ResponseCode::Ok.into()));
555 buf.extend_from_slice(&pack_u32(tx_id));
556 buf
557 }
558
559 fn error_response(tx_id: u32, code: ResponseCode) -> Vec<u8> {
560 let mut buf = Vec::with_capacity(12);
561 buf.extend_from_slice(&pack_u32(12));
562 buf.extend_from_slice(&pack_u16(ContainerType::Response.to_code()));
563 buf.extend_from_slice(&pack_u16(code.into()));
564 buf.extend_from_slice(&pack_u32(tx_id));
565 buf
566 }
567
568 fn data_container(tx_id: u32, code: OperationCode, payload: &[u8]) -> Vec<u8> {
569 let len = 12 + payload.len();
570 let mut buf = Vec::with_capacity(len);
571 buf.extend_from_slice(&pack_u32(len as u32));
572 buf.extend_from_slice(&pack_u16(ContainerType::Data.to_code()));
573 buf.extend_from_slice(&pack_u16(code.into()));
574 buf.extend_from_slice(&pack_u32(tx_id));
575 buf.extend_from_slice(payload);
576 buf
577 }
578
579 async fn mock_storage(
586 transport: Arc<dyn crate::transport::Transport>,
587 vendor_extension_desc: &str,
588 ) -> Storage {
589 let session = Arc::new(PtpSession::open(transport, 1).await.unwrap());
590 let inner = Arc::new(MtpDeviceInner {
591 session,
592 device_info: DeviceInfo {
593 vendor_extension_desc: vendor_extension_desc.to_string(),
594 ..DeviceInfo::default()
595 },
596 });
597 Storage::new(inner, StorageId(1), StorageInfo::default())
598 }
599
600 fn object_info_bytes(filename: &str, parent: u32) -> Vec<u8> {
602 let info = ObjectInfo {
603 storage_id: StorageId(1),
604 format: ObjectFormatCode::Jpeg,
605 parent: ObjectHandle(parent),
606 filename: filename.to_string(),
607 created: Some(DateTime {
608 year: 2024,
609 month: 1,
610 day: 1,
611 hour: 0,
612 minute: 0,
613 second: 0,
614 }),
615 ..ObjectInfo::default()
616 };
617 info.to_bytes().unwrap()
618 }
619
620 fn queue_handles(mock: &MockTransport, tx_id: u32, handles: &[u32]) {
622 let data = pack_u32_array(handles);
623 mock.queue_response(data_container(
624 tx_id,
625 OperationCode::GetObjectHandles,
626 &data,
627 ));
628 mock.queue_response(ok_response(tx_id));
629 }
630
631 fn queue_object_info(mock: &MockTransport, tx_id: u32, filename: &str, parent: u32) {
633 let data = object_info_bytes(filename, parent);
634 mock.queue_response(data_container(tx_id, OperationCode::GetObjectInfo, &data));
635 mock.queue_response(ok_response(tx_id));
636 }
637
638 #[tokio::test]
641 async fn stream_returns_objects_with_correct_counters() {
642 let (transport, mock) = mock_transport();
643 mock.queue_response(ok_response(1)); queue_handles(&mock, 2, &[10, 20, 30]);
646 queue_object_info(&mock, 3, "photo.jpg", 0);
647 queue_object_info(&mock, 4, "video.mp4", 0);
648 queue_object_info(&mock, 5, "notes.txt", 0);
649
650 let storage = mock_storage(transport, "").await;
651 let mut listing = storage.list_objects_stream(None).await.unwrap();
652
653 assert_eq!(listing.total(), 3);
654 assert_eq!(listing.fetched(), 0);
655
656 let first = listing.next().await.unwrap().unwrap();
657 assert_eq!(first.filename, "photo.jpg");
658 assert_eq!(first.handle, ObjectHandle(10));
659 assert_eq!(listing.fetched(), 1);
660
661 let second = listing.next().await.unwrap().unwrap();
662 assert_eq!(second.filename, "video.mp4");
663 assert_eq!(second.handle, ObjectHandle(20));
664 assert_eq!(listing.fetched(), 2);
665
666 let third = listing.next().await.unwrap().unwrap();
667 assert_eq!(third.filename, "notes.txt");
668 assert_eq!(third.handle, ObjectHandle(30));
669 assert_eq!(listing.fetched(), 3);
670
671 assert!(listing.next().await.is_none());
672 }
673
674 #[tokio::test]
675 async fn stream_empty_directory() {
676 let (transport, mock) = mock_transport();
677 mock.queue_response(ok_response(1)); queue_handles(&mock, 2, &[]);
679
680 let storage = mock_storage(transport, "").await;
681 let mut listing = storage.list_objects_stream(None).await.unwrap();
682
683 assert_eq!(listing.total(), 0);
684 assert!(listing.next().await.is_none());
685 }
686
687 #[tokio::test]
688 async fn stream_filters_by_parent() {
689 let (transport, mock) = mock_transport();
691 mock.queue_response(ok_response(1)); queue_handles(&mock, 2, &[10, 20, 30]);
694 queue_object_info(&mock, 3, "root_file.jpg", 0); queue_object_info(&mock, 4, "nested.jpg", 99); queue_object_info(&mock, 5, "another_root.txt", 0); let storage = mock_storage(transport, "").await;
699 let mut listing = storage.list_objects_stream(None).await.unwrap();
700
701 assert_eq!(listing.total(), 3); let first = listing.next().await.unwrap().unwrap();
704 assert_eq!(first.filename, "root_file.jpg");
705 assert_eq!(listing.fetched(), 1);
706
707 let second = listing.next().await.unwrap().unwrap();
708 assert_eq!(second.filename, "another_root.txt");
709 assert_eq!(listing.fetched(), 3); assert!(listing.next().await.is_none());
712 }
713
714 #[tokio::test]
715 async fn stream_android_root_accepts_both_parents() {
716 let (transport, mock) = mock_transport();
718 mock.queue_response(ok_response(1)); queue_handles(&mock, 2, &[10, 20, 30]);
721 queue_object_info(&mock, 3, "dcim", 0); queue_object_info(&mock, 4, "download", 0xFFFFFFFF); queue_object_info(&mock, 5, "nested", 42); let storage = mock_storage(transport, "android.com").await;
726 let mut listing = storage.list_objects_stream(None).await.unwrap();
727
728 let first = listing.next().await.unwrap().unwrap();
729 assert_eq!(first.filename, "dcim");
730
731 let second = listing.next().await.unwrap().unwrap();
732 assert_eq!(second.filename, "download");
733
734 assert!(listing.next().await.is_none()); }
736
737 #[tokio::test]
738 async fn stream_subfolder_listing() {
739 let (transport, mock) = mock_transport();
740 mock.queue_response(ok_response(1)); let parent_handle = 42u32;
743 queue_handles(&mock, 2, &[100, 101]);
744 queue_object_info(&mock, 3, "IMG_001.jpg", parent_handle);
745 queue_object_info(&mock, 4, "IMG_002.jpg", parent_handle);
746
747 let storage = mock_storage(transport, "").await;
748 let mut listing = storage
749 .list_objects_stream(Some(ObjectHandle(parent_handle)))
750 .await
751 .unwrap();
752
753 assert_eq!(listing.total(), 2);
754 let first = listing.next().await.unwrap().unwrap();
755 assert_eq!(first.filename, "IMG_001.jpg");
756 let second = listing.next().await.unwrap().unwrap();
757 assert_eq!(second.filename, "IMG_002.jpg");
758 assert!(listing.next().await.is_none());
759 }
760
761 #[tokio::test]
762 async fn stream_propagates_mid_listing_error() {
763 let (transport, mock) = mock_transport();
764 mock.queue_response(ok_response(1)); queue_handles(&mock, 2, &[10, 20]);
767 queue_object_info(&mock, 3, "good.jpg", 0);
768 mock.queue_response(error_response(4, ResponseCode::InvalidObjectHandle));
770
771 let storage = mock_storage(transport, "").await;
772 let mut listing = storage.list_objects_stream(None).await.unwrap();
773
774 let first = listing.next().await.unwrap().unwrap();
775 assert_eq!(first.filename, "good.jpg");
776
777 let second = listing.next().await.unwrap();
778 assert!(second.is_err());
779 }
780
781 #[tokio::test]
782 async fn list_objects_matches_stream_collect() {
783 let (transport1, mock1) = mock_transport();
785 let (transport2, mock2) = mock_transport();
786
787 for mock in [&mock1, &mock2] {
788 mock.queue_response(ok_response(1)); queue_handles(mock, 2, &[10, 20]);
790 queue_object_info(mock, 3, "a.jpg", 0);
791 queue_object_info(mock, 4, "b.txt", 0);
792 }
793
794 let storage1 = mock_storage(transport1, "").await;
795 let storage2 = mock_storage(transport2, "").await;
796
797 let all_at_once = storage1.list_objects(None).await.unwrap();
798
799 let mut listing = storage2.list_objects_stream(None).await.unwrap();
800 let mut streamed = Vec::new();
801 while let Some(result) = listing.next().await {
802 streamed.push(result.unwrap());
803 }
804
805 assert_eq!(all_at_once.len(), streamed.len());
806 for (a, b) in all_at_once.iter().zip(streamed.iter()) {
807 assert_eq!(a.filename, b.filename);
808 assert_eq!(a.handle, b.handle);
809 }
810 }
811}