use bytes::Bytes;
use futures::StreamExt;
use azure_storage::StorageCredentials;
use azure_storage_blobs::prelude::*;
use crate::error::Error;
use crate::traits::StorageBackend;
use crate::types::{PutOptions, RawDownloadResult};
pub struct AzureBackend {
container_client: ContainerClient,
public_base_url: String,
}
pub struct AzureBackendBuilder {
account: Option<String>,
container: Option<String>,
access_key: Option<String>,
public_base_url: Option<String>,
}
impl AzureBackend {
pub fn builder() -> AzureBackendBuilder {
AzureBackendBuilder {
account: None,
container: None,
access_key: None,
public_base_url: None,
}
}
}
impl AzureBackendBuilder {
pub fn account(mut self, account: &str) -> Self {
self.account = Some(account.to_string());
self
}
pub fn container(mut self, container: &str) -> Self {
self.container = Some(container.to_string());
self
}
pub fn access_key(mut self, key: &str) -> Self {
self.access_key = Some(key.to_string());
self
}
pub fn public_base_url(mut self, url: &str) -> Self {
self.public_base_url = Some(url.trim_end_matches('/').to_string());
self
}
pub fn build(self) -> Result<AzureBackend, Error> {
let account = self
.account
.ok_or_else(|| Error::ConfigError("account is required".into()))?;
let container = self
.container
.ok_or_else(|| Error::ConfigError("container is required".into()))?;
let access_key = self
.access_key
.ok_or_else(|| Error::ConfigError("access_key is required".into()))?;
let creds = StorageCredentials::access_key(&account, access_key);
let service = BlobServiceClient::new(&account, creds);
let container_client = service.container_client(&container);
let public_base_url = self
.public_base_url
.unwrap_or_else(|| format!("https://{account}.blob.core.windows.net/{container}"));
Ok(AzureBackend {
container_client,
public_base_url,
})
}
}
impl StorageBackend for AzureBackend {
async fn put_object(
&self,
key: &str,
data: Bytes,
content_type: &str,
options: &PutOptions,
) -> Result<(), Error> {
let mut builder = self
.container_client
.blob_client(key)
.put_block_blob(data)
.content_type(content_type.to_owned());
if let Some(ref cd) = options.content_disposition {
builder = builder.content_disposition(cd.clone());
}
if options.cache_control.is_some() {
log::warn!("Azure backend does not apply cache_control for key: {key}");
}
builder
.await
.map_err(|e| Error::Backend(format!("Azure upload failed: {e}")))?;
Ok(())
}
async fn get_object(&self, key: &str) -> Result<RawDownloadResult, Error> {
let response = self
.container_client
.blob_client(key)
.get_content()
.await
.map_err(|e| Error::Backend(format!("Azure download failed: {e}")))?;
let len = response.len() as i64;
Ok(RawDownloadResult {
data: Bytes::from(response),
content_type: "application/octet-stream".to_string(),
content_length: Some(len),
})
}
async fn delete_object(&self, key: &str) -> Result<(), Error> {
self.container_client
.blob_client(key)
.delete()
.await
.map_err(|e| Error::Backend(format!("Azure delete failed: {e}")))?;
Ok(())
}
async fn exists(&self, key: &str) -> Result<bool, Error> {
match self
.container_client
.blob_client(key)
.get_properties()
.await
{
Ok(_) => Ok(true),
Err(e) => {
let msg = format!("{e:?}");
if msg.contains("BlobNotFound")
|| msg.contains("404")
|| msg.contains("ResourceNotFound")
{
Ok(false)
} else {
Err(Error::Backend(format!("Azure exists check failed: {e}")))
}
}
}
}
async fn presigned_get_url(&self, _key: &str, _expires_in_secs: u64) -> Result<String, Error> {
Err(Error::PresignNotSupported)
}
async fn presigned_put_url(
&self,
_key: &str,
_content_type: &str,
_expires_in_secs: u64,
) -> Result<String, Error> {
Err(Error::PresignNotSupported)
}
fn public_url(&self, key: &str) -> String {
format!("{}/{key}", self.public_base_url)
}
async fn test_connection(&self) -> Result<(), Error> {
self.container_client
.list_blobs()
.max_results(std::num::NonZeroU32::new(1).unwrap())
.into_stream()
.next()
.await
.transpose()
.map_err(|e| Error::Backend(format!("Azure connection test failed: {e}")))?;
Ok(())
}
}