cloud_copy/backend/
azure.rs

1//! Implementation of the Azure Blob Storage backend.
2
3use std::sync::Arc;
4
5use base64::Engine;
6use base64::prelude::BASE64_STANDARD;
7use chrono::Utc;
8use crc64fast_nvme::Digest;
9use http_cache_stream_reqwest::Cache;
10use http_cache_stream_reqwest::storage::DefaultCacheStorage;
11use reqwest::Body;
12use reqwest::Response;
13use reqwest::StatusCode;
14use reqwest::header;
15use serde::Deserialize;
16use serde::Serialize;
17use tokio::sync::broadcast;
18use tracing::debug;
19use url::Url;
20
21use crate::BLOCK_SIZE_THRESHOLD;
22use crate::Config;
23use crate::Error;
24use crate::HttpClient;
25use crate::ONE_MEBIBYTE;
26use crate::Result;
27use crate::TransferEvent;
28use crate::USER_AGENT;
29use crate::UrlExt;
30use crate::backend::StorageBackend;
31use crate::backend::Upload;
32use crate::generator::Alphanumeric;
33use crate::streams::ByteStream;
34use crate::streams::TransferStream;
35
36/// The Azure Blob Storage domain suffix.
37const AZURE_BLOB_STORAGE_ROOT_DOMAIN: &str = "blob.core.windows.net";
38
39/// The Azurite root domain suffix.
40const AZURITE_ROOT_DOMAIN: &str = "blob.core.windows.net.localhost";
41
42/// The default block size in bytes (4 MiB).
43const DEFAULT_BLOCK_SIZE: u64 = 4 * ONE_MEBIBYTE;
44
45/// The maximum block size in bytes (4000 MiB).
46const MAX_BLOCK_SIZE: u64 = 4000 * ONE_MEBIBYTE;
47
48/// The maximum number of blocks for any blob.
49const MAX_BLOCK_COUNT: u64 = 50000;
50
51/// The maximum supported blob size.
52const MAX_BLOB_SIZE: u64 = MAX_BLOCK_SIZE * MAX_BLOCK_COUNT;
53
54/// The header for the Azure storage version supported.
55const AZURE_VERSION_HEADER: &str = "x-ms-version";
56
57/// The header for the blob type.
58const AZURE_BLOB_TYPE_HEADER: &str = "x-ms-blob-type";
59
60/// The header for the CRC64 checksum.
61const AZURE_CONTENT_CRC_HEADER: &str = "x-ms-content-crc64";
62
63/// The current supported Azure storage version.
64const AZURE_STORAGE_VERSION: &str = "2025-05-05";
65
66/// The Azure blob type uploaded by this tool.
67const AZURE_BLOB_TYPE: &str = "BlockBlob";
68
69/// The name of the root container.
70const AZURE_ROOT_CONTAINER: &str = "$root";
71
72/// Represents an Azure-specific copy operation error.
73#[derive(Debug, thiserror::Error)]
74pub enum AzureError {
75    /// The specified Azure blob block size exceeds the maximum.
76    #[error("Azure blob block size cannot exceed {MAX_BLOCK_SIZE} bytes")]
77    InvalidBlockSize,
78    /// The source size exceeds the supported maximum size.
79    #[error("the size of the source file exceeds the supported maximum of {MAX_BLOB_SIZE} bytes")]
80    MaximumSizeExceeded,
81    /// Invalid URL with an `az` scheme.
82    #[error("invalid URL with `az` scheme: the URL is not in a supported format")]
83    InvalidScheme,
84    /// Cannot upload a directory to the root container.
85    #[error("uploading a directory to the root container is not supported by Azure")]
86    RootDirectoryUploadNotSupported,
87    /// Unexpected response from server.
88    #[error("unexpected {status} response from server: failed to deserialize response contents: {error}", status = .status.as_u16())]
89    UnexpectedResponse {
90        /// The response status code.
91        status: reqwest::StatusCode,
92        /// The deserialization error.
93        error: serde_xml_rs::Error,
94    },
95    /// The blob name is missing in the URL.
96    #[error("a blob name is missing from the provided URL")]
97    BlobNameMissing,
98}
99
100/// Represents information about a blob.
101#[derive(Debug, Deserialize)]
102struct Blob {
103    /// The name of the blob.
104    #[serde(rename = "Name")]
105    name: String,
106}
107
108/// Represents a list of blobs.
109#[derive(Default, Debug, Deserialize)]
110struct Blobs {
111    /// The blob names.
112    #[serde(default, rename = "Blob")]
113    items: Vec<Blob>,
114}
115
116/// Represents results of a list operation.
117#[derive(Debug, Deserialize)]
118#[serde(rename = "EnumerationResults")]
119struct Results {
120    /// The error message.
121    #[serde(default, rename = "Blobs")]
122    blobs: Blobs,
123    /// The next marker to use to query for more blobs.
124    #[serde(rename = "NextMarker", default)]
125    next: Option<String>,
126}
127
128/// Represents a block list that comprises an Azure blob.
129#[derive(Serialize)]
130#[serde(rename = "BlockList")]
131struct BlockList<'a> {
132    /// Use the latest block.
133    #[serde(rename = "Latest")]
134    latest: &'a [String],
135}
136
137/// Extension trait for response.
138trait ResponseExt {
139    /// Converts an error response from Azure into an `Error`.
140    async fn into_error(self) -> Error;
141}
142
143impl ResponseExt for Response {
144    async fn into_error(self) -> Error {
145        /// Represents an error response.
146        #[derive(Default, Deserialize)]
147        #[serde(rename = "Error")]
148        struct ErrorResponse {
149            /// The error message.
150            #[serde(rename = "Message")]
151            message: String,
152        }
153
154        let status = self.status();
155        let text: String = match self.text().await {
156            Ok(text) => text,
157            Err(e) => return e.into(),
158        };
159
160        if text.is_empty() {
161            return Error::Server {
162                status,
163                message: text,
164            };
165        }
166
167        let message = match serde_xml_rs::from_str::<ErrorResponse>(&text) {
168            Ok(response) => response.message,
169            Err(e) => {
170                return AzureError::UnexpectedResponse { status, error: e }.into();
171            }
172        };
173
174        Error::Server { status, message }
175    }
176}
177
178/// Represents an upload of a blob to Azure Blob Storage.
179pub struct AzureBlobUpload {
180    /// The HTTP client to use for the upload.
181    client: HttpClient,
182    /// The blob URL.
183    url: Url,
184    /// The Azure block id.
185    block_id: Arc<String>,
186    /// The channel for sending progress updates.
187    events: Option<broadcast::Sender<TransferEvent>>,
188}
189
190impl AzureBlobUpload {
191    /// Constructs a new blob upload.
192    fn new(
193        client: HttpClient,
194        url: Url,
195        block_id: Arc<String>,
196        events: Option<broadcast::Sender<TransferEvent>>,
197    ) -> Self {
198        Self {
199            client,
200            url,
201            block_id,
202            events,
203        }
204    }
205}
206
207impl Upload for AzureBlobUpload {
208    type Part = String;
209
210    async fn put(&self, id: u64, block: u64, bytes: bytes::Bytes) -> Result<Self::Part> {
211        let block_id =
212            BASE64_STANDARD.encode(format!("{block_id}:{block:05}", block_id = self.block_id));
213
214        debug!(
215            "uploading block {block} with id `{block_id}` for `{url}`",
216            url = self.url.display()
217        );
218
219        let mut url = self.url.clone();
220        {
221            // Append the operation and block id to the URL
222            // These parameters are documented here: https://learn.microsoft.com/en-us/rest/api/storageservices/put-block
223            let mut pairs = url.query_pairs_mut();
224            // The component being created (a block)
225            pairs.append_pair("comp", "block");
226            // The id of the block being created
227            pairs.append_pair("blockid", &block_id);
228        }
229
230        // Calculate the CRC64 checksum
231        let mut crc64 = Digest::new();
232        crc64.write(&bytes);
233        let checksum = BASE64_STANDARD.encode(crc64.sum64().to_le_bytes());
234
235        let length = bytes.len();
236        let body = Body::wrap_stream(TransferStream::new(
237            ByteStream::new(bytes),
238            id,
239            block,
240            0,
241            self.events.clone(),
242        ));
243
244        let response = self
245            .client
246            .put(url)
247            .header(header::USER_AGENT, USER_AGENT)
248            .header(header::CONTENT_LENGTH, length)
249            .header(header::CONTENT_TYPE, "application/octet-stream")
250            .header(header::DATE, Utc::now().to_rfc2822())
251            .header(AZURE_VERSION_HEADER, AZURE_STORAGE_VERSION)
252            .header(AZURE_BLOB_TYPE_HEADER, AZURE_BLOB_TYPE)
253            .header(AZURE_CONTENT_CRC_HEADER, checksum)
254            .body(body)
255            .send()
256            .await?;
257
258        if response.status() == StatusCode::CREATED {
259            Ok(block_id)
260        } else {
261            Err(response.into_error().await)
262        }
263    }
264
265    async fn finalize(&self, parts: &[Self::Part]) -> Result<()> {
266        debug!("uploading block list for `{url}`", url = self.url.display());
267
268        let mut url = self.url.clone();
269
270        {
271            // Append the operation to the URL
272            // These parameter are documented here: https://learn.microsoft.com/en-us/rest/api/storageservices/put-block-list
273            let mut pairs = url.query_pairs_mut();
274            // The component being created (a block list)
275            pairs.append_pair("comp", "blocklist");
276        }
277
278        let body = serde_xml_rs::to_string(&BlockList { latest: parts }).expect("should serialize");
279
280        let response = self
281            .client
282            .put(url)
283            .header(header::USER_AGENT, USER_AGENT)
284            .header(header::CONTENT_LENGTH, body.len())
285            .header(header::CONTENT_TYPE, "application/xml")
286            .header(header::DATE, Utc::now().to_rfc2822())
287            .header(AZURE_VERSION_HEADER, AZURE_STORAGE_VERSION)
288            .body(body)
289            .send()
290            .await?;
291
292        if response.status() == StatusCode::CREATED {
293            Ok(())
294        } else {
295            Err(response.into_error().await)
296        }
297    }
298}
299
300/// Represents a storage backend for Azure Blob Storage.
301#[derive(Clone)]
302pub struct AzureBlobStorageBackend {
303    /// The config to use for transferring files.
304    config: Config,
305    /// The HTTP client to use for transferring files.
306    client: HttpClient,
307    /// The channel for sending transfer events.
308    events: Option<broadcast::Sender<TransferEvent>>,
309}
310
311impl AzureBlobStorageBackend {
312    /// Constructs a new Azure Blob Storage backend with the given configuration
313    /// and events channel.
314    pub fn new(
315        config: Config,
316        client: HttpClient,
317        events: Option<broadcast::Sender<TransferEvent>>,
318    ) -> Self {
319        Self {
320            config,
321            client,
322            events,
323        }
324    }
325}
326
327impl StorageBackend for AzureBlobStorageBackend {
328    type Upload = AzureBlobUpload;
329
330    fn config(&self) -> &Config {
331        &self.config
332    }
333
334    fn cache(&self) -> Option<&Cache<DefaultCacheStorage>> {
335        self.client.cache()
336    }
337
338    fn events(&self) -> &Option<broadcast::Sender<TransferEvent>> {
339        &self.events
340    }
341
342    fn block_size(&self, file_size: u64) -> Result<u64> {
343        /// The number of blocks to increment by in search of a block size
344        const BLOCK_COUNT_INCREMENT: u64 = 50;
345
346        // Return the block size if one was specified
347        if let Some(size) = self.config.block_size {
348            if size > MAX_BLOCK_SIZE {
349                return Err(AzureError::InvalidBlockSize.into());
350            }
351
352            return Ok(size);
353        }
354
355        // Try to balance the number of blocks with the size of the blocks
356        let mut num_blocks: u64 = BLOCK_COUNT_INCREMENT;
357        while num_blocks < MAX_BLOCK_COUNT {
358            let block_size = file_size.div_ceil(num_blocks).next_power_of_two();
359            if block_size <= BLOCK_SIZE_THRESHOLD {
360                return Ok(block_size.max(DEFAULT_BLOCK_SIZE));
361            }
362
363            num_blocks += BLOCK_COUNT_INCREMENT;
364        }
365
366        // Couldn't fit the number of blocks within the size threshold; fallback to
367        // whatever will fit
368        let block_size: u64 = file_size.div_ceil(MAX_BLOCK_COUNT);
369        if block_size > MAX_BLOCK_SIZE {
370            return Err(AzureError::MaximumSizeExceeded.into());
371        }
372
373        Ok(block_size)
374    }
375
376    fn is_supported_url(config: &Config, url: &Url) -> bool {
377        match url.scheme() {
378            "az" => true,
379            "http" | "https" => {
380                let Some(domain) = url.domain() else {
381                    return false;
382                };
383
384                // Virtual host style URL of the form https://<account>.blob.core.windows.net/<container>/<path>
385                let Some((_, domain)) = domain.split_once('.') else {
386                    return false;
387                };
388
389                domain.eq_ignore_ascii_case(AZURE_BLOB_STORAGE_ROOT_DOMAIN)
390                    | (config.azure.use_azurite && domain.eq_ignore_ascii_case(AZURITE_ROOT_DOMAIN))
391            }
392            _ => false,
393        }
394    }
395
396    fn rewrite_url(&self, url: Url) -> Result<Url> {
397        match url.scheme() {
398            "az" => {
399                let account = url.host_str().ok_or(AzureError::InvalidScheme)?;
400
401                if url.path() == "/" {
402                    return Err(AzureError::InvalidScheme.into());
403                }
404
405                let (scheme, root, port) = if self.config.azure.use_azurite {
406                    ("http", AZURITE_ROOT_DOMAIN, ":10000")
407                } else {
408                    ("https", AZURE_BLOB_STORAGE_ROOT_DOMAIN, "")
409                };
410
411                match (url.query(), url.fragment()) {
412                    (None, None) => {
413                        format!("{scheme}://{account}.{root}{port}{path}", path = url.path())
414                    }
415                    (None, Some(fragment)) => {
416                        format!(
417                            "{scheme}://{account}.{root}{port}{path}#{fragment}",
418                            path = url.path()
419                        )
420                    }
421                    (Some(query), None) => format!(
422                        "{scheme}://{account}.{root}{port}{path}?{query}",
423                        path = url.path()
424                    ),
425                    (Some(query), Some(fragment)) => {
426                        format!(
427                            "{scheme}://{account}.{root}{port}{path}?{query}#{fragment}",
428                            path = url.path()
429                        )
430                    }
431                }
432                .parse()
433                .map_err(|_| AzureError::InvalidScheme.into())
434            }
435            _ => Ok(url),
436        }
437    }
438
439    fn join_url<'a>(&self, mut url: Url, segments: impl Iterator<Item = &'a str>) -> Result<Url> {
440        let mut segments = segments.peekable();
441
442        // Check to see if we're joining a path to the root container; that's not
443        // supported
444        let mut existing = url.path_segments().expect("URL should have path");
445        if let (Some(first), None) = (existing.next(), existing.next())
446            && !first.is_empty()
447            && segments.peek().is_some()
448        {
449            return Err(AzureError::RootDirectoryUploadNotSupported.into());
450        }
451
452        // Append on the segments
453        {
454            let mut existing = url.path_segments_mut().expect("url should have path");
455            existing.pop_if_empty();
456            existing.extend(segments);
457        }
458
459        Ok(url)
460    }
461
462    async fn head(&self, url: Url) -> Result<Response> {
463        debug_assert!(
464            Self::is_supported_url(&self.config, &url),
465            "{url} is not a supported Azure URL",
466            url = url.as_str()
467        );
468
469        debug!("sending HEAD request for `{url}`", url = url.display());
470
471        let response = self
472            .client
473            .head(url)
474            .header(header::USER_AGENT, USER_AGENT)
475            .header(header::DATE, Utc::now().to_rfc2822())
476            .header(AZURE_VERSION_HEADER, AZURE_STORAGE_VERSION)
477            .send()
478            .await?;
479
480        if !response.status().is_success() {
481            return Err(response.into_error().await);
482        }
483
484        Ok(response)
485    }
486
487    async fn get(&self, url: Url) -> Result<Response> {
488        debug_assert!(
489            Self::is_supported_url(&self.config, &url),
490            "{url} is not a supported Azure URL",
491            url = url.as_str()
492        );
493
494        debug!("sending GET request for `{url}`", url = url.display());
495
496        let response = self
497            .client
498            .get(url)
499            .header(header::USER_AGENT, USER_AGENT)
500            .header(header::DATE, Utc::now().to_rfc2822())
501            .header(AZURE_VERSION_HEADER, AZURE_STORAGE_VERSION)
502            .send()
503            .await?;
504
505        if !response.status().is_success() {
506            return Err(response.into_error().await);
507        }
508
509        Ok(response)
510    }
511
512    async fn get_at_offset(&self, url: Url, etag: &str, offset: u64) -> Result<Response> {
513        debug_assert!(
514            Self::is_supported_url(&self.config, &url),
515            "{url} is not a supported Azure URL",
516            url = url.as_str()
517        );
518
519        debug!(
520            "sending GET request at offset {offset} for `{url}`",
521            url = url.display(),
522        );
523
524        let response = self
525            .client
526            .get(url)
527            .header(header::USER_AGENT, USER_AGENT)
528            .header(header::DATE, Utc::now().to_rfc2822())
529            .header(header::RANGE, format!("bytes={offset}-"))
530            .header(header::IF_MATCH, etag)
531            .header(AZURE_VERSION_HEADER, AZURE_STORAGE_VERSION)
532            .send()
533            .await?;
534
535        let status = response.status();
536
537        // Handle precondition failed as remote content modified
538        if status == StatusCode::PRECONDITION_FAILED {
539            return Err(Error::RemoteContentModified);
540        }
541
542        // Handle other error responses
543        if !status.is_success() {
544            return Err(response.into_error().await);
545        }
546
547        Ok(response)
548    }
549
550    async fn walk(&self, url: Url) -> Result<Vec<String>> {
551        debug_assert!(
552            Self::is_supported_url(&self.config, &url),
553            "{url} is not a supported Azure URL",
554            url = url.as_str()
555        );
556
557        debug!("walking `{url}` as a directory", url = url.display());
558
559        let mut container = url.clone();
560
561        // Clear the path segments for the list request; we only want the container name
562        let mut prefix = {
563            let mut container_segments = container
564                .path_segments_mut()
565                .expect("URL should have a path");
566            container_segments.clear();
567
568            // Start by treating the first path segment as the container to list the
569            // contents of
570            let mut source_segments = url.path_segments().expect("URL should have a path");
571            let name = source_segments.next().ok_or(AzureError::BlobNameMissing)?;
572            container_segments.push(name);
573
574            // The remainder is the prefix we're going to search for
575            source_segments.fold(String::new(), |mut p, s| {
576                if !p.is_empty() {
577                    p.push('/');
578                }
579
580                p.push_str(s);
581                p
582            })
583        };
584
585        // If there's no prefix, then we need to use the implicit root container
586        if prefix.is_empty() {
587            let mut container_segments = container
588                .path_segments_mut()
589                .expect("URL should have a path");
590            container_segments.clear();
591            container_segments.push(AZURE_ROOT_CONTAINER);
592
593            prefix = url.path_segments().expect("URL should have a path").fold(
594                String::new(),
595                |mut p, s| {
596                    if !p.is_empty() {
597                        p.push('/');
598                    }
599
600                    p.push_str(s);
601                    p
602                },
603            );
604
605            assert!(!prefix.is_empty());
606        }
607
608        // The prefix should end with `/` to signify a directory.
609        prefix.push('/');
610
611        {
612            // Append the operation and block id to the URL
613            // These parameters are documented here: https://learn.microsoft.com/en-us/rest/api/storageservices/list-blobs
614            let mut pairs = container.query_pairs_mut();
615            // The resource operation is on the container
616            pairs.append_pair("restype", "container");
617            // The operation is a list
618            pairs.append_pair("comp", "list");
619            // The prefix to use for listing blobs in the container.
620            pairs.append_pair("prefix", &prefix);
621        }
622
623        let mut next = String::new();
624        let mut paths = Vec::new();
625        loop {
626            let mut url = container.clone();
627            if !next.is_empty() {
628                // The marker to start listing from, returned by the previous query
629                url.query_pairs_mut().append_pair("marker", &next);
630            }
631
632            // List the blobs with the prefix
633            let response = self
634                .client
635                .get(url)
636                .header(header::USER_AGENT, USER_AGENT)
637                .header(header::DATE, Utc::now().to_rfc2822())
638                .header(AZURE_VERSION_HEADER, AZURE_STORAGE_VERSION)
639                .send()
640                .await?;
641
642            let status = response.status();
643            if !status.is_success() {
644                return Err(response.into_error().await);
645            }
646
647            let text = response.text().await?;
648            let results: Results = match serde_xml_rs::from_str(&text) {
649                Ok(response) => response,
650                Err(e) => {
651                    return Err(AzureError::UnexpectedResponse { status, error: e }.into());
652                }
653            };
654
655            // If there is only one result and the result is an empty path, then the given
656            // URL was to a file and not a "directory"
657            if paths.is_empty()
658                && results.blobs.items.len() == 1
659                && results.next.is_none()
660                && let Some("") = results.blobs.items[0].name.strip_prefix(&prefix)
661            {
662                return Ok(paths);
663            }
664
665            paths.extend(results.blobs.items.into_iter().map(|b| {
666                b.name
667                    .strip_prefix(&prefix)
668                    .map(Into::into)
669                    .unwrap_or(b.name)
670            }));
671
672            next = results.next.unwrap_or_default();
673            if next.is_empty() {
674                break;
675            }
676        }
677
678        Ok(paths)
679    }
680
681    async fn new_upload(&self, url: Url) -> Result<Self::Upload> {
682        debug_assert!(
683            Self::is_supported_url(&self.config, &url),
684            "{url} is not a supported Azure URL",
685            url = url.as_str()
686        );
687
688        Ok(AzureBlobUpload::new(
689            self.client.clone(),
690            url,
691            Arc::new(Alphanumeric::new(16).to_string()),
692            self.events.clone(),
693        ))
694    }
695}