use std::borrow::Cow;
use std::sync::Arc;
use base64::Engine;
use base64::prelude::BASE64_STANDARD;
use chrono::Utc;
use crc64fast_nvme::Digest;
use http_cache_stream_reqwest::Cache;
use http_cache_stream_reqwest::semantics;
use http_cache_stream_reqwest::storage::DefaultCacheStorage;
use reqwest::Body;
use reqwest::Request;
use reqwest::Response;
use reqwest::StatusCode;
use reqwest::header;
use reqwest::header::HeaderMap;
use reqwest::header::HeaderValue;
use serde::Deserialize;
use serde::Serialize;
use tokio::sync::broadcast;
use tracing::debug;
use url::Url;
use crate::AzureAuthConfig;
use crate::BLOCK_SIZE_THRESHOLD;
use crate::Config;
use crate::DateTimeExt;
use crate::Error;
use crate::HttpClient;
use crate::ONE_MEBIBYTE;
use crate::Result;
use crate::TransferEvent;
use crate::USER_AGENT;
use crate::UrlExt;
use crate::backend::StorageBackend;
use crate::backend::Upload;
use crate::backend::auth::azure::RequestSigner;
use crate::backend::format_range_header;
use crate::generator::Alphanumeric;
use crate::streams::ByteStream;
use crate::streams::TransferStream;
const AZURE_BLOB_STORAGE_ROOT_DOMAIN: &str = "blob.core.windows.net";
const AZURITE_ROOT_DOMAIN: &str = "blob.core.windows.net.localhost";
const DEFAULT_BLOCK_SIZE: u64 = 4 * ONE_MEBIBYTE;
const MAX_BLOCK_SIZE: u64 = 4000 * ONE_MEBIBYTE;
const MAX_BLOCK_COUNT: u64 = 50000;
const MAX_BLOB_SIZE: u64 = MAX_BLOCK_SIZE * MAX_BLOCK_COUNT;
const AZURE_VERSION_HEADER: &str = "x-ms-version";
const AZURE_BLOB_TYPE_HEADER: &str = "x-ms-blob-type";
const AZURE_CONTENT_CRC_HEADER: &str = "x-ms-content-crc64";
const AZURE_STORAGE_VERSION: &str = "2025-05-05";
const AZURE_BLOB_TYPE: &str = "BlockBlob";
const AZURE_ROOT_CONTAINER: &str = "$root";
pub(crate) const AZURE_CONTENT_DIGEST_HEADER: &str = "x-ms-meta-content_digest";
fn insert_authentication_header(auth: &AzureAuthConfig, request: &mut Request) -> Result<()> {
let signer = RequestSigner::new(auth);
let auth = signer.sign(request).ok_or(AzureError::InvalidAccessKey)?;
request.headers_mut().insert(
header::AUTHORIZATION,
HeaderValue::try_from(auth).expect("value should be valid"),
);
Ok(())
}
pub(crate) fn revalidation_hook(
config: &Config,
request: &dyn semantics::RequestLike,
headers: &mut HeaderMap,
) -> Result<()> {
let uri = request.uri();
let Some(host) = uri.host() else {
return Ok(());
};
if !is_azure_domain(config, host) {
return Ok(());
}
if let Some(auth) = config.azure().auth() {
let signer = RequestSigner::new(auth);
let auth = signer
.sign_revalidation(request, headers)
.ok_or(AzureError::InvalidAccessKey)?;
headers.insert(
header::AUTHORIZATION,
HeaderValue::try_from(auth).expect("value should be valid"),
);
}
Ok(())
}
fn is_azure_domain(config: &Config, domain: &str) -> bool {
let Some((_, domain)) = domain.split_once('.') else {
return false;
};
domain.eq_ignore_ascii_case(AZURE_BLOB_STORAGE_ROOT_DOMAIN)
| (config.azure().use_azurite() && domain.eq_ignore_ascii_case(AZURITE_ROOT_DOMAIN))
}
#[derive(Debug, thiserror::Error)]
pub enum AzureError {
#[error("Azure blob block size cannot exceed {MAX_BLOCK_SIZE} bytes")]
InvalidBlockSize,
#[error("the size of the source file exceeds the supported maximum of {MAX_BLOB_SIZE} bytes")]
MaximumSizeExceeded,
#[error("invalid URL with `az` scheme: the URL is not in a supported format")]
InvalidScheme,
#[error("uploading a directory to the root container is not supported by Azure")]
RootDirectoryUploadNotSupported,
#[error("unexpected {status} response from server: failed to deserialize response contents: {error}", status = .status.as_u16())]
UnexpectedResponse {
status: reqwest::StatusCode,
error: serde_xml_rs::Error,
},
#[error("a blob name is missing from the provided URL")]
BlobNameMissing,
#[error("invalid Azure Storage access key")]
InvalidAccessKey,
}
#[derive(Debug, Deserialize)]
struct Blob {
#[serde(rename = "Name")]
name: String,
}
#[derive(Default, Debug, Deserialize)]
struct Blobs {
#[serde(default, rename = "Blob")]
items: Vec<Blob>,
}
#[derive(Debug, Deserialize)]
#[serde(rename = "EnumerationResults")]
struct Results {
#[serde(default, rename = "Blobs")]
blobs: Blobs,
#[serde(rename = "NextMarker", default)]
next: Option<String>,
}
#[derive(Serialize)]
#[serde(rename = "BlockList")]
struct BlockList<'a> {
#[serde(rename = "Latest")]
latest: &'a [String],
}
trait ResponseExt {
async fn into_error(self) -> Error;
}
impl ResponseExt for Response {
async fn into_error(self) -> Error {
#[derive(Default, Deserialize)]
#[serde(rename = "Error")]
struct ErrorResponse {
#[serde(rename = "Message")]
message: String,
}
let status = self.status();
let text: String = match self.text().await {
Ok(text) => text,
Err(e) => return e.into(),
};
if text.is_empty() {
return Error::Server {
status,
message: text,
};
}
let message = match serde_xml_rs::from_str::<ErrorResponse>(&text) {
Ok(response) => response.message,
Err(e) => {
return AzureError::UnexpectedResponse { status, error: e }.into();
}
};
Error::Server { status, message }
}
}
pub struct AzureBlobUpload {
config: Config,
client: HttpClient,
url: Url,
block_id: Arc<String>,
digest: Option<String>,
events: Option<broadcast::Sender<TransferEvent>>,
}
impl AzureBlobUpload {
fn new(
config: Config,
client: HttpClient,
url: Url,
block_id: Arc<String>,
digest: Option<String>,
events: Option<broadcast::Sender<TransferEvent>>,
) -> Self {
Self {
config,
client,
url,
block_id,
digest,
events,
}
}
}
impl Upload for AzureBlobUpload {
type Part = String;
async fn put(&self, id: u64, block: u64, bytes: bytes::Bytes) -> Result<Option<Self::Part>> {
if bytes.is_empty() {
return Ok(None);
}
let block_id =
BASE64_STANDARD.encode(format!("{block_id}:{block:05}", block_id = self.block_id));
debug!(
"uploading block {block} with id `{block_id}` for `{url}`",
url = self.url.display()
);
let mut url = self.url.clone();
{
let mut pairs = url.query_pairs_mut();
pairs.append_pair("comp", "block");
pairs.append_pair("blockid", &block_id);
}
let mut crc64 = Digest::new();
crc64.write(&bytes);
let checksum = BASE64_STANDARD.encode(crc64.sum64().to_le_bytes());
let length = bytes.len();
let body = Body::wrap_stream(TransferStream::new(
ByteStream::new(bytes),
id,
block,
0,
self.events.clone(),
));
let mut request = self
.client
.put(url)
.header(header::USER_AGENT, USER_AGENT)
.header(header::CONTENT_LENGTH, length)
.header(header::CONTENT_TYPE, "application/octet-stream")
.header(header::DATE, Utc::now().to_http_date())
.header(AZURE_VERSION_HEADER, AZURE_STORAGE_VERSION)
.header(AZURE_BLOB_TYPE_HEADER, AZURE_BLOB_TYPE)
.header(AZURE_CONTENT_CRC_HEADER, checksum)
.body(body)
.build()?;
if let Some(auth) = self.config.azure().auth() {
insert_authentication_header(auth, &mut request)?;
}
let response = self.client.execute(request).await?;
if response.status() == StatusCode::CREATED {
Ok(Some(block_id))
} else {
Err(response.into_error().await)
}
}
async fn finalize(&self, parts: &[Self::Part]) -> Result<()> {
debug!("uploading block list for `{url}`", url = self.url.display());
let mut url = self.url.clone();
{
let mut pairs = url.query_pairs_mut();
pairs.append_pair("comp", "blocklist");
}
let body = serde_xml_rs::to_string(&BlockList { latest: parts }).expect("should serialize");
let mut request = self
.client
.put(url)
.header(header::USER_AGENT, USER_AGENT)
.header(header::CONTENT_LENGTH, body.len())
.header(header::CONTENT_TYPE, "application/xml")
.header(header::DATE, Utc::now().to_http_date())
.header(AZURE_VERSION_HEADER, AZURE_STORAGE_VERSION)
.body(body)
.build()?;
if let Some(digest) = &self.digest {
request.headers_mut().insert(
AZURE_CONTENT_DIGEST_HEADER,
digest
.try_into()
.expect("invalid content digest header value"),
);
}
if let Some(auth) = self.config.azure().auth() {
insert_authentication_header(auth, &mut request)?;
}
let response = self.client.execute(request).await?;
if response.status() == StatusCode::CREATED {
Ok(())
} else {
Err(response.into_error().await)
}
}
}
#[derive(Clone)]
pub struct AzureBlobStorageBackend {
config: Config,
client: HttpClient,
events: Option<broadcast::Sender<TransferEvent>>,
}
impl AzureBlobStorageBackend {
pub fn new(
config: Config,
client: HttpClient,
events: Option<broadcast::Sender<TransferEvent>>,
) -> Self {
Self {
config,
client,
events,
}
}
}
impl StorageBackend for AzureBlobStorageBackend {
type Upload = AzureBlobUpload;
fn config(&self) -> &Config {
&self.config
}
fn cache(&self) -> Option<&Cache<DefaultCacheStorage>> {
self.client.cache()
}
fn events(&self) -> &Option<broadcast::Sender<TransferEvent>> {
&self.events
}
fn block_size(&self, file_size: u64) -> Result<u64> {
const BLOCK_COUNT_INCREMENT: u64 = 50;
if let Some(size) = self.config.block_size() {
if size > MAX_BLOCK_SIZE {
return Err(AzureError::InvalidBlockSize.into());
}
return Ok(size);
}
let mut num_blocks: u64 = BLOCK_COUNT_INCREMENT;
while num_blocks < MAX_BLOCK_COUNT {
let block_size = file_size.div_ceil(num_blocks).next_power_of_two();
if block_size <= BLOCK_SIZE_THRESHOLD {
return Ok(block_size.max(DEFAULT_BLOCK_SIZE));
}
num_blocks += BLOCK_COUNT_INCREMENT;
}
let block_size: u64 = file_size.div_ceil(MAX_BLOCK_COUNT);
if block_size > MAX_BLOCK_SIZE {
return Err(AzureError::MaximumSizeExceeded.into());
}
Ok(block_size)
}
fn is_supported_url(config: &Config, url: &Url) -> bool {
match url.scheme() {
"az" => true,
"http" | "https" => {
let Some(domain) = url.domain() else {
return false;
};
is_azure_domain(config, domain)
}
_ => false,
}
}
fn rewrite_url<'a>(config: &Config, url: &'a Url) -> Result<Cow<'a, Url>> {
match url.scheme() {
"az" => {
let account = url.host_str().ok_or(AzureError::InvalidScheme)?;
if url.path() == "/" {
return Err(AzureError::InvalidScheme.into());
}
let (scheme, root, port) = if config.azure().use_azurite() {
("http", AZURITE_ROOT_DOMAIN, ":10000")
} else {
("https", AZURE_BLOB_STORAGE_ROOT_DOMAIN, "")
};
match (url.query(), url.fragment()) {
(None, None) => {
format!("{scheme}://{account}.{root}{port}{path}", path = url.path())
}
(None, Some(fragment)) => {
format!(
"{scheme}://{account}.{root}{port}{path}#{fragment}",
path = url.path()
)
}
(Some(query), None) => format!(
"{scheme}://{account}.{root}{port}{path}?{query}",
path = url.path()
),
(Some(query), Some(fragment)) => {
format!(
"{scheme}://{account}.{root}{port}{path}?{query}#{fragment}",
path = url.path()
)
}
}
.parse()
.map(Cow::Owned)
.map_err(|_| AzureError::InvalidScheme.into())
}
_ => Ok(Cow::Borrowed(url)),
}
}
fn join_url<'a>(&self, mut url: Url, segments: impl Iterator<Item = &'a str>) -> Result<Url> {
let mut segments = segments.peekable();
let mut existing = url.path_segments().expect("URL should have path");
if let (Some(first), None) = (existing.next(), existing.next())
&& !first.is_empty()
&& segments.peek().is_some()
{
return Err(AzureError::RootDirectoryUploadNotSupported.into());
}
{
let mut existing = url.path_segments_mut().expect("url should have path");
existing.pop_if_empty();
existing.extend(segments);
}
Ok(url)
}
async fn head(&self, url: Url, must_exist: bool) -> Result<Response> {
debug_assert!(
Self::is_supported_url(&self.config, &url),
"{url} is not a supported Azure URL",
url = url.as_str()
);
debug!("sending HEAD request for `{url}`", url = url.display());
let mut request = self
.client
.get(url)
.header(header::ACCEPT, "application/xml")
.header(header::USER_AGENT, USER_AGENT)
.header(header::DATE, Utc::now().to_http_date())
.header(AZURE_VERSION_HEADER, AZURE_STORAGE_VERSION)
.build()?;
if let Some(auth) = self.config.azure().auth() {
insert_authentication_header(auth, &mut request)?;
}
let response = self.client.execute(request).await?;
if !response.status().is_success() {
if !must_exist && response.status() == StatusCode::NOT_FOUND {
return Ok(response);
}
return Err(response.into_error().await);
}
Ok(response)
}
async fn get(&self, url: Url) -> Result<Response> {
debug_assert!(
Self::is_supported_url(&self.config, &url),
"{url} is not a supported Azure URL",
url = url.as_str()
);
debug!("sending GET request for `{url}`", url = url.display());
let mut request = self
.client
.get(url)
.header(header::USER_AGENT, USER_AGENT)
.header(header::DATE, Utc::now().to_http_date())
.header(AZURE_VERSION_HEADER, AZURE_STORAGE_VERSION)
.build()?;
if let Some(auth) = self.config.azure().auth() {
insert_authentication_header(auth, &mut request)?;
}
let response = self.client.execute(request).await?;
if !response.status().is_success() {
return Err(response.into_error().await);
}
Ok(response)
}
async fn get_range(
&self,
url: Url,
etag: &str,
start: u64,
exclusive_end: Option<u64>,
) -> Result<Response> {
debug_assert!(
Self::is_supported_url(&self.config, &url),
"{url} is not a supported Azure URL",
url = url.as_str()
);
let range = format_range_header(start, exclusive_end);
debug!(
"sending GET request with range `{range}` for `{url}`",
url = url.display(),
);
let mut request = self
.client
.get(url)
.header(header::USER_AGENT, USER_AGENT)
.header(header::DATE, Utc::now().to_http_date())
.header(header::RANGE, range)
.header(header::IF_MATCH, etag)
.header(AZURE_VERSION_HEADER, AZURE_STORAGE_VERSION)
.build()?;
if let Some(auth) = self.config.azure().auth() {
insert_authentication_header(auth, &mut request)?;
}
let response = self.client.execute(request).await?;
let status = response.status();
if status == StatusCode::PRECONDITION_FAILED {
return Err(Error::RemoteContentModified);
}
if !status.is_success() {
return Err(response.into_error().await);
}
Ok(response)
}
async fn walk(&self, url: Url, first_only: bool) -> Result<Vec<String>> {
debug_assert!(
Self::is_supported_url(&self.config, &url),
"{url} is not a supported Azure URL",
url = url.as_str()
);
debug!("walking `{url}` as a directory", url = url.display());
let mut container = url.clone();
let mut prefix = {
let mut container_segments = container
.path_segments_mut()
.expect("URL should have a path");
container_segments.clear();
let mut source_segments = url.path_segments().expect("URL should have a path");
let name = source_segments.next().ok_or(AzureError::BlobNameMissing)?;
container_segments.push(name);
source_segments.fold(String::new(), |mut p, s| {
if !p.is_empty() {
p.push('/');
}
p.push_str(s);
p
})
};
if prefix.is_empty() {
let mut container_segments = container
.path_segments_mut()
.expect("URL should have a path");
container_segments.clear();
container_segments.push(AZURE_ROOT_CONTAINER);
prefix = url.path_segments().expect("URL should have a path").fold(
String::new(),
|mut p, s| {
if !p.is_empty() {
p.push('/');
}
p.push_str(s);
p
},
);
assert!(!prefix.is_empty());
}
if !prefix.ends_with('/') {
prefix.push('/');
}
{
let mut pairs = container.query_pairs_mut();
pairs.append_pair("restype", "container");
pairs.append_pair("comp", "list");
pairs.append_pair("prefix", &prefix);
if first_only {
pairs.append_pair("maxresults", "1");
}
}
let mut next = String::new();
let mut paths = Vec::new();
loop {
let mut url = container.clone();
if !next.is_empty() {
url.query_pairs_mut().append_pair("marker", &next);
}
let mut request = self
.client
.get(url)
.header(header::USER_AGENT, USER_AGENT)
.header(header::DATE, Utc::now().to_http_date())
.header(AZURE_VERSION_HEADER, AZURE_STORAGE_VERSION)
.build()?;
if let Some(auth) = self.config.azure().auth() {
insert_authentication_header(auth, &mut request)?;
}
let response = self.client.execute(request).await?;
let status = response.status();
if !status.is_success() {
return Err(response.into_error().await);
}
let text = response.text().await?;
let results: Results = match serde_xml_rs::from_str(&text) {
Ok(response) => response,
Err(e) => {
return Err(AzureError::UnexpectedResponse { status, error: e }.into());
}
};
if paths.is_empty()
&& results.blobs.items.len() == 1
&& results.next.is_none()
&& let Some("") = results.blobs.items[0].name.strip_prefix(&prefix)
{
return Ok(paths);
}
paths.extend(results.blobs.items.into_iter().map(|b| {
let name = b.name.strip_prefix(&prefix).unwrap_or(&b.name);
name.strip_prefix('/').unwrap_or(name).into()
}));
next = results.next.unwrap_or_default();
if first_only || next.is_empty() {
break;
}
}
Ok(paths)
}
async fn new_upload(&self, url: Url, digest: Option<String>) -> Result<Self::Upload> {
debug_assert!(
Self::is_supported_url(&self.config, &url),
"{url} is not a supported Azure URL",
url = url.as_str()
);
if !self.config.overwrite() {
let response = self.head(url.clone(), false).await?;
if response.status() != StatusCode::NOT_FOUND {
return Err(Error::RemoteDestinationExists(url));
}
}
Ok(AzureBlobUpload::new(
self.config.clone(),
self.client.clone(),
url,
Arc::new(Alphanumeric::new(16).to_string()),
digest,
self.events.clone(),
))
}
}