use crate::StorageBackend;
use async_trait::async_trait;
use azure_storage::prelude::*;
use azure_storage::CloudLocation;
use azure_storage_blobs::prelude::*;
use futures::TryStreamExt;
use std::fmt;
use std::sync::Arc;
const CHUNK_SIZE: usize = 4 * 1024 * 1024;
const AZURE_BLOCK_SIZE: usize = 4 * 1024 * 1024;
#[derive(Clone)]
pub struct AzureBackend {
account_name: String,
container_name: String,
client: Arc<ContainerClient>,
}
impl fmt::Debug for AzureBackend {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("AzureBackend")
.field("account_name", &self.account_name)
.field("container_name", &self.container_name)
.finish()
}
}
impl AzureBackend {
pub async fn with_sas_token(
account_name: impl Into<String>,
container_name: impl Into<String>,
sas_token: impl Into<String>,
) -> anyhow::Result<Self> {
let account_name = account_name.into();
let container_name = container_name.into();
let sas_token = sas_token.into();
if account_name.is_empty() {
return Err(anyhow::anyhow!("account_name cannot be empty"));
}
if container_name.is_empty() {
return Err(anyhow::anyhow!("container_name cannot be empty"));
}
if sas_token.is_empty() {
return Err(anyhow::anyhow!("sas_token cannot be empty"));
}
let storage_credentials = StorageCredentials::sas_token(sas_token)?;
let container_client = ClientBuilder::new(account_name.clone(), storage_credentials)
.container_client(container_name.clone());
tracing::info!(
"Created Azure Blob Storage backend with SAS token for {}/{}",
account_name,
container_name
);
let backend = AzureBackend {
account_name: account_name.clone(),
container_name: container_name.clone(),
client: Arc::new(container_client),
};
backend.ensure_container_exists().await?;
Ok(backend)
}
pub async fn with_account_key(
account_name: impl Into<String>,
container_name: impl Into<String>,
account_key: impl Into<String>,
) -> anyhow::Result<Self> {
let account_name = account_name.into();
let container_name = container_name.into();
let account_key = account_key.into();
if account_name.is_empty() {
return Err(anyhow::anyhow!("account_name cannot be empty"));
}
if container_name.is_empty() {
return Err(anyhow::anyhow!("container_name cannot be empty"));
}
if account_key.is_empty() {
return Err(anyhow::anyhow!("account_key cannot be empty"));
}
let storage_credentials = StorageCredentials::access_key(account_name.clone(), account_key);
let container_client = ClientBuilder::new(account_name.clone(), storage_credentials)
.container_client(container_name.clone());
tracing::info!(
"Created Azure Blob Storage backend with account key for {}/{}",
account_name,
container_name
);
let backend = AzureBackend {
account_name: account_name.clone(),
container_name: container_name.clone(),
client: Arc::new(container_client),
};
backend.ensure_container_exists().await?;
Ok(backend)
}
pub async fn with_connection_string(
container_name: impl Into<String>,
connection_string: impl Into<String>,
) -> anyhow::Result<Self> {
let container_name = container_name.into();
let connection_string = connection_string.into();
if container_name.is_empty() {
return Err(anyhow::anyhow!("container_name cannot be empty"));
}
if connection_string.is_empty() {
return Err(anyhow::anyhow!("connection_string cannot be empty"));
}
let account_name = connection_string
.split(';')
.find(|s| s.starts_with("AccountName="))
.and_then(|s| s.strip_prefix("AccountName="))
.ok_or_else(|| anyhow::anyhow!("Invalid connection string: missing AccountName"))?
.to_string();
let account_key = connection_string
.split(';')
.find(|s| s.starts_with("AccountKey="))
.and_then(|s| s.strip_prefix("AccountKey="))
.ok_or_else(|| anyhow::anyhow!("Invalid connection string: missing AccountKey"))?
.to_string();
let storage_credentials = StorageCredentials::access_key(account_name.clone(), account_key);
let container_client = if let Some(blob_endpoint) = connection_string
.split(';')
.find(|s| s.starts_with("BlobEndpoint="))
.and_then(|s| s.strip_prefix("BlobEndpoint="))
{
let url = azure_core::Url::parse(blob_endpoint)
.map_err(|e| anyhow::anyhow!("Invalid BlobEndpoint: {}", e))?;
let host = url
.host_str()
.ok_or_else(|| anyhow::anyhow!("Invalid BlobEndpoint: missing host"))?;
let port = url.port().unwrap_or(10000);
let cloud_location = CloudLocation::Emulator {
address: host.to_string(),
port,
};
tracing::debug!("Using custom blob endpoint: {}:{}", host, port);
ClientBuilder::with_location(cloud_location, storage_credentials)
.container_client(container_name.clone())
} else {
ClientBuilder::new(account_name.clone(), storage_credentials)
.container_client(container_name.clone())
};
tracing::info!(
"Created Azure Blob Storage backend with connection string for {}/{}",
account_name,
container_name
);
let backend = AzureBackend {
account_name: account_name.clone(),
container_name: container_name.clone(),
client: Arc::new(container_client),
};
backend.ensure_container_exists().await?;
Ok(backend)
}
fn validate_key(key: &str) -> anyhow::Result<()> {
if key.is_empty() {
return Err(anyhow::anyhow!("key cannot be empty"));
}
Ok(())
}
fn map_error(err: impl Into<anyhow::Error>, context: &str) -> anyhow::Error {
let err = err.into();
let error_msg = err.to_string();
if error_msg.contains("404") || error_msg.contains("BlobNotFound") {
anyhow::anyhow!("object not found: {}", context)
} else if error_msg.contains("403") || error_msg.contains("PermissionDenied") {
anyhow::anyhow!("permission denied: {}", context)
} else if error_msg.contains("409") || error_msg.contains("ContainerNotFound") {
anyhow::anyhow!("container not found: {}", context)
} else {
err
}
}
async fn ensure_container_exists(&self) -> anyhow::Result<()> {
match self.client.exists().await {
Ok(exists) if !exists => {
tracing::info!("Creating container: {}", self.container_name);
match self.client.create().await {
Ok(_) => {
tracing::info!("Successfully created container: {}", self.container_name);
Ok(())
}
Err(e) => {
let err_msg = e.to_string();
if err_msg.contains("ContainerAlreadyExists") || err_msg.contains("409") {
tracing::debug!(
"Container {} already exists (concurrent creation)",
self.container_name
);
Ok(())
} else {
Err(anyhow::anyhow!(
"Failed to create container {}: {}",
self.container_name,
e
))
}
}
}
}
Ok(_) => {
tracing::debug!("Container {} already exists", self.container_name);
Ok(())
}
Err(e) => {
tracing::warn!(
"Could not check container existence: {}, attempting to create",
e
);
match self.client.create().await {
Ok(_) => {
tracing::info!("Successfully created container: {}", self.container_name);
Ok(())
}
Err(create_err) => {
let err_msg = create_err.to_string();
if err_msg.contains("ContainerAlreadyExists") || err_msg.contains("409") {
tracing::debug!("Container {} already exists", self.container_name);
Ok(())
} else {
Err(anyhow::anyhow!(
"Failed to create container {}: {}",
self.container_name,
create_err
))
}
}
}
}
}
}
}
#[async_trait]
impl StorageBackend for AzureBackend {
async fn get(&self, key: &str) -> anyhow::Result<Vec<u8>> {
Self::validate_key(key)?;
tracing::debug!(
"Getting object from Azure Blob Storage: {}/{}",
self.container_name,
key
);
let blob_client = self.client.blob_client(key);
match blob_client.get_content().await {
Ok(data) => {
tracing::debug!("Successfully retrieved {} ({} bytes)", key, data.len());
Ok(data)
}
Err(e) => {
let azure_error = e.to_string();
if azure_error.contains("404") || azure_error.contains("BlobNotFound") {
return Err(anyhow::anyhow!("object not found: {}", key));
}
Err(Self::map_error(e, key))
}
}
}
async fn put(&self, key: &str, data: &[u8]) -> anyhow::Result<()> {
Self::validate_key(key)?;
tracing::debug!(
"Putting object to Azure Blob Storage: {} (size: {} bytes)",
key,
data.len()
);
if data.len() > AZURE_BLOCK_SIZE {
self.put_chunked(key, data).await?;
} else {
self.put_direct(key, data).await?;
}
tracing::debug!("Successfully uploaded {}", key);
Ok(())
}
async fn exists(&self, key: &str) -> anyhow::Result<bool> {
Self::validate_key(key)?;
tracing::debug!(
"Checking existence of object in Azure Blob Storage: {}",
key
);
let blob_client = self.client.blob_client(key);
match blob_client.exists().await {
Ok(exists) => {
tracing::debug!("Blob {} exists: {}", key, exists);
Ok(exists)
}
Err(e) => {
let error_msg = e.to_string().to_lowercase();
if error_msg.contains("404")
|| error_msg.contains("not found")
|| error_msg.contains("notfound")
|| error_msg.contains("blobnotfound")
|| error_msg.contains("does not exist")
|| error_msg.contains("containernotfound")
{
Ok(false)
} else {
Err(Self::map_error(e, key))
}
}
}
}
async fn delete(&self, key: &str) -> anyhow::Result<()> {
Self::validate_key(key)?;
tracing::debug!("Deleting object from Azure Blob Storage: {}", key);
let blob_client = self.client.blob_client(key);
match blob_client.delete().await {
Ok(_) => {
tracing::debug!("Successfully deleted {}", key);
Ok(())
}
Err(e) => {
let error_msg = e.to_string();
if error_msg.contains("404") {
tracing::debug!("Blob {} doesn't exist, delete is idempotent", key);
Ok(())
} else {
Err(Self::map_error(e, key))
}
}
}
}
async fn list_objects(&self, prefix: &str) -> anyhow::Result<Vec<String>> {
tracing::debug!(
"Listing objects in Azure Blob Storage with prefix: '{}'",
prefix
);
let prefix_owned = prefix.to_string(); let mut stream = if prefix_owned.is_empty() {
self.client.list_blobs().into_stream()
} else {
self.client.list_blobs().prefix(prefix_owned).into_stream()
};
let mut results = Vec::new();
while let Some(blob_list) = stream
.try_next()
.await
.map_err(|e| Self::map_error(e, &format!("listing with prefix '{}'", prefix)))?
{
for blob in blob_list.blobs.blobs() {
results.push(blob.name.clone());
}
}
results.sort();
tracing::debug!(
"Found {} objects with prefix '{}' in container {}",
results.len(),
prefix,
self.container_name
);
Ok(results)
}
}
impl AzureBackend {
async fn put_direct(&self, key: &str, data: &[u8]) -> anyhow::Result<()> {
tracing::debug!(
"Uploading {} bytes directly to {} in container {}",
data.len(),
key,
self.container_name
);
let blob_client = self.client.blob_client(key);
let data_vec = data.to_vec();
blob_client
.put_block_blob(data_vec)
.await
.map_err(|e| Self::map_error(e, key))?;
Ok(())
}
async fn put_chunked(&self, key: &str, data: &[u8]) -> anyhow::Result<()> {
let chunk_count = data.len().div_ceil(CHUNK_SIZE);
tracing::debug!(
"Uploading {} bytes in {} chunks to {} in container {}",
data.len(),
chunk_count,
key,
self.container_name
);
let blob_client = self.client.blob_client(key);
let mut block_ids = Vec::new();
for (i, chunk) in data.chunks(CHUNK_SIZE).enumerate() {
let block_id = format!("{:08}", i).into_bytes();
let block_id_b64 = azure_core::base64::encode(&block_id);
tracing::trace!(
"Uploading chunk {}/{} ({} bytes) with block ID {}",
i + 1,
chunk_count,
chunk.len(),
block_id_b64
);
let chunk_vec = chunk.to_vec(); blob_client
.put_block(block_id_b64.clone(), chunk_vec)
.await
.map_err(|e| {
Self::map_error(e, &format!("uploading chunk {} of {}", i + 1, chunk_count))
})?;
block_ids.push(block_id_b64);
}
let block_list = BlockList {
blocks: block_ids
.into_iter()
.map(BlobBlockType::new_uncommitted)
.collect(),
};
blob_client
.put_block_list(block_list)
.await
.map_err(|e| Self::map_error(e, &format!("committing {} blocks", chunk_count)))?;
tracing::debug!(
"Successfully uploaded {} bytes in {} chunks to {}",
data.len(),
chunk_count,
key
);
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_validate_key_empty() {
assert!(AzureBackend::validate_key("").is_err());
}
#[test]
fn test_validate_key_valid() {
assert!(AzureBackend::validate_key("valid/key/path").is_ok());
}
#[test]
fn test_validate_key_with_special_chars() {
assert!(AzureBackend::validate_key("key-with_special.chars").is_ok());
}
#[tokio::test]
#[ignore = "requires live Azure credentials - not available in CI"]
async fn test_sas_token_backend_creation() {
let result = AzureBackend::with_sas_token(
"testaccount",
"testcontainer",
"sv=2021-06-08&ss=bfqt&srt=sco&sp=rwdlacupitfx",
)
.await;
assert!(result.is_ok());
let backend = result.unwrap();
assert_eq!(backend.account_name, "testaccount");
assert_eq!(backend.container_name, "testcontainer");
}
#[tokio::test]
#[ignore = "requires live Azure credentials - not available in CI"]
async fn test_account_key_backend_creation() {
let result = AzureBackend::with_account_key(
"testaccount",
"testcontainer",
"DefaultEndpointsProtocol=https;AccountName=testaccount;AccountKey=test==;EndpointSuffix=core.windows.net",
)
.await;
assert!(result.is_ok());
let backend = result.unwrap();
assert_eq!(backend.account_name, "testaccount");
assert_eq!(backend.container_name, "testcontainer");
}
#[tokio::test]
#[ignore = "requires Azurite emulator"]
async fn test_connection_string_backend_creation() {
let conn_str = "DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://localhost:10000/devstoreaccount1;";
let result = AzureBackend::with_connection_string("mediagit-test", conn_str).await;
assert!(result.is_ok());
let backend = result.unwrap();
assert_eq!(backend.account_name, "devstoreaccount1");
assert_eq!(backend.container_name, "mediagit-test");
}
#[tokio::test]
async fn test_empty_account_name_fails() {
let result = AzureBackend::with_sas_token("", "testcontainer", "token").await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_empty_container_name_fails() {
let result = AzureBackend::with_account_key("testaccount", "", "key").await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_empty_sas_token_fails() {
let result = AzureBackend::with_sas_token("testaccount", "testcontainer", "").await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_empty_account_key_fails() {
let result = AzureBackend::with_account_key("testaccount", "testcontainer", "").await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_empty_connection_string_fails() {
let result = AzureBackend::with_connection_string("testcontainer", "").await;
assert!(result.is_err());
}
#[test]
fn test_chunk_size_constant() {
assert_eq!(CHUNK_SIZE, 4 * 1024 * 1024);
}
#[test]
fn test_azure_block_size_constant() {
assert_eq!(AZURE_BLOCK_SIZE, 4 * 1024 * 1024);
}
#[test]
fn test_chunk_size_alignment() {
const { assert!(CHUNK_SIZE >= 1024 * 1024) };
const { assert!(CHUNK_SIZE <= 100 * 1024 * 1024) };
}
}