pub mod auth;
pub(crate) mod sas;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use azure_core::http::headers::{HeaderName, Headers};
use azure_core::http::request::RequestContent;
use azure_core::http::{ClientOptions, Transport};
use azure_storage_blob::clients::{
BlobClient, BlobContainerClient, BlobContainerClientOptions, BlockBlobClient,
};
use azure_storage_blob::models::method_options::BlockBlobClientUploadOptions;
use azure_storage_blob::models::{
BlobClientDeleteOptions, BlobClientDownloadOptions, BlobClientGetPropertiesOptions,
BlobContainerClientListBlobsOptions, BlockBlobClientCommitBlockListOptions, BlockLookupList,
};
use azure_storage_blob::stream::tokio::FileStream;
use bytes::Bytes;
use futures::StreamExt;
use tempfile::NamedTempFile;
use time::OffsetDateTime;
use tokio::io::AsyncWriteExt;
use tokio::sync::Semaphore;
use tokio::task::JoinSet;
use url::Url;
use crate::url::{AzureAddressing, RemoteUrl};
use super::error::{network_boxed, other_boxed};
use super::multipart::{
AZURE_MAX_BLOCKS, MULTIPART_PUT_MAX_CONCURRENCY, MULTIPART_PUT_PART_SIZE, UploadPart,
plan_upload_parts, read_file_part, should_use_multipart, slice_bytes_part,
};
use super::{
GetOpts, ObjectMeta, ObjectStore, ObjectStoreError, ProgressSink, PutOpts, persist_temp,
};
pub(crate) const SINGLE_PUT_BLOB_LIMIT_BYTES: u64 = 5_000 * (1 << 20);
pub(crate) const POOL_IDLE_TIMEOUT: Duration = Duration::from_secs(30);
pub(crate) const TCP_KEEPALIVE: Duration = Duration::from_secs(30);
pub(crate) const CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
pub(crate) const READ_TIMEOUT: Duration = Duration::from_secs(30);
pub struct AzureStore {
container: BlobContainerClient,
container_name: String,
sas_signing: Option<auth::SasSigningKey>,
}
impl std::fmt::Debug for AzureStore {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AzureStore")
.field("endpoint", &self.container.endpoint().as_str())
.field("container", &self.container_name)
.field("sas_signing", &self.sas_signing)
.finish()
}
}
impl AzureStore {
#[allow(clippy::unused_async)]
pub async fn from_remote_url(url: &RemoteUrl) -> Result<Self, ObjectStoreError> {
let RemoteUrl::Azure {
endpoint,
account,
container,
addressing,
flags,
..
} = url
else {
return Err(ObjectStoreError::Other(
format!("AzureStore::from_remote_url called with non-Azure URL: {url}").into(),
));
};
let account_url = build_account_url(endpoint, account, *addressing);
let resolved = auth::resolve(account, flags)?;
let sas_signing = resolved.sas_signing_key.clone();
let client_options = build_client_options(&resolved)?;
let container_options = BlobContainerClientOptions {
client_options,
..Default::default()
};
let container_client = BlobContainerClient::new(
&account_url,
container,
resolved.token_credential,
Some(container_options),
)
.map_err(other_boxed)?;
Ok(Self {
container: container_client,
container_name: container.clone(),
sas_signing,
})
}
fn blob_client(&self, key: &str) -> BlobClient {
self.container.blob_client(key)
}
pub(crate) async fn probe(&self, prefix: &str) -> Result<(), ObjectStoreError> {
let prefix_opt = (!prefix.is_empty()).then(|| prefix.to_owned());
let opts = BlobContainerClientListBlobsOptions {
prefix: prefix_opt,
maxresults: Some(1),
..Default::default()
};
let mut pages = self
.container
.list_blobs(Some(opts))
.map_err(|e| classify(e, prefix))?
.into_pages();
if let Some(page_result) = pages.next().await {
page_result.map_err(|e| classify(e, prefix))?;
}
Ok(())
}
}
pub(crate) fn build_http_client() -> Result<Arc<reqwest::Client>, ObjectStoreError> {
reqwest::Client::builder()
.pool_idle_timeout(POOL_IDLE_TIMEOUT)
.tcp_keepalive(TCP_KEEPALIVE)
.connect_timeout(CONNECT_TIMEOUT)
.read_timeout(READ_TIMEOUT)
.build()
.map(Arc::new)
.map_err(other_boxed)
}
pub(crate) fn build_client_options(
resolved: &auth::ResolvedCredentials,
) -> Result<ClientOptions, ObjectStoreError> {
let mut opts = ClientOptions {
transport: Some(Transport::new(build_http_client()?)),
..Default::default()
};
if let Some(policy) = &resolved.per_try_policy {
opts.per_try_policies.push(Arc::clone(policy));
}
Ok(opts)
}
pub(crate) fn build_account_url(
endpoint: &Url,
account: &str,
addressing: AzureAddressing,
) -> String {
let mut rewritten = endpoint.clone();
rewritten.set_query(None);
rewritten.set_fragment(None);
let path = match addressing {
AzureAddressing::VirtualHosted => "/".to_owned(),
AzureAddressing::PathStyle => format!("/{account}"),
};
rewritten.set_path(&path);
rewritten.to_string()
}
fn classify(err: azure_core::Error, key: &str) -> ObjectStoreError {
if let azure_core::error::ErrorKind::HttpResponse {
status, error_code, ..
} = err.kind()
&& let Some(mapped) =
classify_status_and_code(u16::from(*status), error_code.as_deref(), key)
{
return mapped;
}
if matches!(err.kind(), azure_core::error::ErrorKind::Io) {
return network_boxed(err);
}
other_boxed(err)
}
fn classify_status_and_code(
status: u16,
code: Option<&str>,
key: &str,
) -> Option<ObjectStoreError> {
match status {
404 => return Some(ObjectStoreError::NotFound(key.to_owned())),
403 => return Some(ObjectStoreError::AccessDenied(key.to_owned())),
412 => return Some(ObjectStoreError::PreconditionFailed(key.to_owned())),
409 => return Some(ObjectStoreError::Conflict(key.to_owned())),
413 => {
return Some(ObjectStoreError::PayloadTooLarge {
limit_bytes: SINGLE_PUT_BLOB_LIMIT_BYTES,
});
}
_ => {}
}
match code {
Some("RequestBodyTooLarge") => Some(ObjectStoreError::PayloadTooLarge {
limit_bytes: SINGLE_PUT_BLOB_LIMIT_BYTES,
}),
_ => None,
}
}
fn properties_to_meta(
key: &str,
content_length: Option<u64>,
last_modified: Option<OffsetDateTime>,
etag: Option<&str>,
) -> Result<ObjectMeta, ObjectStoreError> {
let size = content_length.ok_or_else(|| {
ObjectStoreError::Other(
format!("get_properties on `{key}` returned no content-length").into(),
)
})?;
let last_modified = last_modified.ok_or_else(|| {
ObjectStoreError::Other(
format!("get_properties on `{key}` returned no last-modified").into(),
)
})?;
Ok(ObjectMeta {
key: key.to_owned(),
size,
last_modified,
etag: etag.map(str::to_owned),
})
}
fn item_to_meta(
name: Option<&str>,
content_length: Option<u64>,
last_modified: Option<OffsetDateTime>,
etag: Option<&str>,
) -> Result<ObjectMeta, ObjectStoreError> {
let key = name
.ok_or_else(|| ObjectStoreError::Other("list_blobs returned a blob without a name".into()))?
.to_owned();
let size = content_length.unwrap_or(0);
let last_modified = last_modified.ok_or_else(|| {
ObjectStoreError::Other(
format!("list_blobs returned blob `{key}` without last_modified").into(),
)
})?;
Ok(ObjectMeta {
key,
size,
last_modified,
etag: etag.map(str::to_owned),
})
}
#[async_trait::async_trait]
impl ObjectStore for AzureStore {
async fn list(&self, prefix: &str) -> Result<Vec<ObjectMeta>, ObjectStoreError> {
let prefix_opt = (!prefix.is_empty()).then(|| prefix.to_owned());
let opts = BlobContainerClientListBlobsOptions {
prefix: prefix_opt,
..Default::default()
};
let mut pages = self
.container
.list_blobs(Some(opts))
.map_err(|e| classify(e, prefix))?
.into_pages();
let mut out = Vec::new();
while let Some(page_result) = pages.next().await {
let response = page_result.map_err(|e| classify(e, prefix))?;
let body = response
.into_body()
.xml::<azure_storage_blob::models::ListBlobsResponse>()
.map_err(|e| classify(e, prefix))?;
for item in body.segment.blob_items {
let props = item.properties.unwrap_or_default();
let meta = item_to_meta(
item.name.as_deref(),
props.content_length,
props.last_modified,
None,
)?;
out.push(meta);
}
}
Ok(out)
}
async fn get_to_file(
&self,
key: &str,
dest: &Path,
opts: GetOpts,
) -> Result<(), ObjectStoreError> {
let parent = dest.parent().ok_or_else(|| {
ObjectStoreError::Other(
format!("destination `{}` has no parent directory", dest.display()).into(),
)
})?;
let progress = opts.progress.as_ref();
match self.head_then_download(key, dest, parent, progress).await {
Err(ObjectStoreError::PreconditionFailed(_)) => {
tracing::warn!(key, "blob changed between head and GET; retrying");
self.head_then_download(key, dest, parent, progress).await
}
other => other,
}
}
async fn get_bytes(&self, key: &str) -> Result<Bytes, ObjectStoreError> {
let blob = self.blob_client(key);
let result = blob.download(None).await.map_err(|e| classify(e, key))?;
let bytes = result.body.collect().await.map_err(network_boxed)?;
Ok(bytes)
}
async fn get_bytes_range(
&self,
key: &str,
range: std::ops::Range<u64>,
) -> Result<Bytes, ObjectStoreError> {
const _USIZE_AT_LEAST_64_BIT: () =
assert!(usize::BITS >= 64, "Azure backend requires 64-bit usize");
if let Some(empty) = super::precheck_range(key, &range)? {
return Ok(empty);
}
let sdk_start = usize::try_from(range.start).expect("invariant: usize is at least 64 bits");
let sdk_end = usize::try_from(range.end).expect("invariant: usize is at least 64 bits");
let opts = BlobClientDownloadOptions {
range: Some(sdk_start..sdk_end),
..Default::default()
};
let blob = self.blob_client(key);
let result = match blob.download(Some(opts)).await {
Ok(result) => result,
Err(err) => {
if let azure_core::error::ErrorKind::HttpResponse { status, .. } = err.kind()
&& u16::from(*status) == 416
{
return Err(ObjectStoreError::RangeNotSatisfiable {
key: key.to_owned(),
requested: range,
});
}
return Err(classify(err, key));
}
};
let bytes = result.body.collect().await.map_err(network_boxed)?;
super::verify_range_response_length(key, &range, bytes)
}
async fn put_bytes(
&self,
key: &str,
body: Bytes,
opts: PutOpts,
) -> Result<(), ObjectStoreError> {
let size = body.len() as u64;
if should_use_multipart(size) {
return self.multipart_put_bytes(key, body, size, opts).await;
}
let progress = opts.progress.clone();
let blob = self.blob_client(key);
let upload_opts = upload_options_from(opts);
blob.upload(bytes_to_request_content(body), Some(upload_opts))
.await
.map_err(|e| classify(e, key))?;
if let Some(sink) = progress
&& size > 0
{
sink.report(size);
}
Ok(())
}
async fn put_path(&self, key: &str, src: &Path, opts: PutOpts) -> Result<(), ObjectStoreError> {
let file = tokio::fs::File::open(src).await.map_err(other_boxed)?;
let body_len = file.metadata().await.map_err(other_boxed)?.len();
if should_use_multipart(body_len) {
return self.multipart_put_path(key, file, body_len, opts).await;
}
let stream = FileStream::builder(file)
.build()
.await
.map_err(other_boxed)?;
let body: azure_core::http::Body = stream.into();
let blob = self.blob_client(key);
let progress = opts.progress.clone();
let upload_opts = upload_options_from(opts);
blob.upload(body.into(), Some(upload_opts))
.await
.map_err(|e| classify(e, key))?;
if let Some(sink) = progress
&& body_len > 0
{
sink.report(body_len);
}
Ok(())
}
async fn put_if_absent(&self, key: &str, body: Bytes) -> Result<bool, ObjectStoreError> {
let blob = self.blob_client(key);
let upload_opts = BlockBlobClientUploadOptions::default().with_if_not_exists();
let resp = blob
.upload(bytes_to_request_content(body), Some(upload_opts))
.await;
match resp.map_err(|e| classify(e, key)) {
Ok(_) => Ok(true),
Err(ObjectStoreError::PreconditionFailed(_) | ObjectStoreError::Conflict(_)) => {
Ok(false)
}
Err(other) => Err(other),
}
}
async fn head(&self, key: &str) -> Result<ObjectMeta, ObjectStoreError> {
let blob = self.blob_client(key);
let resp = blob
.get_properties(None::<BlobClientGetPropertiesOptions<'_>>)
.await
.map_err(|e| classify(e, key))?;
let headers = resp.headers();
properties_to_meta(
key,
header_u64(headers, &HeaderName::from_static("content-length")),
header_http_date(headers, &HeaderName::from_static("last-modified")),
headers.get_optional_str(&HeaderName::from_static("etag")),
)
}
async fn copy(&self, src: &str, dst: &str) -> Result<(), ObjectStoreError> {
let temp = NamedTempFile::new().map_err(other_boxed)?;
self.get_to_file(src, temp.path(), GetOpts::default())
.await?;
match self.put_path(dst, temp.path(), PutOpts::default()).await {
Ok(()) => Ok(()),
Err(ObjectStoreError::NotFound(_)) => Err(ObjectStoreError::Other(
format!("copy `{src}` → `{dst}`: upload returned NotFound").into(),
)),
Err(other) => Err(other),
}
}
async fn delete(&self, key: &str) -> Result<(), ObjectStoreError> {
let blob = self.blob_client(key);
blob.delete(None::<BlobClientDeleteOptions<'_>>)
.await
.map_err(|e| classify(e, key))?;
Ok(())
}
async fn presigned_get_url(
&self,
key: &str,
ttl: std::time::Duration,
) -> Result<String, ObjectStoreError> {
let signing = self.sas_signing.as_ref().ok_or_else(|| {
ObjectStoreError::Unsupported(
"Azure presigned URLs require a shared account key (KEY env var or \
connection string); SAS-env-var and Entra-ID credentials cannot \
derive per-blob SAS"
.to_owned(),
)
})?;
let blob = self.blob_client(key);
let base = blob.url();
sas::build_blob_sas_url(base, &self.container_name, key, signing, ttl)
}
}
impl AzureStore {
async fn head_then_download(
&self,
key: &str,
dest: &Path,
parent: &Path,
progress: Option<&ProgressSink>,
) -> Result<(), ObjectStoreError> {
let meta = self.head(key).await?;
let temp = NamedTempFile::new_in(parent).map_err(other_boxed)?;
if meta.size == 0 {
return persist_temp(temp, dest);
}
self.download_streaming(key, temp.path(), meta.etag.as_deref(), progress)
.await?;
persist_temp(temp, dest)
}
async fn download_streaming(
&self,
key: &str,
temp_path: &Path,
etag: Option<&str>,
progress: Option<&ProgressSink>,
) -> Result<(), ObjectStoreError> {
let blob = self.blob_client(key);
let mut opts = BlobClientDownloadOptions::default();
if let Some(etag) = etag {
opts.if_match = Some(etag.to_owned());
}
let mut result = blob
.download(Some(opts))
.await
.map_err(|e| classify(e, key))?;
let mut file = tokio::fs::OpenOptions::new()
.write(true)
.truncate(true)
.open(temp_path)
.await
.map_err(other_boxed)?;
while let Some(chunk) = result.body.next().await {
let bytes = chunk.map_err(network_boxed)?;
let chunk_len = bytes.len() as u64;
file.write_all(&bytes).await.map_err(other_boxed)?;
if let Some(sink) = progress
&& chunk_len > 0
{
sink.report(chunk_len);
}
}
file.flush().await.map_err(other_boxed)?;
Ok(())
}
async fn multipart_put_bytes(
&self,
key: &str,
body: Bytes,
size: u64,
opts: PutOpts,
) -> Result<(), ObjectStoreError> {
let parts = plan_upload_parts(size, MULTIPART_PUT_PART_SIZE, AZURE_MAX_BLOCKS);
let progress = opts.progress.clone();
let staged = self
.stage_blocks_with_bodies(key, &parts, progress, |part| slice_bytes_part(&body, part))
.await?;
let blob = self.blob_client(key).block_blob_client();
commit_block_list(&blob, key, staged, opts).await
}
async fn multipart_put_path(
&self,
key: &str,
file: tokio::fs::File,
size: u64,
opts: PutOpts,
) -> Result<(), ObjectStoreError> {
let parts = plan_upload_parts(size, MULTIPART_PUT_PART_SIZE, AZURE_MAX_BLOCKS);
let progress = opts.progress.clone();
let file: Arc<std::fs::File> = Arc::new(file.into_std().await);
let staged = self
.stage_blocks_from_file(key, file, &parts, progress)
.await?;
let blob = self.blob_client(key).block_blob_client();
commit_block_list(&blob, key, staged, opts).await
}
async fn stage_blocks_with_bodies<F>(
&self,
key: &str,
parts: &[UploadPart],
progress: Option<ProgressSink>,
make_body: F,
) -> Result<Vec<Vec<u8>>, ObjectStoreError>
where
F: Fn(UploadPart) -> Result<Bytes, ObjectStoreError>,
{
let semaphore = Arc::new(Semaphore::new(MULTIPART_PUT_MAX_CONCURRENCY));
let mut tasks: JoinSet<Result<(usize, Vec<u8>), ObjectStoreError>> = JoinSet::new();
for (idx, part) in parts.iter().enumerate() {
let part = *part;
let part_index = idx;
let block_id = block_id_for(idx);
let body = make_body(part)?;
let blob = self.blob_client(key).block_blob_client();
let key = key.to_owned();
let semaphore = Arc::clone(&semaphore);
let progress = progress.clone();
tasks.spawn(async move {
let _permit = semaphore.acquire_owned().await.map_err(other_boxed)?;
blob.stage_block(&block_id, part.length, bytes_to_request_content(body), None)
.await
.map_err(|e| classify(e, &key))?;
if let Some(sink) = &progress {
sink.report(part.length);
}
Ok((part_index, block_id))
});
}
join_staged_blocks(tasks, parts.len()).await
}
async fn stage_blocks_from_file(
&self,
key: &str,
file: Arc<std::fs::File>,
parts: &[UploadPart],
progress: Option<ProgressSink>,
) -> Result<Vec<Vec<u8>>, ObjectStoreError> {
let semaphore = Arc::new(Semaphore::new(MULTIPART_PUT_MAX_CONCURRENCY));
let mut tasks: JoinSet<Result<(usize, Vec<u8>), ObjectStoreError>> = JoinSet::new();
for (idx, part) in parts.iter().enumerate() {
let part = *part;
let part_index = idx;
let block_id = block_id_for(idx);
let blob = self.blob_client(key).block_blob_client();
let key = key.to_owned();
let task_file = Arc::clone(&file);
let semaphore = Arc::clone(&semaphore);
let progress = progress.clone();
tasks.spawn(async move {
let _permit = semaphore.acquire_owned().await.map_err(other_boxed)?;
let body = read_file_part(task_file, part).await?;
blob.stage_block(&block_id, part.length, bytes_to_request_content(body), None)
.await
.map_err(|e| classify(e, &key))?;
if let Some(sink) = &progress {
sink.report(part.length);
}
Ok((part_index, block_id))
});
}
join_staged_blocks(tasks, parts.len()).await
}
}
fn block_id_for(idx: usize) -> Vec<u8> {
format!("{:032}", idx + 1).into_bytes()
}
async fn join_staged_blocks(
mut tasks: JoinSet<Result<(usize, Vec<u8>), ObjectStoreError>>,
expected: usize,
) -> Result<Vec<Vec<u8>>, ObjectStoreError> {
let mut staged: Vec<Option<Vec<u8>>> = (0..expected).map(|_| None).collect();
while let Some(joined) = tasks.join_next().await {
let (idx, block_id) = joined.map_err(other_boxed)??;
staged[idx] = Some(block_id);
}
staged
.into_iter()
.enumerate()
.map(|(idx, slot)| {
slot.ok_or_else(|| {
ObjectStoreError::Other(
format!("internal: stage_block task for part {idx} did not return").into(),
)
})
})
.collect()
}
async fn commit_block_list(
blob: &BlockBlobClient,
key: &str,
block_ids: Vec<Vec<u8>>,
opts: PutOpts,
) -> Result<(), ObjectStoreError> {
let block_list = BlockLookupList {
latest: Some(block_ids),
..Default::default()
};
let body: RequestContent<_, _> = block_list.try_into().map_err(other_boxed)?;
let (cd, metadata) = put_opts_blob_fields(opts);
let commit_opts = BlockBlobClientCommitBlockListOptions {
blob_content_disposition: cd,
metadata,
..Default::default()
};
blob.commit_block_list(body, Some(commit_opts))
.await
.map_err(|e| classify(e, key))?;
Ok(())
}
fn bytes_to_request_content<F>(body: Bytes) -> RequestContent<Bytes, F>
where
Bytes: Into<RequestContent<Bytes, F>>,
{
body.into()
}
fn put_opts_blob_fields(
opts: PutOpts,
) -> (
Option<String>,
Option<std::collections::HashMap<String, String>>,
) {
let metadata = (!opts.user_metadata.is_empty()).then(|| {
opts.user_metadata
.into_iter()
.collect::<std::collections::HashMap<_, _>>()
});
(opts.content_disposition, metadata)
}
fn upload_options_from(opts: PutOpts) -> BlockBlobClientUploadOptions<'static> {
let (cd, metadata) = put_opts_blob_fields(opts);
BlockBlobClientUploadOptions {
blob_content_disposition: cd,
metadata,
..Default::default()
}
}
fn header_u64(headers: &Headers, name: &HeaderName) -> Option<u64> {
headers.get_optional_str(name).and_then(|s| s.parse().ok())
}
fn header_http_date(headers: &Headers, name: &HeaderName) -> Option<OffsetDateTime> {
let raw = headers.get_optional_str(name)?;
OffsetDateTime::parse(raw, &time::format_description::well_known::Rfc2822).ok()
}
#[cfg(test)]
mod tests {
use super::*;
use crate::url::{AzureAddressing, RemoteFlags};
fn parse_endpoint(s: &str) -> Url {
Url::parse(s).expect("test endpoint URL parses")
}
fn s3_url() -> RemoteUrl {
RemoteUrl::S3 {
endpoint: parse_endpoint("https://my-bucket.s3.us-west-2.amazonaws.com/"),
bucket: "my-bucket".to_owned(),
prefix: None,
addressing: crate::url::S3Addressing::VirtualHosted,
flags: RemoteFlags::default(),
}
}
#[test]
fn build_account_url_virtual_hosted_strips_path() {
let url = parse_endpoint("https://acct.blob.core.windows.net/my-container/some/prefix");
let out = build_account_url(&url, "acct", AzureAddressing::VirtualHosted);
assert_eq!(out, "https://acct.blob.core.windows.net/");
}
#[test]
fn build_account_url_path_style_keeps_account() {
let url = parse_endpoint("http://127.0.0.1:10000/devstoreaccount1/my-container/repo");
let out = build_account_url(&url, "devstoreaccount1", AzureAddressing::PathStyle);
assert_eq!(out, "http://127.0.0.1:10000/devstoreaccount1");
}
#[test]
fn build_account_url_strips_query_and_fragment() {
let url = parse_endpoint("https://acct.blob.core.windows.net/c/r?credential=foo#frag");
let out = build_account_url(&url, "acct", AzureAddressing::VirtualHosted);
assert_eq!(out, "https://acct.blob.core.windows.net/");
}
#[test]
fn classify_404_is_not_found() {
assert!(matches!(
classify_status_and_code(404, None, "k"),
Some(ObjectStoreError::NotFound(s)) if s == "k"
));
}
#[test]
fn classify_403_is_access_denied() {
assert!(matches!(
classify_status_and_code(403, None, "k"),
Some(ObjectStoreError::AccessDenied(s)) if s == "k"
));
}
#[test]
fn classify_412_is_precondition_failed() {
assert!(matches!(
classify_status_and_code(412, None, "k"),
Some(ObjectStoreError::PreconditionFailed(s)) if s == "k"
));
}
#[test]
fn classify_409_is_conflict() {
assert!(matches!(
classify_status_and_code(409, None, "k"),
Some(ObjectStoreError::Conflict(s)) if s == "k"
));
}
#[test]
fn classify_413_is_payload_too_large() {
assert!(matches!(
classify_status_and_code(413, None, "k"),
Some(ObjectStoreError::PayloadTooLarge { limit_bytes })
if limit_bytes == SINGLE_PUT_BLOB_LIMIT_BYTES
));
}
#[test]
fn classify_request_body_too_large_code_is_payload_too_large() {
assert!(matches!(
classify_status_and_code(400, Some("RequestBodyTooLarge"), "k"),
Some(ObjectStoreError::PayloadTooLarge { limit_bytes })
if limit_bytes == SINGLE_PUT_BLOB_LIMIT_BYTES
));
}
#[test]
fn classify_unrecognised_status_returns_none() {
assert!(classify_status_and_code(500, None, "k").is_none());
assert!(classify_status_and_code(429, None, "k").is_none());
assert!(classify_status_and_code(500, Some("InternalError"), "k").is_none());
}
#[test]
fn properties_to_meta_round_trips_well_formed_response() {
let now = OffsetDateTime::from_unix_timestamp(1_700_000_000).unwrap();
let meta = properties_to_meta("k", Some(42), Some(now), Some("\"abc\""))
.expect("conversion succeeds");
assert_eq!(meta.key, "k");
assert_eq!(meta.size, 42);
assert_eq!(meta.last_modified.unix_timestamp(), 1_700_000_000);
assert_eq!(meta.etag.as_deref(), Some("\"abc\""));
}
#[test]
fn properties_to_meta_preserves_legitimate_zero_size() {
let now = OffsetDateTime::from_unix_timestamp(1_700_000_000).unwrap();
let meta =
properties_to_meta("LOCK", Some(0), Some(now), None).expect("conversion succeeds");
assert_eq!(meta.size, 0);
}
#[test]
fn properties_to_meta_rejects_missing_content_length() {
let now = OffsetDateTime::from_unix_timestamp(1_700_000_000).unwrap();
let err = properties_to_meta("k", None, Some(now), None)
.expect_err("missing content-length must error");
match err {
ObjectStoreError::Other(inner) => {
let msg = inner.to_string();
assert!(msg.contains("no content-length"), "names failure: {msg}");
assert!(msg.contains("`k`"), "includes the key for context: {msg}");
}
other => {
panic!("expected ObjectStoreError::Other for missing content-length, got {other:?}")
}
}
}
#[test]
fn properties_to_meta_rejects_missing_last_modified() {
let err = properties_to_meta("k", Some(0), None, None)
.expect_err("missing last_modified must error");
match err {
ObjectStoreError::Other(inner) => {
let msg = inner.to_string();
assert!(msg.contains("no last-modified"), "names failure: {msg}");
assert!(msg.contains("`k`"), "includes the key for context: {msg}");
}
other => {
panic!("expected ObjectStoreError::Other for missing last_modified, got {other:?}")
}
}
}
#[test]
fn item_to_meta_round_trips_well_formed_item() {
let now = OffsetDateTime::from_unix_timestamp(1_700_000_000).unwrap();
let meta = item_to_meta(Some("k"), Some(42), Some(now), Some("\"abc\"")).unwrap();
assert_eq!(meta.key, "k");
assert_eq!(meta.size, 42);
assert_eq!(meta.last_modified.unix_timestamp(), 1_700_000_000);
assert_eq!(meta.etag.as_deref(), Some("\"abc\""));
}
#[test]
fn item_to_meta_rejects_missing_name() {
let now = OffsetDateTime::from_unix_timestamp(1_700_000_000).unwrap();
let err = item_to_meta(None, Some(0), Some(now), None).unwrap_err();
match err {
ObjectStoreError::Other(inner) => {
assert!(
inner.to_string().contains("without a name"),
"names failure: {inner}"
);
}
other => panic!("expected ObjectStoreError::Other, got {other:?}"),
}
}
#[test]
fn item_to_meta_rejects_missing_last_modified() {
let err = item_to_meta(Some("k"), Some(0), None, None).unwrap_err();
match err {
ObjectStoreError::Other(inner) => {
let msg = inner.to_string();
assert!(
msg.contains("without last_modified"),
"names failure: {msg}"
);
assert!(msg.contains("`k`"), "includes the key: {msg}");
}
other => panic!("expected ObjectStoreError::Other, got {other:?}"),
}
}
#[test]
fn item_to_meta_treats_missing_size_as_zero() {
let now = OffsetDateTime::from_unix_timestamp(1_700_000_000).unwrap();
let meta = item_to_meta(Some("k"), None, Some(now), None).unwrap();
assert_eq!(meta.size, 0);
}
#[test]
fn upload_options_from_default_is_empty() {
let out = upload_options_from(PutOpts::default());
assert!(out.blob_content_disposition.is_none());
assert!(out.metadata.is_none());
}
#[test]
fn upload_options_from_carries_content_disposition() {
let opts = PutOpts {
content_disposition: Some("attachment; filename=x".into()),
user_metadata: Vec::new(),
progress: None,
};
let out = upload_options_from(opts);
let cd: String = out
.blob_content_disposition
.expect("content_disposition should be set");
assert!(cd.contains("attachment"));
}
#[test]
fn upload_options_from_collects_metadata() {
let opts = PutOpts {
content_disposition: None,
user_metadata: vec![("x-foo".into(), "1".into()), ("x-bar".into(), "2".into())],
progress: None,
};
let out = upload_options_from(opts);
let map = out.metadata.expect("metadata set");
assert_eq!(map.get("x-foo").map(String::as_str), Some("1"));
assert_eq!(map.get("x-bar").map(String::as_str), Some("2"));
}
#[tokio::test]
async fn from_remote_url_rejects_s3() {
let result = AzureStore::from_remote_url(&s3_url()).await;
match result {
Err(ObjectStoreError::Other(_)) => {}
Err(other) => panic!("expected ObjectStoreError::Other, got {other:?}"),
Ok(_) => panic!("expected S3 URL to be rejected"),
}
}
#[test]
fn transport_timeout_constants_have_expected_values() {
assert_eq!(POOL_IDLE_TIMEOUT, Duration::from_secs(30));
assert_eq!(TCP_KEEPALIVE, Duration::from_secs(30));
assert_eq!(CONNECT_TIMEOUT, Duration::from_secs(10));
assert_eq!(READ_TIMEOUT, Duration::from_secs(30));
}
#[test]
fn build_http_client_succeeds() {
build_http_client().expect("reqwest client builds with the configured timeouts");
}
#[test]
fn build_client_options_installs_custom_transport() {
let resolved = auth::ResolvedCredentials {
token_credential: None,
per_try_policy: None,
sas_signing_key: None,
};
let opts = build_client_options(&resolved).expect("client options build");
assert!(
opts.transport.is_some(),
"ClientOptions::transport must be Some so the SDK uses our \
pool_idle_timeout / tcp_keepalive client (issue #26)",
);
assert!(
opts.per_try_policies.is_empty(),
"no per-try policy was supplied; the helper must not inject \
a fallback signer of its own",
);
}
#[test]
fn build_client_options_preserves_per_try_policy() {
const AZURITE_KEY: &str = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==";
let policy: Arc<dyn azure_core::http::policies::Policy> = Arc::new(
auth::SharedKeySigningPolicy::new("devstoreaccount1", AZURITE_KEY)
.expect("shared-key policy constructs"),
);
let resolved = auth::ResolvedCredentials {
token_credential: None,
per_try_policy: Some(Arc::clone(&policy)),
sas_signing_key: None,
};
let opts = build_client_options(&resolved).expect("client options build");
assert!(opts.transport.is_some(), "transport still wired");
assert_eq!(
opts.per_try_policies.len(),
1,
"exactly one per-try policy is wired",
);
assert!(
Arc::ptr_eq(&policy, &opts.per_try_policies[0]),
"the policy at index 0 must be the same Arc the caller \
supplied — not a fresh policy constructed inside the helper",
);
}
#[test]
fn should_use_multipart_pins_threshold_boundary() {
use super::super::multipart::MULTIPART_PUT_THRESHOLD;
assert!(!should_use_multipart(MULTIPART_PUT_THRESHOLD - 1));
assert!(should_use_multipart(MULTIPART_PUT_THRESHOLD));
assert!(should_use_multipart(MULTIPART_PUT_THRESHOLD + 1));
assert!(should_use_multipart(6 * (1 << 30)));
}
#[test]
fn block_id_for_is_unique_and_uniform_length() {
let id_a = block_id_for(0);
let id_b = block_id_for(1);
let id_c = block_id_for(99_999);
assert_eq!(id_a.len(), id_b.len(), "all IDs share length");
assert_eq!(id_a.len(), id_c.len(), "even at the upper end");
assert_ne!(id_a, id_b, "two parts get distinct IDs");
assert_eq!(id_a.len(), 32);
}
}