use std::{ops::Range, sync::Arc, time::Duration};
use async_trait::async_trait;
use bytes::Bytes;
use futures::TryStreamExt;
use object_store::{
ClientOptions, Error as ObjError, MultipartUpload, ObjectStore, ObjectStoreExt, PutMode,
PutOptions, PutPayload, UpdateVersion,
azure::{MicrosoftAzure, MicrosoftAzureBuilder},
path::Path as ObjPath,
};
use super::{ObjectMeta, StorageError, StorageProvider, retry};
#[derive(Debug)]
pub struct AzureStorageProvider {
container: String,
prefix: String,
store: Arc<MicrosoftAzure>,
}
impl AzureStorageProvider {
pub fn new(container: impl Into<String>) -> Result<Self, StorageError> {
let container = container.into();
let store = MicrosoftAzureBuilder::from_env()
.with_container_name(&container)
.with_client_options(tuned_client_options())
.with_retry(retry::config())
.build()
.map_err(|e| StorageError::Permanent {
uri: format!("azure://{container}"),
source: Box::new(e),
})?;
Ok(Self {
container,
prefix: String::new(),
store: Arc::new(store),
})
}
pub fn new_with_prefix(
container: impl Into<String>,
prefix: impl Into<String>,
) -> Result<Self, StorageError> {
let mut provider = Self::new(container)?;
provider.prefix = normalize_prefix(prefix);
Ok(provider)
}
pub fn new_with_emulator(container: impl Into<String>) -> Result<Self, StorageError> {
let container = container.into();
let store = MicrosoftAzureBuilder::new()
.with_use_emulator(true)
.with_container_name(&container)
.build()
.map_err(|e| StorageError::Permanent {
uri: format!("azure://{container} @ emulator"),
source: Box::new(e),
})?;
Ok(Self {
container,
prefix: String::new(),
store: Arc::new(store),
})
}
pub fn from_object_store(container: impl Into<String>, store: MicrosoftAzure) -> Self {
Self {
container: container.into(),
prefix: String::new(),
store: Arc::new(store),
}
}
pub fn container(&self) -> &str {
&self.container
}
pub fn prefix(&self) -> &str {
&self.prefix
}
fn key(&self, uri: &str) -> String {
let uri = uri.trim_start_matches('/');
if self.prefix.is_empty() {
uri.to_string()
} else {
format!("{}/{uri}", self.prefix)
}
}
fn path(&self, uri: &str) -> Result<ObjPath, StorageError> {
let key = self.key(uri);
ObjPath::parse(&key).map_err(|e| StorageError::Permanent {
uri: uri.into(),
source: Box::new(e),
})
}
}
fn normalize_prefix(prefix: impl Into<String>) -> String {
prefix.into().trim_matches('/').to_string()
}
const AZURE_POOL_MAX_IDLE_PER_HOST: usize = 1024;
const AZURE_POOL_IDLE_TIMEOUT: Duration = Duration::from_secs(90);
const AZURE_CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
const AZURE_REQUEST_TIMEOUT: Duration = Duration::from_secs(300);
fn tuned_client_options() -> ClientOptions {
ClientOptions::new()
.with_pool_max_idle_per_host(AZURE_POOL_MAX_IDLE_PER_HOST)
.with_pool_idle_timeout(AZURE_POOL_IDLE_TIMEOUT)
.with_connect_timeout(AZURE_CONNECT_TIMEOUT)
.with_timeout(AZURE_REQUEST_TIMEOUT)
}
fn translate(uri: &str, e: ObjError) -> StorageError {
match e {
ObjError::NotFound { .. } => StorageError::NotFound { uri: uri.into() },
ObjError::AlreadyExists { .. } | ObjError::Precondition { .. } => {
StorageError::PreconditionFailed { uri: uri.into() }
}
ObjError::Generic { source, .. } => StorageError::TransientExhausted {
uri: uri.into(),
source,
},
other => StorageError::Permanent {
uri: uri.into(),
source: Box::new(other),
},
}
}
#[async_trait]
impl StorageProvider for AzureStorageProvider {
async fn head(&self, uri: &str) -> Result<ObjectMeta, StorageError> {
let path = self.path(uri)?;
let meta = self
.store
.head(&path)
.await
.map_err(|e| translate(uri, e))?;
Ok(ObjectMeta {
size: meta.size as u64,
etag: meta.e_tag,
last_modified: meta.last_modified.into(),
})
}
async fn get(&self, uri: &str) -> Result<(Bytes, ObjectMeta), StorageError> {
let path = self.path(uri)?;
retry::with_reissue(|| async {
let result = self.store.get(&path).await.map_err(|e| translate(uri, e))?;
let meta = ObjectMeta {
size: result.meta.size as u64,
etag: result.meta.e_tag.clone(),
last_modified: result.meta.last_modified.into(),
};
let bytes = result.bytes().await.map_err(|e| translate(uri, e))?;
Ok((bytes, meta))
})
.await
}
async fn get_range(&self, uri: &str, range: Range<u64>) -> Result<Bytes, StorageError> {
let path = self.path(uri)?;
retry::complete_range(uri, range, |r| async {
self.store
.get_range(&path, r)
.await
.map_err(|e| translate(uri, e))
})
.await
}
async fn tail(&self, uri: &str, len: u64) -> Result<(Bytes, u64), StorageError> {
let size = self.head(uri).await?.size;
if len == 0 {
return Ok((Bytes::new(), size));
}
let start = size.saturating_sub(len);
let bytes = self.get_range(uri, start..size).await?;
Ok((bytes, size))
}
async fn put_atomic(&self, uri: &str, bytes: Bytes) -> Result<Option<String>, StorageError> {
let path = self.path(uri)?;
let opts = PutOptions {
mode: PutMode::Create,
..Default::default()
};
self.store
.put_opts(&path, PutPayload::from_bytes(bytes), opts)
.await
.map(|r| r.e_tag)
.map_err(|e| translate(uri, e))
}
async fn put_if_match(
&self,
uri: &str,
bytes: Bytes,
expected_etag: Option<&str>,
) -> Result<Option<String>, StorageError> {
let path = self.path(uri)?;
let opts = match expected_etag {
None => PutOptions {
mode: PutMode::Create,
..Default::default()
},
Some(expected) => PutOptions {
mode: PutMode::Update(UpdateVersion {
e_tag: Some(expected.to_string()),
version: None,
}),
..Default::default()
},
};
self.store
.put_opts(&path, PutPayload::from_bytes(bytes), opts)
.await
.map(|r| r.e_tag)
.map_err(|e| translate(uri, e))
}
async fn put_multipart(&self, uri: &str) -> Result<Box<dyn MultipartUpload>, StorageError> {
let path = self.path(uri)?;
self.store
.put_multipart(&path)
.await
.map_err(|e| translate(uri, e))
}
async fn delete(&self, uri: &str) -> Result<(), StorageError> {
let path = self.path(uri)?;
match self.store.delete(&path).await {
Ok(()) => Ok(()),
Err(ObjError::NotFound { .. }) => Ok(()),
Err(e) => Err(translate(uri, e)),
}
}
async fn list_with_prefix_metadata(
&self,
prefix: &str,
) -> Result<Vec<(String, ObjectMeta)>, StorageError> {
let path = ObjPath::from(prefix);
let mut stream = self.store.list(Some(&path));
let mut out = Vec::new();
while let Some(meta) = stream.try_next().await.map_err(|e| translate(prefix, e))? {
out.push((
meta.location.to_string(),
ObjectMeta {
size: meta.size,
etag: meta.e_tag,
last_modified: meta.last_modified.into(),
},
));
}
Ok(out)
}
fn object_store_handle(&self, uri: &str) -> Option<(Arc<dyn ObjectStore>, ObjPath)> {
let path = self.path(uri).ok()?;
Some((Arc::clone(&self.store) as Arc<dyn ObjectStore>, path))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn translate_not_found_to_typed_variant() {
let err = translate(
"some/key",
ObjError::NotFound {
path: "some/key".into(),
source: "raw".into(),
},
);
match err {
StorageError::NotFound { uri } => assert_eq!(uri, "some/key"),
other => panic!("expected NotFound; got {other:?}"),
}
}
#[test]
fn translate_already_exists_to_precondition_failed() {
let err = translate(
"k",
ObjError::AlreadyExists {
path: "k".into(),
source: "raw".into(),
},
);
assert!(matches!(err, StorageError::PreconditionFailed { uri } if uri == "k"));
}
#[test]
fn translate_precondition_to_precondition_failed() {
let err = translate(
"k",
ObjError::Precondition {
path: "k".into(),
source: "raw".into(),
},
);
assert!(matches!(err, StorageError::PreconditionFailed { uri } if uri == "k"));
}
#[test]
fn translate_generic_to_transient_exhausted() {
let err = translate(
"k",
ObjError::Generic {
store: "MicrosoftAzure",
source: "boom".into(),
},
);
match err {
StorageError::TransientExhausted { uri, .. } => assert_eq!(uri, "k"),
other => panic!("expected TransientExhausted; got {other:?}"),
}
}
#[test]
fn translate_other_variant_to_permanent() {
let err = translate(
"k",
ObjError::UnknownConfigurationKey {
store: "MicrosoftAzure",
key: "foo".into(),
},
);
match err {
StorageError::Permanent { uri, .. } => assert_eq!(uri, "k"),
other => panic!("expected Permanent; got {other:?}"),
}
}
#[test]
fn path_parses_simple_uri() {
let p = test_provider().path("foo/bar.txt").expect("parse");
assert_eq!(p.to_string(), "foo/bar.txt");
}
#[test]
fn path_parses_nested_uri() {
let p = test_provider()
.path("manifest-lists/list-000042.json")
.expect("parse");
assert_eq!(p.to_string(), "manifest-lists/list-000042.json");
}
#[test]
fn path_applies_prefix() {
let mut p = test_provider();
p.prefix = "tbl".into();
assert_eq!(p.key("data/seg-1"), "tbl/data/seg-1");
}
fn test_provider() -> AzureStorageProvider {
AzureStorageProvider::new_with_emulator("test-container")
.expect("construct emulator provider")
}
#[test]
fn new_with_emulator_builds_and_exposes_container() {
let p = AzureStorageProvider::new_with_emulator("emu-container")
.expect("construct with emulator");
assert_eq!(p.container(), "emu-container");
}
#[test]
fn from_object_store_preserves_container() {
let store = MicrosoftAzureBuilder::new()
.with_endpoint("http://127.0.0.1:1".to_string())
.with_container_name("hatch-container")
.with_account("devstoreaccount1")
.with_access_key("dGVzdC1rZXk=")
.with_allow_http(true)
.build()
.expect("build MicrosoftAzure");
let p = AzureStorageProvider::from_object_store("hatch-container", store);
assert_eq!(p.container(), "hatch-container");
}
#[test]
fn debug_impl_does_not_panic() {
let p = test_provider();
let s = format!("{p:?}");
assert!(s.contains("AzureStorageProvider"));
}
#[test]
fn normalize_prefix_trims_surrounding_slashes() {
assert_eq!(normalize_prefix("/tbl/"), "tbl");
assert_eq!(normalize_prefix("///a/b///"), "a/b");
assert_eq!(normalize_prefix("plain"), "plain");
assert_eq!(normalize_prefix(""), "");
}
#[test]
fn key_without_prefix_strips_leading_slash() {
let p = test_provider();
assert_eq!(p.prefix(), "");
assert_eq!(p.key("/foo/bar"), "foo/bar");
assert_eq!(p.key("foo/bar"), "foo/bar");
}
#[test]
fn key_with_prefix_prepends_and_strips_leading_slash() {
let mut p = test_provider();
p.prefix = "tbl".into();
assert_eq!(p.prefix(), "tbl");
assert_eq!(p.key("data/seg-1"), "tbl/data/seg-1");
assert_eq!(p.key("/data/seg-1"), "tbl/data/seg-1");
}
#[test]
fn path_with_prefix_parses_under_prefix() {
let mut p = test_provider();
p.prefix = "tbl".into();
let parsed = p.path("data/seg-1").expect("parse");
assert_eq!(parsed.to_string(), "tbl/data/seg-1");
}
#[test]
fn object_store_handle_returns_path_under_prefix() {
let mut p = test_provider();
p.prefix = "tbl".into();
let (_, path) = p
.object_store_handle("data/seg-1")
.expect("handle for valid uri");
assert_eq!(path.to_string(), "tbl/data/seg-1");
}
}