git_remote_object_store/object_store/azure.rs
1//! Azure Blob Storage backend for the [`ObjectStore`] trait.
2//!
3//! [`ObjectStore`]: super::ObjectStore
4//!
5//! `AzureStore` wraps `azure_storage_blob`. Like the S3 backend, this
6//! module owns the URL → SDK config translation, the error-code
7//! classifier ([`classify`]), and the credential resolution plumbing.
8//! Unlike S3, the SDK already does parallel range downloads inside
9//! `BlobClient::download()`, so there is no hand-rolled multipart
10//! orchestrator (asymmetric with S3 by design).
11//!
12//! ## Authentication
13//!
14//! The official `azure_storage_blob` 0.12 crate currently exposes only
15//! `Arc<dyn TokenCredential>` (Entra ID) on its constructors. Azurite
16//! does not implement Entra ID without an `--oauth basic` HTTPS setup,
17//! and many production accounts still authenticate with shared keys.
18//! To bridge both, we install our own [`auth::SharedKeySigningPolicy`]
19//! as a per-try [`azure_core::http::policies::Policy`] and pass `None`
20//! for the SDK's `credential` parameter. The SDK then forwards every
21//! request through our policy, which signs the request using the Azure
22//! Storage shared-key v2 scheme. Tracking issue:
23//! `Azure/azure-sdk-for-rust#2975`.
24//!
25//! Resolution order for `?credential=<NAME>` in the URL:
26//!
27//! 1. `AZSTORE_<NAME>_KEY` — base64 account key → shared-key signing.
28//! 2. `AZSTORE_<NAME>_CONNECTION_STRING` — connection string with
29//! `AccountName=` / `AccountKey=` → shared-key signing.
30//! 3. `AZSTORE_<NAME>_SAS` — SAS query string appended verbatim to
31//! every outgoing request URL.
32//!
33//! When no `?credential=` flag is set we fall back to
34//! `azure_identity::DeveloperToolsCredential` (env, workload identity,
35//! managed identity, Azure CLI, ...).
36//!
37//! ## Conditional writes
38//!
39//! [`put_if_absent`][super::ObjectStore::put_if_absent] uses
40//! `If-None-Match: "*"` (the SDK's
41//! `BlockBlobClientUploadOptions::with_if_not_exists` convenience).
42//! Azure returns 409 (`BlobAlreadyExists`) or 412
43//! (`ConditionNotMet`) for the contention case; both collapse to
44//! `Ok(false)`.
45//!
46//! ## Atomic `get_to_file`
47//!
48//! Identical to the S3 path: `head` → tempfile → `download(if_match)` →
49//! persist. The SDK's `download()` aggregates parallel range fetches
50//! internally, so no per-chunk semaphore here. A single retry with a
51//! fresh `ETag` covers the head-then-`GET` race (412 mid-download).
52//!
53//! ## `copy(src, dst)`
54//!
55//! `azure_storage_blob` 0.12 does not expose a `BlobClient::copy_from_url`
56//! method (only `BlockBlobClient::upload_blob_from_url`, which requires
57//! a SAS-tokened source URL or an `x-ms-copy-source-authorization`
58//! header — neither integrates cleanly with our credential model). We
59//! implement `copy` as a stream-through-tempfile round trip:
60//! `get_to_file` writes `src` to a `NamedTempFile`, then `put_path`
61//! uploads it to `dst`. Both legs already stream — `get_to_file`
62//! consumes the SDK's chunked download into the file without buffering
63//! the body, and `put_path` switches to our explicit
64//! `stage_block` + `commit_block_list` orchestrator (see
65//! [`AzureStore::multipart_put_path`]) once the body crosses
66//! [`super::multipart::MULTIPART_PUT_THRESHOLD`]. Peak in-flight bytes
67//! are bounded by
68//! [`super::multipart::MULTIPART_PUT_MAX_CONCURRENCY`] ×
69//! [`super::multipart::MULTIPART_PUT_PART_SIZE`] regardless of blob
70//! size, which matters for `manage doctor`'s duplicate-bundle
71//! quarantine path ([`crate::manage::doctor::Doctor::evict_losing_bundle`])
72//! — that path can copy multi-GiB bundles. Zero-byte lock files still
73//! round-trip fast: `get_to_file` short-circuits the GET on `size == 0`
74//! and `put_path` issues a single zero-byte `Put Blob`. Body is
75//! preserved; user metadata is not propagated, matching the S3 backend's
76//! `CopyObject` path which similarly carries only body bytes.
77//!
78//! This is asymmetric with the S3 backend, which uses `CopyObject` for
79//! a true server-side copy — Azure's equivalent (`Copy Blob`,
80//! `Put Blob From URL`) requires a SAS-signed source URL or an
81//! `x-ms-copy-source-authorization` header that the 0.12 SDK does not
82//! ergonomically expose. The download+reupload path is the safe
83//! correct fallback until the SDK closes that gap.
84//!
85//! ## A note on `Range` and zero-byte blobs
86//!
87//! A `Range` request against a zero-byte blob returns HTTP 416. We
88//! never issue Range requests directly — `BlobClient::download()`
89//! owns that — but the zero-size short-circuit in
90//! [`get_to_file`](ObjectStore::get_to_file) also avoids any download
91//! SDK call against a known-empty blob, which sidesteps the issue
92//! entirely.
93//!
94//! ## Size limits
95//!
96//! Azure caps a block blob at 50 000 committed blocks (~4.75 TiB at
97//! the SDK's default block size) and a single `Put Blob` body at
98//! 5000 MiB; above [`super::multipart::MULTIPART_PUT_THRESHOLD`] the
99//! helper switches to explicit `stage_block` + `commit_block_list`,
100//! so callers do not have to reason about the single-call cutoff.
101//! The upload path is **not resumable** across process death — see
102//! the README "Known limitations" section.
103//!
104//! ## HTTP transport tuning
105//!
106//! `azure_core` 0.35's default transport keeps idle pooled connections
107//! forever and never sets TCP keepalive, so a pooled connection to a
108//! rotated VIP would hang an in-flight request until the OS-level TCP
109//! retransmit timeout fires (~15 minutes on Linux). [`AzureStore`]
110//! installs a custom [`reqwest::Client`] via [`Transport`] on
111//! [`ClientOptions::transport`] with four bounds:
112//!
113//! - [`POOL_IDLE_TIMEOUT`] (30 s) — drops idle pooled connections
114//! before a typical DNS rotation makes them stale.
115//! - [`TCP_KEEPALIVE`] (30 s) — detects a dead-but-not-closed TCP
116//! session in seconds rather than the 2-hour Linux default; covers
117//! *hot* pooled connections that pool-idle alone cannot.
118//! - [`CONNECT_TIMEOUT`] (10 s) — bounds a fresh-connect attempt to
119//! a dead VIP rather than waiting on the OS connect timeout.
120//! - [`READ_TIMEOUT`] (30 s) — per-read timeout that resets after a
121//! successful read, so a stuck transfer fails fast without limiting
122//! total body size.
123//!
124//! Together these cap a DNS-rotation hang at tens of seconds rather
125//! than minutes. The custom transport leaves
126//! [`ClientOptions::per_try_policies`] (where the shared-key signing
127//! lives) untouched — the SDK pipeline runs per-try policies
128//! independently of the transport. Tracking issue: #26.
129//!
130//! ## Stdout discipline
131//!
132//! Per `.claude/rules/protocol-stdout.md`, this module never writes to
133//! stdout. Diagnostics go through `tracing` (which the helper binaries
134//! configure to write to stderr).
135
136pub mod auth;
137pub(crate) mod sas;
138
139use std::path::Path;
140use std::sync::Arc;
141use std::time::Duration;
142
143use azure_core::http::headers::{HeaderName, Headers};
144use azure_core::http::request::RequestContent;
145use azure_core::http::{ClientOptions, Transport};
146use azure_storage_blob::clients::{
147 BlobClient, BlobContainerClient, BlobContainerClientOptions, BlockBlobClient,
148};
149use azure_storage_blob::models::method_options::BlockBlobClientUploadOptions;
150use azure_storage_blob::models::{
151 BlobClientDeleteOptions, BlobClientDownloadOptions, BlobClientGetPropertiesOptions,
152 BlobContainerClientListBlobsOptions, BlockBlobClientCommitBlockListOptions, BlockLookupList,
153};
154use azure_storage_blob::stream::tokio::FileStream;
155use bytes::Bytes;
156use futures::StreamExt;
157use tempfile::NamedTempFile;
158use time::OffsetDateTime;
159use tokio::io::AsyncWriteExt;
160use tokio::sync::Semaphore;
161use tokio::task::JoinSet;
162use url::Url;
163
164use crate::url::{AzureAddressing, RemoteUrl};
165
166use super::error::{network_boxed, other_boxed};
167use super::multipart::{
168 AZURE_MAX_BLOCKS, MULTIPART_PUT_MAX_CONCURRENCY, MULTIPART_PUT_PART_SIZE, UploadPart,
169 plan_upload_parts, read_file_part, should_use_multipart, slice_bytes_part,
170};
171use super::{
172 GetOpts, ObjectMeta, ObjectStore, ObjectStoreError, ProgressSink, PutOpts, persist_temp,
173};
174
175/// Azure Blob's hard ceiling on a single Put Blob body for the wire
176/// versions we negotiate (2019-12-12+). Reported in
177/// [`ObjectStoreError::PayloadTooLarge`] when the SDK surfaces HTTP 413
178/// or `RequestBodyTooLarge`, so the wire-line names a concrete number
179/// rather than dumping an opaque SDK chain.
180pub(crate) const SINGLE_PUT_BLOB_LIMIT_BYTES: u64 = 5_000 * (1 << 20);
181
182/// Bound on how long an idle pooled HTTPS connection lingers before
183/// the [`reqwest`] connection pool drops it. Short enough that DNS
184/// rotation rarely hits a stale pooled connection; long enough that
185/// bursty fetch / push batches still benefit from connection reuse.
186/// See module-level "HTTP transport tuning" docs and issue #26.
187pub(crate) const POOL_IDLE_TIMEOUT: Duration = Duration::from_secs(30);
188
189/// TCP keepalive interval for the custom [`reqwest`] transport.
190/// Detects dead-but-not-closed sessions in seconds rather than the
191/// 2-hour Linux default. See module-level "HTTP transport tuning"
192/// docs and issue #26.
193pub(crate) const TCP_KEEPALIVE: Duration = Duration::from_secs(30);
194
195/// Bound on a fresh TCP-connect attempt. `reqwest` defaults to no
196/// connect timeout, so an unreachable IP would otherwise wait on the
197/// OS-level connect timeout (~75 s on Linux defaults). 10 s is
198/// comfortable for an in-region or even cross-region handshake while
199/// failing fast on a dead VIP. See module-level "HTTP transport
200/// tuning" docs and issue #26.
201pub(crate) const CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
202
203/// Per-read timeout for the custom [`reqwest`] transport. Resets after
204/// each successful read, so it caps how long a stuck connection can
205/// hold a transfer without limiting total body size. Sized to match
206/// [`POOL_IDLE_TIMEOUT`] / [`TCP_KEEPALIVE`] so a single rotation
207/// budget covers all three knobs. See module-level "HTTP transport
208/// tuning" docs and issue #26.
209pub(crate) const READ_TIMEOUT: Duration = Duration::from_secs(30);
210
211/// Production [`ObjectStore`] backed by `azure_storage_blob`.
212pub struct AzureStore {
213 container: BlobContainerClient,
214 /// Container name as parsed from the URL — needed by SAS-token
215 /// construction (issue #76) because the SDK's
216 /// `BlobContainerClient::container_name()` is private. Held
217 /// regardless of credential type so the field shape doesn't
218 /// branch on whether SAS is reachable.
219 container_name: String,
220 /// Storage-key material for service-blob SAS generation
221 /// ([`presigned_get_url`](ObjectStore::presigned_get_url)).
222 /// `Some` when the credential alias resolves to a shared
223 /// account key (KEY env var or connection string); `None` for
224 /// SAS-env-var or Entra-ID paths, which return
225 /// [`ObjectStoreError::Unsupported`] for presigning.
226 sas_signing: Option<auth::SasSigningKey>,
227}
228
229impl std::fmt::Debug for AzureStore {
230 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
231 // `BlobContainerClient` is opaque (private fields, no `Debug`);
232 // surface the endpoint instead so error / log lines remain
233 // useful.
234 f.debug_struct("AzureStore")
235 .field("endpoint", &self.container.endpoint().as_str())
236 .field("container", &self.container_name)
237 .field("sas_signing", &self.sas_signing)
238 .finish()
239 }
240}
241
242impl AzureStore {
243 /// Build an `AzureStore` from a parsed [`RemoteUrl`].
244 ///
245 /// Like the S3 backend, the [`RemoteUrl::Azure::prefix`] field is
246 /// intentionally **not** consumed here; callers compose it into keys
247 /// themselves.
248 ///
249 /// Marked `async` for symmetry with `S3Store::from_remote_url`,
250 /// which awaits the AWS provider chain. The Azure path resolves
251 /// credentials synchronously today; the signature stays `async` so
252 /// future credential providers (e.g. one that fetches an OIDC
253 /// token at construction) can plug in without breaking callers.
254 ///
255 /// # Errors
256 ///
257 /// Returns [`ObjectStoreError::Other`] if `url` is not the Azure
258 /// variant or if credential resolution fails.
259 #[allow(clippy::unused_async)]
260 pub async fn from_remote_url(url: &RemoteUrl) -> Result<Self, ObjectStoreError> {
261 let RemoteUrl::Azure {
262 endpoint,
263 account,
264 container,
265 addressing,
266 flags,
267 ..
268 } = url
269 else {
270 return Err(ObjectStoreError::Other(
271 format!("AzureStore::from_remote_url called with non-Azure URL: {url}").into(),
272 ));
273 };
274
275 let account_url = build_account_url(endpoint, account, *addressing);
276 let resolved = auth::resolve(account, flags)?;
277 let sas_signing = resolved.sas_signing_key.clone();
278
279 let client_options = build_client_options(&resolved)?;
280
281 let container_options = BlobContainerClientOptions {
282 client_options,
283 ..Default::default()
284 };
285
286 let container_client = BlobContainerClient::new(
287 &account_url,
288 container,
289 resolved.token_credential,
290 Some(container_options),
291 )
292 .map_err(other_boxed)?;
293
294 Ok(Self {
295 container: container_client,
296 container_name: container.clone(),
297 sas_signing,
298 })
299 }
300
301 /// Construct a [`BlobClient`] for an individual blob.
302 fn blob_client(&self, key: &str) -> BlobClient {
303 self.container.blob_client(key)
304 }
305
306 /// Verify the container is reachable with the configured credentials
307 /// by listing one blob (`maxresults=1`) and consuming only the first
308 /// page of results. Used by [`crate::protocol::backend::build`] to
309 /// fold credential / missing-container / authorization failures into
310 /// categorical [`crate::protocol::backend::BackendError`] variants
311 /// before the helper REPL runs its first command. Counterpart to
312 /// [`crate::object_store::s3::S3Store::probe`].
313 pub(crate) async fn probe(&self, prefix: &str) -> Result<(), ObjectStoreError> {
314 // Pass `None` for an empty prefix per the same Azurite quirk
315 // documented at the top of `list` above: a signed empty prefix
316 // returns 403 from Azurite.
317 let prefix_opt = (!prefix.is_empty()).then(|| prefix.to_owned());
318 let opts = BlobContainerClientListBlobsOptions {
319 prefix: prefix_opt,
320 maxresults: Some(1),
321 ..Default::default()
322 };
323 let mut pages = self
324 .container
325 .list_blobs(Some(opts))
326 .map_err(|e| classify(e, prefix))?
327 .into_pages();
328 // Consume only the first page: probing does not need the full
329 // listing — we only care that the request succeeded.
330 if let Some(page_result) = pages.next().await {
331 page_result.map_err(|e| classify(e, prefix))?;
332 }
333 Ok(())
334 }
335}
336
337/// Build the [`reqwest::Client`] used by [`AzureStore`]'s custom
338/// [`Transport`].
339///
340/// Bounds the connection pool's idle window, enables TCP keepalive,
341/// and sets connect / per-read timeouts so a rotated VIP cannot wedge
342/// a long-running session (see [`POOL_IDLE_TIMEOUT`] / [`TCP_KEEPALIVE`]
343/// / [`CONNECT_TIMEOUT`] / [`READ_TIMEOUT`] for rationale). Returns
344/// [`ObjectStoreError::Other`] if the TLS / DNS resolver layer fails
345/// to initialise, which the SDK would otherwise surface as a cryptic
346/// per-request error.
347pub(crate) fn build_http_client() -> Result<Arc<reqwest::Client>, ObjectStoreError> {
348 reqwest::Client::builder()
349 .pool_idle_timeout(POOL_IDLE_TIMEOUT)
350 .tcp_keepalive(TCP_KEEPALIVE)
351 .connect_timeout(CONNECT_TIMEOUT)
352 .read_timeout(READ_TIMEOUT)
353 .build()
354 .map(Arc::new)
355 .map_err(other_boxed)
356}
357
358/// Build the [`ClientOptions`] [`AzureStore`] hands to the SDK.
359///
360/// Installs the custom [`Transport`] (see [`build_http_client`]) and
361/// preserves the credential resolver's per-try signing policy. The
362/// helper is split out (rather than inlined into [`AzureStore::from_remote_url`])
363/// so unit tests can assert that both invariants hold without
364/// constructing a real `BlobContainerClient`.
365pub(crate) fn build_client_options(
366 resolved: &auth::ResolvedCredentials,
367) -> Result<ClientOptions, ObjectStoreError> {
368 let mut opts = ClientOptions {
369 transport: Some(Transport::new(build_http_client()?)),
370 ..Default::default()
371 };
372 if let Some(policy) = &resolved.per_try_policy {
373 opts.per_try_policies.push(Arc::clone(policy));
374 }
375 Ok(opts)
376}
377
378/// Construct the account-level endpoint URL the SDK constructors expect.
379///
380/// The SDK takes a separate `container_name` argument, so we strip the
381/// container (and any prefix segments) from the parsed URL. For
382/// virtual-hosted addressing the path becomes `/`; for path-style
383/// addressing (Azurite, custom endpoints) the path becomes `/<account>`.
384pub(crate) fn build_account_url(
385 endpoint: &Url,
386 account: &str,
387 addressing: AzureAddressing,
388) -> String {
389 let mut rewritten = endpoint.clone();
390 rewritten.set_query(None);
391 rewritten.set_fragment(None);
392 let path = match addressing {
393 AzureAddressing::VirtualHosted => "/".to_owned(),
394 AzureAddressing::PathStyle => format!("/{account}"),
395 };
396 rewritten.set_path(&path);
397 rewritten.to_string()
398}
399
400/// Map an [`azure_core::Error`] into the trait's [`ObjectStoreError`] enum.
401///
402/// `key` is the operation's key/prefix context; it appears in the
403/// resulting [`ObjectStoreError::NotFound`] / [`ObjectStoreError::AccessDenied`] /
404/// [`ObjectStoreError::PreconditionFailed`] / [`ObjectStoreError::Conflict`] payload.
405fn classify(err: azure_core::Error, key: &str) -> ObjectStoreError {
406 if let azure_core::error::ErrorKind::HttpResponse {
407 status, error_code, ..
408 } = err.kind()
409 && let Some(mapped) =
410 classify_status_and_code(u16::from(*status), error_code.as_deref(), key)
411 {
412 return mapped;
413 }
414 if matches!(err.kind(), azure_core::error::ErrorKind::Io) {
415 return network_boxed(err);
416 }
417 other_boxed(err)
418}
419
420/// Pure status/code classifier (key context, no SDK types) so unit
421/// tests can exercise every branch without synthesising an SDK error.
422fn classify_status_and_code(
423 status: u16,
424 code: Option<&str>,
425 key: &str,
426) -> Option<ObjectStoreError> {
427 match status {
428 404 => return Some(ObjectStoreError::NotFound(key.to_owned())),
429 403 => return Some(ObjectStoreError::AccessDenied(key.to_owned())),
430 412 => return Some(ObjectStoreError::PreconditionFailed(key.to_owned())),
431 409 => return Some(ObjectStoreError::Conflict(key.to_owned())),
432 // Azure surfaces a Put Blob body over the single-PUT ceiling as
433 // HTTP 413 with code `RequestBodyTooLarge`; the status alone is
434 // sufficient (HTTP 413 is the canonical "Payload Too Large").
435 413 => {
436 return Some(ObjectStoreError::PayloadTooLarge {
437 limit_bytes: SINGLE_PUT_BLOB_LIMIT_BYTES,
438 });
439 }
440 _ => {}
441 }
442 // Defensive backstop for the (rare) case where the SDK exposes the
443 // service code without a 413 status: route on the code alone.
444 match code {
445 Some("RequestBodyTooLarge") => Some(ObjectStoreError::PayloadTooLarge {
446 limit_bytes: SINGLE_PUT_BLOB_LIMIT_BYTES,
447 }),
448 _ => None,
449 }
450}
451
452/// Convert the relevant `Get Blob Properties` headers into the trait's
453/// [`ObjectMeta`].
454///
455/// Extracted so unit tests can drive the missing-content-length and
456/// missing-last-modified guard branches without synthesising a full
457/// `BlobClientGetPropertiesResultHeaders` value.
458///
459/// A missing `Content-Length` is an error rather than silent zero: a
460/// 0-byte size is semantically meaningful (lock files are intentionally
461/// empty) and downstream `head_then_download` takes a fast path on
462/// `size == 0` that writes an empty destination file. Treating "header
463/// absent" as 0 would silently produce empty bundles instead of
464/// surfacing the malformed response.
465fn properties_to_meta(
466 key: &str,
467 content_length: Option<u64>,
468 last_modified: Option<OffsetDateTime>,
469 etag: Option<&str>,
470) -> Result<ObjectMeta, ObjectStoreError> {
471 let size = content_length.ok_or_else(|| {
472 ObjectStoreError::Other(
473 format!("get_properties on `{key}` returned no content-length").into(),
474 )
475 })?;
476 let last_modified = last_modified.ok_or_else(|| {
477 ObjectStoreError::Other(
478 format!("get_properties on `{key}` returned no last-modified").into(),
479 )
480 })?;
481 Ok(ObjectMeta {
482 key: key.to_owned(),
483 size,
484 last_modified,
485 etag: etag.map(str::to_owned),
486 })
487}
488
489/// Convert a `BlobItem`-shaped record into the trait's [`ObjectMeta`].
490///
491/// Extracted so unit tests can drive the missing-field guards without
492/// synthesising a full `ListBlobsResponse`.
493fn item_to_meta(
494 name: Option<&str>,
495 content_length: Option<u64>,
496 last_modified: Option<OffsetDateTime>,
497 etag: Option<&str>,
498) -> Result<ObjectMeta, ObjectStoreError> {
499 let key = name
500 .ok_or_else(|| ObjectStoreError::Other("list_blobs returned a blob without a name".into()))?
501 .to_owned();
502 let size = content_length.unwrap_or(0);
503 let last_modified = last_modified.ok_or_else(|| {
504 ObjectStoreError::Other(
505 format!("list_blobs returned blob `{key}` without last_modified").into(),
506 )
507 })?;
508 Ok(ObjectMeta {
509 key,
510 size,
511 last_modified,
512 etag: etag.map(str::to_owned),
513 })
514}
515
516#[async_trait::async_trait]
517impl ObjectStore for AzureStore {
518 async fn list(&self, prefix: &str) -> Result<Vec<ObjectMeta>, ObjectStoreError> {
519 // Pass `None` for an empty prefix: Azure list_blobs URL-encodes
520 // `prefix=` and Azurite signs an empty value differently than
521 // an absent one (treats it as a tampered query and returns
522 // 403). Skipping the parameter is the wire-equivalent of "no
523 // prefix filter" anyway.
524 let prefix_opt = (!prefix.is_empty()).then(|| prefix.to_owned());
525 let opts = BlobContainerClientListBlobsOptions {
526 prefix: prefix_opt,
527 ..Default::default()
528 };
529 let mut pages = self
530 .container
531 .list_blobs(Some(opts))
532 .map_err(|e| classify(e, prefix))?
533 .into_pages();
534
535 let mut out = Vec::new();
536 while let Some(page_result) = pages.next().await {
537 let response = page_result.map_err(|e| classify(e, prefix))?;
538 let body = response
539 .into_body()
540 .xml::<azure_storage_blob::models::ListBlobsResponse>()
541 .map_err(|e| classify(e, prefix))?;
542 for item in body.segment.blob_items {
543 let props = item.properties.unwrap_or_default();
544 let meta = item_to_meta(
545 item.name.as_deref(),
546 props.content_length,
547 props.last_modified,
548 // Listing omits ETag for parity with S3 (avoid
549 // inflating per-object metadata for callers that
550 // only need a key/size enumeration).
551 None,
552 )?;
553 out.push(meta);
554 }
555 }
556 Ok(out)
557 }
558
559 async fn get_to_file(
560 &self,
561 key: &str,
562 dest: &Path,
563 opts: GetOpts,
564 ) -> Result<(), ObjectStoreError> {
565 let parent = dest.parent().ok_or_else(|| {
566 ObjectStoreError::Other(
567 format!("destination `{}` has no parent directory", dest.display()).into(),
568 )
569 })?;
570
571 // Mirror S3: try once, retry once on 412 (the head→GET race).
572 // After the second attempt any error — including a repeated
573 // 412 — propagates.
574 let progress = opts.progress.as_ref();
575 match self.head_then_download(key, dest, parent, progress).await {
576 Err(ObjectStoreError::PreconditionFailed(_)) => {
577 tracing::warn!(key, "blob changed between head and GET; retrying");
578 self.head_then_download(key, dest, parent, progress).await
579 }
580 other => other,
581 }
582 }
583
584 async fn get_bytes(&self, key: &str) -> Result<Bytes, ObjectStoreError> {
585 let blob = self.blob_client(key);
586 let result = blob.download(None).await.map_err(|e| classify(e, key))?;
587 let bytes = result.body.collect().await.map_err(network_boxed)?;
588 Ok(bytes)
589 }
590
591 /// Issue a Get Blob with a `Range<usize>` covering `[start, end)`.
592 /// HTTP 416 maps to [`ObjectStoreError::RangeNotSatisfiable`] with
593 /// the original `Range<u64>` so the wire-line names what the
594 /// caller asked for. All other failures route through [`classify`].
595 ///
596 /// The Azure SDK exposes `BlobClientDownloadOptions::range` as
597 /// `Option<Range<usize>>`. `usize` is at least 64 bits on every
598 /// supported target, so casting from `u64` is lossless; the cast
599 /// is documented here so a future 32-bit port surfaces as a
600 /// compile error rather than silent truncation.
601 ///
602 /// Azure silently truncates a ranged GET to EOF when the requested
603 /// range overruns the blob — `start < body.len() <= end` returns
604 /// `start..body.len()` bytes with HTTP 206 and no error. The
605 /// post-flight length check via [`super::verify_range_response_length`]
606 /// elevates that mismatch to [`ObjectStoreError::RangeNotSatisfiable`]
607 /// so callers (notably the packchain reader) cannot mistake a
608 /// truncated slice for the full requested range.
609 async fn get_bytes_range(
610 &self,
611 key: &str,
612 range: std::ops::Range<u64>,
613 ) -> Result<Bytes, ObjectStoreError> {
614 // Compile-time guarantee: every supported target has a 64-bit
615 // usize, so the `u64 → usize` conversions below cannot
616 // truncate. A future 32-bit port surfaces as a build break,
617 // not silent corruption.
618 const _USIZE_AT_LEAST_64_BIT: () =
619 assert!(usize::BITS >= 64, "Azure backend requires 64-bit usize");
620
621 if let Some(empty) = super::precheck_range(key, &range)? {
622 return Ok(empty);
623 }
624 let sdk_start = usize::try_from(range.start).expect("invariant: usize is at least 64 bits");
625 let sdk_end = usize::try_from(range.end).expect("invariant: usize is at least 64 bits");
626 let opts = BlobClientDownloadOptions {
627 range: Some(sdk_start..sdk_end),
628 ..Default::default()
629 };
630 let blob = self.blob_client(key);
631 let result = match blob.download(Some(opts)).await {
632 Ok(result) => result,
633 Err(err) => {
634 if let azure_core::error::ErrorKind::HttpResponse { status, .. } = err.kind()
635 && u16::from(*status) == 416
636 {
637 return Err(ObjectStoreError::RangeNotSatisfiable {
638 key: key.to_owned(),
639 requested: range,
640 });
641 }
642 return Err(classify(err, key));
643 }
644 };
645 let bytes = result.body.collect().await.map_err(network_boxed)?;
646 super::verify_range_response_length(key, &range, bytes)
647 }
648
649 async fn put_bytes(
650 &self,
651 key: &str,
652 body: Bytes,
653 opts: PutOpts,
654 ) -> Result<(), ObjectStoreError> {
655 // Same threshold as the S3 backend: above
656 // [`MULTIPART_PUT_THRESHOLD`] use explicit `stage_block` +
657 // `commit_block_list` so each block has its own retry budget,
658 // predictable concurrency, and per-block progress events. Below
659 // the threshold keep the single `Put Blob` round trip. Issue #53.
660 let size = body.len() as u64;
661 if should_use_multipart(size) {
662 return self.multipart_put_bytes(key, body, size, opts).await;
663 }
664 let progress = opts.progress.clone();
665 let blob = self.blob_client(key);
666 let upload_opts = upload_options_from(opts);
667 blob.upload(bytes_to_request_content(body), Some(upload_opts))
668 .await
669 .map_err(|e| classify(e, key))?;
670 if let Some(sink) = progress
671 && size > 0
672 {
673 sink.report(size);
674 }
675 Ok(())
676 }
677
678 /// Stream a local file to `key` without buffering its full body.
679 ///
680 /// Above [`super::multipart::MULTIPART_PUT_THRESHOLD`] this routes through explicit
681 /// `stage_block` + `commit_block_list`, paralleling the S3 backend
682 /// (issue #53). Below the threshold the single `Put Blob` path
683 /// preserves the one-round-trip cost for small bundles and lock
684 /// files.
685 ///
686 /// On the multipart path each task opens its own
687 /// `tokio::fs::File`, seeks to its part offset, reads the part
688 /// into a `Bytes`, then calls `BlockBlobClient::stage_block`. With
689 /// `MULTIPART_PUT_MAX_CONCURRENCY = 8` and
690 /// `MULTIPART_PUT_PART_SIZE = 16 MiB`, peak memory is bounded at
691 /// 128 MiB regardless of file size.
692 ///
693 /// On the single-PUT path we wrap `tokio::fs::File` in
694 /// [`FileStream`] so the body is delivered as
695 /// `Body::SeekableStream`. The per-try signing policy reads
696 /// `request.body().len()`, which `SeekableStream` reports faithfully
697 /// via `len()`.
698 async fn put_path(&self, key: &str, src: &Path, opts: PutOpts) -> Result<(), ObjectStoreError> {
699 // Open the file once and read size from the open handle. This
700 // closes the metadata/upload race that would let a concurrent
701 // truncate or rename produce a body whose length disagrees
702 // with the size we used for multipart planning.
703 let file = tokio::fs::File::open(src).await.map_err(other_boxed)?;
704 let body_len = file.metadata().await.map_err(other_boxed)?.len();
705 if should_use_multipart(body_len) {
706 return self.multipart_put_path(key, file, body_len, opts).await;
707 }
708 // Below the threshold: single `Put Blob`. Wrap our already-
709 // open handle in `FileStream`; the SDK does not re-open by
710 // path (which would re-introduce the race).
711 let stream = FileStream::builder(file)
712 .build()
713 .await
714 .map_err(other_boxed)?;
715 let body: azure_core::http::Body = stream.into();
716
717 let blob = self.blob_client(key);
718 let progress = opts.progress.clone();
719 let upload_opts = upload_options_from(opts);
720 blob.upload(body.into(), Some(upload_opts))
721 .await
722 .map_err(|e| classify(e, key))?;
723 if let Some(sink) = progress
724 && body_len > 0
725 {
726 sink.report(body_len);
727 }
728 Ok(())
729 }
730
731 async fn put_if_absent(&self, key: &str, body: Bytes) -> Result<bool, ObjectStoreError> {
732 let blob = self.blob_client(key);
733 let upload_opts = BlockBlobClientUploadOptions::default().with_if_not_exists();
734 let resp = blob
735 .upload(bytes_to_request_content(body), Some(upload_opts))
736 .await;
737 match resp.map_err(|e| classify(e, key)) {
738 Ok(_) => Ok(true),
739 Err(ObjectStoreError::PreconditionFailed(_) | ObjectStoreError::Conflict(_)) => {
740 Ok(false)
741 }
742 Err(other) => Err(other),
743 }
744 }
745
746 async fn head(&self, key: &str) -> Result<ObjectMeta, ObjectStoreError> {
747 let blob = self.blob_client(key);
748 let resp = blob
749 .get_properties(None::<BlobClientGetPropertiesOptions<'_>>)
750 .await
751 .map_err(|e| classify(e, key))?;
752 let headers = resp.headers();
753 properties_to_meta(
754 key,
755 header_u64(headers, &HeaderName::from_static("content-length")),
756 header_http_date(headers, &HeaderName::from_static("last-modified")),
757 headers.get_optional_str(&HeaderName::from_static("etag")),
758 )
759 }
760
761 async fn copy(&self, src: &str, dst: &str) -> Result<(), ObjectStoreError> {
762 // Server-side copy via `Put Blob From URL` requires a SAS-tokened
763 // source URL or `x-ms-copy-source-authorization`, neither of
764 // which integrates with our credential model in a clean way
765 // for the SDK 0.12 surface. Stream `src` to a temp file via
766 // `get_to_file` (chunked download, no body buffer), then
767 // `put_path` it back to `dst` (block-uploaded for large bodies
768 // via `multipart_put_path`). Peak in-flight bytes are bounded
769 // by `MULTIPART_PUT_MAX_CONCURRENCY` × `MULTIPART_PUT_PART_SIZE`
770 // regardless of blob size — necessary because `manage doctor`'s
771 // duplicate-bundle quarantine path uses `copy()` and bundles
772 // can be multi-GiB.
773 let temp = NamedTempFile::new().map_err(other_boxed)?;
774 // `get_to_file` propagates `NotFound(src)` if the source is
775 // absent — exactly the trait contract for `copy`.
776 self.get_to_file(src, temp.path(), GetOpts::default())
777 .await?;
778 // A NotFound on the upload is destination-side — re-shape it
779 // so callers don't mistake it for "src absent".
780 match self.put_path(dst, temp.path(), PutOpts::default()).await {
781 Ok(()) => Ok(()),
782 Err(ObjectStoreError::NotFound(_)) => Err(ObjectStoreError::Other(
783 format!("copy `{src}` → `{dst}`: upload returned NotFound").into(),
784 )),
785 Err(other) => Err(other),
786 }
787 }
788
789 async fn delete(&self, key: &str) -> Result<(), ObjectStoreError> {
790 let blob = self.blob_client(key);
791 blob.delete(None::<BlobClientDeleteOptions<'_>>)
792 .await
793 .map_err(|e| classify(e, key))?;
794 Ok(())
795 }
796
797 /// Build a service-blob SAS URL for `key` valid for `ttl`.
798 /// Used by the `bundle-uri` capability (issue #76) to advertise
799 /// time-limited download URLs against private containers.
800 ///
801 /// Only the shared-key / connection-string credential paths can
802 /// produce a SAS — the SAS env-var path has no key to re-sign
803 /// with, and the Entra-ID `TokenCredential` path requires
804 /// user-delegation SAS (out of scope per the issue). Both
805 /// fall through to [`ObjectStoreError::Unsupported`].
806 ///
807 /// # Errors
808 ///
809 /// - [`ObjectStoreError::Unsupported`] when the credential is
810 /// not a shared key.
811 /// - [`ObjectStoreError::Other`] when SAS construction fails
812 /// (HMAC init / base64 decode / time overflow).
813 async fn presigned_get_url(
814 &self,
815 key: &str,
816 ttl: std::time::Duration,
817 ) -> Result<String, ObjectStoreError> {
818 let signing = self.sas_signing.as_ref().ok_or_else(|| {
819 ObjectStoreError::Unsupported(
820 "Azure presigned URLs require a shared account key (KEY env var or \
821 connection string); SAS-env-var and Entra-ID credentials cannot \
822 derive per-blob SAS"
823 .to_owned(),
824 )
825 })?;
826 // The SDK's `BlobClient::url()` returns the fully-qualified
827 // blob URL including the container path segment. Reuse it
828 // rather than re-deriving the URL shape per addressing
829 // mode here.
830 let blob = self.blob_client(key);
831 let base = blob.url();
832 sas::build_blob_sas_url(base, &self.container_name, key, signing, ttl)
833 }
834}
835
836impl AzureStore {
837 /// One head→tempfile→download→persist round trip.
838 ///
839 /// Factored out so [`get_to_file`](ObjectStore::get_to_file) can
840 /// invoke it twice: once normally, once more on a 412 retry.
841 async fn head_then_download(
842 &self,
843 key: &str,
844 dest: &Path,
845 parent: &Path,
846 progress: Option<&ProgressSink>,
847 ) -> Result<(), ObjectStoreError> {
848 let meta = self.head(key).await?;
849 let temp = NamedTempFile::new_in(parent).map_err(other_boxed)?;
850 if meta.size == 0 {
851 // Skip the GET entirely for zero-byte blobs (lock files):
852 // `download_streaming` would issue a plain GET for an empty
853 // body — correct but a wasted round trip.
854 return persist_temp(temp, dest);
855 }
856 self.download_streaming(key, temp.path(), meta.etag.as_deref(), progress)
857 .await?;
858 persist_temp(temp, dest)
859 }
860
861 /// Stream a blob body to `temp_path` with optional `If-Match`
862 /// guarding against mid-download mutation. When `progress` is
863 /// `Some`, fires once per SDK body chunk read off the wire.
864 async fn download_streaming(
865 &self,
866 key: &str,
867 temp_path: &Path,
868 etag: Option<&str>,
869 progress: Option<&ProgressSink>,
870 ) -> Result<(), ObjectStoreError> {
871 let blob = self.blob_client(key);
872 let mut opts = BlobClientDownloadOptions::default();
873 if let Some(etag) = etag {
874 opts.if_match = Some(etag.to_owned());
875 }
876 let mut result = blob
877 .download(Some(opts))
878 .await
879 .map_err(|e| classify(e, key))?;
880
881 let mut file = tokio::fs::OpenOptions::new()
882 .write(true)
883 .truncate(true)
884 .open(temp_path)
885 .await
886 .map_err(other_boxed)?;
887
888 while let Some(chunk) = result.body.next().await {
889 let bytes = chunk.map_err(network_boxed)?;
890 let chunk_len = bytes.len() as u64;
891 file.write_all(&bytes).await.map_err(other_boxed)?;
892 if let Some(sink) = progress
893 && chunk_len > 0
894 {
895 sink.report(chunk_len);
896 }
897 }
898 file.flush().await.map_err(other_boxed)?;
899 Ok(())
900 }
901
902 /// Drive a multipart upload from a fully-buffered `Bytes` body.
903 ///
904 /// `Bytes::slice` is zero-copy — every block borrows into the same
905 /// underlying allocation, so peak memory equals the caller's body
906 /// rather than `body × blocks`.
907 async fn multipart_put_bytes(
908 &self,
909 key: &str,
910 body: Bytes,
911 size: u64,
912 opts: PutOpts,
913 ) -> Result<(), ObjectStoreError> {
914 let parts = plan_upload_parts(size, MULTIPART_PUT_PART_SIZE, AZURE_MAX_BLOCKS);
915 let progress = opts.progress.clone();
916 let staged = self
917 .stage_blocks_with_bodies(key, &parts, progress, |part| slice_bytes_part(&body, part))
918 .await?;
919 let blob = self.blob_client(key).block_blob_client();
920 commit_block_list(&blob, key, staged, opts).await
921 }
922
923 /// Drive a multipart upload by streaming a local file block-by-block.
924 ///
925 /// All tasks share one `Arc<std::fs::File>`; per-task
926 /// `read_file_part` uses `pread` so reads are concurrent without
927 /// offset contention. Sharing one open file description closes
928 /// the metadata/upload race. With `MULTIPART_PUT_MAX_CONCURRENCY
929 /// = 8` and `MULTIPART_PUT_PART_SIZE = 16 MiB`, peak memory is
930 /// bounded at 128 MiB regardless of file size.
931 async fn multipart_put_path(
932 &self,
933 key: &str,
934 file: tokio::fs::File,
935 size: u64,
936 opts: PutOpts,
937 ) -> Result<(), ObjectStoreError> {
938 let parts = plan_upload_parts(size, MULTIPART_PUT_PART_SIZE, AZURE_MAX_BLOCKS);
939 let progress = opts.progress.clone();
940 let file: Arc<std::fs::File> = Arc::new(file.into_std().await);
941 let staged = self
942 .stage_blocks_from_file(key, file, &parts, progress)
943 .await?;
944 let blob = self.blob_client(key).block_blob_client();
945 commit_block_list(&blob, key, staged, opts).await
946 }
947
948 /// Spawn parallel `stage_block` tasks with bodies sourced from a
949 /// closure (used by `multipart_put_bytes`).
950 ///
951 /// Returns the per-block IDs in part order so the caller can build
952 /// a `BlockLookupList` for `commit_block_list`. On error,
953 /// already-staged blocks are simply not committed and Azure
954 /// auto-expires them after seven days; there is no client-side
955 /// abort call.
956 ///
957 /// `BlockBlobClient` does not implement `Clone`, so each spawned
958 /// task constructs its own via `self.blob_client(...)
959 /// .block_blob_client()`. The container's `blob_client(&self, ..)`
960 /// returns an owned `BlobClient` already (cheap-clone of internal
961 /// `Arc` state), so this stays allocation-light.
962 async fn stage_blocks_with_bodies<F>(
963 &self,
964 key: &str,
965 parts: &[UploadPart],
966 progress: Option<ProgressSink>,
967 make_body: F,
968 ) -> Result<Vec<Vec<u8>>, ObjectStoreError>
969 where
970 F: Fn(UploadPart) -> Result<Bytes, ObjectStoreError>,
971 {
972 let semaphore = Arc::new(Semaphore::new(MULTIPART_PUT_MAX_CONCURRENCY));
973 let mut tasks: JoinSet<Result<(usize, Vec<u8>), ObjectStoreError>> = JoinSet::new();
974 for (idx, part) in parts.iter().enumerate() {
975 let part = *part;
976 let part_index = idx;
977 let block_id = block_id_for(idx);
978 let body = make_body(part)?;
979 let blob = self.blob_client(key).block_blob_client();
980 let key = key.to_owned();
981 let semaphore = Arc::clone(&semaphore);
982 let progress = progress.clone();
983 tasks.spawn(async move {
984 let _permit = semaphore.acquire_owned().await.map_err(other_boxed)?;
985 blob.stage_block(&block_id, part.length, bytes_to_request_content(body), None)
986 .await
987 .map_err(|e| classify(e, &key))?;
988 if let Some(sink) = &progress {
989 sink.report(part.length);
990 }
991 Ok((part_index, block_id))
992 });
993 }
994 join_staged_blocks(tasks, parts.len()).await
995 }
996
997 /// Spawn parallel `stage_block` tasks that each read their
998 /// block from the shared `Arc<std::fs::File>` via `pread`. The
999 /// shared open file description gives every task a stable view
1000 /// of the same inode (used by `multipart_put_path`).
1001 async fn stage_blocks_from_file(
1002 &self,
1003 key: &str,
1004 file: Arc<std::fs::File>,
1005 parts: &[UploadPart],
1006 progress: Option<ProgressSink>,
1007 ) -> Result<Vec<Vec<u8>>, ObjectStoreError> {
1008 let semaphore = Arc::new(Semaphore::new(MULTIPART_PUT_MAX_CONCURRENCY));
1009 let mut tasks: JoinSet<Result<(usize, Vec<u8>), ObjectStoreError>> = JoinSet::new();
1010 for (idx, part) in parts.iter().enumerate() {
1011 let part = *part;
1012 let part_index = idx;
1013 let block_id = block_id_for(idx);
1014 let blob = self.blob_client(key).block_blob_client();
1015 let key = key.to_owned();
1016 let task_file = Arc::clone(&file);
1017 let semaphore = Arc::clone(&semaphore);
1018 let progress = progress.clone();
1019 tasks.spawn(async move {
1020 let _permit = semaphore.acquire_owned().await.map_err(other_boxed)?;
1021 let body = read_file_part(task_file, part).await?;
1022 blob.stage_block(&block_id, part.length, bytes_to_request_content(body), None)
1023 .await
1024 .map_err(|e| classify(e, &key))?;
1025 if let Some(sink) = &progress {
1026 sink.report(part.length);
1027 }
1028 Ok((part_index, block_id))
1029 });
1030 }
1031 join_staged_blocks(tasks, parts.len()).await
1032 }
1033}
1034
1035/// Build a deterministic Azure block ID for the `idx`-th part
1036/// (zero-indexed).
1037///
1038/// Azure requires that all block IDs in a single
1039/// `commit_block_list` request share a length pre-base64. 32 bytes
1040/// of zero-padded ASCII digits accommodates up to 10^32 parts —
1041/// vastly above [`AZURE_MAX_BLOCKS`] = 50 000.
1042fn block_id_for(idx: usize) -> Vec<u8> {
1043 format!("{:032}", idx + 1).into_bytes()
1044}
1045
1046/// Drain a `JoinSet` of `stage_block` tasks into a Vec of block IDs
1047/// indexed by part order. Short-circuits on the first error.
1048async fn join_staged_blocks(
1049 mut tasks: JoinSet<Result<(usize, Vec<u8>), ObjectStoreError>>,
1050 expected: usize,
1051) -> Result<Vec<Vec<u8>>, ObjectStoreError> {
1052 let mut staged: Vec<Option<Vec<u8>>> = (0..expected).map(|_| None).collect();
1053 while let Some(joined) = tasks.join_next().await {
1054 let (idx, block_id) = joined.map_err(other_boxed)??;
1055 staged[idx] = Some(block_id);
1056 }
1057 staged
1058 .into_iter()
1059 .enumerate()
1060 .map(|(idx, slot)| {
1061 slot.ok_or_else(|| {
1062 ObjectStoreError::Other(
1063 format!("internal: stage_block task for part {idx} did not return").into(),
1064 )
1065 })
1066 })
1067 .collect()
1068}
1069
1070/// Commit the staged blocks in order, applying any
1071/// `content_disposition` / `user_metadata` from the original `PutOpts`.
1072///
1073/// Azure has no `AbortMultipartUpload` equivalent: if commit fails
1074/// the staged blocks remain on the storage account and expire
1075/// automatically (default seven days). Surface the commit error
1076/// directly — the caller's error handling already understands the
1077/// "operation did not succeed" outcome.
1078async fn commit_block_list(
1079 blob: &BlockBlobClient,
1080 key: &str,
1081 block_ids: Vec<Vec<u8>>,
1082 opts: PutOpts,
1083) -> Result<(), ObjectStoreError> {
1084 let block_list = BlockLookupList {
1085 latest: Some(block_ids),
1086 ..Default::default()
1087 };
1088 let body: RequestContent<_, _> = block_list.try_into().map_err(other_boxed)?;
1089 let (cd, metadata) = put_opts_blob_fields(opts);
1090 let commit_opts = BlockBlobClientCommitBlockListOptions {
1091 blob_content_disposition: cd,
1092 metadata,
1093 ..Default::default()
1094 };
1095 blob.commit_block_list(body, Some(commit_opts))
1096 .await
1097 .map_err(|e| classify(e, key))?;
1098 Ok(())
1099}
1100
1101/// Wrap `Bytes` in a `RequestContent` without copying the buffer.
1102///
1103/// `RequestContent` has an inherent `from(Vec<u8>)` constructor that
1104/// shadows the generic `From<Bytes>` trait impl, so a bare
1105/// `RequestContent::from(body)` resolves to the `Vec<u8>` overload and
1106/// re-allocates. Going through `Into` instead picks up the trait impl
1107/// and keeps the `Bytes` payload zero-copy. The return type is left
1108/// generic so the call site (which pins `Bytes` + `NoFormat` via the
1109/// `BlobClient::upload` signature) drives type inference.
1110fn bytes_to_request_content<F>(body: Bytes) -> RequestContent<Bytes, F>
1111where
1112 Bytes: Into<RequestContent<Bytes, F>>,
1113{
1114 body.into()
1115}
1116
1117/// Pull the blob-shaped `content_disposition` and `metadata` fields
1118/// out of [`PutOpts`].
1119///
1120/// Both `BlockBlobClientUploadOptions` (single `Put Blob`) and
1121/// `BlockBlobClientCommitBlockListOptions` (multipart commit) carry
1122/// the same two fields by the same names. Centralising the
1123/// conversion here keeps a single source of truth for "how a
1124/// `PutOpts` becomes Azure blob metadata."
1125fn put_opts_blob_fields(
1126 opts: PutOpts,
1127) -> (
1128 Option<String>,
1129 Option<std::collections::HashMap<String, String>>,
1130) {
1131 let metadata = (!opts.user_metadata.is_empty()).then(|| {
1132 opts.user_metadata
1133 .into_iter()
1134 .collect::<std::collections::HashMap<_, _>>()
1135 });
1136 (opts.content_disposition, metadata)
1137}
1138
1139/// Build a [`BlockBlobClientUploadOptions`] from the trait's [`PutOpts`].
1140fn upload_options_from(opts: PutOpts) -> BlockBlobClientUploadOptions<'static> {
1141 let (cd, metadata) = put_opts_blob_fields(opts);
1142 BlockBlobClientUploadOptions {
1143 blob_content_disposition: cd,
1144 metadata,
1145 ..Default::default()
1146 }
1147}
1148
1149fn header_u64(headers: &Headers, name: &HeaderName) -> Option<u64> {
1150 headers.get_optional_str(name).and_then(|s| s.parse().ok())
1151}
1152
1153fn header_http_date(headers: &Headers, name: &HeaderName) -> Option<OffsetDateTime> {
1154 let raw = headers.get_optional_str(name)?;
1155 OffsetDateTime::parse(raw, &time::format_description::well_known::Rfc2822).ok()
1156}
1157
1158#[cfg(test)]
1159mod tests {
1160 use super::*;
1161 use crate::url::{AzureAddressing, RemoteFlags};
1162
1163 fn parse_endpoint(s: &str) -> Url {
1164 Url::parse(s).expect("test endpoint URL parses")
1165 }
1166
1167 fn s3_url() -> RemoteUrl {
1168 RemoteUrl::S3 {
1169 endpoint: parse_endpoint("https://my-bucket.s3.us-west-2.amazonaws.com/"),
1170 bucket: "my-bucket".to_owned(),
1171 prefix: None,
1172 addressing: crate::url::S3Addressing::VirtualHosted,
1173 flags: RemoteFlags::default(),
1174 }
1175 }
1176
1177 // --- build_account_url --------------------------------------------
1178
1179 #[test]
1180 fn build_account_url_virtual_hosted_strips_path() {
1181 let url = parse_endpoint("https://acct.blob.core.windows.net/my-container/some/prefix");
1182 let out = build_account_url(&url, "acct", AzureAddressing::VirtualHosted);
1183 assert_eq!(out, "https://acct.blob.core.windows.net/");
1184 }
1185
1186 #[test]
1187 fn build_account_url_path_style_keeps_account() {
1188 let url = parse_endpoint("http://127.0.0.1:10000/devstoreaccount1/my-container/repo");
1189 let out = build_account_url(&url, "devstoreaccount1", AzureAddressing::PathStyle);
1190 assert_eq!(out, "http://127.0.0.1:10000/devstoreaccount1");
1191 }
1192
1193 #[test]
1194 fn build_account_url_strips_query_and_fragment() {
1195 let url = parse_endpoint("https://acct.blob.core.windows.net/c/r?credential=foo#frag");
1196 let out = build_account_url(&url, "acct", AzureAddressing::VirtualHosted);
1197 assert_eq!(out, "https://acct.blob.core.windows.net/");
1198 }
1199
1200 // --- classify_status_and_code -------------------------------------
1201
1202 #[test]
1203 fn classify_404_is_not_found() {
1204 assert!(matches!(
1205 classify_status_and_code(404, None, "k"),
1206 Some(ObjectStoreError::NotFound(s)) if s == "k"
1207 ));
1208 }
1209
1210 #[test]
1211 fn classify_403_is_access_denied() {
1212 assert!(matches!(
1213 classify_status_and_code(403, None, "k"),
1214 Some(ObjectStoreError::AccessDenied(s)) if s == "k"
1215 ));
1216 }
1217
1218 #[test]
1219 fn classify_412_is_precondition_failed() {
1220 assert!(matches!(
1221 classify_status_and_code(412, None, "k"),
1222 Some(ObjectStoreError::PreconditionFailed(s)) if s == "k"
1223 ));
1224 }
1225
1226 #[test]
1227 fn classify_409_is_conflict() {
1228 // 409 covers Azure's `BlobAlreadyExists` (the put-if-absent
1229 // contention path). Without this branch, `put_if_absent` would
1230 // surface contention as a hard error instead of `Ok(false)`.
1231 assert!(matches!(
1232 classify_status_and_code(409, None, "k"),
1233 Some(ObjectStoreError::Conflict(s)) if s == "k"
1234 ));
1235 }
1236
1237 #[test]
1238 fn classify_413_is_payload_too_large() {
1239 // Pass `code=None` so the assertion isolates the 413-status
1240 // branch; passing a recognised code would still pass even if
1241 // the status arm regressed (the code arm would catch it). The
1242 // canonical "Payload Too Large" status alone suffices.
1243 assert!(matches!(
1244 classify_status_and_code(413, None, "k"),
1245 Some(ObjectStoreError::PayloadTooLarge { limit_bytes })
1246 if limit_bytes == SINGLE_PUT_BLOB_LIMIT_BYTES
1247 ));
1248 }
1249
1250 #[test]
1251 fn classify_request_body_too_large_code_is_payload_too_large() {
1252 // Defensive backstop: if the SDK delivers the service code on a
1253 // non-413 status (e.g. 400), the code branch still catches it.
1254 assert!(matches!(
1255 classify_status_and_code(400, Some("RequestBodyTooLarge"), "k"),
1256 Some(ObjectStoreError::PayloadTooLarge { limit_bytes })
1257 if limit_bytes == SINGLE_PUT_BLOB_LIMIT_BYTES
1258 ));
1259 }
1260
1261 #[test]
1262 fn classify_unrecognised_status_returns_none() {
1263 assert!(classify_status_and_code(500, None, "k").is_none());
1264 assert!(classify_status_and_code(429, None, "k").is_none());
1265 assert!(classify_status_and_code(500, Some("InternalError"), "k").is_none());
1266 }
1267
1268 // --- properties_to_meta ------------------------------------------
1269
1270 #[test]
1271 fn properties_to_meta_round_trips_well_formed_response() {
1272 let now = OffsetDateTime::from_unix_timestamp(1_700_000_000).unwrap();
1273 let meta = properties_to_meta("k", Some(42), Some(now), Some("\"abc\""))
1274 .expect("conversion succeeds");
1275 assert_eq!(meta.key, "k");
1276 assert_eq!(meta.size, 42);
1277 assert_eq!(meta.last_modified.unix_timestamp(), 1_700_000_000);
1278 assert_eq!(meta.etag.as_deref(), Some("\"abc\""));
1279 }
1280
1281 #[test]
1282 fn properties_to_meta_preserves_legitimate_zero_size() {
1283 // Zero-byte lock files are legitimate; a present
1284 // `Content-Length: 0` header (`Some(0)`) must round-trip as
1285 // `size == 0`, distinct from the missing-header error.
1286 let now = OffsetDateTime::from_unix_timestamp(1_700_000_000).unwrap();
1287 let meta =
1288 properties_to_meta("LOCK", Some(0), Some(now), None).expect("conversion succeeds");
1289 assert_eq!(meta.size, 0);
1290 }
1291
1292 #[test]
1293 fn properties_to_meta_rejects_missing_content_length() {
1294 let now = OffsetDateTime::from_unix_timestamp(1_700_000_000).unwrap();
1295 let err = properties_to_meta("k", None, Some(now), None)
1296 .expect_err("missing content-length must error");
1297 match err {
1298 ObjectStoreError::Other(inner) => {
1299 let msg = inner.to_string();
1300 assert!(msg.contains("no content-length"), "names failure: {msg}");
1301 assert!(msg.contains("`k`"), "includes the key for context: {msg}");
1302 }
1303 other => {
1304 panic!("expected ObjectStoreError::Other for missing content-length, got {other:?}")
1305 }
1306 }
1307 }
1308
1309 #[test]
1310 fn properties_to_meta_rejects_missing_last_modified() {
1311 let err = properties_to_meta("k", Some(0), None, None)
1312 .expect_err("missing last_modified must error");
1313 match err {
1314 ObjectStoreError::Other(inner) => {
1315 let msg = inner.to_string();
1316 assert!(msg.contains("no last-modified"), "names failure: {msg}");
1317 assert!(msg.contains("`k`"), "includes the key for context: {msg}");
1318 }
1319 other => {
1320 panic!("expected ObjectStoreError::Other for missing last_modified, got {other:?}")
1321 }
1322 }
1323 }
1324
1325 // --- item_to_meta -------------------------------------------------
1326
1327 #[test]
1328 fn item_to_meta_round_trips_well_formed_item() {
1329 let now = OffsetDateTime::from_unix_timestamp(1_700_000_000).unwrap();
1330 let meta = item_to_meta(Some("k"), Some(42), Some(now), Some("\"abc\"")).unwrap();
1331 assert_eq!(meta.key, "k");
1332 assert_eq!(meta.size, 42);
1333 assert_eq!(meta.last_modified.unix_timestamp(), 1_700_000_000);
1334 assert_eq!(meta.etag.as_deref(), Some("\"abc\""));
1335 }
1336
1337 #[test]
1338 fn item_to_meta_rejects_missing_name() {
1339 let now = OffsetDateTime::from_unix_timestamp(1_700_000_000).unwrap();
1340 let err = item_to_meta(None, Some(0), Some(now), None).unwrap_err();
1341 match err {
1342 ObjectStoreError::Other(inner) => {
1343 assert!(
1344 inner.to_string().contains("without a name"),
1345 "names failure: {inner}"
1346 );
1347 }
1348 other => panic!("expected ObjectStoreError::Other, got {other:?}"),
1349 }
1350 }
1351
1352 #[test]
1353 fn item_to_meta_rejects_missing_last_modified() {
1354 let err = item_to_meta(Some("k"), Some(0), None, None).unwrap_err();
1355 match err {
1356 ObjectStoreError::Other(inner) => {
1357 let msg = inner.to_string();
1358 assert!(
1359 msg.contains("without last_modified"),
1360 "names failure: {msg}"
1361 );
1362 assert!(msg.contains("`k`"), "includes the key: {msg}");
1363 }
1364 other => panic!("expected ObjectStoreError::Other, got {other:?}"),
1365 }
1366 }
1367
1368 #[test]
1369 fn item_to_meta_treats_missing_size_as_zero() {
1370 // The Azure SDK types content_length as Option<u64>; missing
1371 // values default to 0 (rather than `None` propagating through
1372 // every caller's arithmetic).
1373 let now = OffsetDateTime::from_unix_timestamp(1_700_000_000).unwrap();
1374 let meta = item_to_meta(Some("k"), None, Some(now), None).unwrap();
1375 assert_eq!(meta.size, 0);
1376 }
1377
1378 // --- upload_options_from ------------------------------------------
1379
1380 #[test]
1381 fn upload_options_from_default_is_empty() {
1382 let out = upload_options_from(PutOpts::default());
1383 assert!(out.blob_content_disposition.is_none());
1384 assert!(out.metadata.is_none());
1385 }
1386
1387 #[test]
1388 fn upload_options_from_carries_content_disposition() {
1389 let opts = PutOpts {
1390 content_disposition: Some("attachment; filename=x".into()),
1391 user_metadata: Vec::new(),
1392 progress: None,
1393 };
1394 let out = upload_options_from(opts);
1395 let cd: String = out
1396 .blob_content_disposition
1397 .expect("content_disposition should be set");
1398 assert!(cd.contains("attachment"));
1399 }
1400
1401 #[test]
1402 fn upload_options_from_collects_metadata() {
1403 let opts = PutOpts {
1404 content_disposition: None,
1405 user_metadata: vec![("x-foo".into(), "1".into()), ("x-bar".into(), "2".into())],
1406 progress: None,
1407 };
1408 let out = upload_options_from(opts);
1409 let map = out.metadata.expect("metadata set");
1410 assert_eq!(map.get("x-foo").map(String::as_str), Some("1"));
1411 assert_eq!(map.get("x-bar").map(String::as_str), Some("2"));
1412 }
1413
1414 // --- from_remote_url constructor branch ---------------------------
1415
1416 #[tokio::test]
1417 async fn from_remote_url_rejects_s3() {
1418 let result = AzureStore::from_remote_url(&s3_url()).await;
1419 match result {
1420 Err(ObjectStoreError::Other(_)) => {}
1421 Err(other) => panic!("expected ObjectStoreError::Other, got {other:?}"),
1422 Ok(_) => panic!("expected S3 URL to be rejected"),
1423 }
1424 }
1425
1426 // --- HTTP transport tuning (#26 / #28) ----------------------------
1427
1428 /// Pin the timeout values. A future copy-paste mistake (`from_millis`
1429 /// instead of `from_secs`, an accidental zero) silently disables
1430 /// the very behaviour these constants exist for; fail fast instead.
1431 /// If the constants are deliberately changed, update the expected
1432 /// values on the right-hand side together — the test exists to make
1433 /// such changes deliberate, not to lock the values forever.
1434 #[test]
1435 fn transport_timeout_constants_have_expected_values() {
1436 assert_eq!(POOL_IDLE_TIMEOUT, Duration::from_secs(30));
1437 assert_eq!(TCP_KEEPALIVE, Duration::from_secs(30));
1438 assert_eq!(CONNECT_TIMEOUT, Duration::from_secs(10));
1439 assert_eq!(READ_TIMEOUT, Duration::from_secs(30));
1440 }
1441
1442 #[test]
1443 fn build_http_client_succeeds() {
1444 build_http_client().expect("reqwest client builds with the configured timeouts");
1445 }
1446
1447 /// The meaningful regression check: if a future refactor drops the
1448 /// `transport = Some(...)` line in `build_client_options`, the
1449 /// Azure backend silently reverts to `azure_core`'s default
1450 /// (unbounded) HTTP transport. This test fails when that happens.
1451 /// Also pins the empty-policies invariant on the no-credential
1452 /// branch, so a refactor that injects a fallback policy when
1453 /// `per_try_policy` is `None` is caught.
1454 #[test]
1455 fn build_client_options_installs_custom_transport() {
1456 let resolved = auth::ResolvedCredentials {
1457 token_credential: None,
1458 per_try_policy: None,
1459 sas_signing_key: None,
1460 };
1461 let opts = build_client_options(&resolved).expect("client options build");
1462 assert!(
1463 opts.transport.is_some(),
1464 "ClientOptions::transport must be Some so the SDK uses our \
1465 pool_idle_timeout / tcp_keepalive client (issue #26)",
1466 );
1467 assert!(
1468 opts.per_try_policies.is_empty(),
1469 "no per-try policy was supplied; the helper must not inject \
1470 a fallback signer of its own",
1471 );
1472 }
1473
1474 /// Issue #28's Notes section explicitly calls out: the per-try
1475 /// signing policy must continue to fire after we install a custom
1476 /// transport. The SDK pipeline runs them independently of the
1477 /// transport, but a future refactor that confuses the two fields
1478 /// would silently drop signing — surface that here. The
1479 /// [`Arc::ptr_eq`] check pins identity so a refactor that
1480 /// silently *replaces* the caller's policy with a fresh one
1481 /// (rather than dropping it outright) also fails.
1482 #[test]
1483 fn build_client_options_preserves_per_try_policy() {
1484 // Azurite's published well-known account key — base64-valid
1485 // and safe to embed.
1486 const AZURITE_KEY: &str = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==";
1487 let policy: Arc<dyn azure_core::http::policies::Policy> = Arc::new(
1488 auth::SharedKeySigningPolicy::new("devstoreaccount1", AZURITE_KEY)
1489 .expect("shared-key policy constructs"),
1490 );
1491 let resolved = auth::ResolvedCredentials {
1492 token_credential: None,
1493 per_try_policy: Some(Arc::clone(&policy)),
1494 sas_signing_key: None,
1495 };
1496 let opts = build_client_options(&resolved).expect("client options build");
1497 assert!(opts.transport.is_some(), "transport still wired");
1498 assert_eq!(
1499 opts.per_try_policies.len(),
1500 1,
1501 "exactly one per-try policy is wired",
1502 );
1503 assert!(
1504 Arc::ptr_eq(&policy, &opts.per_try_policies[0]),
1505 "the policy at index 0 must be the same Arc the caller \
1506 supplied — not a fresh policy constructed inside the helper",
1507 );
1508 }
1509
1510 /// Pin the `should_use_multipart` predicate at and around the
1511 /// shared threshold (issue #53).
1512 ///
1513 /// `put_bytes` and `put_path` route through this predicate. The
1514 /// integration test `multipart_put_emits_per_block_progress_events`
1515 /// covers the dispatch *call* (only multipart emits per-block
1516 /// events). This unit test pins the predicate's boundary semantics
1517 /// so the constant can't be moved out from under that test
1518 /// without something failing. The Azure backend uses the same
1519 /// shared `MULTIPART_PUT_THRESHOLD` as S3 so a future refactor
1520 /// cannot accidentally raise the threshold for one backend alone.
1521 #[test]
1522 fn should_use_multipart_pins_threshold_boundary() {
1523 use super::super::multipart::MULTIPART_PUT_THRESHOLD;
1524 assert!(!should_use_multipart(MULTIPART_PUT_THRESHOLD - 1));
1525 assert!(should_use_multipart(MULTIPART_PUT_THRESHOLD));
1526 assert!(should_use_multipart(MULTIPART_PUT_THRESHOLD + 1));
1527 assert!(should_use_multipart(6 * (1 << 30)));
1528 }
1529
1530 /// Pin `block_id_for(idx)` so two parts can never collide on the
1531 /// same block ID, and so all IDs in a single `commit_block_list`
1532 /// share a length pre-base64 (Azure's hard requirement).
1533 #[test]
1534 fn block_id_for_is_unique_and_uniform_length() {
1535 let id_a = block_id_for(0);
1536 let id_b = block_id_for(1);
1537 let id_c = block_id_for(99_999);
1538 assert_eq!(id_a.len(), id_b.len(), "all IDs share length");
1539 assert_eq!(id_a.len(), id_c.len(), "even at the upper end");
1540 assert_ne!(id_a, id_b, "two parts get distinct IDs");
1541 // 32 ASCII bytes accommodates up to 10^32 parts — vastly above
1542 // [`AZURE_MAX_BLOCKS`] = 50 000.
1543 assert_eq!(id_a.len(), 32);
1544 }
1545}