1use crate::cancel::{bail_if_cancelled, CancelToken};
4use crate::mtp::object::NewObjectInfo;
5use crate::mtp::stream::{FileDownload, Progress};
6use crate::ptp::{ObjectHandle, ObjectInfo, StorageId, StorageInfo};
7use crate::{Error, UploadError};
8use bytes::Bytes;
9use futures::Stream;
10use std::ops::ControlFlow;
11use std::sync::Arc;
12
13use super::device::MtpDeviceInner;
14
15pub struct ObjectListing {
47 inner: Arc<MtpDeviceInner>,
48 handles: Vec<ObjectHandle>,
49 cursor: usize,
51 parent_filter: Option<ParentFilter>,
53 cancel: Option<CancelToken>,
56}
57
58enum ParentFilter {
60 Exact(ObjectHandle),
62 AndroidRoot,
64}
65
66impl ObjectListing {
67 #[must_use]
72 pub fn total(&self) -> usize {
73 self.handles.len()
74 }
75
76 #[must_use]
78 pub fn fetched(&self) -> usize {
79 self.cursor
80 }
81
82 pub async fn next(&mut self) -> Option<Result<ObjectInfo, Error>> {
92 loop {
93 if self.cursor >= self.handles.len() {
94 return None;
95 }
96
97 if let Err(e) = bail_if_cancelled(self.cancel.as_ref()) {
100 return Some(Err(e));
101 }
102
103 let handle = self.handles[self.cursor];
104 self.cursor += 1;
105
106 let mut info = match self.inner.session.get_object_info_full(handle).await {
107 Ok(info) => info,
108 Err(e) => return Some(Err(e)),
109 };
110 info.handle = handle;
111
112 if let Some(filter) = &self.parent_filter {
114 let matches = match filter {
115 ParentFilter::Exact(expected) => info.parent == *expected,
116 ParentFilter::AndroidRoot => info.parent.0 == 0 || info.parent.0 == 0xFFFFFFFF,
117 };
118 if !matches {
119 continue;
120 }
121 }
122
123 return Some(Ok(info));
124 }
125 }
126}
127
128pub struct Storage {
133 inner: Arc<MtpDeviceInner>,
134 id: StorageId,
135 info: StorageInfo,
136}
137
138impl Storage {
139 pub(crate) fn new(inner: Arc<MtpDeviceInner>, id: StorageId, info: StorageInfo) -> Self {
141 Self { inner, id, info }
142 }
143
144 #[must_use]
145 pub fn id(&self) -> StorageId {
146 self.id
147 }
148
149 #[must_use]
151 pub fn info(&self) -> &StorageInfo {
152 &self.info
153 }
154
155 pub async fn refresh(&mut self) -> Result<(), Error> {
157 self.info = self.inner.session.get_storage_info(self.id).await?;
158 Ok(())
159 }
160
161 pub async fn list_objects(
172 &self,
173 parent: Option<ObjectHandle>,
174 ) -> Result<Vec<ObjectInfo>, Error> {
175 self.list_objects_with_cancel(parent, None).await
176 }
177
178 pub async fn list_objects_with_cancel(
189 &self,
190 parent: Option<ObjectHandle>,
191 cancel: Option<&CancelToken>,
192 ) -> Result<Vec<ObjectInfo>, Error> {
193 let mut listing = self.list_objects_stream_with_cancel(parent, cancel).await?;
194 let mut objects = Vec::with_capacity(listing.total());
195 while let Some(result) = listing.next().await {
196 objects.push(result?);
197 }
198 Ok(objects)
199 }
200
201 pub async fn list_objects_stream(
238 &self,
239 parent: Option<ObjectHandle>,
240 ) -> Result<ObjectListing, Error> {
241 self.list_objects_stream_with_cancel(parent, None).await
242 }
243
244 pub async fn list_objects_stream_with_cancel(
250 &self,
251 parent: Option<ObjectHandle>,
252 cancel: Option<&CancelToken>,
253 ) -> Result<ObjectListing, Error> {
254 bail_if_cancelled(cancel)?;
255
256 if parent.is_none() {
261 let fast = self
262 .inner
263 .session
264 .get_object_handles(self.id, None, Some(ObjectHandle::ALL))
265 .await;
266
267 match fast {
268 Ok(handles) => {
269 return Ok(ObjectListing {
270 inner: Arc::clone(&self.inner),
271 handles,
272 cursor: 0,
273 parent_filter: Some(ParentFilter::AndroidRoot),
274 cancel: cancel.cloned(),
275 });
276 }
277 Err(_) => {
278 }
280 }
281 }
282
283 bail_if_cancelled(cancel)?;
284
285 let result = self
286 .inner
287 .session
288 .get_object_handles(self.id, None, parent)
289 .await;
290
291 let handles = match result {
292 Ok(h) => h,
293 Err(Error::Protocol {
294 code: crate::ptp::ResponseCode::InvalidObjectHandle,
295 ..
296 }) if parent.is_none() => {
297 return self.list_objects_stream_samsung_fallback(cancel).await;
299 }
300 Err(e) => return Err(e),
301 };
302
303 let parent_filter = Some(ParentFilter::Exact(parent.unwrap_or(ObjectHandle::ROOT)));
304
305 Ok(ObjectListing {
306 inner: Arc::clone(&self.inner),
307 handles,
308 cursor: 0,
309 parent_filter,
310 cancel: cancel.cloned(),
311 })
312 }
313
314 async fn list_objects_stream_samsung_fallback(
316 &self,
317 cancel: Option<&CancelToken>,
318 ) -> Result<ObjectListing, Error> {
319 let handles = self
320 .inner
321 .session
322 .get_object_handles(self.id, None, Some(ObjectHandle::ALL))
323 .await?;
324
325 Ok(ObjectListing {
326 inner: Arc::clone(&self.inner),
327 handles,
328 cursor: 0,
329 parent_filter: Some(ParentFilter::AndroidRoot),
331 cancel: cancel.cloned(),
332 })
333 }
334
335 pub async fn list_objects_recursive(
344 &self,
345 parent: Option<ObjectHandle>,
346 ) -> Result<Vec<ObjectInfo>, Error> {
347 if self.inner.is_android() {
348 return self.list_objects_recursive_manual(parent).await;
349 }
350
351 let native_result = self.list_objects_recursive_native(parent).await?;
352
353 let has_files = native_result.iter().any(|o| o.is_file());
356 if !native_result.is_empty() && !has_files {
357 return self.list_objects_recursive_manual(parent).await;
358 }
359
360 Ok(native_result)
361 }
362
363 pub async fn list_objects_recursive_native(
365 &self,
366 parent: Option<ObjectHandle>,
367 ) -> Result<Vec<ObjectInfo>, Error> {
368 let recursive_parent = if parent.is_none() {
369 Some(ObjectHandle::ALL)
370 } else {
371 parent
372 };
373
374 let handles = self
375 .inner
376 .session
377 .get_object_handles(self.id, None, recursive_parent)
378 .await?;
379
380 let mut objects = Vec::with_capacity(handles.len());
381 for handle in handles {
382 let mut info = self.inner.session.get_object_info_full(handle).await?;
383 info.handle = handle;
384 objects.push(info);
385 }
386 Ok(objects)
387 }
388
389 pub async fn list_objects_recursive_manual(
391 &self,
392 parent: Option<ObjectHandle>,
393 ) -> Result<Vec<ObjectInfo>, Error> {
394 let mut result = Vec::new();
395 let mut folders_to_visit = vec![parent];
396
397 while let Some(current_parent) = folders_to_visit.pop() {
398 let objects = self.list_objects(current_parent).await?;
399
400 for obj in objects {
401 if obj.is_folder() {
402 folders_to_visit.push(Some(obj.handle));
403 }
404 result.push(obj);
405 }
406 }
407
408 Ok(result)
409 }
410
411 pub async fn get_object_info(&self, handle: ObjectHandle) -> Result<ObjectInfo, Error> {
417 let mut info = self.inner.session.get_object_info_full(handle).await?;
418 info.handle = handle;
419 Ok(info)
420 }
421
422 pub async fn download(&self, handle: ObjectHandle) -> Result<Vec<u8>, Error> {
431 self.inner.session.get_object(handle).await
432 }
433
434 pub async fn download_partial(
440 &self,
441 handle: ObjectHandle,
442 offset: u64,
443 size: u32,
444 ) -> Result<Vec<u8>, Error> {
445 self.inner
446 .session
447 .get_partial_object(handle, offset, size)
448 .await
449 }
450
451 pub async fn download_partial_64(
457 &self,
458 handle: ObjectHandle,
459 offset: u64,
460 size: u32,
461 ) -> Result<Vec<u8>, Error> {
462 self.inner
463 .session
464 .get_partial_object_64(handle, offset, size)
465 .await
466 }
467
468 pub async fn download_thumbnail(&self, handle: ObjectHandle) -> Result<Vec<u8>, Error> {
469 self.inner.session.get_thumb(handle).await
470 }
471
472 pub async fn download_stream(&self, handle: ObjectHandle) -> Result<FileDownload, Error> {
509 let info = self.get_object_info(handle).await?;
510 let size = info.size;
511
512 let stream = self
513 .inner
514 .session
515 .execute_with_receive_stream(crate::ptp::OperationCode::GetObject, &[handle.0])
516 .await?;
517
518 Ok(FileDownload::new(size, stream))
519 }
520
521 pub async fn upload<S>(
546 &self,
547 parent: Option<ObjectHandle>,
548 info: NewObjectInfo,
549 data: S,
550 ) -> Result<ObjectHandle, UploadError>
551 where
552 S: Stream<Item = Result<Bytes, std::io::Error>> + Unpin + Send,
553 {
554 self.upload_with_progress(parent, info, data, |_| ControlFlow::Continue(()))
555 .await
556 }
557
558 pub async fn upload_with_progress<S, F>(
574 &self,
575 parent: Option<ObjectHandle>,
576 info: NewObjectInfo,
577 data: S,
578 mut on_progress: F,
579 ) -> Result<ObjectHandle, UploadError>
580 where
581 S: Stream<Item = Result<Bytes, std::io::Error>> + Unpin + Send,
582 F: FnMut(Progress) -> ControlFlow<()> + Send,
583 {
584 use futures::StreamExt;
585
586 let total_size = info.size;
587 let object_info = info.to_object_info();
588 let parent_handle = parent.unwrap_or(ObjectHandle::ROOT);
589
590 let (_, _, handle) = self
593 .inner
594 .session
595 .send_object_info(self.id, parent_handle, &object_info)
596 .await
597 .map_err(|source| UploadError {
598 source,
599 partial: None,
600 })?;
601
602 let mut bytes_sent = 0u64;
604 let progress_stream = data.map(move |chunk_result| {
605 let chunk = chunk_result?;
606 bytes_sent += chunk.len() as u64;
607 let progress = Progress {
608 bytes_transferred: bytes_sent,
609 total_bytes: Some(total_size),
610 };
611 if let ControlFlow::Break(()) = on_progress(progress) {
612 return Err(std::io::Error::new(
613 std::io::ErrorKind::Interrupted,
614 "cancelled",
615 ));
616 }
617 Ok(chunk)
618 });
619
620 self.inner
624 .session
625 .send_object_stream(total_size, progress_stream)
626 .await
627 .map_err(|e| match &e {
628 Error::Io(io_err) if io_err.kind() == std::io::ErrorKind::Interrupted => {
629 Error::Cancelled
630 }
631 _ => e,
632 })
633 .map_err(|source| UploadError {
634 source,
635 partial: Some(handle),
636 })?;
637
638 Ok(handle)
639 }
640
641 pub async fn create_folder(
646 &self,
647 parent: Option<ObjectHandle>,
648 name: &str,
649 ) -> Result<ObjectHandle, Error> {
650 let info = NewObjectInfo::folder(name);
651 let object_info = info.to_object_info();
652 let parent_handle = parent.unwrap_or(ObjectHandle::ROOT);
653
654 let (_, _, handle) = self
655 .inner
656 .session
657 .send_object_info(self.id, parent_handle, &object_info)
658 .await?;
659
660 Ok(handle)
661 }
662
663 pub async fn delete(&self, handle: ObjectHandle) -> Result<(), Error> {
664 self.inner.session.delete_object(handle).await
665 }
666
667 pub async fn delete_with_cancel(
675 &self,
676 handle: ObjectHandle,
677 cancel: Option<&CancelToken>,
678 ) -> Result<(), Error> {
679 bail_if_cancelled(cancel)?;
680 self.inner.session.delete_object(handle).await
681 }
682
683 pub async fn move_object(
685 &self,
686 handle: ObjectHandle,
687 new_parent: ObjectHandle,
688 new_storage: Option<StorageId>,
689 ) -> Result<(), Error> {
690 let storage = new_storage.unwrap_or(self.id);
691 self.inner
692 .session
693 .move_object(handle, storage, new_parent)
694 .await
695 }
696
697 pub async fn copy_object(
698 &self,
699 handle: ObjectHandle,
700 new_parent: ObjectHandle,
701 new_storage: Option<StorageId>,
702 ) -> Result<ObjectHandle, Error> {
703 let storage = new_storage.unwrap_or(self.id);
704 self.inner
705 .session
706 .copy_object(handle, storage, new_parent)
707 .await
708 }
709
710 pub async fn rename(&self, handle: ObjectHandle, new_name: &str) -> Result<(), Error> {
715 self.inner.session.rename_object(handle, new_name).await
716 }
717}
718
719#[cfg(test)]
720mod tests {
721 use super::*;
722 use crate::ptp::{
723 pack_u16, pack_u32, pack_u32_array, ContainerType, DateTime, DeviceInfo, ObjectFormatCode,
724 OperationCode, PtpSession, ResponseCode, StorageInfo,
725 };
726 use crate::transport::mock::MockTransport;
727
728 fn mock_transport() -> (Arc<dyn crate::transport::Transport>, Arc<MockTransport>) {
731 let mock = Arc::new(MockTransport::new());
732 let transport: Arc<dyn crate::transport::Transport> = Arc::clone(&mock) as _;
733 (transport, mock)
734 }
735
736 fn ok_response(tx_id: u32) -> Vec<u8> {
737 let mut buf = Vec::with_capacity(12);
738 buf.extend_from_slice(&pack_u32(12));
739 buf.extend_from_slice(&pack_u16(ContainerType::Response.to_code()));
740 buf.extend_from_slice(&pack_u16(ResponseCode::Ok.into()));
741 buf.extend_from_slice(&pack_u32(tx_id));
742 buf
743 }
744
745 fn error_response(tx_id: u32, code: ResponseCode) -> Vec<u8> {
746 let mut buf = Vec::with_capacity(12);
747 buf.extend_from_slice(&pack_u32(12));
748 buf.extend_from_slice(&pack_u16(ContainerType::Response.to_code()));
749 buf.extend_from_slice(&pack_u16(code.into()));
750 buf.extend_from_slice(&pack_u32(tx_id));
751 buf
752 }
753
754 fn data_container(tx_id: u32, code: OperationCode, payload: &[u8]) -> Vec<u8> {
755 let len = 12 + payload.len();
756 let mut buf = Vec::with_capacity(len);
757 buf.extend_from_slice(&pack_u32(len as u32));
758 buf.extend_from_slice(&pack_u16(ContainerType::Data.to_code()));
759 buf.extend_from_slice(&pack_u16(code.into()));
760 buf.extend_from_slice(&pack_u32(tx_id));
761 buf.extend_from_slice(payload);
762 buf
763 }
764
765 async fn mock_storage(
772 transport: Arc<dyn crate::transport::Transport>,
773 vendor_extension_desc: &str,
774 ) -> Storage {
775 let session = Arc::new(PtpSession::open(transport, 1).await.unwrap());
776 let inner = Arc::new(MtpDeviceInner {
777 session,
778 device_info: DeviceInfo {
779 vendor_extension_desc: vendor_extension_desc.to_string(),
780 ..DeviceInfo::default()
781 },
782 });
783 Storage::new(inner, StorageId(1), StorageInfo::default())
784 }
785
786 fn object_info_bytes(filename: &str, parent: u32) -> Vec<u8> {
788 let info = ObjectInfo {
789 storage_id: StorageId(1),
790 format: ObjectFormatCode::Jpeg,
791 parent: ObjectHandle(parent),
792 filename: filename.to_string(),
793 created: Some(DateTime {
794 year: 2024,
795 month: 1,
796 day: 1,
797 hour: 0,
798 minute: 0,
799 second: 0,
800 }),
801 ..ObjectInfo::default()
802 };
803 info.to_bytes().unwrap()
804 }
805
806 fn queue_handles(mock: &MockTransport, tx_id: u32, handles: &[u32]) {
808 let data = pack_u32_array(handles);
809 mock.queue_response(data_container(
810 tx_id,
811 OperationCode::GetObjectHandles,
812 &data,
813 ));
814 mock.queue_response(ok_response(tx_id));
815 }
816
817 fn queue_object_info(mock: &MockTransport, tx_id: u32, filename: &str, parent: u32) {
819 let data = object_info_bytes(filename, parent);
820 mock.queue_response(data_container(tx_id, OperationCode::GetObjectInfo, &data));
821 mock.queue_response(ok_response(tx_id));
822 }
823
824 #[tokio::test]
827 async fn stream_returns_objects_with_correct_counters() {
828 let (transport, mock) = mock_transport();
829 mock.queue_response(ok_response(0)); queue_handles(&mock, 1, &[10, 20, 30]);
832 queue_object_info(&mock, 2, "photo.jpg", 0);
833 queue_object_info(&mock, 3, "video.mp4", 0);
834 queue_object_info(&mock, 4, "notes.txt", 0);
835
836 let storage = mock_storage(transport, "").await;
837 let mut listing = storage.list_objects_stream(None).await.unwrap();
838
839 assert_eq!(listing.total(), 3);
840 assert_eq!(listing.fetched(), 0);
841
842 let first = listing.next().await.unwrap().unwrap();
843 assert_eq!(first.filename, "photo.jpg");
844 assert_eq!(first.handle, ObjectHandle(10));
845 assert_eq!(listing.fetched(), 1);
846
847 let second = listing.next().await.unwrap().unwrap();
848 assert_eq!(second.filename, "video.mp4");
849 assert_eq!(second.handle, ObjectHandle(20));
850 assert_eq!(listing.fetched(), 2);
851
852 let third = listing.next().await.unwrap().unwrap();
853 assert_eq!(third.filename, "notes.txt");
854 assert_eq!(third.handle, ObjectHandle(30));
855 assert_eq!(listing.fetched(), 3);
856
857 assert!(listing.next().await.is_none());
858 }
859
860 #[tokio::test]
861 async fn stream_empty_directory() {
862 let (transport, mock) = mock_transport();
863 mock.queue_response(ok_response(0)); queue_handles(&mock, 1, &[]);
865
866 let storage = mock_storage(transport, "").await;
867 let mut listing = storage.list_objects_stream(None).await.unwrap();
868
869 assert_eq!(listing.total(), 0);
870 assert!(listing.next().await.is_none());
871 }
872
873 #[tokio::test]
874 async fn stream_filters_by_parent() {
875 let (transport, mock) = mock_transport();
878 mock.queue_response(ok_response(0)); queue_handles(&mock, 1, &[10, 20, 30]);
881 queue_object_info(&mock, 2, "root_file.jpg", 0); queue_object_info(&mock, 3, "nested.jpg", 99); queue_object_info(&mock, 4, "another_root.txt", 0); let storage = mock_storage(transport, "").await;
886 let mut listing = storage.list_objects_stream(None).await.unwrap();
887
888 assert_eq!(listing.total(), 3); let first = listing.next().await.unwrap().unwrap();
891 assert_eq!(first.filename, "root_file.jpg");
892 assert_eq!(listing.fetched(), 1);
893
894 let second = listing.next().await.unwrap().unwrap();
895 assert_eq!(second.filename, "another_root.txt");
896 assert_eq!(listing.fetched(), 3); assert!(listing.next().await.is_none());
899 }
900
901 #[tokio::test]
902 async fn stream_root_accepts_both_parent_values() {
903 let (transport, mock) = mock_transport();
905 mock.queue_response(ok_response(0)); queue_handles(&mock, 1, &[10, 20, 30]);
908 queue_object_info(&mock, 2, "dcim", 0); queue_object_info(&mock, 3, "download", 0xFFFFFFFF); queue_object_info(&mock, 4, "nested", 42); let storage = mock_storage(transport, "").await;
913 let mut listing = storage.list_objects_stream(None).await.unwrap();
914
915 let first = listing.next().await.unwrap().unwrap();
916 assert_eq!(first.filename, "dcim");
917
918 let second = listing.next().await.unwrap().unwrap();
919 assert_eq!(second.filename, "download");
920
921 assert!(listing.next().await.is_none()); }
923
924 #[tokio::test]
925 async fn stream_subfolder_listing() {
926 let (transport, mock) = mock_transport();
927 mock.queue_response(ok_response(0)); let parent_handle = 42u32;
930 queue_handles(&mock, 1, &[100, 101]);
931 queue_object_info(&mock, 2, "IMG_001.jpg", parent_handle);
932 queue_object_info(&mock, 3, "IMG_002.jpg", parent_handle);
933
934 let storage = mock_storage(transport, "").await;
935 let mut listing = storage
936 .list_objects_stream(Some(ObjectHandle(parent_handle)))
937 .await
938 .unwrap();
939
940 assert_eq!(listing.total(), 2);
941 let first = listing.next().await.unwrap().unwrap();
942 assert_eq!(first.filename, "IMG_001.jpg");
943 let second = listing.next().await.unwrap().unwrap();
944 assert_eq!(second.filename, "IMG_002.jpg");
945 assert!(listing.next().await.is_none());
946 }
947
948 #[tokio::test]
949 async fn stream_propagates_mid_listing_error() {
950 let (transport, mock) = mock_transport();
951 mock.queue_response(ok_response(0)); queue_handles(&mock, 1, &[10, 20]);
954 queue_object_info(&mock, 2, "good.jpg", 0);
955 mock.queue_response(error_response(3, ResponseCode::InvalidObjectHandle));
957
958 let storage = mock_storage(transport, "").await;
959 let mut listing = storage.list_objects_stream(None).await.unwrap();
960
961 let first = listing.next().await.unwrap().unwrap();
962 assert_eq!(first.filename, "good.jpg");
963
964 let second = listing.next().await.unwrap();
965 assert!(second.is_err());
966 }
967
968 #[tokio::test]
969 async fn list_objects_matches_stream_collect() {
970 let (transport1, mock1) = mock_transport();
972 let (transport2, mock2) = mock_transport();
973
974 for mock in [&mock1, &mock2] {
975 mock.queue_response(ok_response(0)); queue_handles(mock, 1, &[10, 20]);
977 queue_object_info(mock, 2, "a.jpg", 0);
978 queue_object_info(mock, 3, "b.txt", 0);
979 }
980
981 let storage1 = mock_storage(transport1, "").await;
982 let storage2 = mock_storage(transport2, "").await;
983
984 let all_at_once = storage1.list_objects(None).await.unwrap();
985
986 let mut listing = storage2.list_objects_stream(None).await.unwrap();
987 let mut streamed = Vec::new();
988 while let Some(result) = listing.next().await {
989 streamed.push(result.unwrap());
990 }
991
992 assert_eq!(all_at_once.len(), streamed.len());
993 for (a, b) in all_at_once.iter().zip(streamed.iter()) {
994 assert_eq!(a.filename, b.filename);
995 assert_eq!(a.handle, b.handle);
996 }
997 }
998
999 #[tokio::test]
1002 async fn stream_root_falls_back_on_error() {
1003 let (transport, mock) = mock_transport();
1005 mock.queue_response(ok_response(0)); mock.queue_response(error_response(1, ResponseCode::InvalidObjectHandle));
1009
1010 queue_handles(&mock, 2, &[10, 20]);
1012 queue_object_info(&mock, 3, "root.jpg", 0);
1013 queue_object_info(&mock, 4, "nested.jpg", 99); let storage = mock_storage(transport, "").await;
1016 let mut listing = storage.list_objects_stream(None).await.unwrap();
1017
1018 assert_eq!(listing.total(), 2);
1019
1020 let first = listing.next().await.unwrap().unwrap();
1021 assert_eq!(first.filename, "root.jpg");
1022
1023 assert!(listing.next().await.is_none());
1025 }
1026
1027 #[tokio::test]
1028 async fn stream_root_empty_is_not_fallback() {
1029 let (transport, mock) = mock_transport();
1031 mock.queue_response(ok_response(0)); queue_handles(&mock, 1, &[]); let storage = mock_storage(transport, "").await;
1036 let mut listing = storage.list_objects_stream(None).await.unwrap();
1037
1038 assert_eq!(listing.total(), 0);
1039 assert!(listing.next().await.is_none());
1040 }
1041
1042 #[tokio::test]
1043 async fn stream_kindle_root_uses_fast_path() {
1044 let (transport, mock) = mock_transport();
1046 mock.queue_response(ok_response(0)); queue_handles(&mock, 1, &[100, 101, 102]);
1049 queue_object_info(&mock, 2, "documents", 0);
1050 queue_object_info(&mock, 3, "system", 0);
1051 queue_object_info(&mock, 4, "fonts", 0);
1052
1053 let storage = mock_storage(transport, "microsoft.com/WMDRMPD:10.1").await;
1054 let mut listing = storage.list_objects_stream(None).await.unwrap();
1055
1056 assert_eq!(listing.total(), 3);
1057
1058 let first = listing.next().await.unwrap().unwrap();
1059 assert_eq!(first.filename, "documents");
1060 let second = listing.next().await.unwrap().unwrap();
1061 assert_eq!(second.filename, "system");
1062 let third = listing.next().await.unwrap().unwrap();
1063 assert_eq!(third.filename, "fonts");
1064
1065 assert!(listing.next().await.is_none());
1066 }
1067
1068 #[tokio::test]
1069 async fn stream_subfolder_skips_fast_path() {
1070 let (transport, mock) = mock_transport();
1072 mock.queue_response(ok_response(0)); let parent_handle = 50u32;
1075 queue_handles(&mock, 1, &[200, 201]);
1076 queue_object_info(&mock, 2, "child_a.txt", parent_handle);
1077 queue_object_info(&mock, 3, "child_b.txt", parent_handle);
1078
1079 let storage = mock_storage(transport, "").await;
1080 let mut listing = storage
1081 .list_objects_stream(Some(ObjectHandle(parent_handle)))
1082 .await
1083 .unwrap();
1084
1085 assert_eq!(listing.total(), 2);
1086 let first = listing.next().await.unwrap().unwrap();
1087 assert_eq!(first.filename, "child_a.txt");
1088 let second = listing.next().await.unwrap().unwrap();
1089 assert_eq!(second.filename, "child_b.txt");
1090 assert!(listing.next().await.is_none());
1091 }
1092
1093 fn object_info_bytes_with_size(filename: &str, parent: u32, size: u64) -> Vec<u8> {
1098 let info = ObjectInfo {
1099 storage_id: StorageId(1),
1100 format: ObjectFormatCode::Jpeg,
1101 parent: ObjectHandle(parent),
1102 filename: filename.to_string(),
1103 size,
1104 ..ObjectInfo::default()
1105 };
1106 info.to_bytes().unwrap()
1107 }
1108
1109 fn queue_object_info_with_size(
1110 mock: &MockTransport,
1111 tx_id: u32,
1112 filename: &str,
1113 parent: u32,
1114 size: u64,
1115 ) {
1116 let data = object_info_bytes_with_size(filename, parent, size);
1117 mock.queue_response(data_container(tx_id, OperationCode::GetObjectInfo, &data));
1118 mock.queue_response(ok_response(tx_id));
1119 }
1120
1121 fn queue_object_size_prop(mock: &MockTransport, tx_id: u32, size: u64) {
1122 let payload = crate::ptp::pack_u64(size);
1123 mock.queue_response(data_container(
1124 tx_id,
1125 OperationCode::GetObjectPropValue,
1126 &payload,
1127 ));
1128 mock.queue_response(ok_response(tx_id));
1129 }
1130
1131 #[tokio::test]
1132 async fn get_object_info_resolves_saturated_size() {
1133 const REAL_SIZE: u64 = 5 * 1024 * 1024 * 1024;
1135
1136 let (transport, mock) = mock_transport();
1137 mock.queue_response(ok_response(0)); queue_object_info_with_size(&mock, 1, "big.mkv", 0, REAL_SIZE);
1139 queue_object_size_prop(&mock, 2, REAL_SIZE);
1140
1141 let storage = mock_storage(transport, "").await;
1142 let info = storage.get_object_info(ObjectHandle(42)).await.unwrap();
1143
1144 assert_eq!(info.filename, "big.mkv");
1145 assert_eq!(info.size, REAL_SIZE, "size should be resolved to full u64");
1146 }
1147
1148 #[tokio::test]
1149 async fn get_object_info_skips_lookup_when_size_fits_u32() {
1150 let (transport, mock) = mock_transport();
1153 mock.queue_response(ok_response(0)); queue_object_info_with_size(&mock, 1, "small.jpg", 0, 1_000_000);
1155
1156 let storage = mock_storage(transport, "").await;
1157 let info = storage.get_object_info(ObjectHandle(42)).await.unwrap();
1158
1159 assert_eq!(info.size, 1_000_000);
1160 }
1161
1162 #[tokio::test]
1163 async fn get_object_info_falls_back_when_prop_lookup_fails() {
1164 let (transport, mock) = mock_transport();
1167 mock.queue_response(ok_response(0)); queue_object_info_with_size(&mock, 1, "big.mkv", 0, 8 * 1024 * 1024 * 1024);
1169 mock.queue_response(error_response(2, ResponseCode::OperationNotSupported));
1170
1171 let storage = mock_storage(transport, "").await;
1172 let info = storage.get_object_info(ObjectHandle(42)).await.unwrap();
1173
1174 assert_eq!(
1175 info.size,
1176 u64::from(u32::MAX),
1177 "should keep saturated u32::MAX when prop lookup fails"
1178 );
1179 }
1180
1181 #[tokio::test]
1182 async fn get_object_info_exactly_u32_max_triggers_lookup() {
1183 const REAL_SIZE: u64 = u32::MAX as u64;
1187
1188 let (transport, mock) = mock_transport();
1189 mock.queue_response(ok_response(0)); queue_object_info_with_size(&mock, 1, "edge.bin", 0, REAL_SIZE);
1191 queue_object_size_prop(&mock, 2, REAL_SIZE);
1192
1193 let storage = mock_storage(transport, "").await;
1194 let info = storage.get_object_info(ObjectHandle(42)).await.unwrap();
1195
1196 assert_eq!(info.size, REAL_SIZE);
1197 }
1198
1199 #[tokio::test]
1202 async fn cancel_before_first_handle_bails_immediately() {
1203 let (transport, mock) = mock_transport();
1208 mock.queue_response(ok_response(0)); queue_handles(&mock, 1, &[10, 20, 30]);
1210
1211 let storage = mock_storage(transport, "").await;
1212 let cancel = CancelToken::new();
1213 let mut listing = storage
1214 .list_objects_stream_with_cancel(None, Some(&cancel))
1215 .await
1216 .unwrap();
1217
1218 assert_eq!(listing.total(), 3);
1219
1220 cancel.cancel();
1222
1223 let first = listing.next().await.expect("expected Some(Err(Cancelled))");
1224 assert!(matches!(first, Err(Error::Cancelled)));
1225 }
1226
1227 #[tokio::test]
1228 async fn cancel_mid_listing_bails_at_next_boundary() {
1229 let (transport, mock) = mock_transport();
1233 mock.queue_response(ok_response(0)); queue_handles(&mock, 1, &[10, 20, 30]);
1235 queue_object_info(&mock, 2, "first.jpg", 0);
1236 let storage = mock_storage(transport, "").await;
1240 let cancel = CancelToken::new();
1241 let mut listing = storage
1242 .list_objects_stream_with_cancel(None, Some(&cancel))
1243 .await
1244 .unwrap();
1245
1246 let first = listing.next().await.unwrap().unwrap();
1247 assert_eq!(first.filename, "first.jpg");
1248
1249 cancel.cancel();
1252
1253 let second = listing.next().await.expect("expected Some(Err(Cancelled))");
1254 assert!(matches!(second, Err(Error::Cancelled)));
1255 }
1256
1257 #[tokio::test]
1258 async fn cancel_via_list_objects_returns_cancelled_error() {
1259 let (transport, mock) = mock_transport();
1262 mock.queue_response(ok_response(0)); queue_handles(&mock, 1, &[10, 20, 30]);
1264
1265 let storage = mock_storage(transport, "").await;
1266 let cancel = CancelToken::new();
1267 cancel.cancel();
1268
1269 let result = storage.list_objects_with_cancel(None, Some(&cancel)).await;
1270 assert!(matches!(result, Err(Error::Cancelled)));
1271 }
1272
1273 #[tokio::test]
1274 async fn delete_with_cancel_bails_before_request() {
1275 let (transport, mock) = mock_transport();
1280 mock.queue_response(ok_response(0)); let storage = mock_storage(transport, "").await;
1283 let cancel = CancelToken::new();
1284 cancel.cancel();
1285
1286 let result = storage
1287 .delete_with_cancel(ObjectHandle(1), Some(&cancel))
1288 .await;
1289 assert!(matches!(result, Err(Error::Cancelled)));
1290 }
1291
1292 #[tokio::test]
1293 async fn delete_with_cancel_no_token_runs_normally() {
1294 let (transport, mock) = mock_transport();
1296 mock.queue_response(ok_response(0)); mock.queue_response(ok_response(1)); let storage = mock_storage(transport, "").await;
1300 let result = storage.delete_with_cancel(ObjectHandle(1), None).await;
1301 assert!(result.is_ok());
1302 }
1303
1304 #[tokio::test]
1305 async fn cancel_does_not_affect_unset_token() {
1306 let (transport, mock) = mock_transport();
1309 mock.queue_response(ok_response(0)); queue_handles(&mock, 1, &[10, 20]);
1311 queue_object_info(&mock, 2, "a.jpg", 0);
1312 queue_object_info(&mock, 3, "b.jpg", 0);
1313
1314 let storage = mock_storage(transport, "").await;
1315 let objects = storage.list_objects_with_cancel(None, None).await.unwrap();
1316 assert_eq!(objects.len(), 2);
1317 assert_eq!(objects[0].filename, "a.jpg");
1318 assert_eq!(objects[1].filename, "b.jpg");
1319 }
1320
1321 #[tokio::test]
1322 async fn list_objects_stream_resolves_saturated_size() {
1323 const REAL_SIZE: u64 = 6 * 1024 * 1024 * 1024;
1325
1326 let (transport, mock) = mock_transport();
1327 mock.queue_response(ok_response(0)); queue_handles(&mock, 1, &[10, 20]);
1329 queue_object_info_with_size(&mock, 2, "small.jpg", 0, 500_000);
1330 queue_object_info_with_size(&mock, 3, "huge.mkv", 0, REAL_SIZE);
1331 queue_object_size_prop(&mock, 4, REAL_SIZE);
1332
1333 let storage = mock_storage(transport, "").await;
1334 let objects = storage.list_objects(None).await.unwrap();
1335
1336 assert_eq!(objects.len(), 2);
1337 assert_eq!(objects[0].size, 500_000);
1338 assert_eq!(objects[1].size, REAL_SIZE);
1339 }
1340}