#![cfg_attr(docsrs, feature(doc_cfg))]
#![deny(rustdoc::broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
#![warn(
missing_copy_implementations,
missing_debug_implementations,
missing_docs,
clippy::explicit_iter_loop,
clippy::future_not_send,
clippy::use_self,
clippy::clone_on_ref_ptr,
unreachable_pub
)]
#"
)]
#![cfg_attr(
feature = "gcp",
doc = "* [`gcp`]: [Google Cloud Storage](https://cloud.google.com/storage/) support. See [`GoogleCloudStorageBuilder`](gcp::GoogleCloudStorageBuilder)"
)]
#![cfg_attr(
feature = "aws",
doc = "* [`aws`]: [Amazon S3](https://aws.amazon.com/s3/). See [`AmazonS3Builder`](aws::AmazonS3Builder)"
)]
#![cfg_attr(
feature = "azure",
doc = "* [`azure`]: [Azure Blob Storage](https://azure.microsoft.com/en-gb/services/storage/blobs/). See [`MicrosoftAzureBuilder`](azure::MicrosoftAzureBuilder)"
)]
#![cfg_attr(
feature = "http",
doc = "* [`http`]: [HTTP/WebDAV Storage](https://datatracker.ietf.org/doc/html/rfc2518). See [`HttpBuilder`](http::HttpBuilder)"
)]
#[cfg(feature = "aws")]
pub mod aws;
#[cfg(feature = "azure")]
pub mod azure;
#[cfg(feature = "tokio")]
pub mod buffered;
#[cfg(not(target_arch = "wasm32"))]
pub mod chunked;
pub mod delimited;
#[cfg(feature = "gcp")]
pub mod gcp;
#[cfg(feature = "http")]
pub mod http;
#[cfg(feature = "tokio")]
pub mod limit;
#[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
pub mod local;
pub mod memory;
pub mod path;
pub mod prefix;
pub mod registry;
#[cfg(feature = "cloud")]
pub mod signer;
#[cfg(feature = "tokio")]
pub mod throttle;
#[cfg(feature = "cloud")]
pub mod client;
#[cfg(feature = "cloud")]
pub use client::{
ClientConfigKey, ClientOptions, CredentialProvider, StaticCredentialProvider,
backoff::BackoffConfig, retry::RetryConfig,
};
#[cfg(all(feature = "cloud", not(target_arch = "wasm32")))]
pub use client::Certificate;
#[cfg(feature = "cloud")]
mod config;
mod tags;
pub use tags::TagSet;
pub mod list;
pub mod multipart;
mod parse;
mod payload;
mod upload;
mod util;
mod attributes;
#[cfg(any(feature = "integration", test))]
pub mod integration;
pub use attributes::*;
pub use parse::{ObjectStoreScheme, parse_url, parse_url_opts};
pub use payload::*;
pub use upload::*;
pub use util::{GetRange, OBJECT_STORE_COALESCE_DEFAULT, coalesce_ranges, collect_bytes};
pub use ::http::{Extensions, HeaderMap, HeaderValue};
use crate::path::Path;
#[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
use crate::util::maybe_spawn_blocking;
use async_trait::async_trait;
use bytes::Bytes;
use chrono::{DateTime, Utc};
use futures_util::{StreamExt, TryStreamExt, stream::BoxStream};
use std::fmt::{Debug, Formatter};
use std::ops::Range;
use std::sync::Arc;
pub type DynObjectStore = dyn ObjectStore;
pub type MultipartId = String;
#[async_trait]
pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {
async fn put_opts(
&self,
location: &Path,
payload: PutPayload,
opts: PutOptions,
) -> Result<PutResult>;
async fn put_multipart_opts(
&self,
location: &Path,
opts: PutMultipartOptions,
) -> Result<Box<dyn MultipartUpload>>;
async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult>;
async fn get_ranges(&self, location: &Path, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
coalesce_ranges(
ranges,
|range| self.get_range(location, range),
OBJECT_STORE_COALESCE_DEFAULT,
)
.await
}
fn delete_stream(
&self,
locations: BoxStream<'static, Result<Path>>,
) -> BoxStream<'static, Result<Path>>;
fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>>;
fn list_with_offset(
&self,
prefix: Option<&Path>,
offset: &Path,
) -> BoxStream<'static, Result<ObjectMeta>> {
let offset = offset.clone();
self.list(prefix)
.try_filter(move |f| futures_util::future::ready(f.location > offset))
.boxed()
}
async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult>;
async fn copy_opts(&self, from: &Path, to: &Path, options: CopyOptions) -> Result<()>;
async fn rename_opts(&self, from: &Path, to: &Path, options: RenameOptions) -> Result<()> {
let RenameOptions {
target_mode,
extensions,
} = options;
let copy_mode = match target_mode {
RenameTargetMode::Overwrite => CopyMode::Overwrite,
RenameTargetMode::Create => CopyMode::Create,
};
let copy_options = CopyOptions {
mode: copy_mode,
extensions,
};
self.copy_opts(from, to, copy_options).await?;
self.delete(from).await?;
Ok(())
}
}
macro_rules! as_ref_impl {
($type:ty) => {
#[async_trait]
#[deny(clippy::missing_trait_methods)]
impl<T: ObjectStore + ?Sized> ObjectStore for $type {
async fn put_opts(
&self,
location: &Path,
payload: PutPayload,
opts: PutOptions,
) -> Result<PutResult> {
self.as_ref().put_opts(location, payload, opts).await
}
async fn put_multipart_opts(
&self,
location: &Path,
opts: PutMultipartOptions,
) -> Result<Box<dyn MultipartUpload>> {
self.as_ref().put_multipart_opts(location, opts).await
}
async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
self.as_ref().get_opts(location, options).await
}
async fn get_ranges(
&self,
location: &Path,
ranges: &[Range<u64>],
) -> Result<Vec<Bytes>> {
self.as_ref().get_ranges(location, ranges).await
}
fn delete_stream(
&self,
locations: BoxStream<'static, Result<Path>>,
) -> BoxStream<'static, Result<Path>> {
self.as_ref().delete_stream(locations)
}
fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
self.as_ref().list(prefix)
}
fn list_with_offset(
&self,
prefix: Option<&Path>,
offset: &Path,
) -> BoxStream<'static, Result<ObjectMeta>> {
self.as_ref().list_with_offset(prefix, offset)
}
async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
self.as_ref().list_with_delimiter(prefix).await
}
async fn copy_opts(&self, from: &Path, to: &Path, options: CopyOptions) -> Result<()> {
self.as_ref().copy_opts(from, to, options).await
}
async fn rename_opts(
&self,
from: &Path,
to: &Path,
options: RenameOptions,
) -> Result<()> {
self.as_ref().rename_opts(from, to, options).await
}
}
};
}
as_ref_impl!(Arc<T>);
as_ref_impl!(Box<T>);
pub trait ObjectStoreExt: ObjectStore {
fn put(&self, location: &Path, payload: PutPayload) -> impl Future<Output = Result<PutResult>>;
fn put_multipart(
&self,
location: &Path,
) -> impl Future<Output = Result<Box<dyn MultipartUpload>>>;
fn get(&self, location: &Path) -> impl Future<Output = Result<GetResult>>;
fn get_range(&self, location: &Path, range: Range<u64>) -> impl Future<Output = Result<Bytes>>;
fn head(&self, location: &Path) -> impl Future<Output = Result<ObjectMeta>>;
fn delete(&self, location: &Path) -> impl Future<Output = Result<()>>;
fn copy(&self, from: &Path, to: &Path) -> impl Future<Output = Result<()>>;
fn copy_if_not_exists(&self, from: &Path, to: &Path) -> impl Future<Output = Result<()>>;
fn rename(&self, from: &Path, to: &Path) -> impl Future<Output = Result<()>>;
fn rename_if_not_exists(&self, from: &Path, to: &Path) -> impl Future<Output = Result<()>>;
}
impl<T> ObjectStoreExt for T
where
T: ObjectStore + ?Sized,
{
async fn put(&self, location: &Path, payload: PutPayload) -> Result<PutResult> {
self.put_opts(location, payload, PutOptions::default())
.await
}
async fn put_multipart(&self, location: &Path) -> Result<Box<dyn MultipartUpload>> {
self.put_multipart_opts(location, PutMultipartOptions::default())
.await
}
async fn get(&self, location: &Path) -> Result<GetResult> {
self.get_opts(location, GetOptions::default()).await
}
async fn get_range(&self, location: &Path, range: Range<u64>) -> Result<Bytes> {
let options = GetOptions::new().with_range(Some(range));
self.get_opts(location, options).await?.bytes().await
}
async fn head(&self, location: &Path) -> Result<ObjectMeta> {
let options = GetOptions::new().with_head(true);
Ok(self.get_opts(location, options).await?.meta)
}
async fn delete(&self, location: &Path) -> Result<()> {
let location = location.clone();
let mut stream =
self.delete_stream(futures_util::stream::once(async move { Ok(location) }).boxed());
let _path = stream.try_next().await?.ok_or_else(|| Error::Generic {
store: "ext",
source: "`delete_stream` with one location should yield once but didn't".into(),
})?;
if stream.next().await.is_some() {
Err(Error::Generic {
store: "ext",
source: "`delete_stream` with one location expected to yield exactly once, but yielded more than once".into(),
})
} else {
Ok(())
}
}
async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
let options = CopyOptions::new().with_mode(CopyMode::Overwrite);
self.copy_opts(from, to, options).await
}
async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
let options = CopyOptions::new().with_mode(CopyMode::Create);
self.copy_opts(from, to, options).await
}
async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
let options = RenameOptions::new().with_target_mode(RenameTargetMode::Overwrite);
self.rename_opts(from, to, options).await
}
async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
let options = RenameOptions::new().with_target_mode(RenameTargetMode::Create);
self.rename_opts(from, to, options).await
}
}
#[derive(Debug)]
pub struct ListResult {
pub common_prefixes: Vec<Path>,
pub objects: Vec<ObjectMeta>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ObjectMeta {
pub location: Path,
pub last_modified: DateTime<Utc>,
pub size: u64,
pub e_tag: Option<String>,
pub version: Option<String>,
}
#[derive(Debug, Default, Clone)]
pub struct GetOptions {
pub if_match: Option<String>,
pub if_none_match: Option<String>,
pub if_modified_since: Option<DateTime<Utc>>,
pub if_unmodified_since: Option<DateTime<Utc>>,
pub range: Option<GetRange>,
pub version: Option<String>,
pub head: bool,
pub extensions: Extensions,
}
impl GetOptions {
pub fn check_preconditions(&self, meta: &ObjectMeta) -> Result<()> {
let etag = meta.e_tag.as_deref().unwrap_or("*");
let last_modified = meta.last_modified;
if let Some(m) = &self.if_match {
if m != "*" && m.split(',').map(str::trim).all(|x| x != etag) {
return Err(Error::Precondition {
path: meta.location.to_string(),
source: format!("{etag} does not match {m}").into(),
});
}
} else if let Some(date) = self.if_unmodified_since {
if last_modified > date {
return Err(Error::Precondition {
path: meta.location.to_string(),
source: format!("{date} < {last_modified}").into(),
});
}
}
if let Some(m) = &self.if_none_match {
if m == "*" || m.split(',').map(str::trim).any(|x| x == etag) {
return Err(Error::NotModified {
path: meta.location.to_string(),
source: format!("{etag} matches {m}").into(),
});
}
} else if let Some(date) = self.if_modified_since {
if last_modified <= date {
return Err(Error::NotModified {
path: meta.location.to_string(),
source: format!("{date} >= {last_modified}").into(),
});
}
}
Ok(())
}
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn with_if_match(mut self, etag: Option<impl Into<String>>) -> Self {
self.if_match = etag.map(Into::into);
self
}
#[must_use]
pub fn with_if_none_match(mut self, etag: Option<impl Into<String>>) -> Self {
self.if_none_match = etag.map(Into::into);
self
}
#[must_use]
pub fn with_if_modified_since(mut self, dt: Option<impl Into<DateTime<Utc>>>) -> Self {
self.if_modified_since = dt.map(Into::into);
self
}
#[must_use]
pub fn with_if_unmodified_since(mut self, dt: Option<impl Into<DateTime<Utc>>>) -> Self {
self.if_unmodified_since = dt.map(Into::into);
self
}
#[must_use]
pub fn with_range(mut self, range: Option<impl Into<GetRange>>) -> Self {
self.range = range.map(Into::into);
self
}
#[must_use]
pub fn with_version(mut self, version: Option<impl Into<String>>) -> Self {
self.version = version.map(Into::into);
self
}
#[must_use]
pub fn with_head(mut self, head: impl Into<bool>) -> Self {
self.head = head.into();
self
}
#[must_use]
pub fn with_extensions(mut self, extensions: Extensions) -> Self {
self.extensions = extensions;
self
}
}
#[derive(Debug)]
pub struct GetResult {
pub payload: GetResultPayload,
pub meta: ObjectMeta,
pub range: Range<u64>,
pub attributes: Attributes,
}
pub enum GetResultPayload {
#[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
File(std::fs::File, std::path::PathBuf),
Stream(BoxStream<'static, Result<Bytes>>),
}
impl Debug for GetResultPayload {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
#[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
Self::File(_, _) => write!(f, "GetResultPayload(File)"),
Self::Stream(_) => write!(f, "GetResultPayload(Stream)"),
}
}
}
impl GetResult {
pub async fn bytes(self) -> Result<Bytes> {
let len = self.range.end - self.range.start;
match self.payload {
#[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
GetResultPayload::File(mut file, path) => {
maybe_spawn_blocking(move || {
use crate::local::read_range;
let buffer = read_range(&mut file, &path, self.range)?;
Ok(buffer)
})
.await
}
GetResultPayload::Stream(s) => collect_bytes(s, Some(len)).await,
}
}
pub fn into_stream(self) -> BoxStream<'static, Result<Bytes>> {
match self.payload {
#[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
GetResultPayload::File(file, path) => {
const CHUNK_SIZE: usize = 8 * 1024;
local::chunked_stream(file, path, self.range, CHUNK_SIZE)
}
GetResultPayload::Stream(s) => s,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub enum PutMode {
#[default]
Overwrite,
Create,
Update(UpdateVersion),
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct UpdateVersion {
pub e_tag: Option<String>,
pub version: Option<String>,
}
impl From<PutResult> for UpdateVersion {
fn from(value: PutResult) -> Self {
Self {
e_tag: value.e_tag,
version: value.version,
}
}
}
#[derive(Debug, Clone, Default)]
pub struct PutOptions {
pub mode: PutMode,
pub tags: TagSet,
pub attributes: Attributes,
pub extensions: Extensions,
}
impl PartialEq<Self> for PutOptions {
fn eq(&self, other: &Self) -> bool {
let Self {
mode,
tags,
attributes,
extensions: _,
} = self;
let Self {
mode: other_mode,
tags: other_tags,
attributes: other_attributes,
extensions: _,
} = other;
(mode == other_mode) && (tags == other_tags) && (attributes == other_attributes)
}
}
impl Eq for PutOptions {}
impl From<PutMode> for PutOptions {
fn from(mode: PutMode) -> Self {
Self {
mode,
..Default::default()
}
}
}
impl From<TagSet> for PutOptions {
fn from(tags: TagSet) -> Self {
Self {
tags,
..Default::default()
}
}
}
impl From<Attributes> for PutOptions {
fn from(attributes: Attributes) -> Self {
Self {
attributes,
..Default::default()
}
}
}
#[doc(hidden)]
#[deprecated(note = "Use PutMultipartOptions", since = "0.12.3")]
pub type PutMultipartOpts = PutMultipartOptions;
#[derive(Debug, Clone, Default)]
pub struct PutMultipartOptions {
pub tags: TagSet,
pub attributes: Attributes,
pub extensions: Extensions,
}
impl PartialEq<Self> for PutMultipartOptions {
fn eq(&self, other: &Self) -> bool {
let Self {
tags,
attributes,
extensions: _,
} = self;
let Self {
tags: other_tags,
attributes: other_attributes,
extensions: _,
} = other;
(tags == other_tags) && (attributes == other_attributes)
}
}
impl Eq for PutMultipartOptions {}
impl From<TagSet> for PutMultipartOptions {
fn from(tags: TagSet) -> Self {
Self {
tags,
..Default::default()
}
}
}
impl From<Attributes> for PutMultipartOptions {
fn from(attributes: Attributes) -> Self {
Self {
attributes,
..Default::default()
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PutResult {
pub e_tag: Option<String>,
pub version: Option<String>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum CopyMode {
#[default]
Overwrite,
Create,
}
#[derive(Debug, Clone, Default)]
pub struct CopyOptions {
pub mode: CopyMode,
pub extensions: Extensions,
}
impl CopyOptions {
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn with_mode(mut self, mode: CopyMode) -> Self {
self.mode = mode;
self
}
#[must_use]
pub fn with_extensions(mut self, extensions: Extensions) -> Self {
self.extensions = extensions;
self
}
}
impl PartialEq<Self> for CopyOptions {
fn eq(&self, other: &Self) -> bool {
let Self {
mode,
extensions: _,
} = self;
let Self {
mode: mode_other,
extensions: _,
} = other;
mode == mode_other
}
}
impl Eq for CopyOptions {}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum RenameTargetMode {
#[default]
Overwrite,
Create,
}
#[derive(Debug, Clone, Default)]
pub struct RenameOptions {
pub target_mode: RenameTargetMode,
pub extensions: Extensions,
}
impl RenameOptions {
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn with_target_mode(mut self, target_mode: RenameTargetMode) -> Self {
self.target_mode = target_mode;
self
}
#[must_use]
pub fn with_extensions(mut self, extensions: Extensions) -> Self {
self.extensions = extensions;
self
}
}
impl PartialEq<Self> for RenameOptions {
fn eq(&self, other: &Self) -> bool {
let Self {
target_mode,
extensions: _,
} = self;
let Self {
target_mode: target_mode_other,
extensions: _,
} = other;
target_mode == target_mode_other
}
}
impl Eq for RenameOptions {}
pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug, thiserror::Error)]
#[non_exhaustive]
pub enum Error {
#[error("Generic {} error: {}", store, source)]
Generic {
store: &'static str,
source: Box<dyn std::error::Error + Send + Sync + 'static>,
},
#[error("Object at location {} not found: {}", path, source)]
NotFound {
path: String,
source: Box<dyn std::error::Error + Send + Sync + 'static>,
},
#[error("Encountered object with invalid path: {}", source)]
InvalidPath {
#[from]
source: path::Error,
},
#[cfg(feature = "tokio")]
#[error("Error joining spawned task: {}", source)]
JoinError {
#[from]
source: tokio::task::JoinError,
},
#[error("Operation not supported: {}", source)]
NotSupported {
source: Box<dyn std::error::Error + Send + Sync + 'static>,
},
#[error("Object at location {} already exists: {}", path, source)]
AlreadyExists {
path: String,
source: Box<dyn std::error::Error + Send + Sync + 'static>,
},
#[error("Request precondition failure for path {}: {}", path, source)]
Precondition {
path: String,
source: Box<dyn std::error::Error + Send + Sync + 'static>,
},
#[error("Object at location {} not modified: {}", path, source)]
NotModified {
path: String,
source: Box<dyn std::error::Error + Send + Sync + 'static>,
},
#[error("Operation {operation} not yet implemented by {implementer}.")]
NotImplemented {
operation: String,
implementer: String,
},
#[error(
"The operation lacked the necessary privileges to complete for path {}: {}",
path,
source
)]
PermissionDenied {
path: String,
source: Box<dyn std::error::Error + Send + Sync + 'static>,
},
#[error(
"The operation lacked valid authentication credentials for path {}: {}",
path,
source
)]
Unauthenticated {
path: String,
source: Box<dyn std::error::Error + Send + Sync + 'static>,
},
#[error("Configuration key: '{}' is not valid for store '{}'.", key, store)]
UnknownConfigurationKey {
store: &'static str,
key: String,
},
}
impl From<Error> for std::io::Error {
fn from(e: Error) -> Self {
let kind = match &e {
Error::NotFound { .. } => std::io::ErrorKind::NotFound,
_ => std::io::ErrorKind::Other,
};
Self::new(kind, e)
}
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::TimeZone;
macro_rules! maybe_skip_integration {
() => {
if std::env::var("TEST_INTEGRATION").is_err() {
eprintln!("Skipping integration test - set TEST_INTEGRATION");
return;
}
};
}
pub(crate) use maybe_skip_integration;
fn list_store<'a>(
store: &'a dyn ObjectStore,
path_str: &str,
) -> BoxStream<'a, Result<ObjectMeta>> {
let path = Path::from(path_str);
store.list(Some(&path))
}
#[cfg(any(feature = "azure", feature = "aws"))]
pub(crate) async fn signing<T>(integration: &T)
where
T: ObjectStore + signer::Signer,
{
use reqwest::Method;
use std::time::Duration;
let data = Bytes::from("hello world");
let path = Path::from("file.txt");
integration.put(&path, data.clone().into()).await.unwrap();
let signed = integration
.signed_url(Method::GET, &path, Duration::from_secs(60))
.await
.unwrap();
let resp = reqwest::get(signed).await.unwrap();
let loaded = resp.bytes().await.unwrap();
assert_eq!(data, loaded);
}
#[cfg(any(feature = "aws", feature = "azure"))]
pub(crate) async fn tagging<F, Fut>(storage: Arc<dyn ObjectStore>, validate: bool, get_tags: F)
where
F: Fn(Path) -> Fut + Send + Sync,
Fut: std::future::Future<Output = Result<client::HttpResponse>> + Send,
{
use bytes::Buf;
use serde::Deserialize;
use tokio::io::AsyncWriteExt;
use crate::buffered::BufWriter;
#[derive(Deserialize)]
struct Tagging {
#[serde(rename = "TagSet")]
list: TagList,
}
#[derive(Debug, Deserialize)]
struct TagList {
#[serde(rename = "Tag")]
tags: Vec<Tag>,
}
#[derive(Debug, Deserialize, Eq, PartialEq)]
#[serde(rename_all = "PascalCase")]
struct Tag {
key: String,
value: String,
}
let tags = vec![
Tag {
key: "foo.com=bar/s".to_string(),
value: "bananas/foo.com-_".to_string(),
},
Tag {
key: "namespace/key.foo".to_string(),
value: "value with a space".to_string(),
},
];
let mut tag_set = TagSet::default();
for t in &tags {
tag_set.push(&t.key, &t.value)
}
let path = Path::from("tag_test");
storage
.put_opts(&path, "test".into(), tag_set.clone().into())
.await
.unwrap();
let multi_path = Path::from("tag_test_multi");
let mut write = storage
.put_multipart_opts(&multi_path, tag_set.clone().into())
.await
.unwrap();
write.put_part("foo".into()).await.unwrap();
write.complete().await.unwrap();
let buf_path = Path::from("tag_test_buf");
let mut buf = BufWriter::new(storage, buf_path.clone()).with_tags(tag_set);
buf.write_all(b"foo").await.unwrap();
buf.shutdown().await.unwrap();
if !validate {
return;
}
for path in [path, multi_path, buf_path] {
let resp = get_tags(path.clone()).await.unwrap();
let body = resp.into_body().bytes().await.unwrap();
let mut resp: Tagging = quick_xml::de::from_reader(body.reader()).unwrap();
resp.list.tags.sort_by(|a, b| a.key.cmp(&b.key));
assert_eq!(resp.list.tags, tags);
}
}
#[tokio::test]
async fn test_list_lifetimes() {
let store = memory::InMemory::new();
let mut stream = list_store(&store, "path");
assert!(stream.next().await.is_none());
}
#[test]
fn test_preconditions() {
let mut meta = ObjectMeta {
location: Path::from("test"),
last_modified: Utc.timestamp_nanos(100),
size: 100,
e_tag: Some("123".to_string()),
version: None,
};
let mut options = GetOptions::default();
options.check_preconditions(&meta).unwrap();
options.if_modified_since = Some(Utc.timestamp_nanos(50));
options.check_preconditions(&meta).unwrap();
options.if_modified_since = Some(Utc.timestamp_nanos(100));
options.check_preconditions(&meta).unwrap_err();
options.if_modified_since = Some(Utc.timestamp_nanos(101));
options.check_preconditions(&meta).unwrap_err();
options = GetOptions::default();
options.if_unmodified_since = Some(Utc.timestamp_nanos(50));
options.check_preconditions(&meta).unwrap_err();
options.if_unmodified_since = Some(Utc.timestamp_nanos(100));
options.check_preconditions(&meta).unwrap();
options.if_unmodified_since = Some(Utc.timestamp_nanos(101));
options.check_preconditions(&meta).unwrap();
options = GetOptions::default();
options.if_match = Some("123".to_string());
options.check_preconditions(&meta).unwrap();
options.if_match = Some("123,354".to_string());
options.check_preconditions(&meta).unwrap();
options.if_match = Some("354, 123,".to_string());
options.check_preconditions(&meta).unwrap();
options.if_match = Some("354".to_string());
options.check_preconditions(&meta).unwrap_err();
options.if_match = Some("*".to_string());
options.check_preconditions(&meta).unwrap();
options.if_unmodified_since = Some(Utc.timestamp_nanos(200));
options.check_preconditions(&meta).unwrap();
options = GetOptions::default();
options.if_none_match = Some("123".to_string());
options.check_preconditions(&meta).unwrap_err();
options.if_none_match = Some("*".to_string());
options.check_preconditions(&meta).unwrap_err();
options.if_none_match = Some("1232".to_string());
options.check_preconditions(&meta).unwrap();
options.if_none_match = Some("23, 123".to_string());
options.check_preconditions(&meta).unwrap_err();
options.if_modified_since = Some(Utc.timestamp_nanos(10));
options.check_preconditions(&meta).unwrap_err();
meta.e_tag = None;
options = GetOptions::default();
options.if_none_match = Some("*".to_string()); options.check_preconditions(&meta).unwrap_err();
options = GetOptions::default();
options.if_match = Some("*".to_string()); options.check_preconditions(&meta).unwrap();
}
#[test]
#[cfg(feature = "http")]
fn test_reexported_types() {
let mut headers = HeaderMap::new();
headers.insert("content-type", HeaderValue::from_static("text/plain"));
assert_eq!(headers.len(), 1);
let value = HeaderValue::from_static("test-value");
assert_eq!(value.as_bytes(), b"test-value");
let mut extensions = Extensions::new();
extensions.insert("test-key");
assert!(extensions.get::<&str>().is_some());
}
#[test]
fn test_get_options_builder() {
let dt = Utc::now();
let extensions = Extensions::new();
let options = GetOptions::new();
assert_eq!(options.if_match, None);
assert_eq!(options.if_none_match, None);
assert_eq!(options.if_modified_since, None);
assert_eq!(options.if_unmodified_since, None);
assert_eq!(options.range, None);
assert_eq!(options.version, None);
assert!(!options.head);
assert!(options.extensions.get::<&str>().is_none());
let options = options
.with_if_match(Some("etag-match"))
.with_if_none_match(Some("etag-none-match"))
.with_if_modified_since(Some(dt))
.with_if_unmodified_since(Some(dt))
.with_range(Some(0..100))
.with_version(Some("version-1"))
.with_head(true)
.with_extensions(extensions.clone());
assert_eq!(options.if_match, Some("etag-match".to_string()));
assert_eq!(options.if_none_match, Some("etag-none-match".to_string()));
assert_eq!(options.if_modified_since, Some(dt));
assert_eq!(options.if_unmodified_since, Some(dt));
assert_eq!(options.range, Some(GetRange::Bounded(0..100)));
assert_eq!(options.version, Some("version-1".to_string()));
assert!(options.head);
assert_eq!(options.extensions.get::<&str>(), extensions.get::<&str>());
}
fn takes_generic_object_store<T: ObjectStore>(store: T) {
let _ = store;
}
#[test]
fn test_dyn_impl() {
let store: Arc<dyn ObjectStore> = Arc::new(memory::InMemory::new());
takes_generic_object_store(store);
let store: Box<dyn ObjectStore> = Box::new(memory::InMemory::new());
takes_generic_object_store(store);
}
#[test]
fn test_generic_impl() {
let store = Arc::new(memory::InMemory::new());
takes_generic_object_store(store);
let store = Box::new(memory::InMemory::new());
takes_generic_object_store(store);
}
}