1use crate::{
26 multipart::{MultipartStore, PartId},
27 path::Path,
28 signer::Signer,
29 GetOptions, GetResult, ListResult, MultipartId, MultipartUpload, ObjectMeta, ObjectStore,
30 PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, UploadPart,
31};
32use async_trait::async_trait;
33use futures::stream::{BoxStream, StreamExt, TryStreamExt};
34use reqwest::Method;
35use std::fmt::Debug;
36use std::sync::Arc;
37use std::time::Duration;
38use url::Url;
39
40use crate::client::get::GetClientExt;
41use crate::client::list::{ListClient, ListClientExt};
42use crate::client::CredentialProvider;
43pub use credential::{authority_hosts, AzureAccessKey, AzureAuthorizer};
44
45mod builder;
46mod client;
47mod credential;
48
49pub type AzureCredentialProvider = Arc<dyn CredentialProvider<Credential = AzureCredential>>;
51use crate::azure::client::AzureClient;
52use crate::client::parts::Parts;
53use crate::list::{PaginatedListOptions, PaginatedListResult, PaginatedListStore};
54pub use builder::{AzureConfigKey, MicrosoftAzureBuilder};
55pub use credential::AzureCredential;
56
57const STORE: &str = "MicrosoftAzure";
58
59#[derive(Debug)]
61pub struct MicrosoftAzure {
62 client: Arc<AzureClient>,
63}
64
65impl MicrosoftAzure {
66 pub fn credentials(&self) -> &AzureCredentialProvider {
68 &self.client.config().credentials
69 }
70
71 fn path_url(&self, path: &Path) -> Url {
73 self.client.config().path_url(path)
74 }
75}
76
77impl std::fmt::Display for MicrosoftAzure {
78 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79 write!(
80 f,
81 "MicrosoftAzure {{ account: {}, container: {} }}",
82 self.client.config().account,
83 self.client.config().container
84 )
85 }
86}
87
88#[async_trait]
89impl ObjectStore for MicrosoftAzure {
90 async fn put_opts(
91 &self,
92 location: &Path,
93 payload: PutPayload,
94 opts: PutOptions,
95 ) -> Result<PutResult> {
96 self.client.put_blob(location, payload, opts).await
97 }
98
99 async fn put_multipart_opts(
100 &self,
101 location: &Path,
102 opts: PutMultipartOptions,
103 ) -> Result<Box<dyn MultipartUpload>> {
104 Ok(Box::new(AzureMultiPartUpload {
105 part_idx: 0,
106 opts,
107 state: Arc::new(UploadState {
108 client: Arc::clone(&self.client),
109 location: location.clone(),
110 parts: Default::default(),
111 }),
112 }))
113 }
114
115 async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
116 self.client.get_opts(location, options).await
117 }
118
119 async fn delete(&self, location: &Path) -> Result<()> {
120 self.client.delete_request(location, &()).await
121 }
122
123 fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
124 self.client.list(prefix)
125 }
126 fn delete_stream<'a>(
127 &'a self,
128 locations: BoxStream<'a, Result<Path>>,
129 ) -> BoxStream<'a, Result<Path>> {
130 locations
131 .try_chunks(256)
132 .map(move |locations| async {
133 let locations = locations.map_err(|e| e.1)?;
136 self.client
137 .bulk_delete_request(locations)
138 .await
139 .map(futures::stream::iter)
140 })
141 .buffered(20)
142 .try_flatten()
143 .boxed()
144 }
145
146 async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
147 self.client.list_with_delimiter(prefix).await
148 }
149
150 async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
151 self.client.copy_request(from, to, true).await
152 }
153
154 async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
155 self.client.copy_request(from, to, false).await
156 }
157}
158
159#[async_trait]
160impl Signer for MicrosoftAzure {
161 async fn signed_url(&self, method: Method, path: &Path, expires_in: Duration) -> Result<Url> {
192 let mut url = self.path_url(path);
193 let signer = self.client.signer(expires_in).await?;
194 signer.sign(&method, &mut url)?;
195 Ok(url)
196 }
197
198 async fn signed_urls(
199 &self,
200 method: Method,
201 paths: &[Path],
202 expires_in: Duration,
203 ) -> Result<Vec<Url>> {
204 let mut urls = Vec::with_capacity(paths.len());
205 let signer = self.client.signer(expires_in).await?;
206 for path in paths {
207 let mut url = self.path_url(path);
208 signer.sign(&method, &mut url)?;
209 urls.push(url);
210 }
211 Ok(urls)
212 }
213}
214
215#[derive(Debug)]
221struct AzureMultiPartUpload {
222 part_idx: usize,
223 state: Arc<UploadState>,
224 opts: PutMultipartOptions,
225}
226
227#[derive(Debug)]
228struct UploadState {
229 location: Path,
230 parts: Parts,
231 client: Arc<AzureClient>,
232}
233
234#[async_trait]
235impl MultipartUpload for AzureMultiPartUpload {
236 fn put_part(&mut self, data: PutPayload) -> UploadPart {
237 let idx = self.part_idx;
238 self.part_idx += 1;
239 let state = Arc::clone(&self.state);
240 Box::pin(async move {
241 let part = state.client.put_block(&state.location, idx, data).await?;
242 state.parts.put(idx, part);
243 Ok(())
244 })
245 }
246
247 async fn complete(&mut self) -> Result<PutResult> {
248 let parts = self.state.parts.finish(self.part_idx)?;
249
250 self.state
251 .client
252 .put_block_list(&self.state.location, parts, std::mem::take(&mut self.opts))
253 .await
254 }
255
256 async fn abort(&mut self) -> Result<()> {
257 Ok(())
259 }
260}
261
262#[async_trait]
263impl MultipartStore for MicrosoftAzure {
264 async fn create_multipart(&self, _: &Path) -> Result<MultipartId> {
265 Ok(String::new())
266 }
267
268 async fn put_part(
269 &self,
270 path: &Path,
271 _: &MultipartId,
272 part_idx: usize,
273 data: PutPayload,
274 ) -> Result<PartId> {
275 self.client.put_block(path, part_idx, data).await
276 }
277
278 async fn complete_multipart(
279 &self,
280 path: &Path,
281 _: &MultipartId,
282 parts: Vec<PartId>,
283 ) -> Result<PutResult> {
284 self.client
285 .put_block_list(path, parts, Default::default())
286 .await
287 }
288
289 async fn abort_multipart(&self, _: &Path, _: &MultipartId) -> Result<()> {
290 Ok(())
293 }
294}
295
296#[async_trait]
297impl PaginatedListStore for MicrosoftAzure {
298 async fn list_paginated(
299 &self,
300 prefix: Option<&str>,
301 opts: PaginatedListOptions,
302 ) -> Result<PaginatedListResult> {
303 self.client.list_request(prefix, opts).await
304 }
305}
306
307#[cfg(test)]
308mod tests {
309 use super::*;
310 use crate::integration::*;
311 use crate::tests::*;
312 use bytes::Bytes;
313
314 #[tokio::test]
315 async fn azure_blob_test() {
316 maybe_skip_integration!();
317 let integration = MicrosoftAzureBuilder::from_env().build().unwrap();
318
319 put_get_delete_list(&integration).await;
320 get_opts(&integration).await;
321 list_uses_directories_correctly(&integration).await;
322 list_with_delimiter(&integration).await;
323 rename_and_copy(&integration).await;
324 copy_if_not_exists(&integration).await;
325 stream_get(&integration).await;
326 put_opts(&integration, true).await;
327 multipart(&integration, &integration).await;
328 multipart_race_condition(&integration, false).await;
329 multipart_out_of_order(&integration).await;
330 signing(&integration).await;
331 list_paginated(&integration, &integration).await;
332
333 let validate = !integration.client.config().disable_tagging;
334 tagging(
335 Arc::new(MicrosoftAzure {
336 client: Arc::clone(&integration.client),
337 }),
338 validate,
339 |p| {
340 let client = Arc::clone(&integration.client);
341 async move { client.get_blob_tagging(&p).await }
342 },
343 )
344 .await;
345
346 if !integration.client.config().is_emulator {
348 put_get_attributes(&integration).await;
349 }
350 }
351
352 #[ignore = "Used for manual testing against a real storage account."]
353 #[tokio::test]
354 async fn test_user_delegation_key() {
355 let account = std::env::var("AZURE_ACCOUNT_NAME").unwrap();
356 let container = std::env::var("AZURE_CONTAINER_NAME").unwrap();
357 let client_id = std::env::var("AZURE_CLIENT_ID").unwrap();
358 let client_secret = std::env::var("AZURE_CLIENT_SECRET").unwrap();
359 let tenant_id = std::env::var("AZURE_TENANT_ID").unwrap();
360 let integration = MicrosoftAzureBuilder::new()
361 .with_account(account)
362 .with_container_name(container)
363 .with_client_id(client_id)
364 .with_client_secret(client_secret)
365 .with_tenant_id(&tenant_id)
366 .build()
367 .unwrap();
368
369 let data = Bytes::from("hello world");
370 let path = Path::from("file.txt");
371 integration.put(&path, data.clone().into()).await.unwrap();
372
373 let signed = integration
374 .signed_url(Method::GET, &path, Duration::from_secs(60))
375 .await
376 .unwrap();
377
378 let resp = reqwest::get(signed).await.unwrap();
379 let loaded = resp.bytes().await.unwrap();
380
381 assert_eq!(data, loaded);
382 }
383
384 #[test]
385 fn azure_test_config_get_value() {
386 let azure_client_id = "object_store:fake_access_key_id".to_string();
387 let azure_storage_account_name = "object_store:fake_secret_key".to_string();
388 let azure_storage_token = "object_store:fake_default_region".to_string();
389 let builder = MicrosoftAzureBuilder::new()
390 .with_config(AzureConfigKey::ClientId, &azure_client_id)
391 .with_config(AzureConfigKey::AccountName, &azure_storage_account_name)
392 .with_config(AzureConfigKey::Token, &azure_storage_token);
393
394 assert_eq!(
395 builder.get_config_value(&AzureConfigKey::ClientId).unwrap(),
396 azure_client_id
397 );
398 assert_eq!(
399 builder
400 .get_config_value(&AzureConfigKey::AccountName)
401 .unwrap(),
402 azure_storage_account_name
403 );
404 assert_eq!(
405 builder.get_config_value(&AzureConfigKey::Token).unwrap(),
406 azure_storage_token
407 );
408 }
409}