#![deny(rustdoc::broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
#![warn(
missing_copy_implementations,
missing_debug_implementations,
missing_docs,
clippy::explicit_iter_loop,
clippy::future_not_send,
clippy::use_self,
clippy::clone_on_ref_ptr
)]
#[cfg(feature = "aws")]
pub mod aws;
#[cfg(feature = "azure")]
pub mod azure;
#[cfg(feature = "gcp")]
pub mod gcp;
pub mod limit;
pub mod local;
pub mod memory;
pub mod path;
pub mod throttle;
#[cfg(feature = "gcp")]
mod client;
#[cfg(feature = "gcp")]
pub use client::{backoff::BackoffConfig, retry::RetryConfig};
#[cfg(any(feature = "azure", feature = "aws", feature = "gcp"))]
mod multipart;
mod util;
use crate::path::Path;
use crate::util::{
coalesce_ranges, collect_bytes, maybe_spawn_blocking, OBJECT_STORE_COALESCE_DEFAULT,
};
use async_trait::async_trait;
use bytes::Bytes;
use chrono::{DateTime, Utc};
use futures::{stream::BoxStream, StreamExt};
use snafu::Snafu;
use std::fmt::{Debug, Formatter};
use std::io::{Read, Seek, SeekFrom};
use std::ops::Range;
use tokio::io::AsyncWrite;
pub type DynObjectStore = dyn ObjectStore;
pub type MultipartId = String;
#[async_trait]
pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {
async fn put(&self, location: &Path, bytes: Bytes) -> Result<()>;
async fn put_multipart(
&self,
location: &Path,
) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)>;
async fn abort_multipart(
&self,
location: &Path,
multipart_id: &MultipartId,
) -> Result<()>;
async fn get(&self, location: &Path) -> Result<GetResult>;
async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes>;
async fn get_ranges(
&self,
location: &Path,
ranges: &[Range<usize>],
) -> Result<Vec<Bytes>> {
coalesce_ranges(
ranges,
|range| self.get_range(location, range),
OBJECT_STORE_COALESCE_DEFAULT,
)
.await
}
async fn head(&self, location: &Path) -> Result<ObjectMeta>;
async fn delete(&self, location: &Path) -> Result<()>;
async fn list(
&self,
prefix: Option<&Path>,
) -> Result<BoxStream<'_, Result<ObjectMeta>>>;
async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult>;
async fn copy(&self, from: &Path, to: &Path) -> Result<()>;
async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
self.copy(from, to).await?;
self.delete(from).await
}
async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()>;
async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
self.copy_if_not_exists(from, to).await?;
self.delete(from).await
}
}
#[derive(Debug)]
pub struct ListResult {
pub common_prefixes: Vec<Path>,
pub objects: Vec<ObjectMeta>,
}
#[derive(Debug, Clone, PartialEq)]
pub struct ObjectMeta {
pub location: Path,
pub last_modified: DateTime<Utc>,
pub size: usize,
}
pub enum GetResult {
File(std::fs::File, std::path::PathBuf),
Stream(BoxStream<'static, Result<Bytes>>),
}
impl Debug for GetResult {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::File(_, _) => write!(f, "GetResult(File)"),
Self::Stream(_) => write!(f, "GetResult(Stream)"),
}
}
}
impl GetResult {
pub async fn bytes(self) -> Result<Bytes> {
match self {
Self::File(mut file, path) => {
maybe_spawn_blocking(move || {
let len = file.seek(SeekFrom::End(0)).map_err(|source| {
local::Error::Seek {
source,
path: path.clone(),
}
})?;
file.seek(SeekFrom::Start(0)).map_err(|source| {
local::Error::Seek {
source,
path: path.clone(),
}
})?;
let mut buffer = Vec::with_capacity(len as usize);
file.read_to_end(&mut buffer).map_err(|source| {
local::Error::UnableToReadBytes { source, path }
})?;
Ok(buffer.into())
})
.await
}
Self::Stream(s) => collect_bytes(s, None).await,
}
}
pub fn into_stream(self) -> BoxStream<'static, Result<Bytes>> {
match self {
Self::File(file, path) => {
const CHUNK_SIZE: usize = 8 * 1024;
futures::stream::try_unfold(
(file, path, false),
|(mut file, path, finished)| {
maybe_spawn_blocking(move || {
if finished {
return Ok(None);
}
let mut buffer = Vec::with_capacity(CHUNK_SIZE);
let read = file
.by_ref()
.take(CHUNK_SIZE as u64)
.read_to_end(&mut buffer)
.map_err(|e| local::Error::UnableToReadBytes {
source: e,
path: path.clone(),
})?;
Ok(Some((buffer.into(), (file, path, read != CHUNK_SIZE))))
})
},
)
.boxed()
}
Self::Stream(s) => s,
}
}
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug, Snafu)]
#[allow(missing_docs)]
pub enum Error {
#[snafu(display("Generic {} error: {}", store, source))]
Generic {
store: &'static str,
source: Box<dyn std::error::Error + Send + Sync + 'static>,
},
#[snafu(display("Object at location {} not found: {}", path, source))]
NotFound {
path: String,
source: Box<dyn std::error::Error + Send + Sync + 'static>,
},
#[snafu(
display("Encountered object with invalid path: {}", source),
context(false)
)]
InvalidPath { source: path::Error },
#[snafu(display("Error joining spawned task: {}", source), context(false))]
JoinError { source: tokio::task::JoinError },
#[snafu(display("Operation not supported: {}", source))]
NotSupported {
source: Box<dyn std::error::Error + Send + Sync + 'static>,
},
#[snafu(display("Object at location {} already exists: {}", path, source))]
AlreadyExists {
path: String,
source: Box<dyn std::error::Error + Send + Sync + 'static>,
},
#[snafu(display("Operation not yet implemented."))]
NotImplemented,
#[cfg(feature = "gcp")]
#[snafu(display("OAuth error: {}", source), context(false))]
OAuth { source: client::oauth::Error },
}
#[cfg(test)]
mod test_util {
use super::*;
use futures::TryStreamExt;
pub async fn flatten_list_stream(
storage: &DynObjectStore,
prefix: Option<&Path>,
) -> Result<Vec<Path>> {
storage
.list(prefix)
.await?
.map_ok(|meta| meta.location)
.try_collect::<Vec<Path>>()
.await
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test_util::flatten_list_stream;
use tokio::io::AsyncWriteExt;
pub(crate) async fn put_get_delete_list(storage: &DynObjectStore) {
let store_str = storage.to_string();
delete_fixtures(storage).await;
let content_list = flatten_list_stream(storage, None).await.unwrap();
assert!(
content_list.is_empty(),
"Expected list to be empty; found: {:?}",
content_list
);
let location = Path::from("test_dir/test_file.json");
let data = Bytes::from("arbitrary data");
let expected_data = data.clone();
storage.put(&location, data).await.unwrap();
let root = Path::from("/");
let content_list = flatten_list_stream(storage, None).await.unwrap();
assert_eq!(content_list, &[location.clone()]);
let content_list = flatten_list_stream(storage, Some(&root)).await.unwrap();
assert_eq!(content_list, &[location.clone()]);
let result = storage.list_with_delimiter(None).await.unwrap();
assert_eq!(&result.objects, &[]);
assert_eq!(result.common_prefixes.len(), 1);
assert_eq!(result.common_prefixes[0], Path::from("test_dir"));
let result = storage.list_with_delimiter(Some(&root)).await.unwrap();
assert!(result.objects.is_empty());
assert_eq!(result.common_prefixes.len(), 1);
assert_eq!(result.common_prefixes[0], Path::from("test_dir"));
let prefix = Path::from("test_dir");
let content_list = flatten_list_stream(storage, Some(&prefix)).await.unwrap();
assert_eq!(content_list, &[location.clone()]);
let prefix = Path::from("something");
let content_list = flatten_list_stream(storage, Some(&prefix)).await.unwrap();
assert!(content_list.is_empty());
let read_data = storage.get(&location).await.unwrap().bytes().await.unwrap();
assert_eq!(&*read_data, expected_data);
let range = 3..7;
let range_result = storage.get_range(&location, range.clone()).await;
let out_of_range = 200..300;
let out_of_range_result = storage.get_range(&location, out_of_range).await;
if store_str.starts_with("MicrosoftAzureEmulator") {
let err = range_result.unwrap_err().to_string();
assert!(err.contains("x-ms-range-get-content-crc64 header or parameter is not supported in Azurite strict mode"), "{}", err);
let err = out_of_range_result.unwrap_err().to_string();
assert!(err.contains("x-ms-range-get-content-crc64 header or parameter is not supported in Azurite strict mode"), "{}", err);
} else {
let bytes = range_result.unwrap();
assert_eq!(bytes, expected_data.slice(range));
out_of_range_result.unwrap_err();
let ranges = vec![0..1, 2..3, 0..5];
let bytes = storage.get_ranges(&location, &ranges).await.unwrap();
for (range, bytes) in ranges.iter().zip(bytes) {
assert_eq!(bytes, expected_data.slice(range.clone()))
}
}
let head = storage.head(&location).await.unwrap();
assert_eq!(head.size, expected_data.len());
storage.delete(&location).await.unwrap();
let content_list = flatten_list_stream(storage, None).await.unwrap();
assert!(content_list.is_empty());
let err = storage.get(&location).await.unwrap_err();
assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
let err = storage.head(&location).await.unwrap_err();
assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
let file_with_delimiter = Path::from_iter(["a", "b/c", "foo.file"]);
storage
.put(&file_with_delimiter, Bytes::from("arbitrary"))
.await
.unwrap();
let files = flatten_list_stream(storage, None).await.unwrap();
assert_eq!(files, vec![file_with_delimiter.clone()]);
let files = flatten_list_stream(storage, Some(&Path::from("a/b")))
.await
.unwrap();
assert!(files.is_empty());
let files = storage
.list_with_delimiter(Some(&Path::from("a/b")))
.await
.unwrap();
assert!(files.common_prefixes.is_empty());
assert!(files.objects.is_empty());
let files = storage
.list_with_delimiter(Some(&Path::from("a")))
.await
.unwrap();
assert_eq!(files.common_prefixes, vec![Path::from_iter(["a", "b/c"])]);
assert!(files.objects.is_empty());
let files = storage
.list_with_delimiter(Some(&Path::from_iter(["a", "b/c"])))
.await
.unwrap();
assert!(files.common_prefixes.is_empty());
assert_eq!(files.objects.len(), 1);
assert_eq!(files.objects[0].location, file_with_delimiter);
storage.delete(&file_with_delimiter).await.unwrap();
let emoji_prefix = Path::from("🙀");
let emoji_file = Path::from("🙀/😀.parquet");
storage
.put(&emoji_file, Bytes::from("arbitrary"))
.await
.unwrap();
storage.head(&emoji_file).await.unwrap();
storage
.get(&emoji_file)
.await
.unwrap()
.bytes()
.await
.unwrap();
let files = flatten_list_stream(storage, Some(&emoji_prefix))
.await
.unwrap();
assert_eq!(files, vec![emoji_file.clone()]);
let dst = Path::from("foo.parquet");
storage.copy(&emoji_file, &dst).await.unwrap();
let mut files = flatten_list_stream(storage, None).await.unwrap();
files.sort_unstable();
assert_eq!(files, vec![emoji_file.clone(), dst.clone()]);
storage.delete(&emoji_file).await.unwrap();
storage.delete(&dst).await.unwrap();
let files = flatten_list_stream(storage, Some(&emoji_prefix))
.await
.unwrap();
assert!(files.is_empty());
let hello_prefix = Path::parse("%48%45%4C%4C%4F").unwrap();
let path = hello_prefix.child("foo.parquet");
storage.put(&path, Bytes::from(vec![0, 1])).await.unwrap();
let files = flatten_list_stream(storage, Some(&hello_prefix))
.await
.unwrap();
assert_eq!(files, vec![path.clone()]);
let files = flatten_list_stream(storage, Some(&Path::from("HELLO")))
.await
.unwrap();
assert!(files.is_empty());
let err = storage
.head(&Path::from("HELLO/foo.parquet"))
.await
.unwrap_err();
assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
storage.delete(&path).await.unwrap();
let path = Path::parse("%Q.parquet").unwrap();
storage.put(&path, Bytes::from(vec![0, 1])).await.unwrap();
let files = flatten_list_stream(storage, None).await.unwrap();
assert_eq!(files, vec![path.clone()]);
storage.delete(&path).await.unwrap();
}
fn get_vec_of_bytes(chunk_length: usize, num_chunks: usize) -> Vec<Bytes> {
std::iter::repeat(Bytes::from_iter(std::iter::repeat(b'x').take(chunk_length)))
.take(num_chunks)
.collect()
}
pub(crate) async fn stream_get(storage: &DynObjectStore) {
let location = Path::from("test_dir/test_upload_file.txt");
let data = get_vec_of_bytes(5_000_000, 10);
let bytes_expected = data.concat();
let (_, mut writer) = storage.put_multipart(&location).await.unwrap();
for chunk in &data {
writer.write_all(chunk).await.unwrap();
}
let meta_res = storage.head(&location).await;
assert!(meta_res.is_err());
assert!(matches!(
meta_res.unwrap_err(),
crate::Error::NotFound { .. }
));
writer.shutdown().await.unwrap();
let bytes_written = storage.get(&location).await.unwrap().bytes().await.unwrap();
assert_eq!(bytes_expected, bytes_written);
let data = get_vec_of_bytes(5_000, 5);
let bytes_expected = data.concat();
let (_, mut writer) = storage.put_multipart(&location).await.unwrap();
for chunk in &data {
writer.write_all(chunk).await.unwrap();
}
writer.shutdown().await.unwrap();
let bytes_written = storage.get(&location).await.unwrap().bytes().await.unwrap();
assert_eq!(bytes_expected, bytes_written);
let location = Path::from("test_dir/test_abort_upload.txt");
let (upload_id, writer) = storage.put_multipart(&location).await.unwrap();
drop(writer);
storage
.abort_multipart(&location, &upload_id)
.await
.unwrap();
let get_res = storage.get(&location).await;
assert!(get_res.is_err());
assert!(matches!(
get_res.unwrap_err(),
crate::Error::NotFound { .. }
));
let (upload_id, mut writer) = storage.put_multipart(&location).await.unwrap();
if let Some(chunk) = data.get(0) {
writer.write_all(chunk).await.unwrap();
let _ = writer.write(chunk).await.unwrap();
}
drop(writer);
storage
.abort_multipart(&location, &upload_id)
.await
.unwrap();
let get_res = storage.get(&location).await;
assert!(get_res.is_err());
assert!(matches!(
get_res.unwrap_err(),
crate::Error::NotFound { .. }
));
}
pub(crate) async fn list_uses_directories_correctly(storage: &DynObjectStore) {
delete_fixtures(storage).await;
let content_list = flatten_list_stream(storage, None).await.unwrap();
assert!(
content_list.is_empty(),
"Expected list to be empty; found: {:?}",
content_list
);
let location1 = Path::from("foo/x.json");
let location2 = Path::from("foo.bar/y.json");
let data = Bytes::from("arbitrary data");
storage.put(&location1, data.clone()).await.unwrap();
storage.put(&location2, data).await.unwrap();
let prefix = Path::from("foo");
let content_list = flatten_list_stream(storage, Some(&prefix)).await.unwrap();
assert_eq!(content_list, &[location1.clone()]);
let prefix = Path::from("foo/x");
let content_list = flatten_list_stream(storage, Some(&prefix)).await.unwrap();
assert_eq!(content_list, &[]);
}
pub(crate) async fn list_with_delimiter(storage: &DynObjectStore) {
delete_fixtures(storage).await;
let content_list = flatten_list_stream(storage, None).await.unwrap();
assert!(content_list.is_empty());
let data = Bytes::from("arbitrary data");
let files: Vec<_> = [
"test_file",
"mydb/wb/000/000/000.segment",
"mydb/wb/000/000/001.segment",
"mydb/wb/000/000/002.segment",
"mydb/wb/001/001/000.segment",
"mydb/wb/foo.json",
"mydb/wbwbwb/111/222/333.segment",
"mydb/data/whatevs",
]
.iter()
.map(|&s| Path::from(s))
.collect();
for f in &files {
let data = data.clone();
storage.put(f, data).await.unwrap();
}
let prefix = Path::from("mydb/wb");
let expected_000 = Path::from("mydb/wb/000");
let expected_001 = Path::from("mydb/wb/001");
let expected_location = Path::from("mydb/wb/foo.json");
let result = storage.list_with_delimiter(Some(&prefix)).await.unwrap();
assert_eq!(result.common_prefixes, vec![expected_000, expected_001]);
assert_eq!(result.objects.len(), 1);
let object = &result.objects[0];
assert_eq!(object.location, expected_location);
assert_eq!(object.size, data.len());
let prefix = Path::from("mydb/wb/000/000/001");
let result = storage.list_with_delimiter(Some(&prefix)).await.unwrap();
assert!(result.common_prefixes.is_empty());
assert_eq!(result.objects.len(), 0);
let prefix = Path::from("not_there");
let result = storage.list_with_delimiter(Some(&prefix)).await.unwrap();
assert!(result.common_prefixes.is_empty());
assert!(result.objects.is_empty());
for f in &files {
storage.delete(f).await.unwrap();
}
let content_list = flatten_list_stream(storage, None).await.unwrap();
assert!(content_list.is_empty());
}
pub(crate) async fn get_nonexistent_object(
storage: &DynObjectStore,
location: Option<Path>,
) -> crate::Result<Bytes> {
let location =
location.unwrap_or_else(|| Path::from("this_file_should_not_exist"));
let err = storage.head(&location).await.unwrap_err();
assert!(matches!(err, crate::Error::NotFound { .. }));
storage.get(&location).await?.bytes().await
}
pub(crate) async fn rename_and_copy(storage: &DynObjectStore) {
let path1 = Path::from("test1");
let path2 = Path::from("test2");
let contents1 = Bytes::from("cats");
let contents2 = Bytes::from("dogs");
storage.put(&path1, contents1.clone()).await.unwrap();
storage.put(&path2, contents2.clone()).await.unwrap();
storage.copy(&path1, &path2).await.unwrap();
let new_contents = storage.get(&path2).await.unwrap().bytes().await.unwrap();
assert_eq!(&new_contents, &contents1);
storage.put(&path1, contents1.clone()).await.unwrap();
storage.put(&path2, contents2.clone()).await.unwrap();
storage.rename(&path1, &path2).await.unwrap();
let new_contents = storage.get(&path2).await.unwrap().bytes().await.unwrap();
assert_eq!(&new_contents, &contents1);
let result = storage.get(&path1).await;
assert!(result.is_err());
assert!(matches!(result.unwrap_err(), crate::Error::NotFound { .. }));
storage.delete(&path2).await.unwrap();
}
pub(crate) async fn copy_if_not_exists(storage: &DynObjectStore) {
let path1 = Path::from("test1");
let path2 = Path::from("test2");
let contents1 = Bytes::from("cats");
let contents2 = Bytes::from("dogs");
storage.put(&path1, contents1.clone()).await.unwrap();
storage.put(&path2, contents2.clone()).await.unwrap();
let result = storage.copy_if_not_exists(&path1, &path2).await;
assert!(result.is_err());
assert!(matches!(
result.unwrap_err(),
crate::Error::AlreadyExists { .. }
));
storage.delete(&path2).await.unwrap();
storage.copy_if_not_exists(&path1, &path2).await.unwrap();
storage.delete(&path1).await.unwrap();
let new_contents = storage.get(&path2).await.unwrap().bytes().await.unwrap();
assert_eq!(&new_contents, &contents1);
let result = storage.get(&path1).await;
assert!(result.is_err());
assert!(matches!(result.unwrap_err(), crate::Error::NotFound { .. }));
storage.delete(&path2).await.unwrap();
}
async fn delete_fixtures(storage: &DynObjectStore) {
let paths = flatten_list_stream(storage, None).await.unwrap();
for f in &paths {
let _ = storage.delete(f).await;
}
}
async fn list_store<'a, 'b>(
store: &'a dyn ObjectStore,
path_str: &'b str,
) -> super::Result<BoxStream<'a, super::Result<ObjectMeta>>> {
let path = Path::from(path_str);
store.list(Some(&path)).await
}
#[tokio::test]
async fn test_list_lifetimes() {
let store = memory::InMemory::new();
let mut stream = list_store(&store, "path").await.unwrap();
assert!(stream.next().await.is_none());
}
}