Skip to main content

git_remote_object_store/object_store/
azure.rs

1//! Azure Blob Storage backend for the [`ObjectStore`] trait.
2//!
3//! [`ObjectStore`]: super::ObjectStore
4//!
5//! `AzureStore` wraps `azure_storage_blob`. Like the S3 backend, this
6//! module owns the URL → SDK config translation, the error-code
7//! classifier ([`classify`]), and the credential resolution plumbing.
8//! Unlike S3, the SDK already does parallel range downloads inside
9//! `BlobClient::download()`, so there is no hand-rolled multipart
10//! orchestrator (asymmetric with S3 by design).
11//!
12//! ## Authentication
13//!
14//! The official `azure_storage_blob` 0.12 crate currently exposes only
15//! `Arc<dyn TokenCredential>` (Entra ID) on its constructors. Azurite
16//! does not implement Entra ID without an `--oauth basic` HTTPS setup,
17//! and many production accounts still authenticate with shared keys.
18//! To bridge both, we install our own [`auth::SharedKeySigningPolicy`]
19//! as a per-try [`azure_core::http::policies::Policy`] and pass `None`
20//! for the SDK's `credential` parameter. The SDK then forwards every
21//! request through our policy, which signs the request using the Azure
22//! Storage shared-key v2 scheme. Tracking issue:
23//! `Azure/azure-sdk-for-rust#2975`.
24//!
25//! Resolution order for `?credential=<NAME>` in the URL:
26//!
27//! 1. `AZSTORE_<NAME>_KEY` — base64 account key → shared-key signing.
28//! 2. `AZSTORE_<NAME>_CONNECTION_STRING` — connection string with
29//!    `AccountName=` / `AccountKey=` → shared-key signing.
30//! 3. `AZSTORE_<NAME>_SAS` — SAS query string appended verbatim to
31//!    every outgoing request URL.
32//!
33//! When no `?credential=` flag is set we fall back to
34//! `azure_identity::DeveloperToolsCredential` (env, workload identity,
35//! managed identity, Azure CLI, ...).
36//!
37//! ## Conditional writes
38//!
39//! [`put_if_absent`][super::ObjectStore::put_if_absent] uses
40//! `If-None-Match: "*"` (the SDK's
41//! `BlockBlobClientUploadOptions::with_if_not_exists` convenience).
42//! Azure returns 409 (`BlobAlreadyExists`) or 412
43//! (`ConditionNotMet`) for the contention case; both collapse to
44//! `Ok(false)`.
45//!
46//! ## Atomic `get_to_file`
47//!
48//! Identical to the S3 path: `head` → tempfile → `download(if_match)` →
49//! persist. The SDK's `download()` aggregates parallel range fetches
50//! internally, so no per-chunk semaphore here. A single retry with a
51//! fresh `ETag` covers the head-then-`GET` race (412 mid-download).
52//!
53//! ## `copy(src, dst)`
54//!
55//! `azure_storage_blob` 0.12 does not expose a `BlobClient::copy_from_url`
56//! method (only `BlockBlobClient::upload_blob_from_url`, which requires
57//! a SAS-tokened source URL or an `x-ms-copy-source-authorization`
58//! header — neither integrates cleanly with our credential model). We
59//! implement `copy` as a stream-through-tempfile round trip:
60//! `get_to_file` writes `src` to a `NamedTempFile`, then `put_path`
61//! uploads it to `dst`. Both legs already stream — `get_to_file`
62//! consumes the SDK's chunked download into the file without buffering
63//! the body, and `put_path` switches to our explicit
64//! `stage_block` + `commit_block_list` orchestrator (see
65//! [`AzureStore::multipart_put_path`]) once the body crosses
66//! [`super::multipart::MULTIPART_PUT_THRESHOLD`]. Peak in-flight bytes
67//! are bounded by
68//! [`super::multipart::MULTIPART_PUT_MAX_CONCURRENCY`] ×
69//! [`super::multipart::MULTIPART_PUT_PART_SIZE`] regardless of blob
70//! size, which matters for `manage doctor`'s duplicate-bundle
71//! quarantine path ([`crate::manage::doctor::Doctor::evict_losing_bundle`])
72//! — that path can copy multi-GiB bundles. Zero-byte lock files still
73//! round-trip fast: `get_to_file` short-circuits the GET on `size == 0`
74//! and `put_path` issues a single zero-byte `Put Blob`. Body is
75//! preserved; user metadata is not propagated, matching the S3 backend's
76//! `CopyObject` path which similarly carries only body bytes.
77//!
78//! This is asymmetric with the S3 backend, which uses `CopyObject` for
79//! a true server-side copy — Azure's equivalent (`Copy Blob`,
80//! `Put Blob From URL`) requires a SAS-signed source URL or an
81//! `x-ms-copy-source-authorization` header that the 0.12 SDK does not
82//! ergonomically expose. The download+reupload path is the safe
83//! correct fallback until the SDK closes that gap.
84//!
85//! ## A note on `Range` and zero-byte blobs
86//!
87//! A `Range` request against a zero-byte blob returns HTTP 416. We
88//! never issue Range requests directly — `BlobClient::download()`
89//! owns that — but the zero-size short-circuit in
90//! [`get_to_file`](ObjectStore::get_to_file) also avoids any download
91//! SDK call against a known-empty blob, which sidesteps the issue
92//! entirely.
93//!
94//! ## Size limits
95//!
96//! Azure caps a block blob at 50 000 committed blocks (~4.75 TiB at
97//! the SDK's default block size) and a single `Put Blob` body at
98//! 5000 MiB; above [`super::multipart::MULTIPART_PUT_THRESHOLD`] the
99//! helper switches to explicit `stage_block` + `commit_block_list`,
100//! so callers do not have to reason about the single-call cutoff.
101//! The upload path is **not resumable** across process death — see
102//! the README "Known limitations" section.
103//!
104//! ## HTTP transport tuning
105//!
106//! `azure_core` 0.35's default transport keeps idle pooled connections
107//! forever and never sets TCP keepalive, so a pooled connection to a
108//! rotated VIP would hang an in-flight request until the OS-level TCP
109//! retransmit timeout fires (~15 minutes on Linux). [`AzureStore`]
110//! installs a custom [`reqwest::Client`] via [`Transport`] on
111//! [`ClientOptions::transport`] with four bounds:
112//!
113//! - [`POOL_IDLE_TIMEOUT`] (30 s) — drops idle pooled connections
114//!   before a typical DNS rotation makes them stale.
115//! - [`TCP_KEEPALIVE`] (30 s) — detects a dead-but-not-closed TCP
116//!   session in seconds rather than the 2-hour Linux default; covers
117//!   *hot* pooled connections that pool-idle alone cannot.
118//! - [`CONNECT_TIMEOUT`] (10 s) — bounds a fresh-connect attempt to
119//!   a dead VIP rather than waiting on the OS connect timeout.
120//! - [`READ_TIMEOUT`] (30 s) — per-read timeout that resets after a
121//!   successful read, so a stuck transfer fails fast without limiting
122//!   total body size.
123//!
124//! Together these cap a DNS-rotation hang at tens of seconds rather
125//! than minutes. The custom transport leaves
126//! [`ClientOptions::per_try_policies`] (where the shared-key signing
127//! lives) untouched — the SDK pipeline runs per-try policies
128//! independently of the transport. Tracking issue: #26.
129//!
130//! ## Stdout discipline
131//!
132//! Per `.claude/rules/protocol-stdout.md`, this module never writes to
133//! stdout. Diagnostics go through `tracing` (which the helper binaries
134//! configure to write to stderr).
135
136pub mod auth;
137pub(crate) mod sas;
138
139use std::path::Path;
140use std::sync::Arc;
141use std::time::Duration;
142
143use azure_core::http::headers::{HeaderName, Headers};
144use azure_core::http::request::RequestContent;
145use azure_core::http::{ClientOptions, Transport};
146use azure_storage_blob::clients::{
147    BlobClient, BlobContainerClient, BlobContainerClientOptions, BlockBlobClient,
148};
149use azure_storage_blob::models::method_options::BlockBlobClientUploadOptions;
150use azure_storage_blob::models::{
151    BlobClientDeleteOptions, BlobClientDownloadOptions, BlobClientGetPropertiesOptions,
152    BlobContainerClientListBlobsOptions, BlockBlobClientCommitBlockListOptions, BlockLookupList,
153};
154use azure_storage_blob::stream::tokio::FileStream;
155use bytes::Bytes;
156use futures::StreamExt;
157use tempfile::NamedTempFile;
158use time::OffsetDateTime;
159use tokio::io::AsyncWriteExt;
160use tokio::sync::Semaphore;
161use tokio::task::JoinSet;
162use url::Url;
163
164use crate::url::{AzureAddressing, RemoteUrl};
165
166use super::error::{network_boxed, other_boxed};
167use super::multipart::{
168    AZURE_MAX_BLOCKS, MULTIPART_PUT_MAX_CONCURRENCY, MULTIPART_PUT_PART_SIZE, UploadPart,
169    plan_upload_parts, read_file_part, should_use_multipart, slice_bytes_part,
170};
171use super::{
172    GetOpts, ObjectMeta, ObjectStore, ObjectStoreError, ProgressSink, PutOpts, persist_temp,
173};
174
175/// Azure Blob's hard ceiling on a single Put Blob body for the wire
176/// versions we negotiate (2019-12-12+). Reported in
177/// [`ObjectStoreError::PayloadTooLarge`] when the SDK surfaces HTTP 413
178/// or `RequestBodyTooLarge`, so the wire-line names a concrete number
179/// rather than dumping an opaque SDK chain.
180pub(crate) const SINGLE_PUT_BLOB_LIMIT_BYTES: u64 = 5_000 * (1 << 20);
181
182/// Bound on how long an idle pooled HTTPS connection lingers before
183/// the [`reqwest`] connection pool drops it. Short enough that DNS
184/// rotation rarely hits a stale pooled connection; long enough that
185/// bursty fetch / push batches still benefit from connection reuse.
186/// See module-level "HTTP transport tuning" docs and issue #26.
187pub(crate) const POOL_IDLE_TIMEOUT: Duration = Duration::from_secs(30);
188
189/// TCP keepalive interval for the custom [`reqwest`] transport.
190/// Detects dead-but-not-closed sessions in seconds rather than the
191/// 2-hour Linux default. See module-level "HTTP transport tuning"
192/// docs and issue #26.
193pub(crate) const TCP_KEEPALIVE: Duration = Duration::from_secs(30);
194
195/// Bound on a fresh TCP-connect attempt. `reqwest` defaults to no
196/// connect timeout, so an unreachable IP would otherwise wait on the
197/// OS-level connect timeout (~75 s on Linux defaults). 10 s is
198/// comfortable for an in-region or even cross-region handshake while
199/// failing fast on a dead VIP. See module-level "HTTP transport
200/// tuning" docs and issue #26.
201pub(crate) const CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
202
203/// Per-read timeout for the custom [`reqwest`] transport. Resets after
204/// each successful read, so it caps how long a stuck connection can
205/// hold a transfer without limiting total body size. Sized to match
206/// [`POOL_IDLE_TIMEOUT`] / [`TCP_KEEPALIVE`] so a single rotation
207/// budget covers all three knobs. See module-level "HTTP transport
208/// tuning" docs and issue #26.
209pub(crate) const READ_TIMEOUT: Duration = Duration::from_secs(30);
210
211/// Production [`ObjectStore`] backed by `azure_storage_blob`.
212pub struct AzureStore {
213    container: BlobContainerClient,
214    /// Container name as parsed from the URL — needed by SAS-token
215    /// construction (issue #76) because the SDK's
216    /// `BlobContainerClient::container_name()` is private. Held
217    /// regardless of credential type so the field shape doesn't
218    /// branch on whether SAS is reachable.
219    container_name: String,
220    /// Storage-key material for service-blob SAS generation
221    /// ([`presigned_get_url`](ObjectStore::presigned_get_url)).
222    /// `Some` when the credential alias resolves to a shared
223    /// account key (KEY env var or connection string); `None` for
224    /// SAS-env-var or Entra-ID paths, which return
225    /// [`ObjectStoreError::Unsupported`] for presigning.
226    sas_signing: Option<auth::SasSigningKey>,
227}
228
229impl std::fmt::Debug for AzureStore {
230    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
231        // `BlobContainerClient` is opaque (private fields, no `Debug`);
232        // surface the endpoint instead so error / log lines remain
233        // useful.
234        f.debug_struct("AzureStore")
235            .field("endpoint", &self.container.endpoint().as_str())
236            .field("container", &self.container_name)
237            .field("sas_signing", &self.sas_signing)
238            .finish()
239    }
240}
241
242impl AzureStore {
243    /// Build an `AzureStore` from a parsed [`RemoteUrl`].
244    ///
245    /// Like the S3 backend, the [`RemoteUrl::Azure::prefix`] field is
246    /// intentionally **not** consumed here; callers compose it into keys
247    /// themselves.
248    ///
249    /// Marked `async` for symmetry with `S3Store::from_remote_url`,
250    /// which awaits the AWS provider chain. The Azure path resolves
251    /// credentials synchronously today; the signature stays `async` so
252    /// future credential providers (e.g. one that fetches an OIDC
253    /// token at construction) can plug in without breaking callers.
254    ///
255    /// # Errors
256    ///
257    /// Returns [`ObjectStoreError::Other`] if `url` is not the Azure
258    /// variant or if credential resolution fails.
259    #[allow(clippy::unused_async)]
260    pub async fn from_remote_url(url: &RemoteUrl) -> Result<Self, ObjectStoreError> {
261        let RemoteUrl::Azure {
262            endpoint,
263            account,
264            container,
265            addressing,
266            flags,
267            ..
268        } = url
269        else {
270            return Err(ObjectStoreError::Other(
271                format!("AzureStore::from_remote_url called with non-Azure URL: {url}").into(),
272            ));
273        };
274
275        let account_url = build_account_url(endpoint, account, *addressing);
276        let resolved = auth::resolve(account, flags)?;
277        let sas_signing = resolved.sas_signing_key.clone();
278
279        let client_options = build_client_options(&resolved)?;
280
281        let container_options = BlobContainerClientOptions {
282            client_options,
283            ..Default::default()
284        };
285
286        let container_client = BlobContainerClient::new(
287            &account_url,
288            container,
289            resolved.token_credential,
290            Some(container_options),
291        )
292        .map_err(other_boxed)?;
293
294        Ok(Self {
295            container: container_client,
296            container_name: container.clone(),
297            sas_signing,
298        })
299    }
300
301    /// Construct a [`BlobClient`] for an individual blob.
302    fn blob_client(&self, key: &str) -> BlobClient {
303        self.container.blob_client(key)
304    }
305
306    /// Verify the container is reachable with the configured credentials
307    /// by listing one blob (`maxresults=1`) and consuming only the first
308    /// page of results. Used by [`crate::protocol::backend::build`] to
309    /// fold credential / missing-container / authorization failures into
310    /// categorical [`crate::protocol::backend::BackendError`] variants
311    /// before the helper REPL runs its first command. Counterpart to
312    /// [`crate::object_store::s3::S3Store::probe`].
313    pub(crate) async fn probe(&self, prefix: &str) -> Result<(), ObjectStoreError> {
314        // Pass `None` for an empty prefix per the same Azurite quirk
315        // documented at the top of `list` above: a signed empty prefix
316        // returns 403 from Azurite.
317        let prefix_opt = (!prefix.is_empty()).then(|| prefix.to_owned());
318        let opts = BlobContainerClientListBlobsOptions {
319            prefix: prefix_opt,
320            maxresults: Some(1),
321            ..Default::default()
322        };
323        let mut pages = self
324            .container
325            .list_blobs(Some(opts))
326            .map_err(|e| classify(e, prefix))?
327            .into_pages();
328        // Consume only the first page: probing does not need the full
329        // listing — we only care that the request succeeded.
330        if let Some(page_result) = pages.next().await {
331            page_result.map_err(|e| classify(e, prefix))?;
332        }
333        Ok(())
334    }
335}
336
337/// Build the [`reqwest::Client`] used by [`AzureStore`]'s custom
338/// [`Transport`].
339///
340/// Bounds the connection pool's idle window, enables TCP keepalive,
341/// and sets connect / per-read timeouts so a rotated VIP cannot wedge
342/// a long-running session (see [`POOL_IDLE_TIMEOUT`] / [`TCP_KEEPALIVE`]
343/// / [`CONNECT_TIMEOUT`] / [`READ_TIMEOUT`] for rationale). Returns
344/// [`ObjectStoreError::Other`] if the TLS / DNS resolver layer fails
345/// to initialise, which the SDK would otherwise surface as a cryptic
346/// per-request error.
347pub(crate) fn build_http_client() -> Result<Arc<reqwest::Client>, ObjectStoreError> {
348    reqwest::Client::builder()
349        .pool_idle_timeout(POOL_IDLE_TIMEOUT)
350        .tcp_keepalive(TCP_KEEPALIVE)
351        .connect_timeout(CONNECT_TIMEOUT)
352        .read_timeout(READ_TIMEOUT)
353        .build()
354        .map(Arc::new)
355        .map_err(other_boxed)
356}
357
358/// Build the [`ClientOptions`] [`AzureStore`] hands to the SDK.
359///
360/// Installs the custom [`Transport`] (see [`build_http_client`]) and
361/// preserves the credential resolver's per-try signing policy. The
362/// helper is split out (rather than inlined into [`AzureStore::from_remote_url`])
363/// so unit tests can assert that both invariants hold without
364/// constructing a real `BlobContainerClient`.
365pub(crate) fn build_client_options(
366    resolved: &auth::ResolvedCredentials,
367) -> Result<ClientOptions, ObjectStoreError> {
368    let mut opts = ClientOptions {
369        transport: Some(Transport::new(build_http_client()?)),
370        ..Default::default()
371    };
372    if let Some(policy) = &resolved.per_try_policy {
373        opts.per_try_policies.push(Arc::clone(policy));
374    }
375    Ok(opts)
376}
377
378/// Construct the account-level endpoint URL the SDK constructors expect.
379///
380/// The SDK takes a separate `container_name` argument, so we strip the
381/// container (and any prefix segments) from the parsed URL. For
382/// virtual-hosted addressing the path becomes `/`; for path-style
383/// addressing (Azurite, custom endpoints) the path becomes `/<account>`.
384pub(crate) fn build_account_url(
385    endpoint: &Url,
386    account: &str,
387    addressing: AzureAddressing,
388) -> String {
389    let mut rewritten = endpoint.clone();
390    rewritten.set_query(None);
391    rewritten.set_fragment(None);
392    let path = match addressing {
393        AzureAddressing::VirtualHosted => "/".to_owned(),
394        AzureAddressing::PathStyle => format!("/{account}"),
395    };
396    rewritten.set_path(&path);
397    rewritten.to_string()
398}
399
400/// Map an [`azure_core::Error`] into the trait's [`ObjectStoreError`] enum.
401///
402/// `key` is the operation's key/prefix context; it appears in the
403/// resulting [`ObjectStoreError::NotFound`] / [`ObjectStoreError::AccessDenied`] /
404/// [`ObjectStoreError::PreconditionFailed`] / [`ObjectStoreError::Conflict`] payload.
405fn classify(err: azure_core::Error, key: &str) -> ObjectStoreError {
406    if let azure_core::error::ErrorKind::HttpResponse {
407        status, error_code, ..
408    } = err.kind()
409        && let Some(mapped) =
410            classify_status_and_code(u16::from(*status), error_code.as_deref(), key)
411    {
412        return mapped;
413    }
414    if matches!(err.kind(), azure_core::error::ErrorKind::Io) {
415        return network_boxed(err);
416    }
417    other_boxed(err)
418}
419
420/// Pure status/code classifier (key context, no SDK types) so unit
421/// tests can exercise every branch without synthesising an SDK error.
422fn classify_status_and_code(
423    status: u16,
424    code: Option<&str>,
425    key: &str,
426) -> Option<ObjectStoreError> {
427    match status {
428        404 => return Some(ObjectStoreError::NotFound(key.to_owned())),
429        403 => return Some(ObjectStoreError::AccessDenied(key.to_owned())),
430        412 => return Some(ObjectStoreError::PreconditionFailed(key.to_owned())),
431        409 => return Some(ObjectStoreError::Conflict(key.to_owned())),
432        // Azure surfaces a Put Blob body over the single-PUT ceiling as
433        // HTTP 413 with code `RequestBodyTooLarge`; the status alone is
434        // sufficient (HTTP 413 is the canonical "Payload Too Large").
435        413 => {
436            return Some(ObjectStoreError::PayloadTooLarge {
437                limit_bytes: SINGLE_PUT_BLOB_LIMIT_BYTES,
438            });
439        }
440        _ => {}
441    }
442    // Defensive backstop for the (rare) case where the SDK exposes the
443    // service code without a 413 status: route on the code alone.
444    match code {
445        Some("RequestBodyTooLarge") => Some(ObjectStoreError::PayloadTooLarge {
446            limit_bytes: SINGLE_PUT_BLOB_LIMIT_BYTES,
447        }),
448        _ => None,
449    }
450}
451
452/// Convert the relevant `Get Blob Properties` headers into the trait's
453/// [`ObjectMeta`].
454///
455/// Extracted so unit tests can drive the missing-content-length and
456/// missing-last-modified guard branches without synthesising a full
457/// `BlobClientGetPropertiesResultHeaders` value.
458///
459/// A missing `Content-Length` is an error rather than silent zero: a
460/// 0-byte size is semantically meaningful (lock files are intentionally
461/// empty) and downstream `head_then_download` takes a fast path on
462/// `size == 0` that writes an empty destination file. Treating "header
463/// absent" as 0 would silently produce empty bundles instead of
464/// surfacing the malformed response.
465fn properties_to_meta(
466    key: &str,
467    content_length: Option<u64>,
468    last_modified: Option<OffsetDateTime>,
469    etag: Option<&str>,
470) -> Result<ObjectMeta, ObjectStoreError> {
471    let size = content_length.ok_or_else(|| {
472        ObjectStoreError::Other(
473            format!("get_properties on `{key}` returned no content-length").into(),
474        )
475    })?;
476    let last_modified = last_modified.ok_or_else(|| {
477        ObjectStoreError::Other(
478            format!("get_properties on `{key}` returned no last-modified").into(),
479        )
480    })?;
481    Ok(ObjectMeta {
482        key: key.to_owned(),
483        size,
484        last_modified,
485        etag: etag.map(str::to_owned),
486    })
487}
488
489/// Convert a `BlobItem`-shaped record into the trait's [`ObjectMeta`].
490///
491/// Extracted so unit tests can drive the missing-field guards without
492/// synthesising a full `ListBlobsResponse`.
493fn item_to_meta(
494    name: Option<&str>,
495    content_length: Option<u64>,
496    last_modified: Option<OffsetDateTime>,
497    etag: Option<&str>,
498) -> Result<ObjectMeta, ObjectStoreError> {
499    let key = name
500        .ok_or_else(|| ObjectStoreError::Other("list_blobs returned a blob without a name".into()))?
501        .to_owned();
502    let size = content_length.unwrap_or(0);
503    let last_modified = last_modified.ok_or_else(|| {
504        ObjectStoreError::Other(
505            format!("list_blobs returned blob `{key}` without last_modified").into(),
506        )
507    })?;
508    Ok(ObjectMeta {
509        key,
510        size,
511        last_modified,
512        etag: etag.map(str::to_owned),
513    })
514}
515
516#[async_trait::async_trait]
517impl ObjectStore for AzureStore {
518    async fn list(&self, prefix: &str) -> Result<Vec<ObjectMeta>, ObjectStoreError> {
519        // Pass `None` for an empty prefix: Azure list_blobs URL-encodes
520        // `prefix=` and Azurite signs an empty value differently than
521        // an absent one (treats it as a tampered query and returns
522        // 403). Skipping the parameter is the wire-equivalent of "no
523        // prefix filter" anyway.
524        let prefix_opt = (!prefix.is_empty()).then(|| prefix.to_owned());
525        let opts = BlobContainerClientListBlobsOptions {
526            prefix: prefix_opt,
527            ..Default::default()
528        };
529        let mut pages = self
530            .container
531            .list_blobs(Some(opts))
532            .map_err(|e| classify(e, prefix))?
533            .into_pages();
534
535        let mut out = Vec::new();
536        while let Some(page_result) = pages.next().await {
537            let response = page_result.map_err(|e| classify(e, prefix))?;
538            let body = response
539                .into_body()
540                .xml::<azure_storage_blob::models::ListBlobsResponse>()
541                .map_err(|e| classify(e, prefix))?;
542            for item in body.segment.blob_items {
543                let props = item.properties.unwrap_or_default();
544                let meta = item_to_meta(
545                    item.name.as_deref(),
546                    props.content_length,
547                    props.last_modified,
548                    // Listing omits ETag for parity with S3 (avoid
549                    // inflating per-object metadata for callers that
550                    // only need a key/size enumeration).
551                    None,
552                )?;
553                out.push(meta);
554            }
555        }
556        Ok(out)
557    }
558
559    async fn get_to_file(
560        &self,
561        key: &str,
562        dest: &Path,
563        opts: GetOpts,
564    ) -> Result<(), ObjectStoreError> {
565        let parent = dest.parent().ok_or_else(|| {
566            ObjectStoreError::Other(
567                format!("destination `{}` has no parent directory", dest.display()).into(),
568            )
569        })?;
570
571        // Mirror S3: try once, retry once on 412 (the head→GET race).
572        // After the second attempt any error — including a repeated
573        // 412 — propagates.
574        let progress = opts.progress.as_ref();
575        match self.head_then_download(key, dest, parent, progress).await {
576            Err(ObjectStoreError::PreconditionFailed(_)) => {
577                tracing::warn!(key, "blob changed between head and GET; retrying");
578                self.head_then_download(key, dest, parent, progress).await
579            }
580            other => other,
581        }
582    }
583
584    async fn get_bytes(&self, key: &str) -> Result<Bytes, ObjectStoreError> {
585        let blob = self.blob_client(key);
586        let result = blob.download(None).await.map_err(|e| classify(e, key))?;
587        let bytes = result.body.collect().await.map_err(network_boxed)?;
588        Ok(bytes)
589    }
590
591    /// Issue a Get Blob with a `Range<usize>` covering `[start, end)`.
592    /// HTTP 416 maps to [`ObjectStoreError::RangeNotSatisfiable`] with
593    /// the original `Range<u64>` so the wire-line names what the
594    /// caller asked for. All other failures route through [`classify`].
595    ///
596    /// The Azure SDK exposes `BlobClientDownloadOptions::range` as
597    /// `Option<Range<usize>>`. `usize` is at least 64 bits on every
598    /// supported target, so casting from `u64` is lossless; the cast
599    /// is documented here so a future 32-bit port surfaces as a
600    /// compile error rather than silent truncation.
601    ///
602    /// Azure silently truncates a ranged GET to EOF when the requested
603    /// range overruns the blob — `start < body.len() <= end` returns
604    /// `start..body.len()` bytes with HTTP 206 and no error. The
605    /// post-flight length check via [`super::verify_range_response_length`]
606    /// elevates that mismatch to [`ObjectStoreError::RangeNotSatisfiable`]
607    /// so callers (notably the packchain reader) cannot mistake a
608    /// truncated slice for the full requested range.
609    async fn get_bytes_range(
610        &self,
611        key: &str,
612        range: std::ops::Range<u64>,
613    ) -> Result<Bytes, ObjectStoreError> {
614        // Compile-time guarantee: every supported target has a 64-bit
615        // usize, so the `u64 → usize` conversions below cannot
616        // truncate. A future 32-bit port surfaces as a build break,
617        // not silent corruption.
618        const _USIZE_AT_LEAST_64_BIT: () =
619            assert!(usize::BITS >= 64, "Azure backend requires 64-bit usize");
620
621        if let Some(empty) = super::precheck_range(key, &range)? {
622            return Ok(empty);
623        }
624        let sdk_start = usize::try_from(range.start).expect("invariant: usize is at least 64 bits");
625        let sdk_end = usize::try_from(range.end).expect("invariant: usize is at least 64 bits");
626        let opts = BlobClientDownloadOptions {
627            range: Some(sdk_start..sdk_end),
628            ..Default::default()
629        };
630        let blob = self.blob_client(key);
631        let result = match blob.download(Some(opts)).await {
632            Ok(result) => result,
633            Err(err) => {
634                if let azure_core::error::ErrorKind::HttpResponse { status, .. } = err.kind()
635                    && u16::from(*status) == 416
636                {
637                    return Err(ObjectStoreError::RangeNotSatisfiable {
638                        key: key.to_owned(),
639                        requested: range,
640                    });
641                }
642                return Err(classify(err, key));
643            }
644        };
645        let bytes = result.body.collect().await.map_err(network_boxed)?;
646        super::verify_range_response_length(key, &range, bytes)
647    }
648
649    async fn put_bytes(
650        &self,
651        key: &str,
652        body: Bytes,
653        opts: PutOpts,
654    ) -> Result<(), ObjectStoreError> {
655        // Same threshold as the S3 backend: above
656        // [`MULTIPART_PUT_THRESHOLD`] use explicit `stage_block` +
657        // `commit_block_list` so each block has its own retry budget,
658        // predictable concurrency, and per-block progress events. Below
659        // the threshold keep the single `Put Blob` round trip. Issue #53.
660        let size = body.len() as u64;
661        if should_use_multipart(size) {
662            return self.multipart_put_bytes(key, body, size, opts).await;
663        }
664        let progress = opts.progress.clone();
665        let blob = self.blob_client(key);
666        let upload_opts = upload_options_from(opts);
667        blob.upload(bytes_to_request_content(body), Some(upload_opts))
668            .await
669            .map_err(|e| classify(e, key))?;
670        if let Some(sink) = progress
671            && size > 0
672        {
673            sink.report(size);
674        }
675        Ok(())
676    }
677
678    /// Stream a local file to `key` without buffering its full body.
679    ///
680    /// Above [`super::multipart::MULTIPART_PUT_THRESHOLD`] this routes through explicit
681    /// `stage_block` + `commit_block_list`, paralleling the S3 backend
682    /// (issue #53). Below the threshold the single `Put Blob` path
683    /// preserves the one-round-trip cost for small bundles and lock
684    /// files.
685    ///
686    /// On the multipart path each task opens its own
687    /// `tokio::fs::File`, seeks to its part offset, reads the part
688    /// into a `Bytes`, then calls `BlockBlobClient::stage_block`. With
689    /// `MULTIPART_PUT_MAX_CONCURRENCY = 8` and
690    /// `MULTIPART_PUT_PART_SIZE = 16 MiB`, peak memory is bounded at
691    /// 128 MiB regardless of file size.
692    ///
693    /// On the single-PUT path we wrap `tokio::fs::File` in
694    /// [`FileStream`] so the body is delivered as
695    /// `Body::SeekableStream`. The per-try signing policy reads
696    /// `request.body().len()`, which `SeekableStream` reports faithfully
697    /// via `len()`.
698    async fn put_path(&self, key: &str, src: &Path, opts: PutOpts) -> Result<(), ObjectStoreError> {
699        // Open the file once and read size from the open handle. This
700        // closes the metadata/upload race that would let a concurrent
701        // truncate or rename produce a body whose length disagrees
702        // with the size we used for multipart planning.
703        let file = tokio::fs::File::open(src).await.map_err(other_boxed)?;
704        let body_len = file.metadata().await.map_err(other_boxed)?.len();
705        if should_use_multipart(body_len) {
706            return self.multipart_put_path(key, file, body_len, opts).await;
707        }
708        // Below the threshold: single `Put Blob`. Wrap our already-
709        // open handle in `FileStream`; the SDK does not re-open by
710        // path (which would re-introduce the race).
711        let stream = FileStream::builder(file)
712            .build()
713            .await
714            .map_err(other_boxed)?;
715        let body: azure_core::http::Body = stream.into();
716
717        let blob = self.blob_client(key);
718        let progress = opts.progress.clone();
719        let upload_opts = upload_options_from(opts);
720        blob.upload(body.into(), Some(upload_opts))
721            .await
722            .map_err(|e| classify(e, key))?;
723        if let Some(sink) = progress
724            && body_len > 0
725        {
726            sink.report(body_len);
727        }
728        Ok(())
729    }
730
731    async fn put_if_absent(&self, key: &str, body: Bytes) -> Result<bool, ObjectStoreError> {
732        let blob = self.blob_client(key);
733        let upload_opts = BlockBlobClientUploadOptions::default().with_if_not_exists();
734        let resp = blob
735            .upload(bytes_to_request_content(body), Some(upload_opts))
736            .await;
737        match resp.map_err(|e| classify(e, key)) {
738            Ok(_) => Ok(true),
739            Err(ObjectStoreError::PreconditionFailed(_) | ObjectStoreError::Conflict(_)) => {
740                Ok(false)
741            }
742            Err(other) => Err(other),
743        }
744    }
745
746    async fn head(&self, key: &str) -> Result<ObjectMeta, ObjectStoreError> {
747        let blob = self.blob_client(key);
748        let resp = blob
749            .get_properties(None::<BlobClientGetPropertiesOptions<'_>>)
750            .await
751            .map_err(|e| classify(e, key))?;
752        let headers = resp.headers();
753        properties_to_meta(
754            key,
755            header_u64(headers, &HeaderName::from_static("content-length")),
756            header_http_date(headers, &HeaderName::from_static("last-modified")),
757            headers.get_optional_str(&HeaderName::from_static("etag")),
758        )
759    }
760
761    async fn copy(&self, src: &str, dst: &str) -> Result<(), ObjectStoreError> {
762        // Server-side copy via `Put Blob From URL` requires a SAS-tokened
763        // source URL or `x-ms-copy-source-authorization`, neither of
764        // which integrates with our credential model in a clean way
765        // for the SDK 0.12 surface. Stream `src` to a temp file via
766        // `get_to_file` (chunked download, no body buffer), then
767        // `put_path` it back to `dst` (block-uploaded for large bodies
768        // via `multipart_put_path`). Peak in-flight bytes are bounded
769        // by `MULTIPART_PUT_MAX_CONCURRENCY` × `MULTIPART_PUT_PART_SIZE`
770        // regardless of blob size — necessary because `manage doctor`'s
771        // duplicate-bundle quarantine path uses `copy()` and bundles
772        // can be multi-GiB.
773        let temp = NamedTempFile::new().map_err(other_boxed)?;
774        // `get_to_file` propagates `NotFound(src)` if the source is
775        // absent — exactly the trait contract for `copy`.
776        self.get_to_file(src, temp.path(), GetOpts::default())
777            .await?;
778        // A NotFound on the upload is destination-side — re-shape it
779        // so callers don't mistake it for "src absent".
780        match self.put_path(dst, temp.path(), PutOpts::default()).await {
781            Ok(()) => Ok(()),
782            Err(ObjectStoreError::NotFound(_)) => Err(ObjectStoreError::Other(
783                format!("copy `{src}` → `{dst}`: upload returned NotFound").into(),
784            )),
785            Err(other) => Err(other),
786        }
787    }
788
789    async fn delete(&self, key: &str) -> Result<(), ObjectStoreError> {
790        let blob = self.blob_client(key);
791        blob.delete(None::<BlobClientDeleteOptions<'_>>)
792            .await
793            .map_err(|e| classify(e, key))?;
794        Ok(())
795    }
796
797    /// Build a service-blob SAS URL for `key` valid for `ttl`.
798    /// Used by the `bundle-uri` capability (issue #76) to advertise
799    /// time-limited download URLs against private containers.
800    ///
801    /// Only the shared-key / connection-string credential paths can
802    /// produce a SAS — the SAS env-var path has no key to re-sign
803    /// with, and the Entra-ID `TokenCredential` path requires
804    /// user-delegation SAS (out of scope per the issue). Both
805    /// fall through to [`ObjectStoreError::Unsupported`].
806    ///
807    /// # Errors
808    ///
809    /// - [`ObjectStoreError::Unsupported`] when the credential is
810    ///   not a shared key.
811    /// - [`ObjectStoreError::Other`] when SAS construction fails
812    ///   (HMAC init / base64 decode / time overflow).
813    async fn presigned_get_url(
814        &self,
815        key: &str,
816        ttl: std::time::Duration,
817    ) -> Result<String, ObjectStoreError> {
818        let signing = self.sas_signing.as_ref().ok_or_else(|| {
819            ObjectStoreError::Unsupported(
820                "Azure presigned URLs require a shared account key (KEY env var or \
821                 connection string); SAS-env-var and Entra-ID credentials cannot \
822                 derive per-blob SAS"
823                    .to_owned(),
824            )
825        })?;
826        // The SDK's `BlobClient::url()` returns the fully-qualified
827        // blob URL including the container path segment. Reuse it
828        // rather than re-deriving the URL shape per addressing
829        // mode here.
830        let blob = self.blob_client(key);
831        let base = blob.url();
832        sas::build_blob_sas_url(base, &self.container_name, key, signing, ttl)
833    }
834}
835
836impl AzureStore {
837    /// One head→tempfile→download→persist round trip.
838    ///
839    /// Factored out so [`get_to_file`](ObjectStore::get_to_file) can
840    /// invoke it twice: once normally, once more on a 412 retry.
841    async fn head_then_download(
842        &self,
843        key: &str,
844        dest: &Path,
845        parent: &Path,
846        progress: Option<&ProgressSink>,
847    ) -> Result<(), ObjectStoreError> {
848        let meta = self.head(key).await?;
849        let temp = NamedTempFile::new_in(parent).map_err(other_boxed)?;
850        if meta.size == 0 {
851            // Skip the GET entirely for zero-byte blobs (lock files):
852            // `download_streaming` would issue a plain GET for an empty
853            // body — correct but a wasted round trip.
854            return persist_temp(temp, dest);
855        }
856        self.download_streaming(key, temp.path(), meta.etag.as_deref(), progress)
857            .await?;
858        persist_temp(temp, dest)
859    }
860
861    /// Stream a blob body to `temp_path` with optional `If-Match`
862    /// guarding against mid-download mutation. When `progress` is
863    /// `Some`, fires once per SDK body chunk read off the wire.
864    async fn download_streaming(
865        &self,
866        key: &str,
867        temp_path: &Path,
868        etag: Option<&str>,
869        progress: Option<&ProgressSink>,
870    ) -> Result<(), ObjectStoreError> {
871        let blob = self.blob_client(key);
872        let mut opts = BlobClientDownloadOptions::default();
873        if let Some(etag) = etag {
874            opts.if_match = Some(etag.to_owned());
875        }
876        let mut result = blob
877            .download(Some(opts))
878            .await
879            .map_err(|e| classify(e, key))?;
880
881        let mut file = tokio::fs::OpenOptions::new()
882            .write(true)
883            .truncate(true)
884            .open(temp_path)
885            .await
886            .map_err(other_boxed)?;
887
888        while let Some(chunk) = result.body.next().await {
889            let bytes = chunk.map_err(network_boxed)?;
890            let chunk_len = bytes.len() as u64;
891            file.write_all(&bytes).await.map_err(other_boxed)?;
892            if let Some(sink) = progress
893                && chunk_len > 0
894            {
895                sink.report(chunk_len);
896            }
897        }
898        file.flush().await.map_err(other_boxed)?;
899        Ok(())
900    }
901
902    /// Drive a multipart upload from a fully-buffered `Bytes` body.
903    ///
904    /// `Bytes::slice` is zero-copy — every block borrows into the same
905    /// underlying allocation, so peak memory equals the caller's body
906    /// rather than `body × blocks`.
907    async fn multipart_put_bytes(
908        &self,
909        key: &str,
910        body: Bytes,
911        size: u64,
912        opts: PutOpts,
913    ) -> Result<(), ObjectStoreError> {
914        let parts = plan_upload_parts(size, MULTIPART_PUT_PART_SIZE, AZURE_MAX_BLOCKS);
915        let progress = opts.progress.clone();
916        let staged = self
917            .stage_blocks_with_bodies(key, &parts, progress, |part| slice_bytes_part(&body, part))
918            .await?;
919        let blob = self.blob_client(key).block_blob_client();
920        commit_block_list(&blob, key, staged, opts).await
921    }
922
923    /// Drive a multipart upload by streaming a local file block-by-block.
924    ///
925    /// All tasks share one `Arc<std::fs::File>`; per-task
926    /// `read_file_part` uses `pread` so reads are concurrent without
927    /// offset contention. Sharing one open file description closes
928    /// the metadata/upload race. With `MULTIPART_PUT_MAX_CONCURRENCY
929    /// = 8` and `MULTIPART_PUT_PART_SIZE = 16 MiB`, peak memory is
930    /// bounded at 128 MiB regardless of file size.
931    async fn multipart_put_path(
932        &self,
933        key: &str,
934        file: tokio::fs::File,
935        size: u64,
936        opts: PutOpts,
937    ) -> Result<(), ObjectStoreError> {
938        let parts = plan_upload_parts(size, MULTIPART_PUT_PART_SIZE, AZURE_MAX_BLOCKS);
939        let progress = opts.progress.clone();
940        let file: Arc<std::fs::File> = Arc::new(file.into_std().await);
941        let staged = self
942            .stage_blocks_from_file(key, file, &parts, progress)
943            .await?;
944        let blob = self.blob_client(key).block_blob_client();
945        commit_block_list(&blob, key, staged, opts).await
946    }
947
948    /// Spawn parallel `stage_block` tasks with bodies sourced from a
949    /// closure (used by `multipart_put_bytes`).
950    ///
951    /// Returns the per-block IDs in part order so the caller can build
952    /// a `BlockLookupList` for `commit_block_list`. On error,
953    /// already-staged blocks are simply not committed and Azure
954    /// auto-expires them after seven days; there is no client-side
955    /// abort call.
956    ///
957    /// `BlockBlobClient` does not implement `Clone`, so each spawned
958    /// task constructs its own via `self.blob_client(...)
959    /// .block_blob_client()`. The container's `blob_client(&self, ..)`
960    /// returns an owned `BlobClient` already (cheap-clone of internal
961    /// `Arc` state), so this stays allocation-light.
962    async fn stage_blocks_with_bodies<F>(
963        &self,
964        key: &str,
965        parts: &[UploadPart],
966        progress: Option<ProgressSink>,
967        make_body: F,
968    ) -> Result<Vec<Vec<u8>>, ObjectStoreError>
969    where
970        F: Fn(UploadPart) -> Result<Bytes, ObjectStoreError>,
971    {
972        let semaphore = Arc::new(Semaphore::new(MULTIPART_PUT_MAX_CONCURRENCY));
973        let mut tasks: JoinSet<Result<(usize, Vec<u8>), ObjectStoreError>> = JoinSet::new();
974        for (idx, part) in parts.iter().enumerate() {
975            let part = *part;
976            let part_index = idx;
977            let block_id = block_id_for(idx);
978            let body = make_body(part)?;
979            let blob = self.blob_client(key).block_blob_client();
980            let key = key.to_owned();
981            let semaphore = Arc::clone(&semaphore);
982            let progress = progress.clone();
983            tasks.spawn(async move {
984                let _permit = semaphore.acquire_owned().await.map_err(other_boxed)?;
985                blob.stage_block(&block_id, part.length, bytes_to_request_content(body), None)
986                    .await
987                    .map_err(|e| classify(e, &key))?;
988                if let Some(sink) = &progress {
989                    sink.report(part.length);
990                }
991                Ok((part_index, block_id))
992            });
993        }
994        join_staged_blocks(tasks, parts.len()).await
995    }
996
997    /// Spawn parallel `stage_block` tasks that each read their
998    /// block from the shared `Arc<std::fs::File>` via `pread`. The
999    /// shared open file description gives every task a stable view
1000    /// of the same inode (used by `multipart_put_path`).
1001    async fn stage_blocks_from_file(
1002        &self,
1003        key: &str,
1004        file: Arc<std::fs::File>,
1005        parts: &[UploadPart],
1006        progress: Option<ProgressSink>,
1007    ) -> Result<Vec<Vec<u8>>, ObjectStoreError> {
1008        let semaphore = Arc::new(Semaphore::new(MULTIPART_PUT_MAX_CONCURRENCY));
1009        let mut tasks: JoinSet<Result<(usize, Vec<u8>), ObjectStoreError>> = JoinSet::new();
1010        for (idx, part) in parts.iter().enumerate() {
1011            let part = *part;
1012            let part_index = idx;
1013            let block_id = block_id_for(idx);
1014            let blob = self.blob_client(key).block_blob_client();
1015            let key = key.to_owned();
1016            let task_file = Arc::clone(&file);
1017            let semaphore = Arc::clone(&semaphore);
1018            let progress = progress.clone();
1019            tasks.spawn(async move {
1020                let _permit = semaphore.acquire_owned().await.map_err(other_boxed)?;
1021                let body = read_file_part(task_file, part).await?;
1022                blob.stage_block(&block_id, part.length, bytes_to_request_content(body), None)
1023                    .await
1024                    .map_err(|e| classify(e, &key))?;
1025                if let Some(sink) = &progress {
1026                    sink.report(part.length);
1027                }
1028                Ok((part_index, block_id))
1029            });
1030        }
1031        join_staged_blocks(tasks, parts.len()).await
1032    }
1033}
1034
1035/// Build a deterministic Azure block ID for the `idx`-th part
1036/// (zero-indexed).
1037///
1038/// Azure requires that all block IDs in a single
1039/// `commit_block_list` request share a length pre-base64. 32 bytes
1040/// of zero-padded ASCII digits accommodates up to 10^32 parts —
1041/// vastly above [`AZURE_MAX_BLOCKS`] = 50 000.
1042fn block_id_for(idx: usize) -> Vec<u8> {
1043    format!("{:032}", idx + 1).into_bytes()
1044}
1045
1046/// Drain a `JoinSet` of `stage_block` tasks into a Vec of block IDs
1047/// indexed by part order. Short-circuits on the first error.
1048async fn join_staged_blocks(
1049    mut tasks: JoinSet<Result<(usize, Vec<u8>), ObjectStoreError>>,
1050    expected: usize,
1051) -> Result<Vec<Vec<u8>>, ObjectStoreError> {
1052    let mut staged: Vec<Option<Vec<u8>>> = (0..expected).map(|_| None).collect();
1053    while let Some(joined) = tasks.join_next().await {
1054        let (idx, block_id) = joined.map_err(other_boxed)??;
1055        staged[idx] = Some(block_id);
1056    }
1057    staged
1058        .into_iter()
1059        .enumerate()
1060        .map(|(idx, slot)| {
1061            slot.ok_or_else(|| {
1062                ObjectStoreError::Other(
1063                    format!("internal: stage_block task for part {idx} did not return").into(),
1064                )
1065            })
1066        })
1067        .collect()
1068}
1069
1070/// Commit the staged blocks in order, applying any
1071/// `content_disposition` / `user_metadata` from the original `PutOpts`.
1072///
1073/// Azure has no `AbortMultipartUpload` equivalent: if commit fails
1074/// the staged blocks remain on the storage account and expire
1075/// automatically (default seven days). Surface the commit error
1076/// directly — the caller's error handling already understands the
1077/// "operation did not succeed" outcome.
1078async fn commit_block_list(
1079    blob: &BlockBlobClient,
1080    key: &str,
1081    block_ids: Vec<Vec<u8>>,
1082    opts: PutOpts,
1083) -> Result<(), ObjectStoreError> {
1084    let block_list = BlockLookupList {
1085        latest: Some(block_ids),
1086        ..Default::default()
1087    };
1088    let body: RequestContent<_, _> = block_list.try_into().map_err(other_boxed)?;
1089    let (cd, metadata) = put_opts_blob_fields(opts);
1090    let commit_opts = BlockBlobClientCommitBlockListOptions {
1091        blob_content_disposition: cd,
1092        metadata,
1093        ..Default::default()
1094    };
1095    blob.commit_block_list(body, Some(commit_opts))
1096        .await
1097        .map_err(|e| classify(e, key))?;
1098    Ok(())
1099}
1100
1101/// Wrap `Bytes` in a `RequestContent` without copying the buffer.
1102///
1103/// `RequestContent` has an inherent `from(Vec<u8>)` constructor that
1104/// shadows the generic `From<Bytes>` trait impl, so a bare
1105/// `RequestContent::from(body)` resolves to the `Vec<u8>` overload and
1106/// re-allocates. Going through `Into` instead picks up the trait impl
1107/// and keeps the `Bytes` payload zero-copy. The return type is left
1108/// generic so the call site (which pins `Bytes` + `NoFormat` via the
1109/// `BlobClient::upload` signature) drives type inference.
1110fn bytes_to_request_content<F>(body: Bytes) -> RequestContent<Bytes, F>
1111where
1112    Bytes: Into<RequestContent<Bytes, F>>,
1113{
1114    body.into()
1115}
1116
1117/// Pull the blob-shaped `content_disposition` and `metadata` fields
1118/// out of [`PutOpts`].
1119///
1120/// Both `BlockBlobClientUploadOptions` (single `Put Blob`) and
1121/// `BlockBlobClientCommitBlockListOptions` (multipart commit) carry
1122/// the same two fields by the same names. Centralising the
1123/// conversion here keeps a single source of truth for "how a
1124/// `PutOpts` becomes Azure blob metadata."
1125fn put_opts_blob_fields(
1126    opts: PutOpts,
1127) -> (
1128    Option<String>,
1129    Option<std::collections::HashMap<String, String>>,
1130) {
1131    let metadata = (!opts.user_metadata.is_empty()).then(|| {
1132        opts.user_metadata
1133            .into_iter()
1134            .collect::<std::collections::HashMap<_, _>>()
1135    });
1136    (opts.content_disposition, metadata)
1137}
1138
1139/// Build a [`BlockBlobClientUploadOptions`] from the trait's [`PutOpts`].
1140fn upload_options_from(opts: PutOpts) -> BlockBlobClientUploadOptions<'static> {
1141    let (cd, metadata) = put_opts_blob_fields(opts);
1142    BlockBlobClientUploadOptions {
1143        blob_content_disposition: cd,
1144        metadata,
1145        ..Default::default()
1146    }
1147}
1148
1149fn header_u64(headers: &Headers, name: &HeaderName) -> Option<u64> {
1150    headers.get_optional_str(name).and_then(|s| s.parse().ok())
1151}
1152
1153fn header_http_date(headers: &Headers, name: &HeaderName) -> Option<OffsetDateTime> {
1154    let raw = headers.get_optional_str(name)?;
1155    OffsetDateTime::parse(raw, &time::format_description::well_known::Rfc2822).ok()
1156}
1157
1158#[cfg(test)]
1159mod tests {
1160    use super::*;
1161    use crate::url::{AzureAddressing, RemoteFlags};
1162
1163    fn parse_endpoint(s: &str) -> Url {
1164        Url::parse(s).expect("test endpoint URL parses")
1165    }
1166
1167    fn s3_url() -> RemoteUrl {
1168        RemoteUrl::S3 {
1169            endpoint: parse_endpoint("https://my-bucket.s3.us-west-2.amazonaws.com/"),
1170            bucket: "my-bucket".to_owned(),
1171            prefix: None,
1172            addressing: crate::url::S3Addressing::VirtualHosted,
1173            flags: RemoteFlags::default(),
1174        }
1175    }
1176
1177    // --- build_account_url --------------------------------------------
1178
1179    #[test]
1180    fn build_account_url_virtual_hosted_strips_path() {
1181        let url = parse_endpoint("https://acct.blob.core.windows.net/my-container/some/prefix");
1182        let out = build_account_url(&url, "acct", AzureAddressing::VirtualHosted);
1183        assert_eq!(out, "https://acct.blob.core.windows.net/");
1184    }
1185
1186    #[test]
1187    fn build_account_url_path_style_keeps_account() {
1188        let url = parse_endpoint("http://127.0.0.1:10000/devstoreaccount1/my-container/repo");
1189        let out = build_account_url(&url, "devstoreaccount1", AzureAddressing::PathStyle);
1190        assert_eq!(out, "http://127.0.0.1:10000/devstoreaccount1");
1191    }
1192
1193    #[test]
1194    fn build_account_url_strips_query_and_fragment() {
1195        let url = parse_endpoint("https://acct.blob.core.windows.net/c/r?credential=foo#frag");
1196        let out = build_account_url(&url, "acct", AzureAddressing::VirtualHosted);
1197        assert_eq!(out, "https://acct.blob.core.windows.net/");
1198    }
1199
1200    // --- classify_status_and_code -------------------------------------
1201
1202    #[test]
1203    fn classify_404_is_not_found() {
1204        assert!(matches!(
1205            classify_status_and_code(404, None, "k"),
1206            Some(ObjectStoreError::NotFound(s)) if s == "k"
1207        ));
1208    }
1209
1210    #[test]
1211    fn classify_403_is_access_denied() {
1212        assert!(matches!(
1213            classify_status_and_code(403, None, "k"),
1214            Some(ObjectStoreError::AccessDenied(s)) if s == "k"
1215        ));
1216    }
1217
1218    #[test]
1219    fn classify_412_is_precondition_failed() {
1220        assert!(matches!(
1221            classify_status_and_code(412, None, "k"),
1222            Some(ObjectStoreError::PreconditionFailed(s)) if s == "k"
1223        ));
1224    }
1225
1226    #[test]
1227    fn classify_409_is_conflict() {
1228        // 409 covers Azure's `BlobAlreadyExists` (the put-if-absent
1229        // contention path). Without this branch, `put_if_absent` would
1230        // surface contention as a hard error instead of `Ok(false)`.
1231        assert!(matches!(
1232            classify_status_and_code(409, None, "k"),
1233            Some(ObjectStoreError::Conflict(s)) if s == "k"
1234        ));
1235    }
1236
1237    #[test]
1238    fn classify_413_is_payload_too_large() {
1239        // Pass `code=None` so the assertion isolates the 413-status
1240        // branch; passing a recognised code would still pass even if
1241        // the status arm regressed (the code arm would catch it). The
1242        // canonical "Payload Too Large" status alone suffices.
1243        assert!(matches!(
1244            classify_status_and_code(413, None, "k"),
1245            Some(ObjectStoreError::PayloadTooLarge { limit_bytes })
1246                if limit_bytes == SINGLE_PUT_BLOB_LIMIT_BYTES
1247        ));
1248    }
1249
1250    #[test]
1251    fn classify_request_body_too_large_code_is_payload_too_large() {
1252        // Defensive backstop: if the SDK delivers the service code on a
1253        // non-413 status (e.g. 400), the code branch still catches it.
1254        assert!(matches!(
1255            classify_status_and_code(400, Some("RequestBodyTooLarge"), "k"),
1256            Some(ObjectStoreError::PayloadTooLarge { limit_bytes })
1257                if limit_bytes == SINGLE_PUT_BLOB_LIMIT_BYTES
1258        ));
1259    }
1260
1261    #[test]
1262    fn classify_unrecognised_status_returns_none() {
1263        assert!(classify_status_and_code(500, None, "k").is_none());
1264        assert!(classify_status_and_code(429, None, "k").is_none());
1265        assert!(classify_status_and_code(500, Some("InternalError"), "k").is_none());
1266    }
1267
1268    // --- properties_to_meta ------------------------------------------
1269
1270    #[test]
1271    fn properties_to_meta_round_trips_well_formed_response() {
1272        let now = OffsetDateTime::from_unix_timestamp(1_700_000_000).unwrap();
1273        let meta = properties_to_meta("k", Some(42), Some(now), Some("\"abc\""))
1274            .expect("conversion succeeds");
1275        assert_eq!(meta.key, "k");
1276        assert_eq!(meta.size, 42);
1277        assert_eq!(meta.last_modified.unix_timestamp(), 1_700_000_000);
1278        assert_eq!(meta.etag.as_deref(), Some("\"abc\""));
1279    }
1280
1281    #[test]
1282    fn properties_to_meta_preserves_legitimate_zero_size() {
1283        // Zero-byte lock files are legitimate; a present
1284        // `Content-Length: 0` header (`Some(0)`) must round-trip as
1285        // `size == 0`, distinct from the missing-header error.
1286        let now = OffsetDateTime::from_unix_timestamp(1_700_000_000).unwrap();
1287        let meta =
1288            properties_to_meta("LOCK", Some(0), Some(now), None).expect("conversion succeeds");
1289        assert_eq!(meta.size, 0);
1290    }
1291
1292    #[test]
1293    fn properties_to_meta_rejects_missing_content_length() {
1294        let now = OffsetDateTime::from_unix_timestamp(1_700_000_000).unwrap();
1295        let err = properties_to_meta("k", None, Some(now), None)
1296            .expect_err("missing content-length must error");
1297        match err {
1298            ObjectStoreError::Other(inner) => {
1299                let msg = inner.to_string();
1300                assert!(msg.contains("no content-length"), "names failure: {msg}");
1301                assert!(msg.contains("`k`"), "includes the key for context: {msg}");
1302            }
1303            other => {
1304                panic!("expected ObjectStoreError::Other for missing content-length, got {other:?}")
1305            }
1306        }
1307    }
1308
1309    #[test]
1310    fn properties_to_meta_rejects_missing_last_modified() {
1311        let err = properties_to_meta("k", Some(0), None, None)
1312            .expect_err("missing last_modified must error");
1313        match err {
1314            ObjectStoreError::Other(inner) => {
1315                let msg = inner.to_string();
1316                assert!(msg.contains("no last-modified"), "names failure: {msg}");
1317                assert!(msg.contains("`k`"), "includes the key for context: {msg}");
1318            }
1319            other => {
1320                panic!("expected ObjectStoreError::Other for missing last_modified, got {other:?}")
1321            }
1322        }
1323    }
1324
1325    // --- item_to_meta -------------------------------------------------
1326
1327    #[test]
1328    fn item_to_meta_round_trips_well_formed_item() {
1329        let now = OffsetDateTime::from_unix_timestamp(1_700_000_000).unwrap();
1330        let meta = item_to_meta(Some("k"), Some(42), Some(now), Some("\"abc\"")).unwrap();
1331        assert_eq!(meta.key, "k");
1332        assert_eq!(meta.size, 42);
1333        assert_eq!(meta.last_modified.unix_timestamp(), 1_700_000_000);
1334        assert_eq!(meta.etag.as_deref(), Some("\"abc\""));
1335    }
1336
1337    #[test]
1338    fn item_to_meta_rejects_missing_name() {
1339        let now = OffsetDateTime::from_unix_timestamp(1_700_000_000).unwrap();
1340        let err = item_to_meta(None, Some(0), Some(now), None).unwrap_err();
1341        match err {
1342            ObjectStoreError::Other(inner) => {
1343                assert!(
1344                    inner.to_string().contains("without a name"),
1345                    "names failure: {inner}"
1346                );
1347            }
1348            other => panic!("expected ObjectStoreError::Other, got {other:?}"),
1349        }
1350    }
1351
1352    #[test]
1353    fn item_to_meta_rejects_missing_last_modified() {
1354        let err = item_to_meta(Some("k"), Some(0), None, None).unwrap_err();
1355        match err {
1356            ObjectStoreError::Other(inner) => {
1357                let msg = inner.to_string();
1358                assert!(
1359                    msg.contains("without last_modified"),
1360                    "names failure: {msg}"
1361                );
1362                assert!(msg.contains("`k`"), "includes the key: {msg}");
1363            }
1364            other => panic!("expected ObjectStoreError::Other, got {other:?}"),
1365        }
1366    }
1367
1368    #[test]
1369    fn item_to_meta_treats_missing_size_as_zero() {
1370        // The Azure SDK types content_length as Option<u64>; missing
1371        // values default to 0 (rather than `None` propagating through
1372        // every caller's arithmetic).
1373        let now = OffsetDateTime::from_unix_timestamp(1_700_000_000).unwrap();
1374        let meta = item_to_meta(Some("k"), None, Some(now), None).unwrap();
1375        assert_eq!(meta.size, 0);
1376    }
1377
1378    // --- upload_options_from ------------------------------------------
1379
1380    #[test]
1381    fn upload_options_from_default_is_empty() {
1382        let out = upload_options_from(PutOpts::default());
1383        assert!(out.blob_content_disposition.is_none());
1384        assert!(out.metadata.is_none());
1385    }
1386
1387    #[test]
1388    fn upload_options_from_carries_content_disposition() {
1389        let opts = PutOpts {
1390            content_disposition: Some("attachment; filename=x".into()),
1391            user_metadata: Vec::new(),
1392            progress: None,
1393        };
1394        let out = upload_options_from(opts);
1395        let cd: String = out
1396            .blob_content_disposition
1397            .expect("content_disposition should be set");
1398        assert!(cd.contains("attachment"));
1399    }
1400
1401    #[test]
1402    fn upload_options_from_collects_metadata() {
1403        let opts = PutOpts {
1404            content_disposition: None,
1405            user_metadata: vec![("x-foo".into(), "1".into()), ("x-bar".into(), "2".into())],
1406            progress: None,
1407        };
1408        let out = upload_options_from(opts);
1409        let map = out.metadata.expect("metadata set");
1410        assert_eq!(map.get("x-foo").map(String::as_str), Some("1"));
1411        assert_eq!(map.get("x-bar").map(String::as_str), Some("2"));
1412    }
1413
1414    // --- from_remote_url constructor branch ---------------------------
1415
1416    #[tokio::test]
1417    async fn from_remote_url_rejects_s3() {
1418        let result = AzureStore::from_remote_url(&s3_url()).await;
1419        match result {
1420            Err(ObjectStoreError::Other(_)) => {}
1421            Err(other) => panic!("expected ObjectStoreError::Other, got {other:?}"),
1422            Ok(_) => panic!("expected S3 URL to be rejected"),
1423        }
1424    }
1425
1426    // --- HTTP transport tuning (#26 / #28) ----------------------------
1427
1428    /// Pin the timeout values. A future copy-paste mistake (`from_millis`
1429    /// instead of `from_secs`, an accidental zero) silently disables
1430    /// the very behaviour these constants exist for; fail fast instead.
1431    /// If the constants are deliberately changed, update the expected
1432    /// values on the right-hand side together — the test exists to make
1433    /// such changes deliberate, not to lock the values forever.
1434    #[test]
1435    fn transport_timeout_constants_have_expected_values() {
1436        assert_eq!(POOL_IDLE_TIMEOUT, Duration::from_secs(30));
1437        assert_eq!(TCP_KEEPALIVE, Duration::from_secs(30));
1438        assert_eq!(CONNECT_TIMEOUT, Duration::from_secs(10));
1439        assert_eq!(READ_TIMEOUT, Duration::from_secs(30));
1440    }
1441
1442    #[test]
1443    fn build_http_client_succeeds() {
1444        build_http_client().expect("reqwest client builds with the configured timeouts");
1445    }
1446
1447    /// The meaningful regression check: if a future refactor drops the
1448    /// `transport = Some(...)` line in `build_client_options`, the
1449    /// Azure backend silently reverts to `azure_core`'s default
1450    /// (unbounded) HTTP transport. This test fails when that happens.
1451    /// Also pins the empty-policies invariant on the no-credential
1452    /// branch, so a refactor that injects a fallback policy when
1453    /// `per_try_policy` is `None` is caught.
1454    #[test]
1455    fn build_client_options_installs_custom_transport() {
1456        let resolved = auth::ResolvedCredentials {
1457            token_credential: None,
1458            per_try_policy: None,
1459            sas_signing_key: None,
1460        };
1461        let opts = build_client_options(&resolved).expect("client options build");
1462        assert!(
1463            opts.transport.is_some(),
1464            "ClientOptions::transport must be Some so the SDK uses our \
1465             pool_idle_timeout / tcp_keepalive client (issue #26)",
1466        );
1467        assert!(
1468            opts.per_try_policies.is_empty(),
1469            "no per-try policy was supplied; the helper must not inject \
1470             a fallback signer of its own",
1471        );
1472    }
1473
1474    /// Issue #28's Notes section explicitly calls out: the per-try
1475    /// signing policy must continue to fire after we install a custom
1476    /// transport. The SDK pipeline runs them independently of the
1477    /// transport, but a future refactor that confuses the two fields
1478    /// would silently drop signing — surface that here. The
1479    /// [`Arc::ptr_eq`] check pins identity so a refactor that
1480    /// silently *replaces* the caller's policy with a fresh one
1481    /// (rather than dropping it outright) also fails.
1482    #[test]
1483    fn build_client_options_preserves_per_try_policy() {
1484        // Azurite's published well-known account key — base64-valid
1485        // and safe to embed.
1486        const AZURITE_KEY: &str = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==";
1487        let policy: Arc<dyn azure_core::http::policies::Policy> = Arc::new(
1488            auth::SharedKeySigningPolicy::new("devstoreaccount1", AZURITE_KEY)
1489                .expect("shared-key policy constructs"),
1490        );
1491        let resolved = auth::ResolvedCredentials {
1492            token_credential: None,
1493            per_try_policy: Some(Arc::clone(&policy)),
1494            sas_signing_key: None,
1495        };
1496        let opts = build_client_options(&resolved).expect("client options build");
1497        assert!(opts.transport.is_some(), "transport still wired");
1498        assert_eq!(
1499            opts.per_try_policies.len(),
1500            1,
1501            "exactly one per-try policy is wired",
1502        );
1503        assert!(
1504            Arc::ptr_eq(&policy, &opts.per_try_policies[0]),
1505            "the policy at index 0 must be the same Arc the caller \
1506             supplied — not a fresh policy constructed inside the helper",
1507        );
1508    }
1509
1510    /// Pin the `should_use_multipart` predicate at and around the
1511    /// shared threshold (issue #53).
1512    ///
1513    /// `put_bytes` and `put_path` route through this predicate. The
1514    /// integration test `multipart_put_emits_per_block_progress_events`
1515    /// covers the dispatch *call* (only multipart emits per-block
1516    /// events). This unit test pins the predicate's boundary semantics
1517    /// so the constant can't be moved out from under that test
1518    /// without something failing. The Azure backend uses the same
1519    /// shared `MULTIPART_PUT_THRESHOLD` as S3 so a future refactor
1520    /// cannot accidentally raise the threshold for one backend alone.
1521    #[test]
1522    fn should_use_multipart_pins_threshold_boundary() {
1523        use super::super::multipart::MULTIPART_PUT_THRESHOLD;
1524        assert!(!should_use_multipart(MULTIPART_PUT_THRESHOLD - 1));
1525        assert!(should_use_multipart(MULTIPART_PUT_THRESHOLD));
1526        assert!(should_use_multipart(MULTIPART_PUT_THRESHOLD + 1));
1527        assert!(should_use_multipart(6 * (1 << 30)));
1528    }
1529
1530    /// Pin `block_id_for(idx)` so two parts can never collide on the
1531    /// same block ID, and so all IDs in a single `commit_block_list`
1532    /// share a length pre-base64 (Azure's hard requirement).
1533    #[test]
1534    fn block_id_for_is_unique_and_uniform_length() {
1535        let id_a = block_id_for(0);
1536        let id_b = block_id_for(1);
1537        let id_c = block_id_for(99_999);
1538        assert_eq!(id_a.len(), id_b.len(), "all IDs share length");
1539        assert_eq!(id_a.len(), id_c.len(), "even at the upper end");
1540        assert_ne!(id_a, id_b, "two parts get distinct IDs");
1541        // 32 ASCII bytes accommodates up to 10^32 parts — vastly above
1542        // [`AZURE_MAX_BLOCKS`] = 50 000.
1543        assert_eq!(id_a.len(), 32);
1544    }
1545}