use crate::StorageConfig;
use async_trait::async_trait;
use azure_storage::{ErrorKind, ResultExt};
use azure_storage_blobs::{blob::operations::GetMetadataResponse, prelude::ContainerClient};
use bytes::Bytes;
use futures_util::StreamExt;
use remi::{Blob, File, ListBlobsRequest, UploadRequest};
use std::{borrow::Cow, ops::Deref, path::Path, time::SystemTime};
#[derive(Debug, Clone)]
pub struct StorageService {
container: ContainerClient,
#[allow(unused)]
config: StorageConfig,
}
impl StorageService {
pub fn new(config: StorageConfig) -> Result<StorageService, azure_storage::Error> {
Ok(Self {
container: config.clone().try_into()?,
config,
})
}
pub fn with_container_client(container: ContainerClient) -> StorageService {
Self {
container,
config: StorageConfig::dummy(),
}
}
fn sanitize_path<P: AsRef<Path> + Send>(&self, path: P) -> Result<String, azure_storage::Error> {
let path = path
.as_ref()
.to_str()
.ok_or_else(|| azure_storage::Error::new(ErrorKind::Other, "was not valid utf-8"))
.with_context(ErrorKind::Other, || "failed to convert path into a string")?;
let path = path.trim_start_matches("./").trim_start_matches("~/");
Ok(path.into())
}
}
impl Deref for StorageService {
type Target = ContainerClient;
fn deref(&self) -> &Self::Target {
&self.container
}
}
#[async_trait]
impl remi::StorageService for StorageService {
type Error = azure_storage::Error;
fn name(&self) -> Cow<'static, str> {
Cow::Borrowed("remi:azure")
}
#[cfg_attr(
feature = "tracing",
tracing::instrument(
name = "remi.azure.init",
skip_all,
fields(
remi.service = "azure",
container = self.config.container,
)
)
)]
async fn init(&self) -> Result<(), Self::Error> {
if self.container.exists().await? {
return Ok(());
}
#[cfg(feature = "tracing")]
::tracing::info!("creating blob container as it doesn't exist");
#[cfg(feature = "log")]
::log::info!(
"creating blob container [{}] as it doesn't exist",
self.config.container
);
self.container.create().await
}
#[cfg_attr(
feature = "tracing",
tracing::instrument(
name = "remi.azure.open",
skip_all,
fields(
remi.service = "azure",
path = %path.as_ref().display()
)
)
)]
async fn open<P: AsRef<Path> + Send>(&self, path: P) -> Result<Option<Bytes>, Self::Error> {
let path = path.as_ref();
#[cfg(feature = "tracing")]
::tracing::info!(
container = self.config.container,
path = %path.display(),
"opening blob in container"
);
#[cfg(feature = "log")]
::log::info!(
"opening blob [{}] in container [{}]",
path.display(),
self.config.container
);
let client = self.container.blob_client(self.sanitize_path(path)?);
if !client.exists().await? {
return Ok(None);
}
client.get_content().await.map(|content| Some(From::from(content)))
}
#[cfg_attr(
feature = "tracing",
tracing::instrument(
name = "remi.azure.blob",
skip_all,
fields(
remi.service = "azure",
path = %path.as_ref().display()
)
)
)]
async fn blob<P: AsRef<Path> + Send>(&self, path: P) -> Result<Option<Blob>, Self::Error> {
let path = path.as_ref();
#[cfg(feature = "tracing")]
::tracing::info!(
container = self.config.container,
path = %path.display(),
"opening blob in container"
);
#[cfg(feature = "log")]
::log::info!(
"opening blob [{}] in container [{}]",
path.display(),
self.config.container
);
let client = self.container.blob_client(self.sanitize_path(path)?);
if !client.exists().await? {
return Ok(None);
}
let props = client.get_properties().await?;
let data = Bytes::from(client.get_content().await?);
Ok(Some(Blob::File(File {
last_modified_at: {
let last_modified: SystemTime = props.blob.properties.last_modified.into();
Some(
last_modified
.duration_since(SystemTime::UNIX_EPOCH)
.expect("SystemTime overflow?!")
.as_millis(),
)
},
metadata: props.blob.metadata.unwrap_or_default(),
content_type: Some(props.blob.properties.content_type),
created_at: {
let created_at: SystemTime = props.blob.properties.creation_time.into();
Some(
created_at
.duration_since(SystemTime::UNIX_EPOCH)
.expect("SystemTime overflow?!")
.as_millis(),
)
},
is_symlink: false,
data,
path: format!("azure://{}", props.blob.name),
name: props.blob.name,
size: props.blob.properties.content_length.try_into().map_err(|e| {
azure_storage::Error::new(
ErrorKind::Other,
format!("expected content length to fit into `usize`: {e}"),
)
})?,
})))
}
#[cfg_attr(
feature = "tracing",
tracing::instrument(
name = "remi.azure.blobs",
skip_all,
fields(
remi.service = "azure"
)
)
)]
async fn blobs<P: AsRef<Path> + Send>(
&self,
path: Option<P>,
request: Option<ListBlobsRequest>,
) -> Result<Vec<Blob>, Self::Error> {
let options = request.unwrap_or_default();
let mut blobs = self.container.list_blobs();
match path {
Some(path) => {
let path = self.sanitize_path(path)?;
blobs = blobs.prefix(path);
}
None => {
if let Some(prefix) = options.prefix {
blobs = blobs.prefix(prefix);
}
}
}
let mut stream = blobs.into_stream();
let mut blobs = vec![];
while let Some(value) = stream.next().await {
let data = value?;
for blob in data.blobs.blobs() {
blobs.push(Blob::File(File {
last_modified_at: {
let last_modified: SystemTime = blob.properties.last_modified.into();
Some(
last_modified
.duration_since(SystemTime::UNIX_EPOCH)
.expect("SystemTime overflow?!")
.as_millis(),
)
},
metadata: blob.metadata.clone().unwrap_or_default(),
content_type: Some(blob.properties.content_type.clone()),
created_at: {
let created_at: SystemTime = blob.properties.creation_time.into();
Some(
created_at
.duration_since(SystemTime::UNIX_EPOCH)
.expect("SystemTime overflow?!")
.as_millis(),
)
},
is_symlink: false,
data: self.open(&blob.name).await?.unwrap(),
path: format!("azure://{}", blob.name),
name: blob.name.clone(),
size: blob.properties.content_length.try_into().map_err(|e| {
azure_storage::Error::new(
ErrorKind::Other,
format!("expected content length to fit into `usize`: {e}"),
)
})?,
}));
}
}
Ok(blobs)
}
#[cfg_attr(
feature = "tracing",
tracing::instrument(
name = "remi.azure.delete",
skip_all,
fields(
remi.service = "azure",
path = %path.as_ref().display()
)
)
)]
async fn delete<P: AsRef<Path> + Send>(&self, path: P) -> Result<(), Self::Error> {
let path = path.as_ref();
#[cfg(feature = "tracing")]
::tracing::info!(
container = self.config.container,
path = %path.display(),
"deleting blob in container"
);
#[cfg(feature = "log")]
::log::info!(
"deleting blob [{}] in container [{}]",
path.display(),
self.config.container
);
let client = self.container.blob_client(self.sanitize_path(path)?);
if !client.exists().await? {
return Ok(());
}
client.delete().await.map(|_| ())
}
#[cfg_attr(
feature = "tracing",
tracing::instrument(
name = "remi.azure.exists",
skip_all,
fields(
remi.service = "azure",
path = %path.as_ref().display()
)
)
)]
async fn exists<P: AsRef<Path> + Send>(&self, path: P) -> Result<bool, Self::Error> {
let path = path.as_ref();
#[cfg(feature = "tracing")]
::tracing::info!(
container = self.config.container,
path = %path.display(),
"checking if blob is in container"
);
#[cfg(feature = "log")]
::log::info!(
"checking if blob [{}] is in container [{}]",
path.display(),
self.config.container
);
self.container.blob_client(self.sanitize_path(path)?).exists().await
}
#[cfg_attr(
feature = "tracing",
tracing::instrument(
name = "remi.azure.blob",
skip_all,
fields(
remi.service = "azure",
path = %path.as_ref().display()
)
)
)]
async fn upload<P: AsRef<Path> + Send>(&self, path: P, options: UploadRequest) -> Result<(), Self::Error> {
let path = path.as_ref();
#[cfg(feature = "tracing")]
::tracing::info!(
container = self.config.container,
path = %path.display(),
"uploading blob to container"
);
#[cfg(feature = "log")]
::log::info!(
"uploading blob [{}] into container [{}]",
path.display(),
self.config.container
);
let client = self.container.blob_client(self.sanitize_path(path)?);
if client.exists().await? {
#[cfg(feature = "tracing")]
::tracing::warn!(
container = self.config.container,
path = %path.display(),
"blob with path already exists in container, skipping"
);
#[cfg(feature = "log")]
::log::info!(
"blob with path [{}] already exist in container [{}], skipping",
path.display(),
self.config.container
);
return Ok(());
}
let mut blob = client.put_block_blob(options.data);
if let Some(ct) = options.content_type {
blob = blob.content_type(ct);
}
let dummy_response = GetMetadataResponse {
request_id: Default::default(),
etag: Default::default(),
server: Default::default(),
date: std::time::SystemTime::now().into(),
metadata: Default::default(),
};
let mut metadata = dummy_response.metadata;
for (key, value) in options.metadata.clone() {
metadata.insert(key.as_str(), remi::Bytes::from(value));
}
blob.metadata(metadata).await.map(|_| ())
}
}
#[cfg(test)]
mod tests {
use crate::{StorageConfig, StorageService};
#[test]
fn sanitize_paths() {
let storage = StorageService::new(StorageConfig::dummy()).unwrap();
assert_eq!(storage.sanitize_path("./weow.txt").unwrap(), String::from("weow.txt"));
assert_eq!(storage.sanitize_path("~/weow.txt").unwrap(), String::from("weow.txt"));
assert_eq!(storage.sanitize_path("weow.txt").unwrap(), String::from("weow.txt"));
assert_eq!(
storage.sanitize_path("~/weow/fluff/mooo.exe").unwrap(),
String::from("weow/fluff/mooo.exe")
);
}
}