mod actor;
mod com;
mod consts;
mod events;
mod ids;
mod props;
use crate::cancel::CancelToken;
use crate::mtp::backend::{
BackendDownload, BackendListing, ByteRange, DownloadBody, MtpBackend, ProgressFn, UploadStream,
};
use crate::mtp::object::NewObjectInfo;
use crate::mtp::stream::Progress;
use crate::mtp::{
Capabilities, DeviceEvent, DeviceInfo, Error, ObjectHandle, ObjectInfo, StorageId, StorageInfo,
UploadError,
};
use actor::{OpenSpec, WpdHandle};
use async_trait::async_trait;
use bytes::Bytes;
use futures::channel::mpsc;
use futures::{SinkExt, StreamExt};
use std::ops::ControlFlow;
use std::time::Duration;
pub(crate) struct WpdBackend {
handle: WpdHandle,
device_info: DeviceInfo,
capabilities: Capabilities,
events: futures::lock::Mutex<mpsc::UnboundedReceiver<DeviceEvent>>,
}
impl WpdBackend {
pub(crate) async fn open_first() -> Result<Self, Error> {
Self::spawn(OpenSpec::First).await
}
pub(crate) async fn open_by_serial(serial: &str) -> Result<Self, Error> {
Self::spawn(OpenSpec::Serial(serial.to_string())).await
}
pub(crate) async fn open_for_usb(
serial: Option<String>,
vid: u16,
pid: u16,
) -> Result<Self, Error> {
Self::spawn(OpenSpec::UsbDevice { serial, vid, pid }).await
}
async fn spawn(spec: OpenSpec) -> Result<Self, Error> {
let (handle, device_info, capabilities, events) = WpdHandle::spawn(spec).await?;
Ok(Self {
handle,
device_info,
capabilities,
events: futures::lock::Mutex::new(events),
})
}
}
struct WpdDownloadBody {
data: mpsc::Receiver<Result<Bytes, Error>>,
}
#[async_trait]
impl DownloadBody for WpdDownloadBody {
async fn next_chunk(&mut self) -> Option<Result<Bytes, Error>> {
self.data.next().await
}
async fn cancel(&mut self, _idle_timeout: Duration) -> Result<(), Error> {
self.data.close();
while self.data.next().await.is_some() {}
Ok(())
}
}
#[async_trait]
impl MtpBackend for WpdBackend {
fn device_info(&self) -> &DeviceInfo {
&self.device_info
}
fn capabilities(&self) -> &Capabilities {
&self.capabilities
}
async fn storages(&self) -> Result<Vec<StorageInfo>, Error> {
self.handle.call(actor::Request::Storages).await
}
async fn storage_info(&self, storage: StorageId) -> Result<StorageInfo, Error> {
self.handle
.call(|reply| actor::Request::StorageInfo(storage, reply))
.await
}
async fn list(
&self,
storage: StorageId,
parent: Option<ObjectHandle>,
cancel: Option<&CancelToken>,
) -> Result<BackendListing, Error> {
if cancel.is_some_and(CancelToken::is_cancelled) {
return Err(Error::Cancelled);
}
let objs = self
.handle
.call(|reply| actor::Request::List {
storage,
parent,
cancel: cancel.cloned(),
reply,
})
.await?;
let total = objs.len();
let items = futures::stream::iter(objs.into_iter().map(Ok::<ObjectInfo, Error>)).boxed();
Ok(BackendListing { total, items })
}
async fn object_info(&self, obj: ObjectHandle) -> Result<ObjectInfo, Error> {
self.handle
.call(|reply| actor::Request::ObjectInfo(obj, reply))
.await
}
async fn download(
&self,
obj: ObjectHandle,
range: ByteRange,
) -> Result<BackendDownload, Error> {
let start = self
.handle
.call(|reply| actor::Request::Download { obj, range, reply })
.await?;
Ok(BackendDownload {
size: start.size,
body: Box::new(WpdDownloadBody { data: start.data }),
})
}
async fn read_range(
&self,
obj: ObjectHandle,
offset: u64,
len: Option<u32>,
) -> Result<Vec<u8>, Error> {
self.handle
.call(|reply| actor::Request::ReadRange {
obj,
offset,
len,
reply,
})
.await
}
async fn thumbnail(&self, obj: ObjectHandle) -> Result<Vec<u8>, Error> {
self.handle
.call(|reply| actor::Request::Thumbnail { obj, reply })
.await
}
async fn upload(
&self,
storage: StorageId,
parent: Option<ObjectHandle>,
info: NewObjectInfo,
mut data: UploadStream<'_>,
mut progress: Option<ProgressFn<'_>>,
) -> Result<ObjectHandle, UploadError> {
let total = info.size;
let (mut tx, rx) = mpsc::channel::<Bytes>(actor::DATA_BOUND);
let (reply_tx, reply_rx) = futures::channel::oneshot::channel::<actor::UploadReply>();
self.handle
.send(actor::Request::Upload {
storage,
parent,
info,
data: rx,
reply: reply_tx,
})
.map_err(|source| UploadError {
source,
partial: None,
})?;
enum Stop {
Clean,
Cancelled,
Source(std::io::Error),
}
let mut forwarded: u64 = 0;
let stop = loop {
match data.next().await {
None => break Stop::Clean,
Some(Err(e)) => break Stop::Source(e),
Some(Ok(chunk)) => {
let next = forwarded + chunk.len() as u64;
if let Some(cb) = progress.as_mut() {
let p = Progress {
bytes_transferred: next,
total_bytes: Some(total),
};
if let ControlFlow::Break(()) = cb(p) {
break Stop::Cancelled;
}
}
if tx.send(chunk).await.is_err() {
break Stop::Clean;
}
forwarded = next;
}
}
};
drop(tx);
let reply = reply_rx.await.map_err(|_| UploadError {
source: Error::Disconnected,
partial: None,
})?;
match (stop, reply) {
(_, actor::UploadReply::Error(source)) => Err(UploadError {
source,
partial: None,
}),
(_, actor::UploadReply::Committed(handle)) => Ok(handle),
(Stop::Cancelled, actor::UploadReply::ShortClosed { partial }) => Err(UploadError {
source: Error::Cancelled,
partial,
}),
(Stop::Source(e), actor::UploadReply::ShortClosed { partial }) => Err(UploadError {
source: Error::Io {
message: e.to_string(),
},
partial,
}),
(Stop::Clean, actor::UploadReply::ShortClosed { partial }) => Err(UploadError {
source: Error::invalid_data(
"upload stream ended before the declared size was reached",
),
partial,
}),
}
}
async fn create_folder(
&self,
storage: StorageId,
parent: Option<ObjectHandle>,
name: &str,
) -> Result<ObjectHandle, Error> {
let name = name.to_string();
self.handle
.call(|reply| actor::Request::CreateFolder {
storage,
parent,
name,
reply,
})
.await
}
async fn delete(&self, obj: ObjectHandle, cancel: Option<&CancelToken>) -> Result<(), Error> {
if cancel.is_some_and(CancelToken::is_cancelled) {
return Err(Error::Cancelled);
}
self.handle
.call(|reply| actor::Request::Delete { obj, reply })
.await
}
async fn move_object(
&self,
obj: ObjectHandle,
new_parent: ObjectHandle,
new_storage: StorageId,
) -> Result<(), Error> {
self.handle
.call(|reply| actor::Request::MoveObject {
obj,
new_parent,
new_storage,
reply,
})
.await
}
async fn copy_object(
&self,
obj: ObjectHandle,
new_parent: ObjectHandle,
new_storage: StorageId,
) -> Result<ObjectHandle, Error> {
self.handle
.call(|reply| actor::Request::CopyObject {
obj,
new_parent,
new_storage,
reply,
})
.await
}
async fn rename(&self, obj: ObjectHandle, new_name: &str) -> Result<(), Error> {
let name = new_name.to_string();
self.handle
.call(|reply| actor::Request::Rename { obj, name, reply })
.await
}
async fn next_event(&self) -> Result<DeviceEvent, Error> {
let mut events = self.events.lock().await;
events.next().await.ok_or(Error::Disconnected)
}
async fn close(&self) -> Result<(), Error> {
self.handle.shutdown();
Ok(())
}
}