1use std::borrow::Cow;
4use std::sync::Arc;
5
6use base64::Engine;
7use base64::prelude::BASE64_STANDARD;
8use chrono::Utc;
9use crc64fast_nvme::Digest;
10use http_cache_stream_reqwest::Cache;
11use http_cache_stream_reqwest::storage::DefaultCacheStorage;
12use reqwest::Body;
13use reqwest::Response;
14use reqwest::StatusCode;
15use reqwest::header;
16use serde::Deserialize;
17use serde::Serialize;
18use tokio::sync::broadcast;
19use tracing::debug;
20use url::Url;
21
22use crate::BLOCK_SIZE_THRESHOLD;
23use crate::Config;
24use crate::Error;
25use crate::HttpClient;
26use crate::ONE_MEBIBYTE;
27use crate::Result;
28use crate::TransferEvent;
29use crate::USER_AGENT;
30use crate::UrlExt;
31use crate::backend::StorageBackend;
32use crate::backend::Upload;
33use crate::generator::Alphanumeric;
34use crate::streams::ByteStream;
35use crate::streams::TransferStream;
36
37const AZURE_BLOB_STORAGE_ROOT_DOMAIN: &str = "blob.core.windows.net";
39
40const AZURITE_ROOT_DOMAIN: &str = "blob.core.windows.net.localhost";
42
43const DEFAULT_BLOCK_SIZE: u64 = 4 * ONE_MEBIBYTE;
45
46const MAX_BLOCK_SIZE: u64 = 4000 * ONE_MEBIBYTE;
48
49const MAX_BLOCK_COUNT: u64 = 50000;
51
52const MAX_BLOB_SIZE: u64 = MAX_BLOCK_SIZE * MAX_BLOCK_COUNT;
54
55const AZURE_VERSION_HEADER: &str = "x-ms-version";
57
58const AZURE_BLOB_TYPE_HEADER: &str = "x-ms-blob-type";
60
61const AZURE_CONTENT_CRC_HEADER: &str = "x-ms-content-crc64";
63
64const AZURE_STORAGE_VERSION: &str = "2025-05-05";
66
67const AZURE_BLOB_TYPE: &str = "BlockBlob";
69
70const AZURE_ROOT_CONTAINER: &str = "$root";
72
73#[derive(Debug, thiserror::Error)]
75pub enum AzureError {
76 #[error("Azure blob block size cannot exceed {MAX_BLOCK_SIZE} bytes")]
78 InvalidBlockSize,
79 #[error("the size of the source file exceeds the supported maximum of {MAX_BLOB_SIZE} bytes")]
81 MaximumSizeExceeded,
82 #[error("invalid URL with `az` scheme: the URL is not in a supported format")]
84 InvalidScheme,
85 #[error("uploading a directory to the root container is not supported by Azure")]
87 RootDirectoryUploadNotSupported,
88 #[error("unexpected {status} response from server: failed to deserialize response contents: {error}", status = .status.as_u16())]
90 UnexpectedResponse {
91 status: reqwest::StatusCode,
93 error: serde_xml_rs::Error,
95 },
96 #[error("a blob name is missing from the provided URL")]
98 BlobNameMissing,
99}
100
101#[derive(Debug, Deserialize)]
103struct Blob {
104 #[serde(rename = "Name")]
106 name: String,
107}
108
109#[derive(Default, Debug, Deserialize)]
111struct Blobs {
112 #[serde(default, rename = "Blob")]
114 items: Vec<Blob>,
115}
116
117#[derive(Debug, Deserialize)]
119#[serde(rename = "EnumerationResults")]
120struct Results {
121 #[serde(default, rename = "Blobs")]
123 blobs: Blobs,
124 #[serde(rename = "NextMarker", default)]
126 next: Option<String>,
127}
128
129#[derive(Serialize)]
131#[serde(rename = "BlockList")]
132struct BlockList<'a> {
133 #[serde(rename = "Latest")]
135 latest: &'a [String],
136}
137
138trait ResponseExt {
140 async fn into_error(self) -> Error;
142}
143
144impl ResponseExt for Response {
145 async fn into_error(self) -> Error {
146 #[derive(Default, Deserialize)]
148 #[serde(rename = "Error")]
149 struct ErrorResponse {
150 #[serde(rename = "Message")]
152 message: String,
153 }
154
155 let status = self.status();
156 let text: String = match self.text().await {
157 Ok(text) => text,
158 Err(e) => return e.into(),
159 };
160
161 if text.is_empty() {
162 return Error::Server {
163 status,
164 message: text,
165 };
166 }
167
168 let message = match serde_xml_rs::from_str::<ErrorResponse>(&text) {
169 Ok(response) => response.message,
170 Err(e) => {
171 return AzureError::UnexpectedResponse { status, error: e }.into();
172 }
173 };
174
175 Error::Server { status, message }
176 }
177}
178
179pub struct AzureBlobUpload {
181 client: HttpClient,
183 url: Url,
185 block_id: Arc<String>,
187 events: Option<broadcast::Sender<TransferEvent>>,
189}
190
191impl AzureBlobUpload {
192 fn new(
194 client: HttpClient,
195 url: Url,
196 block_id: Arc<String>,
197 events: Option<broadcast::Sender<TransferEvent>>,
198 ) -> Self {
199 Self {
200 client,
201 url,
202 block_id,
203 events,
204 }
205 }
206}
207
208impl Upload for AzureBlobUpload {
209 type Part = String;
210
211 async fn put(&self, id: u64, block: u64, bytes: bytes::Bytes) -> Result<Option<Self::Part>> {
212 if bytes.is_empty() {
214 return Ok(None);
215 }
216
217 let block_id =
218 BASE64_STANDARD.encode(format!("{block_id}:{block:05}", block_id = self.block_id));
219
220 debug!(
221 "uploading block {block} with id `{block_id}` for `{url}`",
222 url = self.url.display()
223 );
224
225 let mut url = self.url.clone();
226 {
227 let mut pairs = url.query_pairs_mut();
230 pairs.append_pair("comp", "block");
232 pairs.append_pair("blockid", &block_id);
234 }
235
236 let mut crc64 = Digest::new();
238 crc64.write(&bytes);
239 let checksum = BASE64_STANDARD.encode(crc64.sum64().to_le_bytes());
240
241 let length = bytes.len();
242 let body = Body::wrap_stream(TransferStream::new(
243 ByteStream::new(bytes),
244 id,
245 block,
246 0,
247 self.events.clone(),
248 ));
249
250 let response = self
251 .client
252 .put(url)
253 .header(header::USER_AGENT, USER_AGENT)
254 .header(header::CONTENT_LENGTH, length)
255 .header(header::CONTENT_TYPE, "application/octet-stream")
256 .header(header::DATE, Utc::now().to_rfc2822())
257 .header(AZURE_VERSION_HEADER, AZURE_STORAGE_VERSION)
258 .header(AZURE_BLOB_TYPE_HEADER, AZURE_BLOB_TYPE)
259 .header(AZURE_CONTENT_CRC_HEADER, checksum)
260 .body(body)
261 .send()
262 .await?;
263
264 if response.status() == StatusCode::CREATED {
265 Ok(Some(block_id))
266 } else {
267 Err(response.into_error().await)
268 }
269 }
270
271 async fn finalize(&self, parts: &[Self::Part]) -> Result<()> {
272 debug!("uploading block list for `{url}`", url = self.url.display());
273
274 let mut url = self.url.clone();
275
276 {
277 let mut pairs = url.query_pairs_mut();
280 pairs.append_pair("comp", "blocklist");
282 }
283
284 let body = serde_xml_rs::to_string(&BlockList { latest: parts }).expect("should serialize");
285
286 let response = self
287 .client
288 .put(url)
289 .header(header::USER_AGENT, USER_AGENT)
290 .header(header::CONTENT_LENGTH, body.len())
291 .header(header::CONTENT_TYPE, "application/xml")
292 .header(header::DATE, Utc::now().to_rfc2822())
293 .header(AZURE_VERSION_HEADER, AZURE_STORAGE_VERSION)
294 .body(body)
295 .send()
296 .await?;
297
298 if response.status() == StatusCode::CREATED {
299 Ok(())
300 } else {
301 Err(response.into_error().await)
302 }
303 }
304}
305
306#[derive(Clone)]
308pub struct AzureBlobStorageBackend {
309 config: Config,
311 client: HttpClient,
313 events: Option<broadcast::Sender<TransferEvent>>,
315}
316
317impl AzureBlobStorageBackend {
318 pub fn new(
321 config: Config,
322 client: HttpClient,
323 events: Option<broadcast::Sender<TransferEvent>>,
324 ) -> Self {
325 Self {
326 config,
327 client,
328 events,
329 }
330 }
331}
332
333impl StorageBackend for AzureBlobStorageBackend {
334 type Upload = AzureBlobUpload;
335
336 fn config(&self) -> &Config {
337 &self.config
338 }
339
340 fn cache(&self) -> Option<&Cache<DefaultCacheStorage>> {
341 self.client.cache()
342 }
343
344 fn events(&self) -> &Option<broadcast::Sender<TransferEvent>> {
345 &self.events
346 }
347
348 fn block_size(&self, file_size: u64) -> Result<u64> {
349 const BLOCK_COUNT_INCREMENT: u64 = 50;
351
352 if let Some(size) = self.config.block_size {
354 if size > MAX_BLOCK_SIZE {
355 return Err(AzureError::InvalidBlockSize.into());
356 }
357
358 return Ok(size);
359 }
360
361 let mut num_blocks: u64 = BLOCK_COUNT_INCREMENT;
363 while num_blocks < MAX_BLOCK_COUNT {
364 let block_size = file_size.div_ceil(num_blocks).next_power_of_two();
365 if block_size <= BLOCK_SIZE_THRESHOLD {
366 return Ok(block_size.max(DEFAULT_BLOCK_SIZE));
367 }
368
369 num_blocks += BLOCK_COUNT_INCREMENT;
370 }
371
372 let block_size: u64 = file_size.div_ceil(MAX_BLOCK_COUNT);
375 if block_size > MAX_BLOCK_SIZE {
376 return Err(AzureError::MaximumSizeExceeded.into());
377 }
378
379 Ok(block_size)
380 }
381
382 fn is_supported_url(config: &Config, url: &Url) -> bool {
383 match url.scheme() {
384 "az" => true,
385 "http" | "https" => {
386 let Some(domain) = url.domain() else {
387 return false;
388 };
389
390 let Some((_, domain)) = domain.split_once('.') else {
392 return false;
393 };
394
395 domain.eq_ignore_ascii_case(AZURE_BLOB_STORAGE_ROOT_DOMAIN)
396 | (config.azure.use_azurite && domain.eq_ignore_ascii_case(AZURITE_ROOT_DOMAIN))
397 }
398 _ => false,
399 }
400 }
401
402 fn rewrite_url<'a>(config: &Config, url: &'a Url) -> Result<Cow<'a, Url>> {
403 match url.scheme() {
404 "az" => {
405 let account = url.host_str().ok_or(AzureError::InvalidScheme)?;
406
407 if url.path() == "/" {
408 return Err(AzureError::InvalidScheme.into());
409 }
410
411 let (scheme, root, port) = if config.azure.use_azurite {
412 ("http", AZURITE_ROOT_DOMAIN, ":10000")
413 } else {
414 ("https", AZURE_BLOB_STORAGE_ROOT_DOMAIN, "")
415 };
416
417 match (url.query(), url.fragment()) {
418 (None, None) => {
419 format!("{scheme}://{account}.{root}{port}{path}", path = url.path())
420 }
421 (None, Some(fragment)) => {
422 format!(
423 "{scheme}://{account}.{root}{port}{path}#{fragment}",
424 path = url.path()
425 )
426 }
427 (Some(query), None) => format!(
428 "{scheme}://{account}.{root}{port}{path}?{query}",
429 path = url.path()
430 ),
431 (Some(query), Some(fragment)) => {
432 format!(
433 "{scheme}://{account}.{root}{port}{path}?{query}#{fragment}",
434 path = url.path()
435 )
436 }
437 }
438 .parse()
439 .map(Cow::Owned)
440 .map_err(|_| AzureError::InvalidScheme.into())
441 }
442 _ => Ok(Cow::Borrowed(url)),
443 }
444 }
445
446 fn join_url<'a>(&self, mut url: Url, segments: impl Iterator<Item = &'a str>) -> Result<Url> {
447 let mut segments = segments.peekable();
448
449 let mut existing = url.path_segments().expect("URL should have path");
452 if let (Some(first), None) = (existing.next(), existing.next())
453 && !first.is_empty()
454 && segments.peek().is_some()
455 {
456 return Err(AzureError::RootDirectoryUploadNotSupported.into());
457 }
458
459 {
461 let mut existing = url.path_segments_mut().expect("url should have path");
462 existing.pop_if_empty();
463 existing.extend(segments);
464 }
465
466 Ok(url)
467 }
468
469 async fn head(&self, url: Url, must_exist: bool) -> Result<Response> {
470 debug_assert!(
471 Self::is_supported_url(&self.config, &url),
472 "{url} is not a supported Azure URL",
473 url = url.as_str()
474 );
475
476 debug!("sending HEAD request for `{url}`", url = url.display());
477
478 let response = self
479 .client
480 .head(url)
481 .header(header::USER_AGENT, USER_AGENT)
482 .header(header::DATE, Utc::now().to_rfc2822())
483 .header(AZURE_VERSION_HEADER, AZURE_STORAGE_VERSION)
484 .send()
485 .await?;
486
487 if !response.status().is_success() {
488 if !must_exist && response.status() == StatusCode::NOT_FOUND {
490 return Ok(response);
491 }
492
493 return Err(response.into_error().await);
494 }
495
496 Ok(response)
497 }
498
499 async fn get(&self, url: Url) -> Result<Response> {
500 debug_assert!(
501 Self::is_supported_url(&self.config, &url),
502 "{url} is not a supported Azure URL",
503 url = url.as_str()
504 );
505
506 debug!("sending GET request for `{url}`", url = url.display());
507
508 let response = self
509 .client
510 .get(url)
511 .header(header::USER_AGENT, USER_AGENT)
512 .header(header::DATE, Utc::now().to_rfc2822())
513 .header(AZURE_VERSION_HEADER, AZURE_STORAGE_VERSION)
514 .send()
515 .await?;
516
517 if !response.status().is_success() {
518 return Err(response.into_error().await);
519 }
520
521 Ok(response)
522 }
523
524 async fn get_at_offset(&self, url: Url, etag: &str, offset: u64) -> Result<Response> {
525 debug_assert!(
526 Self::is_supported_url(&self.config, &url),
527 "{url} is not a supported Azure URL",
528 url = url.as_str()
529 );
530
531 debug!(
532 "sending GET request at offset {offset} for `{url}`",
533 url = url.display(),
534 );
535
536 let response = self
537 .client
538 .get(url)
539 .header(header::USER_AGENT, USER_AGENT)
540 .header(header::DATE, Utc::now().to_rfc2822())
541 .header(header::RANGE, format!("bytes={offset}-"))
542 .header(header::IF_MATCH, etag)
543 .header(AZURE_VERSION_HEADER, AZURE_STORAGE_VERSION)
544 .send()
545 .await?;
546
547 let status = response.status();
548
549 if status == StatusCode::PRECONDITION_FAILED {
551 return Err(Error::RemoteContentModified);
552 }
553
554 if !status.is_success() {
556 return Err(response.into_error().await);
557 }
558
559 Ok(response)
560 }
561
562 async fn walk(&self, url: Url) -> Result<Vec<String>> {
563 debug_assert!(
564 Self::is_supported_url(&self.config, &url),
565 "{url} is not a supported Azure URL",
566 url = url.as_str()
567 );
568
569 debug!("walking `{url}` as a directory", url = url.display());
570
571 let mut container = url.clone();
572
573 let mut prefix = {
575 let mut container_segments = container
576 .path_segments_mut()
577 .expect("URL should have a path");
578 container_segments.clear();
579
580 let mut source_segments = url.path_segments().expect("URL should have a path");
583 let name = source_segments.next().ok_or(AzureError::BlobNameMissing)?;
584 container_segments.push(name);
585
586 source_segments.fold(String::new(), |mut p, s| {
588 if !p.is_empty() {
589 p.push('/');
590 }
591
592 p.push_str(s);
593 p
594 })
595 };
596
597 if prefix.is_empty() {
599 let mut container_segments = container
600 .path_segments_mut()
601 .expect("URL should have a path");
602 container_segments.clear();
603 container_segments.push(AZURE_ROOT_CONTAINER);
604
605 prefix = url.path_segments().expect("URL should have a path").fold(
606 String::new(),
607 |mut p, s| {
608 if !p.is_empty() {
609 p.push('/');
610 }
611
612 p.push_str(s);
613 p
614 },
615 );
616
617 assert!(!prefix.is_empty());
618 }
619
620 prefix.push('/');
622
623 {
624 let mut pairs = container.query_pairs_mut();
627 pairs.append_pair("restype", "container");
629 pairs.append_pair("comp", "list");
631 pairs.append_pair("prefix", &prefix);
633 }
634
635 let mut next = String::new();
636 let mut paths = Vec::new();
637 loop {
638 let mut url = container.clone();
639 if !next.is_empty() {
640 url.query_pairs_mut().append_pair("marker", &next);
642 }
643
644 let response = self
646 .client
647 .get(url)
648 .header(header::USER_AGENT, USER_AGENT)
649 .header(header::DATE, Utc::now().to_rfc2822())
650 .header(AZURE_VERSION_HEADER, AZURE_STORAGE_VERSION)
651 .send()
652 .await?;
653
654 let status = response.status();
655 if !status.is_success() {
656 return Err(response.into_error().await);
657 }
658
659 let text = response.text().await?;
660 let results: Results = match serde_xml_rs::from_str(&text) {
661 Ok(response) => response,
662 Err(e) => {
663 return Err(AzureError::UnexpectedResponse { status, error: e }.into());
664 }
665 };
666
667 if paths.is_empty()
670 && results.blobs.items.len() == 1
671 && results.next.is_none()
672 && let Some("") = results.blobs.items[0].name.strip_prefix(&prefix)
673 {
674 return Ok(paths);
675 }
676
677 paths.extend(results.blobs.items.into_iter().map(|b| {
678 b.name
679 .strip_prefix(&prefix)
680 .map(Into::into)
681 .unwrap_or(b.name)
682 }));
683
684 next = results.next.unwrap_or_default();
685 if next.is_empty() {
686 break;
687 }
688 }
689
690 Ok(paths)
691 }
692
693 async fn new_upload(&self, url: Url) -> Result<Self::Upload> {
694 debug_assert!(
695 Self::is_supported_url(&self.config, &url),
696 "{url} is not a supported Azure URL",
697 url = url.as_str()
698 );
699
700 if !self.config.overwrite {
704 let response = self.head(url.clone(), false).await?;
705 if response.status() != StatusCode::NOT_FOUND {
706 return Err(Error::RemoteDestinationExists(url));
707 }
708 }
709
710 Ok(AzureBlobUpload::new(
711 self.client.clone(),
712 url,
713 Arc::new(Alphanumeric::new(16).to_string()),
714 self.events.clone(),
715 ))
716 }
717}