object_store/azure/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! An object store implementation for Azure blob storage
19//!
20//! ## Streaming uploads
21//!
22//! [ObjectStore::put_multipart] will upload data in blocks and write a blob from those blocks.
23//!
24//! Unused blocks will automatically be dropped after 7 days.
25use crate::{
26    multipart::{MultipartStore, PartId},
27    path::Path,
28    signer::Signer,
29    GetOptions, GetResult, ListResult, MultipartId, MultipartUpload, ObjectMeta, ObjectStore,
30    PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, UploadPart,
31};
32use async_trait::async_trait;
33use futures::stream::{BoxStream, StreamExt, TryStreamExt};
34use reqwest::Method;
35use std::fmt::Debug;
36use std::sync::Arc;
37use std::time::Duration;
38use url::Url;
39
40use crate::client::get::GetClientExt;
41use crate::client::list::{ListClient, ListClientExt};
42use crate::client::CredentialProvider;
43pub use credential::{authority_hosts, AzureAccessKey, AzureAuthorizer};
44
45mod builder;
46mod client;
47mod credential;
48
49/// [`CredentialProvider`] for [`MicrosoftAzure`]
50pub type AzureCredentialProvider = Arc<dyn CredentialProvider<Credential = AzureCredential>>;
51use crate::azure::client::AzureClient;
52use crate::client::parts::Parts;
53use crate::list::{PaginatedListOptions, PaginatedListResult, PaginatedListStore};
54pub use builder::{AzureConfigKey, MicrosoftAzureBuilder};
55pub use credential::AzureCredential;
56
57const STORE: &str = "MicrosoftAzure";
58
59/// Interface for [Microsoft Azure Blob Storage](https://azure.microsoft.com/en-us/services/storage/blobs/).
60#[derive(Debug)]
61pub struct MicrosoftAzure {
62    client: Arc<AzureClient>,
63}
64
65impl MicrosoftAzure {
66    /// Returns the [`AzureCredentialProvider`] used by [`MicrosoftAzure`]
67    pub fn credentials(&self) -> &AzureCredentialProvider {
68        &self.client.config().credentials
69    }
70
71    /// Create a full URL to the resource specified by `path` with this instance's configuration.
72    fn path_url(&self, path: &Path) -> Url {
73        self.client.config().path_url(path)
74    }
75}
76
77impl std::fmt::Display for MicrosoftAzure {
78    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79        write!(
80            f,
81            "MicrosoftAzure {{ account: {}, container: {} }}",
82            self.client.config().account,
83            self.client.config().container
84        )
85    }
86}
87
88#[async_trait]
89impl ObjectStore for MicrosoftAzure {
90    async fn put_opts(
91        &self,
92        location: &Path,
93        payload: PutPayload,
94        opts: PutOptions,
95    ) -> Result<PutResult> {
96        self.client.put_blob(location, payload, opts).await
97    }
98
99    async fn put_multipart_opts(
100        &self,
101        location: &Path,
102        opts: PutMultipartOptions,
103    ) -> Result<Box<dyn MultipartUpload>> {
104        Ok(Box::new(AzureMultiPartUpload {
105            part_idx: 0,
106            opts,
107            state: Arc::new(UploadState {
108                client: Arc::clone(&self.client),
109                location: location.clone(),
110                parts: Default::default(),
111            }),
112        }))
113    }
114
115    async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
116        self.client.get_opts(location, options).await
117    }
118
119    async fn delete(&self, location: &Path) -> Result<()> {
120        self.client.delete_request(location, &()).await
121    }
122
123    fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
124        self.client.list(prefix)
125    }
126    fn delete_stream<'a>(
127        &'a self,
128        locations: BoxStream<'a, Result<Path>>,
129    ) -> BoxStream<'a, Result<Path>> {
130        locations
131            .try_chunks(256)
132            .map(move |locations| async {
133                // Early return the error. We ignore the paths that have already been
134                // collected into the chunk.
135                let locations = locations.map_err(|e| e.1)?;
136                self.client
137                    .bulk_delete_request(locations)
138                    .await
139                    .map(futures::stream::iter)
140            })
141            .buffered(20)
142            .try_flatten()
143            .boxed()
144    }
145
146    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
147        self.client.list_with_delimiter(prefix).await
148    }
149
150    async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
151        self.client.copy_request(from, to, true).await
152    }
153
154    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
155        self.client.copy_request(from, to, false).await
156    }
157}
158
159#[async_trait]
160impl Signer for MicrosoftAzure {
161    /// Create a URL containing the relevant [Service SAS] query parameters that authorize a request
162    /// via `method` to the resource at `path` valid for the duration specified in `expires_in`.
163    ///
164    /// [Service SAS]: https://learn.microsoft.com/en-us/rest/api/storageservices/create-service-sas
165    ///
166    /// # Example
167    ///
168    /// This example returns a URL that will enable a user to upload a file to
169    /// "some-folder/some-file.txt" in the next hour.
170    ///
171    /// ```
172    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
173    /// # use object_store::{azure::MicrosoftAzureBuilder, path::Path, signer::Signer};
174    /// # use reqwest::Method;
175    /// # use std::time::Duration;
176    /// #
177    /// let azure = MicrosoftAzureBuilder::new()
178    ///     .with_account("my-account")
179    ///     .with_access_key("my-access-key")
180    ///     .with_container_name("my-container")
181    ///     .build()?;
182    ///
183    /// let url = azure.signed_url(
184    ///     Method::PUT,
185    ///     &Path::from("some-folder/some-file.txt"),
186    ///     Duration::from_secs(60 * 60)
187    /// ).await?;
188    /// #     Ok(())
189    /// # }
190    /// ```
191    async fn signed_url(&self, method: Method, path: &Path, expires_in: Duration) -> Result<Url> {
192        let mut url = self.path_url(path);
193        let signer = self.client.signer(expires_in).await?;
194        signer.sign(&method, &mut url)?;
195        Ok(url)
196    }
197
198    async fn signed_urls(
199        &self,
200        method: Method,
201        paths: &[Path],
202        expires_in: Duration,
203    ) -> Result<Vec<Url>> {
204        let mut urls = Vec::with_capacity(paths.len());
205        let signer = self.client.signer(expires_in).await?;
206        for path in paths {
207            let mut url = self.path_url(path);
208            signer.sign(&method, &mut url)?;
209            urls.push(url);
210        }
211        Ok(urls)
212    }
213}
214
215/// Relevant docs: <https://azure.github.io/Storage/docs/application-and-user-data/basics/azure-blob-storage-upload-apis/>
216/// In Azure Blob Store, parts are "blocks"
217/// put_multipart_part -> PUT block
218/// complete -> PUT block list
219/// abort -> No equivalent; blocks are simply dropped after 7 days
220#[derive(Debug)]
221struct AzureMultiPartUpload {
222    part_idx: usize,
223    state: Arc<UploadState>,
224    opts: PutMultipartOptions,
225}
226
227#[derive(Debug)]
228struct UploadState {
229    location: Path,
230    parts: Parts,
231    client: Arc<AzureClient>,
232}
233
234#[async_trait]
235impl MultipartUpload for AzureMultiPartUpload {
236    fn put_part(&mut self, data: PutPayload) -> UploadPart {
237        let idx = self.part_idx;
238        self.part_idx += 1;
239        let state = Arc::clone(&self.state);
240        Box::pin(async move {
241            let part = state.client.put_block(&state.location, idx, data).await?;
242            state.parts.put(idx, part);
243            Ok(())
244        })
245    }
246
247    async fn complete(&mut self) -> Result<PutResult> {
248        let parts = self.state.parts.finish(self.part_idx)?;
249
250        self.state
251            .client
252            .put_block_list(&self.state.location, parts, std::mem::take(&mut self.opts))
253            .await
254    }
255
256    async fn abort(&mut self) -> Result<()> {
257        // Nothing to do
258        Ok(())
259    }
260}
261
262#[async_trait]
263impl MultipartStore for MicrosoftAzure {
264    async fn create_multipart(&self, _: &Path) -> Result<MultipartId> {
265        Ok(String::new())
266    }
267
268    async fn put_part(
269        &self,
270        path: &Path,
271        _: &MultipartId,
272        part_idx: usize,
273        data: PutPayload,
274    ) -> Result<PartId> {
275        self.client.put_block(path, part_idx, data).await
276    }
277
278    async fn complete_multipart(
279        &self,
280        path: &Path,
281        _: &MultipartId,
282        parts: Vec<PartId>,
283    ) -> Result<PutResult> {
284        self.client
285            .put_block_list(path, parts, Default::default())
286            .await
287    }
288
289    async fn abort_multipart(&self, _: &Path, _: &MultipartId) -> Result<()> {
290        // There is no way to drop blocks that have been uploaded. Instead, they simply
291        // expire in 7 days.
292        Ok(())
293    }
294}
295
296#[async_trait]
297impl PaginatedListStore for MicrosoftAzure {
298    async fn list_paginated(
299        &self,
300        prefix: Option<&str>,
301        opts: PaginatedListOptions,
302    ) -> Result<PaginatedListResult> {
303        self.client.list_request(prefix, opts).await
304    }
305}
306
307#[cfg(test)]
308mod tests {
309    use super::*;
310    use crate::integration::*;
311    use crate::tests::*;
312    use bytes::Bytes;
313
314    #[tokio::test]
315    async fn azure_blob_test() {
316        maybe_skip_integration!();
317        let integration = MicrosoftAzureBuilder::from_env().build().unwrap();
318
319        put_get_delete_list(&integration).await;
320        get_opts(&integration).await;
321        list_uses_directories_correctly(&integration).await;
322        list_with_delimiter(&integration).await;
323        rename_and_copy(&integration).await;
324        copy_if_not_exists(&integration).await;
325        stream_get(&integration).await;
326        put_opts(&integration, true).await;
327        multipart(&integration, &integration).await;
328        multipart_race_condition(&integration, false).await;
329        multipart_out_of_order(&integration).await;
330        signing(&integration).await;
331        list_paginated(&integration, &integration).await;
332
333        let validate = !integration.client.config().disable_tagging;
334        tagging(
335            Arc::new(MicrosoftAzure {
336                client: Arc::clone(&integration.client),
337            }),
338            validate,
339            |p| {
340                let client = Arc::clone(&integration.client);
341                async move { client.get_blob_tagging(&p).await }
342            },
343        )
344        .await;
345
346        // Azurite doesn't support attributes properly
347        if !integration.client.config().is_emulator {
348            put_get_attributes(&integration).await;
349        }
350    }
351
352    #[ignore = "Used for manual testing against a real storage account."]
353    #[tokio::test]
354    async fn test_user_delegation_key() {
355        let account = std::env::var("AZURE_ACCOUNT_NAME").unwrap();
356        let container = std::env::var("AZURE_CONTAINER_NAME").unwrap();
357        let client_id = std::env::var("AZURE_CLIENT_ID").unwrap();
358        let client_secret = std::env::var("AZURE_CLIENT_SECRET").unwrap();
359        let tenant_id = std::env::var("AZURE_TENANT_ID").unwrap();
360        let integration = MicrosoftAzureBuilder::new()
361            .with_account(account)
362            .with_container_name(container)
363            .with_client_id(client_id)
364            .with_client_secret(client_secret)
365            .with_tenant_id(&tenant_id)
366            .build()
367            .unwrap();
368
369        let data = Bytes::from("hello world");
370        let path = Path::from("file.txt");
371        integration.put(&path, data.clone().into()).await.unwrap();
372
373        let signed = integration
374            .signed_url(Method::GET, &path, Duration::from_secs(60))
375            .await
376            .unwrap();
377
378        let resp = reqwest::get(signed).await.unwrap();
379        let loaded = resp.bytes().await.unwrap();
380
381        assert_eq!(data, loaded);
382    }
383
384    #[test]
385    fn azure_test_config_get_value() {
386        let azure_client_id = "object_store:fake_access_key_id".to_string();
387        let azure_storage_account_name = "object_store:fake_secret_key".to_string();
388        let azure_storage_token = "object_store:fake_default_region".to_string();
389        let builder = MicrosoftAzureBuilder::new()
390            .with_config(AzureConfigKey::ClientId, &azure_client_id)
391            .with_config(AzureConfigKey::AccountName, &azure_storage_account_name)
392            .with_config(AzureConfigKey::Token, &azure_storage_token);
393
394        assert_eq!(
395            builder.get_config_value(&AzureConfigKey::ClientId).unwrap(),
396            azure_client_id
397        );
398        assert_eq!(
399            builder
400                .get_config_value(&AzureConfigKey::AccountName)
401                .unwrap(),
402            azure_storage_account_name
403        );
404        assert_eq!(
405            builder.get_config_value(&AzureConfigKey::Token).unwrap(),
406            azure_storage_token
407        );
408    }
409}