Skip to main content

alien_bindings/providers/storage/
azure_blob.rs

1use crate::providers::storage::credential_bridge::AzureCredentialBridge;
2use crate::providers::utils::{prefixed_path, relativize_path};
3use crate::{
4    error::{Error, ErrorData},
5    presigned::{PresignedOperation, PresignedRequest, PresignedRequestBackend},
6    traits::{Binding, Storage},
7};
8use alien_error::{AlienError, Context, IntoAlienError};
9use async_trait::async_trait;
10use bytes::Bytes;
11use chrono::Utc;
12use futures::stream::BoxStream;
13use futures::TryStreamExt as _;
14use object_store::signer::Signer;
15use object_store::{
16    azure::MicrosoftAzure, path::Path, GetOptions, GetResult, ListResult, ObjectMeta, ObjectStore,
17    PutMultipartOpts, PutOptions, PutPayload, PutResult, Result as ObjectStoreResult,
18};
19use reqwest::Method;
20use std::collections::HashMap;
21use std::sync::Arc;
22use std::time::Duration;
23use url::Url;
24
25/// Azure Blob Storage implementation.
26#[derive(Debug)]
27pub struct BlobStorage {
28    url: Url,
29    base_dir: Path,
30    inner: MicrosoftAzure,
31}
32
33impl BlobStorage {
34    /// Creates a new `BlobStorage` instance from container configuration.
35    ///
36    /// Uses Azure config for credentials.
37    pub fn new(
38        container_name: String,
39        account_name: String,
40        azure_config: &alien_core::AzureClientConfig,
41    ) -> Result<Self, Error> {
42        let azure_url = format!("azure://{}", container_name);
43        let url: Url = Url::parse(&azure_url).into_alien_error().context(
44            ErrorData::InvalidConfigurationUrl {
45                url: azure_url.clone(),
46                reason: "Invalid Azure Blob URL format".to_string(),
47            },
48        )?;
49
50        // Build the store with credentials bridged from AzureClientConfig
51        let credentials = AzureCredentialBridge::new(azure_config.clone());
52        let store = object_store::azure::MicrosoftAzureBuilder::new()
53            .with_container_name(&container_name)
54            .with_account(&account_name)
55            .with_credentials(Arc::new(credentials))
56            .build()
57            .into_alien_error()
58            .context(ErrorData::BindingSetupFailed {
59                binding_type: "Azure Blob storage".to_string(),
60                reason: format!(
61                    "Failed to build Azure Blob client for container: {}",
62                    container_name
63                ),
64            })?;
65
66        // Extract the base path from the URL path segments, handling the None case.
67        let base_dir = match url.path_segments() {
68            Some(segments) => Path::from_iter(segments.filter(|s| !s.is_empty())),
69            None => Path::default(), // Use an empty path if there are no segments
70        };
71
72        Ok(Self {
73            url,
74            base_dir,
75            inner: store,
76        })
77    }
78}
79
80impl Binding for BlobStorage {}
81
82#[async_trait]
83impl Storage for BlobStorage {
84    fn get_base_dir(&self) -> Path {
85        self.base_dir.clone()
86    }
87
88    fn get_url(&self) -> Url {
89        self.url.clone()
90    }
91
92    async fn presigned_put(
93        &self,
94        path: &Path,
95        expires_in: Duration,
96    ) -> crate::error::Result<PresignedRequest> {
97        let dst = prefixed_path(&self.base_dir, path);
98        let signed_url = self
99            .inner
100            .signed_url(Method::PUT, &dst, expires_in)
101            .await
102            .into_alien_error()
103            .context(ErrorData::StorageOperationFailed {
104                binding_name: "azure-blob".to_string(),
105                operation: format!("generate presigned PUT URL for {}", path),
106            })?;
107
108        let mut headers = HashMap::new();
109        // Azure Blob Storage requires the blob type header for PUT operations
110        headers.insert("x-ms-blob-type".to_string(), "BlockBlob".to_string());
111
112        Ok(PresignedRequest {
113            backend: PresignedRequestBackend::Http {
114                url: signed_url.to_string(),
115                method: "PUT".to_string(),
116                headers,
117            },
118            expiration: Utc::now()
119                + chrono::Duration::from_std(expires_in).map_err(|e| {
120                    AlienError::new(ErrorData::Other {
121                        message: format!("Invalid duration: {}", e),
122                    })
123                })?,
124            operation: PresignedOperation::Put,
125            path: path.to_string(),
126        })
127    }
128
129    async fn presigned_get(
130        &self,
131        path: &Path,
132        expires_in: Duration,
133    ) -> crate::error::Result<PresignedRequest> {
134        let dst = prefixed_path(&self.base_dir, path);
135        let signed_url = self
136            .inner
137            .signed_url(Method::GET, &dst, expires_in)
138            .await
139            .into_alien_error()
140            .context(ErrorData::StorageOperationFailed {
141                binding_name: "azure-blob".to_string(),
142                operation: format!("generate presigned GET URL for {}", path),
143            })?;
144
145        let headers = HashMap::new();
146
147        Ok(PresignedRequest {
148            backend: PresignedRequestBackend::Http {
149                url: signed_url.to_string(),
150                method: "GET".to_string(),
151                headers,
152            },
153            expiration: Utc::now()
154                + chrono::Duration::from_std(expires_in).map_err(|e| {
155                    AlienError::new(ErrorData::Other {
156                        message: format!("Invalid duration: {}", e),
157                    })
158                })?,
159            operation: PresignedOperation::Get,
160            path: path.to_string(),
161        })
162    }
163
164    async fn presigned_delete(
165        &self,
166        path: &Path,
167        expires_in: Duration,
168    ) -> crate::error::Result<PresignedRequest> {
169        let dst = prefixed_path(&self.base_dir, path);
170        let signed_url = self
171            .inner
172            .signed_url(Method::DELETE, &dst, expires_in)
173            .await
174            .into_alien_error()
175            .context(ErrorData::StorageOperationFailed {
176                binding_name: "azure-blob".to_string(),
177                operation: format!("generate presigned DELETE URL for {}", path),
178            })?;
179
180        let headers = HashMap::new();
181
182        Ok(PresignedRequest {
183            backend: PresignedRequestBackend::Http {
184                url: signed_url.to_string(),
185                method: "DELETE".to_string(),
186                headers,
187            },
188            expiration: Utc::now()
189                + chrono::Duration::from_std(expires_in).map_err(|e| {
190                    AlienError::new(ErrorData::Other {
191                        message: format!("Invalid duration: {}", e),
192                    })
193                })?,
194            operation: PresignedOperation::Delete,
195            path: path.to_string(),
196        })
197    }
198}
199
200// Delegate ObjectStore trait implementation to the inner store,
201// prefixing paths with the base_dir.
202#[async_trait]
203impl ObjectStore for BlobStorage {
204    async fn put(&self, location: &Path, payload: PutPayload) -> ObjectStoreResult<PutResult> {
205        let dst = prefixed_path(&self.base_dir, location);
206        self.inner.put(&dst, payload).await
207    }
208
209    async fn put_opts(
210        &self,
211        location: &Path,
212        payload: PutPayload,
213        opts: PutOptions,
214    ) -> ObjectStoreResult<PutResult> {
215        let dst = prefixed_path(&self.base_dir, location);
216        self.inner.put_opts(&dst, payload, opts).await
217    }
218
219    async fn put_multipart(
220        &self,
221        location: &Path,
222    ) -> ObjectStoreResult<Box<dyn object_store::MultipartUpload>> {
223        let dst = prefixed_path(&self.base_dir, location);
224        self.inner.put_multipart(&dst).await
225    }
226
227    async fn put_multipart_opts(
228        &self,
229        location: &Path,
230        opts: PutMultipartOpts,
231    ) -> ObjectStoreResult<Box<dyn object_store::MultipartUpload>> {
232        let dst = prefixed_path(&self.base_dir, location);
233        self.inner.put_multipart_opts(&dst, opts).await
234    }
235
236    async fn get(&self, location: &Path) -> ObjectStoreResult<GetResult> {
237        let src = prefixed_path(&self.base_dir, location);
238        self.inner.get(&src).await
239    }
240
241    async fn get_opts(&self, location: &Path, options: GetOptions) -> ObjectStoreResult<GetResult> {
242        let src = prefixed_path(&self.base_dir, location);
243        self.inner.get_opts(&src, options).await
244    }
245
246    async fn get_range(
247        &self,
248        location: &Path,
249        range: std::ops::Range<u64>,
250    ) -> ObjectStoreResult<Bytes> {
251        let src = prefixed_path(&self.base_dir, location);
252        self.inner.get_range(&src, range).await
253    }
254
255    async fn head(&self, location: &Path) -> ObjectStoreResult<ObjectMeta> {
256        let src = prefixed_path(&self.base_dir, location);
257        let mut meta = self.inner.head(&src).await?;
258        meta.location = relativize_path(&self.base_dir, meta.location, "AzureStorage")?;
259        Ok(meta)
260    }
261
262    async fn delete(&self, location: &Path) -> ObjectStoreResult<()> {
263        let src = prefixed_path(&self.base_dir, location);
264        self.inner.delete(&src).await
265    }
266
267    fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, ObjectStoreResult<ObjectMeta>> {
268        let list_prefix_for_inner = prefix
269            .map(|p| prefixed_path(&self.base_dir, p))
270            .unwrap_or_else(|| self.base_dir.clone());
271
272        let base_dir_for_stream = self.base_dir.clone();
273
274        Box::pin(
275            self.inner
276                .list(Some(&list_prefix_for_inner))
277                .and_then(move |mut meta| {
278                    let captured_base_dir = base_dir_for_stream.clone();
279                    async move {
280                        meta.location =
281                            relativize_path(&captured_base_dir, meta.location, "AzureStorage")?;
282                        Ok(meta)
283                    }
284                }),
285        )
286    }
287
288    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> ObjectStoreResult<ListResult> {
289        let list_prefix_for_inner = prefix
290            .map(|p| prefixed_path(&self.base_dir, p))
291            .unwrap_or_else(|| self.base_dir.clone());
292        let mut result = self
293            .inner
294            .list_with_delimiter(Some(&list_prefix_for_inner))
295            .await?;
296
297        for meta_obj in &mut result.objects {
298            let original_location = std::mem::take(&mut meta_obj.location);
299            meta_obj.location = relativize_path(&self.base_dir, original_location, "AzureStorage")?;
300        }
301
302        let mut new_common_prefixes = Vec::with_capacity(result.common_prefixes.len());
303        for cp in result.common_prefixes {
304            new_common_prefixes.push(relativize_path(&self.base_dir, cp, "AzureStorage")?);
305        }
306        result.common_prefixes = new_common_prefixes;
307
308        Ok(result)
309    }
310
311    async fn copy(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> {
312        let src = prefixed_path(&self.base_dir, from);
313        let dst = prefixed_path(&self.base_dir, to);
314        self.inner.copy(&src, &dst).await
315    }
316
317    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> {
318        let src = prefixed_path(&self.base_dir, from);
319        let dst = prefixed_path(&self.base_dir, to);
320        self.inner.copy_if_not_exists(&src, &dst).await
321    }
322}
323
324impl std::fmt::Display for BlobStorage {
325    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
326        write!(f, "AzureStorage(url={})", self.url)
327    }
328}