use std::time::Duration;
use tokio::runtime::Handle;
use crate::Operator as AsyncOperator;
use crate::raw::PresignedRequest;
use crate::types::IntoOperatorUri;
use crate::*;
#[derive(Clone, Debug)]
pub struct Operator {
handle: tokio::runtime::Handle,
op: AsyncOperator,
}
impl Operator {
pub fn new(op: AsyncOperator) -> Result<Self> {
Ok(Self {
handle: Handle::try_current()
.map_err(|_| Error::new(ErrorKind::Unexpected, "failed to get current handle"))?,
op,
})
}
fn spawn_block<F>(&self, f: F) -> Result<F::Output>
where
F: std::future::Future + Send + 'static,
F::Output: Send + 'static,
{
self.handle.block_on(self.handle.spawn(f)).map_err(|err| {
Error::new(ErrorKind::Unexpected, "blocking task failed").set_source(err)
})
}
pub fn from_uri(uri: impl IntoOperatorUri) -> Result<Self> {
let op = AsyncOperator::from_uri(uri)?;
Self::new(op)
}
pub fn info(&self) -> OperatorInfo {
self.op.info()
}
}
impl Operator {
pub fn presign_stat(&self, path: &str, expire: Duration) -> Result<PresignedRequest> {
self.handle.block_on(self.op.presign_stat(path, expire))
}
pub fn presign_read(&self, path: &str, expire: Duration) -> Result<PresignedRequest> {
self.handle.block_on(self.op.presign_read(path, expire))
}
pub fn presign_write(&self, path: &str, expire: Duration) -> Result<PresignedRequest> {
self.handle.block_on(self.op.presign_write(path, expire))
}
pub fn presign_delete(&self, path: &str, expire: Duration) -> Result<PresignedRequest> {
self.handle.block_on(self.op.presign_delete(path, expire))
}
pub fn stat(&self, path: &str) -> Result<Metadata> {
self.stat_options(path, options::StatOptions::default())
}
pub fn stat_options(&self, path: &str, opts: options::StatOptions) -> Result<Metadata> {
let op = self.op.clone();
let path = path.to_string();
self.spawn_block(async move { op.stat_options(&path, opts).await })?
}
pub fn exists(&self, path: &str) -> Result<bool> {
let r = self.stat(path);
match r {
Ok(_) => Ok(true),
Err(err) => match err.kind() {
ErrorKind::NotFound => Ok(false),
_ => Err(err),
},
}
}
pub fn create_dir(&self, path: &str) -> Result<()> {
let op = self.op.clone();
let path = path.to_string();
self.spawn_block(async move { op.create_dir(&path).await })?
}
pub fn read(&self, path: &str) -> Result<Buffer> {
self.read_options(path, options::ReadOptions::default())
}
pub fn read_options(&self, path: &str, opts: options::ReadOptions) -> Result<Buffer> {
let op = self.op.clone();
let path = path.to_string();
self.spawn_block(async move { op.read_options(&path, opts).await })?
}
pub fn reader(&self, path: &str) -> Result<blocking::Reader> {
self.reader_options(path, options::ReaderOptions::default())
}
pub fn reader_options(
&self,
path: &str,
opts: options::ReaderOptions,
) -> Result<blocking::Reader> {
let r = self.handle.block_on(self.op.reader_options(path, opts))?;
Ok(blocking::Reader::new(self.handle.clone(), r))
}
pub fn write(&self, path: &str, bs: impl Into<Buffer>) -> Result<Metadata> {
self.write_options(path, bs, options::WriteOptions::default())
}
pub fn write_options(
&self,
path: &str,
bs: impl Into<Buffer>,
opts: options::WriteOptions,
) -> Result<Metadata> {
let op = self.op.clone();
let path = path.to_string();
let bs = bs.into();
self.spawn_block(async move { op.write_options(&path, bs, opts).await })?
}
pub fn writer(&self, path: &str) -> Result<blocking::Writer> {
self.writer_options(path, options::WriteOptions::default())
}
pub fn writer_options(
&self,
path: &str,
opts: options::WriteOptions,
) -> Result<blocking::Writer> {
let w = self.handle.block_on(self.op.writer_options(path, opts))?;
Ok(blocking::Writer::new(self.handle.clone(), w))
}
pub fn copy(&self, from: &str, to: &str) -> Result<()> {
let op = self.op.clone();
let from = from.to_string();
let to = to.to_string();
self.spawn_block(async move { op.copy(&from, &to).await })?
}
pub fn rename(&self, from: &str, to: &str) -> Result<()> {
let op = self.op.clone();
let from = from.to_string();
let to = to.to_string();
self.spawn_block(async move { op.rename(&from, &to).await })?
}
pub fn delete(&self, path: &str) -> Result<()> {
self.delete_options(path, options::DeleteOptions::default())
}
pub fn delete_options(&self, path: &str, opts: options::DeleteOptions) -> Result<()> {
let op = self.op.clone();
let path = path.to_string();
self.spawn_block(async move { op.delete_options(&path, opts).await })?
}
pub fn delete_iter<I, D>(&self, iter: I) -> Result<()>
where
I: IntoIterator<Item = D>,
D: IntoDeleteInput,
{
self.handle.block_on(self.op.delete_iter(iter))
}
pub fn delete_try_iter<I, D>(&self, try_iter: I) -> Result<()>
where
I: IntoIterator<Item = Result<D>>,
D: IntoDeleteInput,
{
self.handle.block_on(self.op.delete_try_iter(try_iter))
}
pub fn deleter(&self) -> Result<blocking::Deleter> {
blocking::Deleter::create(
self.handle.clone(),
self.handle.block_on(self.op.deleter())?,
)
}
#[deprecated(
since = "0.55.0",
note = "Use `delete_options` with `recursive: true` instead"
)]
#[allow(deprecated)]
pub fn remove_all(&self, path: &str) -> Result<()> {
self.delete_options(
path,
options::DeleteOptions {
recursive: true,
..Default::default()
},
)
}
pub fn list(&self, path: &str) -> Result<Vec<Entry>> {
self.list_options(path, options::ListOptions::default())
}
pub fn list_options(&self, path: &str, opts: options::ListOptions) -> Result<Vec<Entry>> {
let op = self.op.clone();
let path = path.to_string();
self.spawn_block(async move { op.list_options(&path, opts).await })?
}
pub fn lister(&self, path: &str) -> Result<blocking::Lister> {
self.lister_options(path, options::ListOptions::default())
}
pub fn lister_options(
&self,
path: &str,
opts: options::ListOptions,
) -> Result<blocking::Lister> {
let l = self.handle.block_on(self.op.lister_options(path, opts))?;
Ok(blocking::Lister::new(self.handle.clone(), l))
}
pub fn check(&self) -> Result<()> {
let mut ds = self.lister("/")?;
match ds.next() {
Some(Err(e)) if e.kind() != ErrorKind::NotFound => Err(e),
_ => Ok(()),
}
}
}
impl From<Operator> for AsyncOperator {
fn from(val: Operator) -> Self {
val.op
}
}