1use std::sync::Arc;
4
5use base64::Engine;
6use base64::prelude::BASE64_STANDARD;
7use chrono::Utc;
8use crc64fast_nvme::Digest;
9use http_cache_stream_reqwest::Cache;
10use http_cache_stream_reqwest::storage::DefaultCacheStorage;
11use reqwest::Body;
12use reqwest::Response;
13use reqwest::StatusCode;
14use reqwest::header;
15use serde::Deserialize;
16use serde::Serialize;
17use tokio::sync::broadcast;
18use tracing::debug;
19use url::Url;
20
21use crate::BLOCK_SIZE_THRESHOLD;
22use crate::Config;
23use crate::Error;
24use crate::HttpClient;
25use crate::ONE_MEBIBYTE;
26use crate::Result;
27use crate::TransferEvent;
28use crate::USER_AGENT;
29use crate::UrlExt;
30use crate::backend::StorageBackend;
31use crate::backend::Upload;
32use crate::generator::Alphanumeric;
33use crate::streams::ByteStream;
34use crate::streams::TransferStream;
35
36const AZURE_BLOB_STORAGE_ROOT_DOMAIN: &str = "blob.core.windows.net";
38
39const AZURITE_ROOT_DOMAIN: &str = "blob.core.windows.net.localhost";
41
42const DEFAULT_BLOCK_SIZE: u64 = 4 * ONE_MEBIBYTE;
44
45const MAX_BLOCK_SIZE: u64 = 4000 * ONE_MEBIBYTE;
47
48const MAX_BLOCK_COUNT: u64 = 50000;
50
51const MAX_BLOB_SIZE: u64 = MAX_BLOCK_SIZE * MAX_BLOCK_COUNT;
53
54const AZURE_VERSION_HEADER: &str = "x-ms-version";
56
57const AZURE_BLOB_TYPE_HEADER: &str = "x-ms-blob-type";
59
60const AZURE_CONTENT_CRC_HEADER: &str = "x-ms-content-crc64";
62
63const AZURE_STORAGE_VERSION: &str = "2025-05-05";
65
66const AZURE_BLOB_TYPE: &str = "BlockBlob";
68
69const AZURE_ROOT_CONTAINER: &str = "$root";
71
72#[derive(Debug, thiserror::Error)]
74pub enum AzureError {
75 #[error("Azure blob block size cannot exceed {MAX_BLOCK_SIZE} bytes")]
77 InvalidBlockSize,
78 #[error("the size of the source file exceeds the supported maximum of {MAX_BLOB_SIZE} bytes")]
80 MaximumSizeExceeded,
81 #[error("invalid URL with `az` scheme: the URL is not in a supported format")]
83 InvalidScheme,
84 #[error("uploading a directory to the root container is not supported by Azure")]
86 RootDirectoryUploadNotSupported,
87 #[error("unexpected {status} response from server: failed to deserialize response contents: {error}", status = .status.as_u16())]
89 UnexpectedResponse {
90 status: reqwest::StatusCode,
92 error: serde_xml_rs::Error,
94 },
95 #[error("a blob name is missing from the provided URL")]
97 BlobNameMissing,
98}
99
100#[derive(Debug, Deserialize)]
102struct Blob {
103 #[serde(rename = "Name")]
105 name: String,
106}
107
108#[derive(Default, Debug, Deserialize)]
110struct Blobs {
111 #[serde(default, rename = "Blob")]
113 items: Vec<Blob>,
114}
115
116#[derive(Debug, Deserialize)]
118#[serde(rename = "EnumerationResults")]
119struct Results {
120 #[serde(default, rename = "Blobs")]
122 blobs: Blobs,
123 #[serde(rename = "NextMarker", default)]
125 next: Option<String>,
126}
127
128#[derive(Serialize)]
130#[serde(rename = "BlockList")]
131struct BlockList<'a> {
132 #[serde(rename = "Latest")]
134 latest: &'a [String],
135}
136
137trait ResponseExt {
139 async fn into_error(self) -> Error;
141}
142
143impl ResponseExt for Response {
144 async fn into_error(self) -> Error {
145 #[derive(Default, Deserialize)]
147 #[serde(rename = "Error")]
148 struct ErrorResponse {
149 #[serde(rename = "Message")]
151 message: String,
152 }
153
154 let status = self.status();
155 let text: String = match self.text().await {
156 Ok(text) => text,
157 Err(e) => return e.into(),
158 };
159
160 if text.is_empty() {
161 return Error::Server {
162 status,
163 message: text,
164 };
165 }
166
167 let message = match serde_xml_rs::from_str::<ErrorResponse>(&text) {
168 Ok(response) => response.message,
169 Err(e) => {
170 return AzureError::UnexpectedResponse { status, error: e }.into();
171 }
172 };
173
174 Error::Server { status, message }
175 }
176}
177
178pub struct AzureBlobUpload {
180 client: HttpClient,
182 url: Url,
184 block_id: Arc<String>,
186 events: Option<broadcast::Sender<TransferEvent>>,
188}
189
190impl AzureBlobUpload {
191 fn new(
193 client: HttpClient,
194 url: Url,
195 block_id: Arc<String>,
196 events: Option<broadcast::Sender<TransferEvent>>,
197 ) -> Self {
198 Self {
199 client,
200 url,
201 block_id,
202 events,
203 }
204 }
205}
206
207impl Upload for AzureBlobUpload {
208 type Part = String;
209
210 async fn put(&self, id: u64, block: u64, bytes: bytes::Bytes) -> Result<Self::Part> {
211 let block_id =
212 BASE64_STANDARD.encode(format!("{block_id}:{block:05}", block_id = self.block_id));
213
214 debug!(
215 "uploading block {block} with id `{block_id}` for `{url}`",
216 url = self.url.display()
217 );
218
219 let mut url = self.url.clone();
220 {
221 let mut pairs = url.query_pairs_mut();
224 pairs.append_pair("comp", "block");
226 pairs.append_pair("blockid", &block_id);
228 }
229
230 let mut crc64 = Digest::new();
232 crc64.write(&bytes);
233 let checksum = BASE64_STANDARD.encode(crc64.sum64().to_le_bytes());
234
235 let length = bytes.len();
236 let body = Body::wrap_stream(TransferStream::new(
237 ByteStream::new(bytes),
238 id,
239 block,
240 0,
241 self.events.clone(),
242 ));
243
244 let response = self
245 .client
246 .put(url)
247 .header(header::USER_AGENT, USER_AGENT)
248 .header(header::CONTENT_LENGTH, length)
249 .header(header::CONTENT_TYPE, "application/octet-stream")
250 .header(header::DATE, Utc::now().to_rfc2822())
251 .header(AZURE_VERSION_HEADER, AZURE_STORAGE_VERSION)
252 .header(AZURE_BLOB_TYPE_HEADER, AZURE_BLOB_TYPE)
253 .header(AZURE_CONTENT_CRC_HEADER, checksum)
254 .body(body)
255 .send()
256 .await?;
257
258 if response.status() == StatusCode::CREATED {
259 Ok(block_id)
260 } else {
261 Err(response.into_error().await)
262 }
263 }
264
265 async fn finalize(&self, parts: &[Self::Part]) -> Result<()> {
266 debug!("uploading block list for `{url}`", url = self.url.display());
267
268 let mut url = self.url.clone();
269
270 {
271 let mut pairs = url.query_pairs_mut();
274 pairs.append_pair("comp", "blocklist");
276 }
277
278 let body = serde_xml_rs::to_string(&BlockList { latest: parts }).expect("should serialize");
279
280 let response = self
281 .client
282 .put(url)
283 .header(header::USER_AGENT, USER_AGENT)
284 .header(header::CONTENT_LENGTH, body.len())
285 .header(header::CONTENT_TYPE, "application/xml")
286 .header(header::DATE, Utc::now().to_rfc2822())
287 .header(AZURE_VERSION_HEADER, AZURE_STORAGE_VERSION)
288 .body(body)
289 .send()
290 .await?;
291
292 if response.status() == StatusCode::CREATED {
293 Ok(())
294 } else {
295 Err(response.into_error().await)
296 }
297 }
298}
299
300#[derive(Clone)]
302pub struct AzureBlobStorageBackend {
303 config: Config,
305 client: HttpClient,
307 events: Option<broadcast::Sender<TransferEvent>>,
309}
310
311impl AzureBlobStorageBackend {
312 pub fn new(
315 config: Config,
316 client: HttpClient,
317 events: Option<broadcast::Sender<TransferEvent>>,
318 ) -> Self {
319 Self {
320 config,
321 client,
322 events,
323 }
324 }
325}
326
327impl StorageBackend for AzureBlobStorageBackend {
328 type Upload = AzureBlobUpload;
329
330 fn config(&self) -> &Config {
331 &self.config
332 }
333
334 fn cache(&self) -> Option<&Cache<DefaultCacheStorage>> {
335 self.client.cache()
336 }
337
338 fn events(&self) -> &Option<broadcast::Sender<TransferEvent>> {
339 &self.events
340 }
341
342 fn block_size(&self, file_size: u64) -> Result<u64> {
343 const BLOCK_COUNT_INCREMENT: u64 = 50;
345
346 if let Some(size) = self.config.block_size {
348 if size > MAX_BLOCK_SIZE {
349 return Err(AzureError::InvalidBlockSize.into());
350 }
351
352 return Ok(size);
353 }
354
355 let mut num_blocks: u64 = BLOCK_COUNT_INCREMENT;
357 while num_blocks < MAX_BLOCK_COUNT {
358 let block_size = file_size.div_ceil(num_blocks).next_power_of_two();
359 if block_size <= BLOCK_SIZE_THRESHOLD {
360 return Ok(block_size.max(DEFAULT_BLOCK_SIZE));
361 }
362
363 num_blocks += BLOCK_COUNT_INCREMENT;
364 }
365
366 let block_size: u64 = file_size.div_ceil(MAX_BLOCK_COUNT);
369 if block_size > MAX_BLOCK_SIZE {
370 return Err(AzureError::MaximumSizeExceeded.into());
371 }
372
373 Ok(block_size)
374 }
375
376 fn is_supported_url(config: &Config, url: &Url) -> bool {
377 match url.scheme() {
378 "az" => true,
379 "http" | "https" => {
380 let Some(domain) = url.domain() else {
381 return false;
382 };
383
384 let Some((_, domain)) = domain.split_once('.') else {
386 return false;
387 };
388
389 domain.eq_ignore_ascii_case(AZURE_BLOB_STORAGE_ROOT_DOMAIN)
390 | (config.azure.use_azurite && domain.eq_ignore_ascii_case(AZURITE_ROOT_DOMAIN))
391 }
392 _ => false,
393 }
394 }
395
396 fn rewrite_url(&self, url: Url) -> Result<Url> {
397 match url.scheme() {
398 "az" => {
399 let account = url.host_str().ok_or(AzureError::InvalidScheme)?;
400
401 if url.path() == "/" {
402 return Err(AzureError::InvalidScheme.into());
403 }
404
405 let (scheme, root, port) = if self.config.azure.use_azurite {
406 ("http", AZURITE_ROOT_DOMAIN, ":10000")
407 } else {
408 ("https", AZURE_BLOB_STORAGE_ROOT_DOMAIN, "")
409 };
410
411 match (url.query(), url.fragment()) {
412 (None, None) => {
413 format!("{scheme}://{account}.{root}{port}{path}", path = url.path())
414 }
415 (None, Some(fragment)) => {
416 format!(
417 "{scheme}://{account}.{root}{port}{path}#{fragment}",
418 path = url.path()
419 )
420 }
421 (Some(query), None) => format!(
422 "{scheme}://{account}.{root}{port}{path}?{query}",
423 path = url.path()
424 ),
425 (Some(query), Some(fragment)) => {
426 format!(
427 "{scheme}://{account}.{root}{port}{path}?{query}#{fragment}",
428 path = url.path()
429 )
430 }
431 }
432 .parse()
433 .map_err(|_| AzureError::InvalidScheme.into())
434 }
435 _ => Ok(url),
436 }
437 }
438
439 fn join_url<'a>(&self, mut url: Url, segments: impl Iterator<Item = &'a str>) -> Result<Url> {
440 let mut segments = segments.peekable();
441
442 let mut existing = url.path_segments().expect("URL should have path");
445 if let (Some(first), None) = (existing.next(), existing.next())
446 && !first.is_empty()
447 && segments.peek().is_some()
448 {
449 return Err(AzureError::RootDirectoryUploadNotSupported.into());
450 }
451
452 {
454 let mut existing = url.path_segments_mut().expect("url should have path");
455 existing.pop_if_empty();
456 existing.extend(segments);
457 }
458
459 Ok(url)
460 }
461
462 async fn head(&self, url: Url) -> Result<Response> {
463 debug_assert!(
464 Self::is_supported_url(&self.config, &url),
465 "{url} is not a supported Azure URL",
466 url = url.as_str()
467 );
468
469 debug!("sending HEAD request for `{url}`", url = url.display());
470
471 let response = self
472 .client
473 .head(url)
474 .header(header::USER_AGENT, USER_AGENT)
475 .header(header::DATE, Utc::now().to_rfc2822())
476 .header(AZURE_VERSION_HEADER, AZURE_STORAGE_VERSION)
477 .send()
478 .await?;
479
480 if !response.status().is_success() {
481 return Err(response.into_error().await);
482 }
483
484 Ok(response)
485 }
486
487 async fn get(&self, url: Url) -> Result<Response> {
488 debug_assert!(
489 Self::is_supported_url(&self.config, &url),
490 "{url} is not a supported Azure URL",
491 url = url.as_str()
492 );
493
494 debug!("sending GET request for `{url}`", url = url.display());
495
496 let response = self
497 .client
498 .get(url)
499 .header(header::USER_AGENT, USER_AGENT)
500 .header(header::DATE, Utc::now().to_rfc2822())
501 .header(AZURE_VERSION_HEADER, AZURE_STORAGE_VERSION)
502 .send()
503 .await?;
504
505 if !response.status().is_success() {
506 return Err(response.into_error().await);
507 }
508
509 Ok(response)
510 }
511
512 async fn get_at_offset(&self, url: Url, etag: &str, offset: u64) -> Result<Response> {
513 debug_assert!(
514 Self::is_supported_url(&self.config, &url),
515 "{url} is not a supported Azure URL",
516 url = url.as_str()
517 );
518
519 debug!(
520 "sending GET request at offset {offset} for `{url}`",
521 url = url.display(),
522 );
523
524 let response = self
525 .client
526 .get(url)
527 .header(header::USER_AGENT, USER_AGENT)
528 .header(header::DATE, Utc::now().to_rfc2822())
529 .header(header::RANGE, format!("bytes={offset}-"))
530 .header(header::IF_MATCH, etag)
531 .header(AZURE_VERSION_HEADER, AZURE_STORAGE_VERSION)
532 .send()
533 .await?;
534
535 let status = response.status();
536
537 if status == StatusCode::PRECONDITION_FAILED {
539 return Err(Error::RemoteContentModified);
540 }
541
542 if !status.is_success() {
544 return Err(response.into_error().await);
545 }
546
547 Ok(response)
548 }
549
550 async fn walk(&self, url: Url) -> Result<Vec<String>> {
551 debug_assert!(
552 Self::is_supported_url(&self.config, &url),
553 "{url} is not a supported Azure URL",
554 url = url.as_str()
555 );
556
557 debug!("walking `{url}` as a directory", url = url.display());
558
559 let mut container = url.clone();
560
561 let mut prefix = {
563 let mut container_segments = container
564 .path_segments_mut()
565 .expect("URL should have a path");
566 container_segments.clear();
567
568 let mut source_segments = url.path_segments().expect("URL should have a path");
571 let name = source_segments.next().ok_or(AzureError::BlobNameMissing)?;
572 container_segments.push(name);
573
574 source_segments.fold(String::new(), |mut p, s| {
576 if !p.is_empty() {
577 p.push('/');
578 }
579
580 p.push_str(s);
581 p
582 })
583 };
584
585 if prefix.is_empty() {
587 let mut container_segments = container
588 .path_segments_mut()
589 .expect("URL should have a path");
590 container_segments.clear();
591 container_segments.push(AZURE_ROOT_CONTAINER);
592
593 prefix = url.path_segments().expect("URL should have a path").fold(
594 String::new(),
595 |mut p, s| {
596 if !p.is_empty() {
597 p.push('/');
598 }
599
600 p.push_str(s);
601 p
602 },
603 );
604
605 assert!(!prefix.is_empty());
606 }
607
608 prefix.push('/');
610
611 {
612 let mut pairs = container.query_pairs_mut();
615 pairs.append_pair("restype", "container");
617 pairs.append_pair("comp", "list");
619 pairs.append_pair("prefix", &prefix);
621 }
622
623 let mut next = String::new();
624 let mut paths = Vec::new();
625 loop {
626 let mut url = container.clone();
627 if !next.is_empty() {
628 url.query_pairs_mut().append_pair("marker", &next);
630 }
631
632 let response = self
634 .client
635 .get(url)
636 .header(header::USER_AGENT, USER_AGENT)
637 .header(header::DATE, Utc::now().to_rfc2822())
638 .header(AZURE_VERSION_HEADER, AZURE_STORAGE_VERSION)
639 .send()
640 .await?;
641
642 let status = response.status();
643 if !status.is_success() {
644 return Err(response.into_error().await);
645 }
646
647 let text = response.text().await?;
648 let results: Results = match serde_xml_rs::from_str(&text) {
649 Ok(response) => response,
650 Err(e) => {
651 return Err(AzureError::UnexpectedResponse { status, error: e }.into());
652 }
653 };
654
655 if paths.is_empty()
658 && results.blobs.items.len() == 1
659 && results.next.is_none()
660 && let Some("") = results.blobs.items[0].name.strip_prefix(&prefix)
661 {
662 return Ok(paths);
663 }
664
665 paths.extend(results.blobs.items.into_iter().map(|b| {
666 b.name
667 .strip_prefix(&prefix)
668 .map(Into::into)
669 .unwrap_or(b.name)
670 }));
671
672 next = results.next.unwrap_or_default();
673 if next.is_empty() {
674 break;
675 }
676 }
677
678 Ok(paths)
679 }
680
681 async fn new_upload(&self, url: Url) -> Result<Self::Upload> {
682 debug_assert!(
683 Self::is_supported_url(&self.config, &url),
684 "{url} is not a supported Azure URL",
685 url = url.as_str()
686 );
687
688 Ok(AzureBlobUpload::new(
689 self.client.clone(),
690 url,
691 Arc::new(Alphanumeric::new(16).to_string()),
692 self.events.clone(),
693 ))
694 }
695}