alien_bindings/providers/storage/
azure_blob.rs1use crate::providers::storage::credential_bridge::AzureCredentialBridge;
2use crate::providers::utils::{prefixed_path, relativize_path};
3use crate::{
4 error::{Error, ErrorData},
5 presigned::{PresignedOperation, PresignedRequest, PresignedRequestBackend},
6 traits::{Binding, Storage},
7};
8use alien_error::{AlienError, Context, IntoAlienError};
9use async_trait::async_trait;
10use bytes::Bytes;
11use chrono::Utc;
12use futures::stream::BoxStream;
13use futures::TryStreamExt as _;
14use object_store::signer::Signer;
15use object_store::{
16 azure::MicrosoftAzure, path::Path, GetOptions, GetResult, ListResult, ObjectMeta, ObjectStore,
17 PutMultipartOpts, PutOptions, PutPayload, PutResult, Result as ObjectStoreResult,
18};
19use reqwest::Method;
20use std::collections::HashMap;
21use std::sync::Arc;
22use std::time::Duration;
23use url::Url;
24
25#[derive(Debug)]
27pub struct BlobStorage {
28 url: Url,
29 base_dir: Path,
30 inner: MicrosoftAzure,
31}
32
33impl BlobStorage {
34 pub fn new(
38 container_name: String,
39 account_name: String,
40 azure_config: &alien_core::AzureClientConfig,
41 ) -> Result<Self, Error> {
42 let azure_url = format!("azure://{}", container_name);
43 let url: Url = Url::parse(&azure_url).into_alien_error().context(
44 ErrorData::InvalidConfigurationUrl {
45 url: azure_url.clone(),
46 reason: "Invalid Azure Blob URL format".to_string(),
47 },
48 )?;
49
50 let credentials = AzureCredentialBridge::new(azure_config.clone());
52 let store = object_store::azure::MicrosoftAzureBuilder::new()
53 .with_container_name(&container_name)
54 .with_account(&account_name)
55 .with_credentials(Arc::new(credentials))
56 .build()
57 .into_alien_error()
58 .context(ErrorData::BindingSetupFailed {
59 binding_type: "Azure Blob storage".to_string(),
60 reason: format!(
61 "Failed to build Azure Blob client for container: {}",
62 container_name
63 ),
64 })?;
65
66 let base_dir = match url.path_segments() {
68 Some(segments) => Path::from_iter(segments.filter(|s| !s.is_empty())),
69 None => Path::default(), };
71
72 Ok(Self {
73 url,
74 base_dir,
75 inner: store,
76 })
77 }
78}
79
80impl Binding for BlobStorage {}
81
82#[async_trait]
83impl Storage for BlobStorage {
84 fn get_base_dir(&self) -> Path {
85 self.base_dir.clone()
86 }
87
88 fn get_url(&self) -> Url {
89 self.url.clone()
90 }
91
92 async fn presigned_put(
93 &self,
94 path: &Path,
95 expires_in: Duration,
96 ) -> crate::error::Result<PresignedRequest> {
97 let dst = prefixed_path(&self.base_dir, path);
98 let signed_url = self
99 .inner
100 .signed_url(Method::PUT, &dst, expires_in)
101 .await
102 .into_alien_error()
103 .context(ErrorData::StorageOperationFailed {
104 binding_name: "azure-blob".to_string(),
105 operation: format!("generate presigned PUT URL for {}", path),
106 })?;
107
108 let mut headers = HashMap::new();
109 headers.insert("x-ms-blob-type".to_string(), "BlockBlob".to_string());
111
112 Ok(PresignedRequest {
113 backend: PresignedRequestBackend::Http {
114 url: signed_url.to_string(),
115 method: "PUT".to_string(),
116 headers,
117 },
118 expiration: Utc::now()
119 + chrono::Duration::from_std(expires_in).map_err(|e| {
120 AlienError::new(ErrorData::Other {
121 message: format!("Invalid duration: {}", e),
122 })
123 })?,
124 operation: PresignedOperation::Put,
125 path: path.to_string(),
126 })
127 }
128
129 async fn presigned_get(
130 &self,
131 path: &Path,
132 expires_in: Duration,
133 ) -> crate::error::Result<PresignedRequest> {
134 let dst = prefixed_path(&self.base_dir, path);
135 let signed_url = self
136 .inner
137 .signed_url(Method::GET, &dst, expires_in)
138 .await
139 .into_alien_error()
140 .context(ErrorData::StorageOperationFailed {
141 binding_name: "azure-blob".to_string(),
142 operation: format!("generate presigned GET URL for {}", path),
143 })?;
144
145 let headers = HashMap::new();
146
147 Ok(PresignedRequest {
148 backend: PresignedRequestBackend::Http {
149 url: signed_url.to_string(),
150 method: "GET".to_string(),
151 headers,
152 },
153 expiration: Utc::now()
154 + chrono::Duration::from_std(expires_in).map_err(|e| {
155 AlienError::new(ErrorData::Other {
156 message: format!("Invalid duration: {}", e),
157 })
158 })?,
159 operation: PresignedOperation::Get,
160 path: path.to_string(),
161 })
162 }
163
164 async fn presigned_delete(
165 &self,
166 path: &Path,
167 expires_in: Duration,
168 ) -> crate::error::Result<PresignedRequest> {
169 let dst = prefixed_path(&self.base_dir, path);
170 let signed_url = self
171 .inner
172 .signed_url(Method::DELETE, &dst, expires_in)
173 .await
174 .into_alien_error()
175 .context(ErrorData::StorageOperationFailed {
176 binding_name: "azure-blob".to_string(),
177 operation: format!("generate presigned DELETE URL for {}", path),
178 })?;
179
180 let headers = HashMap::new();
181
182 Ok(PresignedRequest {
183 backend: PresignedRequestBackend::Http {
184 url: signed_url.to_string(),
185 method: "DELETE".to_string(),
186 headers,
187 },
188 expiration: Utc::now()
189 + chrono::Duration::from_std(expires_in).map_err(|e| {
190 AlienError::new(ErrorData::Other {
191 message: format!("Invalid duration: {}", e),
192 })
193 })?,
194 operation: PresignedOperation::Delete,
195 path: path.to_string(),
196 })
197 }
198}
199
200#[async_trait]
203impl ObjectStore for BlobStorage {
204 async fn put(&self, location: &Path, payload: PutPayload) -> ObjectStoreResult<PutResult> {
205 let dst = prefixed_path(&self.base_dir, location);
206 self.inner.put(&dst, payload).await
207 }
208
209 async fn put_opts(
210 &self,
211 location: &Path,
212 payload: PutPayload,
213 opts: PutOptions,
214 ) -> ObjectStoreResult<PutResult> {
215 let dst = prefixed_path(&self.base_dir, location);
216 self.inner.put_opts(&dst, payload, opts).await
217 }
218
219 async fn put_multipart(
220 &self,
221 location: &Path,
222 ) -> ObjectStoreResult<Box<dyn object_store::MultipartUpload>> {
223 let dst = prefixed_path(&self.base_dir, location);
224 self.inner.put_multipart(&dst).await
225 }
226
227 async fn put_multipart_opts(
228 &self,
229 location: &Path,
230 opts: PutMultipartOpts,
231 ) -> ObjectStoreResult<Box<dyn object_store::MultipartUpload>> {
232 let dst = prefixed_path(&self.base_dir, location);
233 self.inner.put_multipart_opts(&dst, opts).await
234 }
235
236 async fn get(&self, location: &Path) -> ObjectStoreResult<GetResult> {
237 let src = prefixed_path(&self.base_dir, location);
238 self.inner.get(&src).await
239 }
240
241 async fn get_opts(&self, location: &Path, options: GetOptions) -> ObjectStoreResult<GetResult> {
242 let src = prefixed_path(&self.base_dir, location);
243 self.inner.get_opts(&src, options).await
244 }
245
246 async fn get_range(
247 &self,
248 location: &Path,
249 range: std::ops::Range<u64>,
250 ) -> ObjectStoreResult<Bytes> {
251 let src = prefixed_path(&self.base_dir, location);
252 self.inner.get_range(&src, range).await
253 }
254
255 async fn head(&self, location: &Path) -> ObjectStoreResult<ObjectMeta> {
256 let src = prefixed_path(&self.base_dir, location);
257 let mut meta = self.inner.head(&src).await?;
258 meta.location = relativize_path(&self.base_dir, meta.location, "AzureStorage")?;
259 Ok(meta)
260 }
261
262 async fn delete(&self, location: &Path) -> ObjectStoreResult<()> {
263 let src = prefixed_path(&self.base_dir, location);
264 self.inner.delete(&src).await
265 }
266
267 fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, ObjectStoreResult<ObjectMeta>> {
268 let list_prefix_for_inner = prefix
269 .map(|p| prefixed_path(&self.base_dir, p))
270 .unwrap_or_else(|| self.base_dir.clone());
271
272 let base_dir_for_stream = self.base_dir.clone();
273
274 Box::pin(
275 self.inner
276 .list(Some(&list_prefix_for_inner))
277 .and_then(move |mut meta| {
278 let captured_base_dir = base_dir_for_stream.clone();
279 async move {
280 meta.location =
281 relativize_path(&captured_base_dir, meta.location, "AzureStorage")?;
282 Ok(meta)
283 }
284 }),
285 )
286 }
287
288 async fn list_with_delimiter(&self, prefix: Option<&Path>) -> ObjectStoreResult<ListResult> {
289 let list_prefix_for_inner = prefix
290 .map(|p| prefixed_path(&self.base_dir, p))
291 .unwrap_or_else(|| self.base_dir.clone());
292 let mut result = self
293 .inner
294 .list_with_delimiter(Some(&list_prefix_for_inner))
295 .await?;
296
297 for meta_obj in &mut result.objects {
298 let original_location = std::mem::take(&mut meta_obj.location);
299 meta_obj.location = relativize_path(&self.base_dir, original_location, "AzureStorage")?;
300 }
301
302 let mut new_common_prefixes = Vec::with_capacity(result.common_prefixes.len());
303 for cp in result.common_prefixes {
304 new_common_prefixes.push(relativize_path(&self.base_dir, cp, "AzureStorage")?);
305 }
306 result.common_prefixes = new_common_prefixes;
307
308 Ok(result)
309 }
310
311 async fn copy(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> {
312 let src = prefixed_path(&self.base_dir, from);
313 let dst = prefixed_path(&self.base_dir, to);
314 self.inner.copy(&src, &dst).await
315 }
316
317 async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> {
318 let src = prefixed_path(&self.base_dir, from);
319 let dst = prefixed_path(&self.base_dir, to);
320 self.inner.copy_if_not_exists(&src, &dst).await
321 }
322}
323
324impl std::fmt::Display for BlobStorage {
325 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
326 write!(f, "AzureStorage(url={})", self.url)
327 }
328}