use std::borrow::Cow;
use std::io::{Error, ErrorKind, Result};
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::sync::Arc;
use std::task::Poll;
use futures::Stream;
use pcloud::file::FileIdentifier;
use pcloud::folder::{FolderIdentifier, ROOT};
use reqwest::header;
use tokio::io::DuplexStream;
use tokio::task::JoinHandle;
use tokio_util::io::ReaderStream;
use crate::WriteMode;
use crate::http::{HttpStoreFileReader, RangeHeader};
#[derive(Clone, Debug)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize))]
#[cfg_attr(feature = "serde", serde(untagged))]
pub enum PCloudStoreConfigOrigin {
Region { region: pcloud::Region },
Url { url: Cow<'static, str> },
}
impl Default for PCloudStoreConfigOrigin {
fn default() -> Self {
Self::Region {
region: pcloud::Region::Eu,
}
}
}
#[derive(Clone, Debug)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize))]
pub struct PCloudStoreConfig {
#[cfg_attr(feature = "serde", serde(default, flatten))]
pub origin: PCloudStoreConfigOrigin,
pub credentials: pcloud::Credentials,
#[cfg_attr(feature = "serde", serde(default))]
pub root: PathBuf,
}
impl PCloudStoreConfig {
pub fn build(&self) -> Result<PCloudStore> {
let mut builder = pcloud::Client::builder();
match self.origin {
PCloudStoreConfigOrigin::Region { region } => {
builder.set_region(region);
}
PCloudStoreConfigOrigin::Url { ref url } => {
builder.set_base_url(url.clone());
}
};
builder.set_credentials(self.credentials.clone());
let client = builder
.build()
.map_err(|err| std::io::Error::new(std::io::ErrorKind::InvalidInput, err))?;
Ok(PCloudStore(Arc::new(InnerStore {
client,
root: self.root.clone(),
})))
}
}
struct InnerStore {
client: pcloud::Client,
root: PathBuf,
}
impl InnerStore {
fn real_path(&self, path: &Path) -> Result<PathBuf> {
crate::util::merge_path(&self.root, path, false)
}
}
#[derive(Clone)]
pub struct PCloudStore(Arc<InnerStore>);
impl std::fmt::Debug for PCloudStore {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct(stringify!(PCloudStore))
.finish_non_exhaustive()
}
}
static APP_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"),);
impl PCloudStore {
pub fn new(
base_url: impl Into<Cow<'static, str>>,
credentials: pcloud::Credentials,
) -> Result<Self> {
let client = pcloud::Client::builder()
.with_base_url(base_url)
.with_credentials(credentials)
.build()
.unwrap();
Ok(Self(Arc::new(InnerStore {
client,
root: PathBuf::new(),
})))
}
}
impl crate::Store for PCloudStore {
type Directory = PCloudStoreDirectory;
type File = PCloudStoreFile;
async fn get_file<P: Into<PathBuf>>(&self, path: P) -> Result<Self::File> {
Ok(PCloudStoreFile {
store: self.0.clone(),
path: path.into(),
})
}
async fn get_dir<P: Into<PathBuf>>(&self, path: P) -> Result<Self::Directory> {
Ok(PCloudStoreDirectory {
store: self.0.clone(),
path: path.into(),
})
}
}
pub struct PCloudStoreDirectory {
store: Arc<InnerStore>,
path: PathBuf,
}
impl std::fmt::Debug for PCloudStoreDirectory {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct(stringify!(PCloudStoreDirectory))
.field("path", &self.path)
.finish_non_exhaustive()
}
}
impl crate::StoreDirectory for PCloudStoreDirectory {
type Entry = PCloudStoreEntry;
type Reader = PCloudStoreDirectoryReader;
fn path(&self) -> &std::path::Path {
&self.path
}
async fn exists(&self) -> Result<bool> {
let path = self.store.real_path(&self.path)?;
let identifier = FolderIdentifier::path(path.to_string_lossy());
match self.store.client.list_folder(identifier).await {
Ok(_) => Ok(true),
Err(pcloud::Error::Protocol(2005, _)) => Ok(false),
Err(other) => Err(Error::other(other)),
}
}
async fn read(&self) -> Result<Self::Reader> {
let path = self.store.real_path(&self.path)?;
let identifier = FolderIdentifier::path(path.to_string_lossy());
match self.store.client.list_folder(identifier).await {
Ok(folder) => Ok(PCloudStoreDirectoryReader {
store: self.store.clone(),
path: self.path.clone(),
entries: folder.contents.unwrap_or_default(),
}),
Err(pcloud::Error::Protocol(2005, _)) => {
Err(Error::new(ErrorKind::NotFound, "directory not found"))
}
Err(other) => Err(Error::other(other)),
}
}
async fn delete(&self) -> Result<()> {
let path = self.store.real_path(&self.path)?;
let identifier = FolderIdentifier::path(path.to_string_lossy());
match self.store.client.delete_folder(identifier).await {
Ok(_) => Ok(()),
Err(pcloud::Error::Protocol(2005, _)) => {
Err(Error::new(ErrorKind::NotFound, "directory not found"))
}
Err(pcloud::Error::Protocol(2006, _)) => Err(Error::new(
ErrorKind::DirectoryNotEmpty,
"directory not empty",
)),
Err(other) => Err(Error::other(other)),
}
}
async fn delete_recursive(&self) -> Result<()> {
let path = self.store.real_path(&self.path)?;
let identifier = FolderIdentifier::path(path.to_string_lossy());
match self.store.client.delete_folder_recursive(identifier).await {
Ok(_) => Ok(()),
Err(pcloud::Error::Protocol(2005, _)) => {
Err(Error::new(ErrorKind::NotFound, "directory not found"))
}
Err(pcloud::Error::Protocol(2006, _)) => Err(Error::new(
ErrorKind::DirectoryNotEmpty,
"directory not empty",
)),
Err(other) => Err(Error::other(other)),
}
}
}
pub struct PCloudStoreDirectoryReader {
store: Arc<InnerStore>,
path: PathBuf,
entries: Vec<pcloud::entry::Entry>,
}
impl std::fmt::Debug for PCloudStoreDirectoryReader {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct(stringify!(PCloudStoreDirectoryReader))
.field("path", &self.path)
.field("entries", &self.entries)
.finish_non_exhaustive()
}
}
impl Stream for PCloudStoreDirectoryReader {
type Item = Result<PCloudStoreEntry>;
fn poll_next(
mut self: Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
let mut this = self.as_mut();
if let Some(entry) = this.entries.pop() {
Poll::Ready(Some(PCloudStoreEntry::new(
self.store.clone(),
self.path.clone(),
entry,
)))
} else {
Poll::Ready(None)
}
}
}
impl crate::StoreDirectoryReader<PCloudStoreEntry> for PCloudStoreDirectoryReader {}
pub struct PCloudStoreFile {
store: Arc<InnerStore>,
path: PathBuf,
}
impl std::fmt::Debug for PCloudStoreFile {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct(stringify!(PCloudStoreFile))
.field("path", &self.path)
.finish_non_exhaustive()
}
}
impl crate::StoreFile for PCloudStoreFile {
type FileReader = PCloudStoreFileReader;
type FileWriter = PCloudStoreFileWriter;
type Metadata = PCloudStoreFileMetadata;
fn path(&self) -> &std::path::Path {
&self.path
}
async fn exists(&self) -> Result<bool> {
let path = self.store.real_path(&self.path)?;
let identifier = FileIdentifier::path(path.to_string_lossy());
match self.store.client.get_file_checksum(identifier).await {
Ok(_) => Ok(true),
Err(pcloud::Error::Protocol(2009, _)) => Ok(false),
Err(other) => Err(Error::other(other)),
}
}
async fn metadata(&self) -> Result<Self::Metadata> {
let path = self.store.real_path(&self.path)?;
let identifier = FileIdentifier::path(path.to_string_lossy());
match self.store.client.get_file_checksum(identifier).await {
Ok(file) => Ok(PCloudStoreFileMetadata {
size: file.metadata.size.unwrap_or(0) as u64,
created: file.metadata.base.created.timestamp() as u64,
modified: file.metadata.base.modified.timestamp() as u64,
content_type: file.metadata.content_type,
}),
Err(pcloud::Error::Protocol(2009, _)) => {
Err(Error::new(ErrorKind::NotFound, "file not found"))
}
Err(other) => Err(Error::other(other)),
}
}
async fn read<R: std::ops::RangeBounds<u64>>(&self, range: R) -> Result<Self::FileReader> {
let path = self.store.real_path(&self.path)?;
let identifier = FileIdentifier::path(path.to_string_lossy());
let links = self
.store
.client
.get_file_link(identifier)
.await
.map_err(|err| match err {
pcloud::Error::Protocol(2009, _) => {
Error::new(ErrorKind::NotFound, "file not found")
}
other => Error::other(other),
})?;
let link = links
.first_link()
.ok_or_else(|| Error::other("unable to fetch file link"))?;
let url = link.to_string();
let res = reqwest::Client::new()
.get(url)
.header(header::RANGE, RangeHeader(range).to_string())
.header(header::USER_AGENT, APP_USER_AGENT)
.send()
.await
.map_err(Error::other)?;
PCloudStoreFileReader::from_response(res)
}
async fn write(&self, options: crate::WriteOptions) -> Result<Self::FileWriter> {
match options.mode {
WriteMode::Append => {
return Err(Error::new(
ErrorKind::Unsupported,
"pcloud store doesn't support append write",
));
}
WriteMode::Truncate { offset } if offset != 0 => {
return Err(Error::new(
ErrorKind::Unsupported,
"pcloud store doesn't support truncated write",
));
}
_ => {}
};
let path = self.store.real_path(&self.path)?;
let parent: FolderIdentifier<'static> = path
.parent()
.map(|parent| parent.to_path_buf())
.map(|parent| {
let parent = if parent.is_absolute() {
parent.to_string_lossy().to_string()
} else {
format!("/{}", parent.to_string_lossy())
};
FolderIdentifier::path(parent)
})
.unwrap_or_else(|| FolderIdentifier::FolderId(ROOT));
let filename = path
.file_name()
.ok_or_else(|| Error::new(ErrorKind::InvalidData, "unable to get file name"))?;
let filename = filename.to_string_lossy().to_string();
let (write_buffer, read_buffer) = tokio::io::duplex(8192);
let client = self.store.clone();
let stream = ReaderStream::new(read_buffer);
let files = pcloud::file::upload::MultiFileUpload::default()
.with_stream_entry(filename, None, stream);
let upload_task: JoinHandle<Result<()>> = tokio::spawn(async move {
client
.client
.upload_files(parent, files)
.await
.map(|_| ())
.map_err(Error::other)
});
Ok(PCloudStoreFileWriter {
write_buffer,
upload_task,
})
}
async fn delete(&self) -> Result<()> {
let path = self.store.real_path(&self.path)?;
let identifier = FileIdentifier::path(path.to_string_lossy());
self.store
.client
.delete_file(identifier)
.await
.map(|_| ())
.map_err(|err| match err {
pcloud::Error::Protocol(2009, _) => {
Error::new(ErrorKind::NotFound, "file not found")
}
other => Error::other(other),
})
}
}
#[derive(Debug)]
pub struct PCloudStoreFileWriter {
write_buffer: DuplexStream,
upload_task: JoinHandle<Result<()>>,
}
impl tokio::io::AsyncWrite for PCloudStoreFileWriter {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> Poll<Result<usize>> {
if self.upload_task.is_finished() {
Poll::Ready(Err(Error::new(ErrorKind::BrokenPipe, "request closed")))
} else {
Pin::new(&mut self.write_buffer).poll_write(cx, buf)
}
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Result<()>> {
if self.upload_task.is_finished() {
Poll::Ready(Err(Error::new(ErrorKind::BrokenPipe, "request closed")))
} else {
Pin::new(&mut self.write_buffer).poll_flush(cx)
}
}
fn poll_shutdown(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Result<()>> {
let shutdown = Pin::new(&mut self.write_buffer).poll_shutdown(cx);
if shutdown.is_ready() {
let poll = Pin::new(&mut self.upload_task).poll(cx);
match poll {
Poll::Ready(Ok(res)) => Poll::Ready(res),
Poll::Ready(Err(err)) => Poll::Ready(Err(Error::other(err))),
Poll::Pending => Poll::Pending,
}
} else {
Poll::Pending
}
}
}
impl crate::StoreFileWriter for PCloudStoreFileWriter {}
#[derive(Clone, Debug)]
pub struct PCloudStoreFileMetadata {
size: u64,
created: u64,
modified: u64,
content_type: Option<String>,
}
impl super::StoreMetadata for PCloudStoreFileMetadata {
fn size(&self) -> u64 {
self.size
}
fn created(&self) -> u64 {
self.created
}
fn modified(&self) -> u64 {
self.modified
}
fn content_type(&self) -> Option<&str> {
self.content_type.as_deref()
}
}
pub type PCloudStoreFileReader = HttpStoreFileReader;
pub type PCloudStoreEntry = crate::Entry<PCloudStoreFile, PCloudStoreDirectory>;
impl PCloudStoreEntry {
fn new(store: Arc<InnerStore>, parent: PathBuf, entry: pcloud::entry::Entry) -> Result<Self> {
let path = parent.join(&entry.base().name);
Ok(match entry {
pcloud::entry::Entry::File(_) => Self::File(PCloudStoreFile { store, path }),
pcloud::entry::Entry::Folder(_) => {
Self::Directory(PCloudStoreDirectory { store, path })
}
})
}
}
#[cfg(test)]
mod tests {
use mockito::Matcher;
use tokio::io::AsyncWriteExt;
use super::*;
use crate::{Store, StoreFile, WriteOptions};
#[test]
#[cfg(feature = "serde")]
fn should_parse_config() {
let _config: super::PCloudStoreConfig = toml::from_str(
r#"
region = "EU"
credentials = { username = "username", password = "password" }
root = "/"
"#,
)
.unwrap();
}
#[tokio::test]
async fn should_write_file() {
crate::enable_tracing();
let content = include_bytes!("lib.rs");
let mut srv = mockito::Server::new_async().await;
let mock = srv
.mock("POST", "/uploadfile")
.match_query(Matcher::AllOf(vec![
Matcher::UrlEncoded("username".into(), "username".into()),
Matcher::UrlEncoded("password".into(), "password".into()),
Matcher::UrlEncoded("path".into(), "/foo".into()),
]))
.match_header(
"content-type",
Matcher::Regex("multipart/form-data; boundary=.*".to_string()),
)
.match_body(Matcher::Any)
.with_status(200)
.with_body(r#"{"result": 0, "metadata": [], "checksums": [], "fileids": []}"#)
.create_async()
.await;
let store = PCloudStore::new(
srv.url(),
pcloud::Credentials::UsernamePassword {
username: "username".into(),
password: "password".into(),
},
)
.unwrap();
let file = store.get_file("/foo/bar.txt").await.unwrap();
let mut writer = file.write(WriteOptions::create()).await.unwrap();
writer.write_all(content).await.unwrap();
writer.shutdown().await.unwrap();
mock.assert_async().await;
}
}