pub mod azure;
pub mod error;
pub(crate) mod multipart;
pub mod s3;
#[cfg(any(test, feature = "test-util"))]
pub mod mock;
#[cfg(test)]
pub(crate) mod test_support;
use std::path::Path;
use std::sync::Arc;
use bytes::Bytes;
use tempfile::NamedTempFile;
use time::OffsetDateTime;
use tracing::warn;
use self::error::other_boxed;
pub use self::error::{BoxError, ObjectStoreError};
#[derive(Clone)]
pub struct ProgressSink(Arc<dyn Fn(u64) + Send + Sync>);
impl ProgressSink {
pub fn new<F>(f: F) -> Self
where
F: Fn(u64) + Send + Sync + 'static,
{
Self(Arc::new(f))
}
pub fn report(&self, bytes_amount: u64) {
(self.0)(bytes_amount);
}
}
impl std::fmt::Debug for ProgressSink {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ProgressSink").finish_non_exhaustive()
}
}
pub(crate) fn persist_temp(temp: NamedTempFile, dest: &Path) -> Result<(), ObjectStoreError> {
temp.persist(dest)
.map_err(|e| ObjectStoreError::Other(Box::new(e.error)))?;
Ok(())
}
#[derive(Debug, Clone)]
pub struct ObjectMeta {
pub key: String,
pub size: u64,
pub last_modified: OffsetDateTime,
pub etag: Option<String>,
}
#[derive(Debug, Clone, Default)]
pub struct PutOpts {
pub content_disposition: Option<String>,
pub user_metadata: Vec<(String, String)>,
pub progress: Option<ProgressSink>,
}
#[derive(Debug, Clone, Default)]
pub struct GetOpts {
pub progress: Option<ProgressSink>,
}
pub(crate) fn precheck_range(
key: &str,
range: &std::ops::Range<u64>,
) -> Result<Option<Bytes>, ObjectStoreError> {
if range.start == range.end {
return Ok(Some(Bytes::new()));
}
if range.start > range.end {
return Err(ObjectStoreError::RangeNotSatisfiable {
key: key.to_owned(),
requested: range.clone(),
});
}
Ok(None)
}
pub(crate) fn verify_range_response_length(
key: &str,
range: &std::ops::Range<u64>,
body: Bytes,
) -> Result<Bytes, ObjectStoreError> {
let expected = range.end - range.start;
let actual = body.len() as u64;
if actual == expected {
return Ok(body);
}
Err(ObjectStoreError::RangeNotSatisfiable {
key: key.to_owned(),
requested: range.clone(),
})
}
#[async_trait::async_trait]
pub trait ObjectStore: Send + Sync {
async fn list(&self, prefix: &str) -> Result<Vec<ObjectMeta>, ObjectStoreError>;
async fn get_to_file(
&self,
key: &str,
dest: &Path,
opts: GetOpts,
) -> Result<(), ObjectStoreError>;
async fn get_bytes(&self, key: &str) -> Result<Bytes, ObjectStoreError>;
async fn get_bytes_range(
&self,
key: &str,
range: std::ops::Range<u64>,
) -> Result<Bytes, ObjectStoreError>;
async fn put_bytes(
&self,
key: &str,
body: Bytes,
opts: PutOpts,
) -> Result<(), ObjectStoreError>;
async fn put_path(&self, key: &str, src: &Path, opts: PutOpts) -> Result<(), ObjectStoreError> {
warn!(
key,
path = %src.display(),
"put_path: falling back to read-then-put_bytes; override this method to avoid \
buffering the entire file in memory"
);
let body = tokio::fs::read(src).await.map_err(other_boxed)?;
let len = body.len() as u64;
let progress = opts.progress.clone();
let inner_opts = PutOpts {
progress: None,
..opts
};
self.put_bytes(key, Bytes::from(body), inner_opts).await?;
if let Some(sink) = progress
&& len > 0
{
sink.report(len);
}
Ok(())
}
async fn put_if_absent(&self, key: &str, body: Bytes) -> Result<bool, ObjectStoreError>;
async fn head(&self, key: &str) -> Result<ObjectMeta, ObjectStoreError>;
async fn copy(&self, src: &str, dst: &str) -> Result<(), ObjectStoreError>;
async fn delete(&self, key: &str) -> Result<(), ObjectStoreError>;
async fn presigned_get_url(
&self,
key: &str,
ttl: std::time::Duration,
) -> Result<String, ObjectStoreError> {
let _ = (key, ttl);
Err(ObjectStoreError::Unsupported(
"presigned URLs are not supported by this backend".to_owned(),
))
}
}
#[async_trait::async_trait]
impl<T: ObjectStore + ?Sized> ObjectStore for Arc<T> {
async fn list(&self, prefix: &str) -> Result<Vec<ObjectMeta>, ObjectStoreError> {
(**self).list(prefix).await
}
async fn get_to_file(
&self,
key: &str,
dest: &Path,
opts: GetOpts,
) -> Result<(), ObjectStoreError> {
(**self).get_to_file(key, dest, opts).await
}
async fn get_bytes(&self, key: &str) -> Result<Bytes, ObjectStoreError> {
(**self).get_bytes(key).await
}
async fn get_bytes_range(
&self,
key: &str,
range: std::ops::Range<u64>,
) -> Result<Bytes, ObjectStoreError> {
(**self).get_bytes_range(key, range).await
}
async fn put_bytes(
&self,
key: &str,
body: Bytes,
opts: PutOpts,
) -> Result<(), ObjectStoreError> {
(**self).put_bytes(key, body, opts).await
}
async fn put_path(&self, key: &str, src: &Path, opts: PutOpts) -> Result<(), ObjectStoreError> {
(**self).put_path(key, src, opts).await
}
async fn put_if_absent(&self, key: &str, body: Bytes) -> Result<bool, ObjectStoreError> {
(**self).put_if_absent(key, body).await
}
async fn head(&self, key: &str) -> Result<ObjectMeta, ObjectStoreError> {
(**self).head(key).await
}
async fn copy(&self, src: &str, dst: &str) -> Result<(), ObjectStoreError> {
(**self).copy(src, dst).await
}
async fn delete(&self, key: &str) -> Result<(), ObjectStoreError> {
(**self).delete(key).await
}
async fn presigned_get_url(
&self,
key: &str,
ttl: std::time::Duration,
) -> Result<String, ObjectStoreError> {
(**self).presigned_get_url(key, ttl).await
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn precheck_range_empty_short_circuits_with_empty_bytes() {
let out = precheck_range("k", &(5..5)).expect("empty range is valid");
let bytes = out.expect("empty range short-circuits with Some");
assert!(bytes.is_empty());
}
#[test]
fn precheck_range_inverted_returns_range_not_satisfiable() {
let range = std::ops::Range { start: 7, end: 3 };
let err = precheck_range("k", &range).expect_err("inverted range must error");
assert!(matches!(
err,
ObjectStoreError::RangeNotSatisfiable {
ref key,
requested: ref r,
} if key == "k" && r.start == 7 && r.end == 3
));
}
#[test]
fn precheck_range_well_formed_returns_none() {
let out = precheck_range("k", &(2..6)).expect("valid range");
assert!(out.is_none(), "well-formed range proceeds to SDK call");
}
#[test]
fn verify_range_response_length_passes_exact_length() {
let range = 2..6;
let body = Bytes::from_static(b"abcd"); let out = verify_range_response_length("k", &range, body.clone())
.expect("exact-length body must pass");
assert_eq!(out, body);
}
#[test]
fn verify_range_response_length_rejects_truncated_body() {
let range = 2..6;
let body = Bytes::from_static(b"ab");
let err = verify_range_response_length("pack-key", &range, body)
.expect_err("short body must be rejected");
assert!(
matches!(
err,
ObjectStoreError::RangeNotSatisfiable {
ref key,
requested: ref r,
} if key == "pack-key" && r.start == 2 && r.end == 6,
),
"expected RangeNotSatisfiable(pack-key, 2..6), got {err:?}"
);
}
#[test]
fn verify_range_response_length_rejects_overlong_body() {
let range = 0..4;
let body = Bytes::from_static(b"abcdef");
let err = verify_range_response_length("k", &range, body)
.expect_err("overlong body must be rejected");
assert!(matches!(err, ObjectStoreError::RangeNotSatisfiable { .. }));
}
#[test]
fn verify_range_response_length_single_byte_round_trip() {
let range = 7..8;
let body = Bytes::from_static(b"x");
let out = verify_range_response_length("k", &range, body.clone())
.expect("single-byte body must pass");
assert_eq!(out, body);
}
}