cloud_copy/backend/
azure.rs

1//! Implementation of the Azure Blob Storage backend.
2
3use std::borrow::Cow;
4use std::sync::Arc;
5
6use base64::Engine;
7use base64::prelude::BASE64_STANDARD;
8use chrono::Utc;
9use crc64fast_nvme::Digest;
10use http_cache_stream_reqwest::Cache;
11use http_cache_stream_reqwest::storage::DefaultCacheStorage;
12use reqwest::Body;
13use reqwest::Response;
14use reqwest::StatusCode;
15use reqwest::header;
16use serde::Deserialize;
17use serde::Serialize;
18use tokio::sync::broadcast;
19use tracing::debug;
20use url::Url;
21
22use crate::BLOCK_SIZE_THRESHOLD;
23use crate::Config;
24use crate::Error;
25use crate::HttpClient;
26use crate::ONE_MEBIBYTE;
27use crate::Result;
28use crate::TransferEvent;
29use crate::USER_AGENT;
30use crate::UrlExt;
31use crate::backend::StorageBackend;
32use crate::backend::Upload;
33use crate::generator::Alphanumeric;
34use crate::streams::ByteStream;
35use crate::streams::TransferStream;
36
37/// The Azure Blob Storage domain suffix.
38const AZURE_BLOB_STORAGE_ROOT_DOMAIN: &str = "blob.core.windows.net";
39
40/// The Azurite root domain suffix.
41const AZURITE_ROOT_DOMAIN: &str = "blob.core.windows.net.localhost";
42
43/// The default block size in bytes (4 MiB).
44const DEFAULT_BLOCK_SIZE: u64 = 4 * ONE_MEBIBYTE;
45
46/// The maximum block size in bytes (4000 MiB).
47const MAX_BLOCK_SIZE: u64 = 4000 * ONE_MEBIBYTE;
48
49/// The maximum number of blocks for any blob.
50const MAX_BLOCK_COUNT: u64 = 50000;
51
52/// The maximum supported blob size.
53const MAX_BLOB_SIZE: u64 = MAX_BLOCK_SIZE * MAX_BLOCK_COUNT;
54
55/// The header for the Azure storage version supported.
56const AZURE_VERSION_HEADER: &str = "x-ms-version";
57
58/// The header for the blob type.
59const AZURE_BLOB_TYPE_HEADER: &str = "x-ms-blob-type";
60
61/// The header for the CRC64 checksum.
62const AZURE_CONTENT_CRC_HEADER: &str = "x-ms-content-crc64";
63
64/// The current supported Azure storage version.
65const AZURE_STORAGE_VERSION: &str = "2025-05-05";
66
67/// The Azure blob type uploaded by this tool.
68const AZURE_BLOB_TYPE: &str = "BlockBlob";
69
70/// The name of the root container.
71const AZURE_ROOT_CONTAINER: &str = "$root";
72
73/// Represents an Azure-specific copy operation error.
74#[derive(Debug, thiserror::Error)]
75pub enum AzureError {
76    /// The specified Azure blob block size exceeds the maximum.
77    #[error("Azure blob block size cannot exceed {MAX_BLOCK_SIZE} bytes")]
78    InvalidBlockSize,
79    /// The source size exceeds the supported maximum size.
80    #[error("the size of the source file exceeds the supported maximum of {MAX_BLOB_SIZE} bytes")]
81    MaximumSizeExceeded,
82    /// Invalid URL with an `az` scheme.
83    #[error("invalid URL with `az` scheme: the URL is not in a supported format")]
84    InvalidScheme,
85    /// Cannot upload a directory to the root container.
86    #[error("uploading a directory to the root container is not supported by Azure")]
87    RootDirectoryUploadNotSupported,
88    /// Unexpected response from server.
89    #[error("unexpected {status} response from server: failed to deserialize response contents: {error}", status = .status.as_u16())]
90    UnexpectedResponse {
91        /// The response status code.
92        status: reqwest::StatusCode,
93        /// The deserialization error.
94        error: serde_xml_rs::Error,
95    },
96    /// The blob name is missing in the URL.
97    #[error("a blob name is missing from the provided URL")]
98    BlobNameMissing,
99}
100
101/// Represents information about a blob.
102#[derive(Debug, Deserialize)]
103struct Blob {
104    /// The name of the blob.
105    #[serde(rename = "Name")]
106    name: String,
107}
108
109/// Represents a list of blobs.
110#[derive(Default, Debug, Deserialize)]
111struct Blobs {
112    /// The blob names.
113    #[serde(default, rename = "Blob")]
114    items: Vec<Blob>,
115}
116
117/// Represents results of a list operation.
118#[derive(Debug, Deserialize)]
119#[serde(rename = "EnumerationResults")]
120struct Results {
121    /// The error message.
122    #[serde(default, rename = "Blobs")]
123    blobs: Blobs,
124    /// The next marker to use to query for more blobs.
125    #[serde(rename = "NextMarker", default)]
126    next: Option<String>,
127}
128
129/// Represents a block list that comprises an Azure blob.
130#[derive(Serialize)]
131#[serde(rename = "BlockList")]
132struct BlockList<'a> {
133    /// Use the latest block.
134    #[serde(rename = "Latest")]
135    latest: &'a [String],
136}
137
138/// Extension trait for response.
139trait ResponseExt {
140    /// Converts an error response from Azure into an `Error`.
141    async fn into_error(self) -> Error;
142}
143
144impl ResponseExt for Response {
145    async fn into_error(self) -> Error {
146        /// Represents an error response.
147        #[derive(Default, Deserialize)]
148        #[serde(rename = "Error")]
149        struct ErrorResponse {
150            /// The error message.
151            #[serde(rename = "Message")]
152            message: String,
153        }
154
155        let status = self.status();
156        let text: String = match self.text().await {
157            Ok(text) => text,
158            Err(e) => return e.into(),
159        };
160
161        if text.is_empty() {
162            return Error::Server {
163                status,
164                message: text,
165            };
166        }
167
168        let message = match serde_xml_rs::from_str::<ErrorResponse>(&text) {
169            Ok(response) => response.message,
170            Err(e) => {
171                return AzureError::UnexpectedResponse { status, error: e }.into();
172            }
173        };
174
175        Error::Server { status, message }
176    }
177}
178
179/// Represents an upload of a blob to Azure Blob Storage.
180pub struct AzureBlobUpload {
181    /// The HTTP client to use for the upload.
182    client: HttpClient,
183    /// The blob URL.
184    url: Url,
185    /// The Azure block id.
186    block_id: Arc<String>,
187    /// The channel for sending progress updates.
188    events: Option<broadcast::Sender<TransferEvent>>,
189}
190
191impl AzureBlobUpload {
192    /// Constructs a new blob upload.
193    fn new(
194        client: HttpClient,
195        url: Url,
196        block_id: Arc<String>,
197        events: Option<broadcast::Sender<TransferEvent>>,
198    ) -> Self {
199        Self {
200            client,
201            url,
202            block_id,
203            events,
204        }
205    }
206}
207
208impl Upload for AzureBlobUpload {
209    type Part = String;
210
211    async fn put(&self, id: u64, block: u64, bytes: bytes::Bytes) -> Result<Option<Self::Part>> {
212        // Azure doesn't support uploading blocks of size 0
213        if bytes.is_empty() {
214            return Ok(None);
215        }
216
217        let block_id =
218            BASE64_STANDARD.encode(format!("{block_id}:{block:05}", block_id = self.block_id));
219
220        debug!(
221            "uploading block {block} with id `{block_id}` for `{url}`",
222            url = self.url.display()
223        );
224
225        let mut url = self.url.clone();
226        {
227            // Append the operation and block id to the URL
228            // These parameters are documented here: https://learn.microsoft.com/en-us/rest/api/storageservices/put-block
229            let mut pairs = url.query_pairs_mut();
230            // The component being created (a block)
231            pairs.append_pair("comp", "block");
232            // The id of the block being created
233            pairs.append_pair("blockid", &block_id);
234        }
235
236        // Calculate the CRC64 checksum
237        let mut crc64 = Digest::new();
238        crc64.write(&bytes);
239        let checksum = BASE64_STANDARD.encode(crc64.sum64().to_le_bytes());
240
241        let length = bytes.len();
242        let body = Body::wrap_stream(TransferStream::new(
243            ByteStream::new(bytes),
244            id,
245            block,
246            0,
247            self.events.clone(),
248        ));
249
250        let response = self
251            .client
252            .put(url)
253            .header(header::USER_AGENT, USER_AGENT)
254            .header(header::CONTENT_LENGTH, length)
255            .header(header::CONTENT_TYPE, "application/octet-stream")
256            .header(header::DATE, Utc::now().to_rfc2822())
257            .header(AZURE_VERSION_HEADER, AZURE_STORAGE_VERSION)
258            .header(AZURE_BLOB_TYPE_HEADER, AZURE_BLOB_TYPE)
259            .header(AZURE_CONTENT_CRC_HEADER, checksum)
260            .body(body)
261            .send()
262            .await?;
263
264        if response.status() == StatusCode::CREATED {
265            Ok(Some(block_id))
266        } else {
267            Err(response.into_error().await)
268        }
269    }
270
271    async fn finalize(&self, parts: &[Self::Part]) -> Result<()> {
272        debug!("uploading block list for `{url}`", url = self.url.display());
273
274        let mut url = self.url.clone();
275
276        {
277            // Append the operation to the URL
278            // These parameter are documented here: https://learn.microsoft.com/en-us/rest/api/storageservices/put-block-list
279            let mut pairs = url.query_pairs_mut();
280            // The component being created (a block list)
281            pairs.append_pair("comp", "blocklist");
282        }
283
284        let body = serde_xml_rs::to_string(&BlockList { latest: parts }).expect("should serialize");
285
286        let response = self
287            .client
288            .put(url)
289            .header(header::USER_AGENT, USER_AGENT)
290            .header(header::CONTENT_LENGTH, body.len())
291            .header(header::CONTENT_TYPE, "application/xml")
292            .header(header::DATE, Utc::now().to_rfc2822())
293            .header(AZURE_VERSION_HEADER, AZURE_STORAGE_VERSION)
294            .body(body)
295            .send()
296            .await?;
297
298        if response.status() == StatusCode::CREATED {
299            Ok(())
300        } else {
301            Err(response.into_error().await)
302        }
303    }
304}
305
306/// Represents a storage backend for Azure Blob Storage.
307#[derive(Clone)]
308pub struct AzureBlobStorageBackend {
309    /// The config to use for transferring files.
310    config: Config,
311    /// The HTTP client to use for transferring files.
312    client: HttpClient,
313    /// The channel for sending transfer events.
314    events: Option<broadcast::Sender<TransferEvent>>,
315}
316
317impl AzureBlobStorageBackend {
318    /// Constructs a new Azure Blob Storage backend with the given configuration
319    /// and events channel.
320    pub fn new(
321        config: Config,
322        client: HttpClient,
323        events: Option<broadcast::Sender<TransferEvent>>,
324    ) -> Self {
325        Self {
326            config,
327            client,
328            events,
329        }
330    }
331}
332
333impl StorageBackend for AzureBlobStorageBackend {
334    type Upload = AzureBlobUpload;
335
336    fn config(&self) -> &Config {
337        &self.config
338    }
339
340    fn cache(&self) -> Option<&Cache<DefaultCacheStorage>> {
341        self.client.cache()
342    }
343
344    fn events(&self) -> &Option<broadcast::Sender<TransferEvent>> {
345        &self.events
346    }
347
348    fn block_size(&self, file_size: u64) -> Result<u64> {
349        /// The number of blocks to increment by in search of a block size
350        const BLOCK_COUNT_INCREMENT: u64 = 50;
351
352        // Return the block size if one was specified
353        if let Some(size) = self.config.block_size {
354            if size > MAX_BLOCK_SIZE {
355                return Err(AzureError::InvalidBlockSize.into());
356            }
357
358            return Ok(size);
359        }
360
361        // Try to balance the number of blocks with the size of the blocks
362        let mut num_blocks: u64 = BLOCK_COUNT_INCREMENT;
363        while num_blocks < MAX_BLOCK_COUNT {
364            let block_size = file_size.div_ceil(num_blocks).next_power_of_two();
365            if block_size <= BLOCK_SIZE_THRESHOLD {
366                return Ok(block_size.max(DEFAULT_BLOCK_SIZE));
367            }
368
369            num_blocks += BLOCK_COUNT_INCREMENT;
370        }
371
372        // Couldn't fit the number of blocks within the size threshold; fallback to
373        // whatever will fit
374        let block_size: u64 = file_size.div_ceil(MAX_BLOCK_COUNT);
375        if block_size > MAX_BLOCK_SIZE {
376            return Err(AzureError::MaximumSizeExceeded.into());
377        }
378
379        Ok(block_size)
380    }
381
382    fn is_supported_url(config: &Config, url: &Url) -> bool {
383        match url.scheme() {
384            "az" => true,
385            "http" | "https" => {
386                let Some(domain) = url.domain() else {
387                    return false;
388                };
389
390                // Virtual host style URL of the form https://<account>.blob.core.windows.net/<container>/<path>
391                let Some((_, domain)) = domain.split_once('.') else {
392                    return false;
393                };
394
395                domain.eq_ignore_ascii_case(AZURE_BLOB_STORAGE_ROOT_DOMAIN)
396                    | (config.azure.use_azurite && domain.eq_ignore_ascii_case(AZURITE_ROOT_DOMAIN))
397            }
398            _ => false,
399        }
400    }
401
402    fn rewrite_url<'a>(config: &Config, url: &'a Url) -> Result<Cow<'a, Url>> {
403        match url.scheme() {
404            "az" => {
405                let account = url.host_str().ok_or(AzureError::InvalidScheme)?;
406
407                if url.path() == "/" {
408                    return Err(AzureError::InvalidScheme.into());
409                }
410
411                let (scheme, root, port) = if config.azure.use_azurite {
412                    ("http", AZURITE_ROOT_DOMAIN, ":10000")
413                } else {
414                    ("https", AZURE_BLOB_STORAGE_ROOT_DOMAIN, "")
415                };
416
417                match (url.query(), url.fragment()) {
418                    (None, None) => {
419                        format!("{scheme}://{account}.{root}{port}{path}", path = url.path())
420                    }
421                    (None, Some(fragment)) => {
422                        format!(
423                            "{scheme}://{account}.{root}{port}{path}#{fragment}",
424                            path = url.path()
425                        )
426                    }
427                    (Some(query), None) => format!(
428                        "{scheme}://{account}.{root}{port}{path}?{query}",
429                        path = url.path()
430                    ),
431                    (Some(query), Some(fragment)) => {
432                        format!(
433                            "{scheme}://{account}.{root}{port}{path}?{query}#{fragment}",
434                            path = url.path()
435                        )
436                    }
437                }
438                .parse()
439                .map(Cow::Owned)
440                .map_err(|_| AzureError::InvalidScheme.into())
441            }
442            _ => Ok(Cow::Borrowed(url)),
443        }
444    }
445
446    fn join_url<'a>(&self, mut url: Url, segments: impl Iterator<Item = &'a str>) -> Result<Url> {
447        let mut segments = segments.peekable();
448
449        // Check to see if we're joining a path to the root container; that's not
450        // supported
451        let mut existing = url.path_segments().expect("URL should have path");
452        if let (Some(first), None) = (existing.next(), existing.next())
453            && !first.is_empty()
454            && segments.peek().is_some()
455        {
456            return Err(AzureError::RootDirectoryUploadNotSupported.into());
457        }
458
459        // Append on the segments
460        {
461            let mut existing = url.path_segments_mut().expect("url should have path");
462            existing.pop_if_empty();
463            existing.extend(segments);
464        }
465
466        Ok(url)
467    }
468
469    async fn head(&self, url: Url, must_exist: bool) -> Result<Response> {
470        debug_assert!(
471            Self::is_supported_url(&self.config, &url),
472            "{url} is not a supported Azure URL",
473            url = url.as_str()
474        );
475
476        debug!("sending HEAD request for `{url}`", url = url.display());
477
478        let response = self
479            .client
480            .head(url)
481            .header(header::USER_AGENT, USER_AGENT)
482            .header(header::DATE, Utc::now().to_rfc2822())
483            .header(AZURE_VERSION_HEADER, AZURE_STORAGE_VERSION)
484            .send()
485            .await?;
486
487        if !response.status().is_success() {
488            // If the resource isn't required to exist and it's a 404, return the response.
489            if !must_exist && response.status() == StatusCode::NOT_FOUND {
490                return Ok(response);
491            }
492
493            return Err(response.into_error().await);
494        }
495
496        Ok(response)
497    }
498
499    async fn get(&self, url: Url) -> Result<Response> {
500        debug_assert!(
501            Self::is_supported_url(&self.config, &url),
502            "{url} is not a supported Azure URL",
503            url = url.as_str()
504        );
505
506        debug!("sending GET request for `{url}`", url = url.display());
507
508        let response = self
509            .client
510            .get(url)
511            .header(header::USER_AGENT, USER_AGENT)
512            .header(header::DATE, Utc::now().to_rfc2822())
513            .header(AZURE_VERSION_HEADER, AZURE_STORAGE_VERSION)
514            .send()
515            .await?;
516
517        if !response.status().is_success() {
518            return Err(response.into_error().await);
519        }
520
521        Ok(response)
522    }
523
524    async fn get_at_offset(&self, url: Url, etag: &str, offset: u64) -> Result<Response> {
525        debug_assert!(
526            Self::is_supported_url(&self.config, &url),
527            "{url} is not a supported Azure URL",
528            url = url.as_str()
529        );
530
531        debug!(
532            "sending GET request at offset {offset} for `{url}`",
533            url = url.display(),
534        );
535
536        let response = self
537            .client
538            .get(url)
539            .header(header::USER_AGENT, USER_AGENT)
540            .header(header::DATE, Utc::now().to_rfc2822())
541            .header(header::RANGE, format!("bytes={offset}-"))
542            .header(header::IF_MATCH, etag)
543            .header(AZURE_VERSION_HEADER, AZURE_STORAGE_VERSION)
544            .send()
545            .await?;
546
547        let status = response.status();
548
549        // Handle precondition failed as remote content modified
550        if status == StatusCode::PRECONDITION_FAILED {
551            return Err(Error::RemoteContentModified);
552        }
553
554        // Handle other error responses
555        if !status.is_success() {
556            return Err(response.into_error().await);
557        }
558
559        Ok(response)
560    }
561
562    async fn walk(&self, url: Url) -> Result<Vec<String>> {
563        debug_assert!(
564            Self::is_supported_url(&self.config, &url),
565            "{url} is not a supported Azure URL",
566            url = url.as_str()
567        );
568
569        debug!("walking `{url}` as a directory", url = url.display());
570
571        let mut container = url.clone();
572
573        // Clear the path segments for the list request; we only want the container name
574        let mut prefix = {
575            let mut container_segments = container
576                .path_segments_mut()
577                .expect("URL should have a path");
578            container_segments.clear();
579
580            // Start by treating the first path segment as the container to list the
581            // contents of
582            let mut source_segments = url.path_segments().expect("URL should have a path");
583            let name = source_segments.next().ok_or(AzureError::BlobNameMissing)?;
584            container_segments.push(name);
585
586            // The remainder is the prefix we're going to search for
587            source_segments.fold(String::new(), |mut p, s| {
588                if !p.is_empty() {
589                    p.push('/');
590                }
591
592                p.push_str(s);
593                p
594            })
595        };
596
597        // If there's no prefix, then we need to use the implicit root container
598        if prefix.is_empty() {
599            let mut container_segments = container
600                .path_segments_mut()
601                .expect("URL should have a path");
602            container_segments.clear();
603            container_segments.push(AZURE_ROOT_CONTAINER);
604
605            prefix = url.path_segments().expect("URL should have a path").fold(
606                String::new(),
607                |mut p, s| {
608                    if !p.is_empty() {
609                        p.push('/');
610                    }
611
612                    p.push_str(s);
613                    p
614                },
615            );
616
617            assert!(!prefix.is_empty());
618        }
619
620        // The prefix should end with `/` to signify a directory.
621        prefix.push('/');
622
623        {
624            // Append the operation and block id to the URL
625            // These parameters are documented here: https://learn.microsoft.com/en-us/rest/api/storageservices/list-blobs
626            let mut pairs = container.query_pairs_mut();
627            // The resource operation is on the container
628            pairs.append_pair("restype", "container");
629            // The operation is a list
630            pairs.append_pair("comp", "list");
631            // The prefix to use for listing blobs in the container.
632            pairs.append_pair("prefix", &prefix);
633        }
634
635        let mut next = String::new();
636        let mut paths = Vec::new();
637        loop {
638            let mut url = container.clone();
639            if !next.is_empty() {
640                // The marker to start listing from, returned by the previous query
641                url.query_pairs_mut().append_pair("marker", &next);
642            }
643
644            // List the blobs with the prefix
645            let response = self
646                .client
647                .get(url)
648                .header(header::USER_AGENT, USER_AGENT)
649                .header(header::DATE, Utc::now().to_rfc2822())
650                .header(AZURE_VERSION_HEADER, AZURE_STORAGE_VERSION)
651                .send()
652                .await?;
653
654            let status = response.status();
655            if !status.is_success() {
656                return Err(response.into_error().await);
657            }
658
659            let text = response.text().await?;
660            let results: Results = match serde_xml_rs::from_str(&text) {
661                Ok(response) => response,
662                Err(e) => {
663                    return Err(AzureError::UnexpectedResponse { status, error: e }.into());
664                }
665            };
666
667            // If there is only one result and the result is an empty path, then the given
668            // URL was to a file and not a "directory"
669            if paths.is_empty()
670                && results.blobs.items.len() == 1
671                && results.next.is_none()
672                && let Some("") = results.blobs.items[0].name.strip_prefix(&prefix)
673            {
674                return Ok(paths);
675            }
676
677            paths.extend(results.blobs.items.into_iter().map(|b| {
678                b.name
679                    .strip_prefix(&prefix)
680                    .map(Into::into)
681                    .unwrap_or(b.name)
682            }));
683
684            next = results.next.unwrap_or_default();
685            if next.is_empty() {
686                break;
687            }
688        }
689
690        Ok(paths)
691    }
692
693    async fn new_upload(&self, url: Url) -> Result<Self::Upload> {
694        debug_assert!(
695            Self::is_supported_url(&self.config, &url),
696            "{url} is not a supported Azure URL",
697            url = url.as_str()
698        );
699
700        // Azure doesn't support conditional requests for `Put Block`.
701        // Therefore, we must issue a HEAD request for the blob if not overwriting.
702        // See: https://learn.microsoft.com/en-us/rest/api/storageservices/specifying-conditional-headers-for-blob-service-operations
703        if !self.config.overwrite {
704            let response = self.head(url.clone(), false).await?;
705            if response.status() != StatusCode::NOT_FOUND {
706                return Err(Error::RemoteDestinationExists(url));
707            }
708        }
709
710        Ok(AzureBlobUpload::new(
711            self.client.clone(),
712            url,
713            Arc::new(Alphanumeric::new(16).to_string()),
714            self.events.clone(),
715        ))
716    }
717}