use crate::cancel::{bail_if_cancelled, CancelToken};
use crate::mtp::backend::{BackendListing, ByteRange, MtpBackend, ProgressFn};
use crate::mtp::object::NewObjectInfo;
use crate::mtp::stream::{FileDownload, Progress, WindowedDownload, DEFAULT_DOWNLOAD_WINDOW};
use crate::mtp::{Error, ObjectHandle, ObjectInfo, StorageId, StorageInfo, UploadError};
use bytes::Bytes;
use futures::{Stream, StreamExt};
use std::ops::ControlFlow;
use std::sync::Arc;
pub struct ObjectListing {
inner: BackendListing,
fetched: usize,
}
impl ObjectListing {
fn new(inner: BackendListing) -> Self {
Self { inner, fetched: 0 }
}
#[must_use]
pub fn total(&self) -> usize {
self.inner.total
}
#[must_use]
pub fn fetched(&self) -> usize {
self.fetched
}
pub async fn next(&mut self) -> Option<Result<ObjectInfo, Error>> {
match self.inner.items.next().await {
Some(Ok(info)) => {
self.fetched += 1;
Some(Ok(info))
}
other => other,
}
}
}
pub struct Storage {
backend: Arc<dyn MtpBackend>,
id: StorageId,
info: StorageInfo,
}
impl Storage {
pub(crate) fn new(backend: Arc<dyn MtpBackend>, id: StorageId, info: StorageInfo) -> Self {
Self { backend, id, info }
}
#[must_use]
pub fn id(&self) -> StorageId {
self.id
}
#[must_use]
pub fn info(&self) -> &StorageInfo {
&self.info
}
pub async fn refresh(&mut self) -> Result<(), Error> {
self.info = self.backend.storage_info(self.id).await?;
Ok(())
}
pub async fn list_objects(
&self,
parent: Option<ObjectHandle>,
) -> Result<Vec<ObjectInfo>, Error> {
self.list_objects_with_cancel(parent, None).await
}
pub async fn list_objects_with_cancel(
&self,
parent: Option<ObjectHandle>,
cancel: Option<&CancelToken>,
) -> Result<Vec<ObjectInfo>, Error> {
let mut listing = self.list_objects_stream_with_cancel(parent, cancel).await?;
let mut objects = Vec::with_capacity(listing.total());
while let Some(result) = listing.next().await {
objects.push(result?);
}
Ok(objects)
}
pub async fn list_objects_stream(
&self,
parent: Option<ObjectHandle>,
) -> Result<ObjectListing, Error> {
self.list_objects_stream_with_cancel(parent, None).await
}
pub async fn list_objects_stream_with_cancel(
&self,
parent: Option<ObjectHandle>,
cancel: Option<&CancelToken>,
) -> Result<ObjectListing, Error> {
let listing = self.backend.list(self.id, parent, cancel).await?;
Ok(ObjectListing::new(listing))
}
pub async fn list_objects_recursive(
&self,
parent: Option<ObjectHandle>,
) -> Result<Vec<ObjectInfo>, Error> {
let mut result = Vec::new();
let mut folders_to_visit = vec![parent];
while let Some(current_parent) = folders_to_visit.pop() {
let objects = self.list_objects(current_parent).await?;
for obj in objects {
if obj.is_folder() {
folders_to_visit.push(Some(obj.handle));
}
result.push(obj);
}
}
Ok(result)
}
pub async fn get_object_info(&self, handle: ObjectHandle) -> Result<ObjectInfo, Error> {
self.backend.object_info(handle).await
}
pub async fn download_to_vec(&self, handle: ObjectHandle) -> Result<Vec<u8>, Error> {
self.backend.read_range(handle, 0, None).await
}
pub async fn read_range(
&self,
handle: ObjectHandle,
offset: u64,
len: u32,
) -> Result<Vec<u8>, Error> {
self.backend.read_range(handle, offset, Some(len)).await
}
pub async fn thumbnail(&self, handle: ObjectHandle) -> Result<Vec<u8>, Error> {
self.backend.thumbnail(handle).await
}
pub async fn download(
&self,
handle: ObjectHandle,
range: ByteRange,
) -> Result<FileDownload, Error> {
let dl = self.backend.download(handle, range).await?;
Ok(FileDownload::new(dl.size, dl.body))
}
pub async fn download_windowed(
&self,
handle: ObjectHandle,
range: ByteRange,
window_size: u32,
) -> Result<WindowedDownload, Error> {
let size = self.backend.object_info(handle).await?.size;
let offset = range.offset();
if offset > size {
return Err(Error::invalid_data(format!(
"windowed download offset {offset} is past the object size {size}"
)));
}
Ok(WindowedDownload::new(
Arc::clone(&self.backend),
handle,
size,
offset,
window_size,
))
}
pub async fn download_windowed_default(
&self,
handle: ObjectHandle,
) -> Result<WindowedDownload, Error> {
self.download_windowed(handle, ByteRange::Full, DEFAULT_DOWNLOAD_WINDOW)
.await
}
pub async fn upload<'a, S>(
&'a self,
parent: Option<ObjectHandle>,
info: NewObjectInfo,
data: S,
) -> Result<ObjectHandle, UploadError>
where
S: Stream<Item = Result<Bytes, std::io::Error>> + Unpin + Send + 'a,
{
self.backend
.upload(self.id, parent, info, Box::pin(data), None)
.await
}
pub async fn upload_with_progress<'a, S, F>(
&'a self,
parent: Option<ObjectHandle>,
info: NewObjectInfo,
data: S,
on_progress: F,
) -> Result<ObjectHandle, UploadError>
where
S: Stream<Item = Result<Bytes, std::io::Error>> + Unpin + Send + 'a,
F: FnMut(Progress) -> ControlFlow<()> + Send + 'a,
{
let progress: ProgressFn<'a> = Box::new(on_progress);
self.backend
.upload(self.id, parent, info, Box::pin(data), Some(progress))
.await
}
pub async fn create_folder(
&self,
parent: Option<ObjectHandle>,
name: &str,
) -> Result<ObjectHandle, Error> {
self.backend.create_folder(self.id, parent, name).await
}
pub async fn delete(&self, handle: ObjectHandle) -> Result<(), Error> {
self.backend.delete(handle, None).await
}
pub async fn delete_with_cancel(
&self,
handle: ObjectHandle,
cancel: Option<&CancelToken>,
) -> Result<(), Error> {
bail_if_cancelled(cancel)?;
self.backend.delete(handle, cancel).await
}
pub async fn move_object(
&self,
handle: ObjectHandle,
new_parent: ObjectHandle,
new_storage: Option<StorageId>,
) -> Result<(), Error> {
let storage = new_storage.unwrap_or(self.id);
self.backend.move_object(handle, new_parent, storage).await
}
pub async fn copy_object(
&self,
handle: ObjectHandle,
new_parent: ObjectHandle,
new_storage: Option<StorageId>,
) -> Result<ObjectHandle, Error> {
let storage = new_storage.unwrap_or(self.id);
self.backend.copy_object(handle, new_parent, storage).await
}
pub async fn rename(&self, handle: ObjectHandle, new_name: &str) -> Result<(), Error> {
self.backend.rename(handle, new_name).await
}
}