use crate::error::StorageError;
use base64::prelude::*;
use datafusion::prelude::SessionContext;
use futures::TryStreamExt;
use object_store::aws::{AmazonS3, AmazonS3Builder};
use object_store::azure::{MicrosoftAzure, MicrosoftAzureBuilder};
use object_store::gcp::{GoogleCloudStorage, GoogleCloudStorageBuilder};
use object_store::local::LocalFileSystem;
use object_store::path::Path;
use object_store::ObjectStore as ObjStore;
use scouter_settings::ObjectStorageSettings;
use scouter_types::StorageType;
use std::sync::Arc;
use tracing::debug;
use url::Url;
fn decode_base64_str(service_base64_creds: &str) -> Result<String, StorageError> {
let decoded = BASE64_STANDARD.decode(service_base64_creds)?;
Ok(String::from_utf8(decoded)?)
}
#[derive(Debug, Clone)]
enum StorageProvider {
Google(Arc<GoogleCloudStorage>),
Aws(Arc<AmazonS3>),
Local(Arc<LocalFileSystem>),
Azure(Arc<MicrosoftAzure>),
}
impl StorageProvider {
pub fn new(storage_settings: &ObjectStorageSettings) -> Result<Self, StorageError> {
let store = match storage_settings.storage_type {
StorageType::Google => {
let mut builder = GoogleCloudStorageBuilder::from_env();
if let Ok(base64_creds) = std::env::var("GOOGLE_ACCOUNT_JSON_BASE64") {
let key = decode_base64_str(&base64_creds)?;
builder = builder.with_service_account_key(&key);
debug!("Using base64 encoded service account key for Google Cloud Storage");
}
let storage = builder
.with_bucket_name(storage_settings.storage_root())
.build()?;
StorageProvider::Google(Arc::new(storage))
}
StorageType::Aws => {
let builder = AmazonS3Builder::from_env()
.with_bucket_name(storage_settings.storage_root())
.with_region(storage_settings.region.clone())
.build()?;
StorageProvider::Aws(Arc::new(builder))
}
StorageType::Local => {
let builder = LocalFileSystem::new();
StorageProvider::Local(Arc::new(builder))
}
StorageType::Azure => {
let builder = MicrosoftAzureBuilder::from_env()
.with_container_name(storage_settings.storage_root())
.build()?;
StorageProvider::Azure(Arc::new(builder))
}
};
Ok(store)
}
pub fn get_base_url(
&self,
storage_settings: &ObjectStorageSettings,
) -> Result<Url, StorageError> {
match self {
StorageProvider::Google(_) => Ok(Url::parse(&storage_settings.storage_uri)?),
StorageProvider::Aws(_) => Ok(Url::parse(&storage_settings.storage_uri)?),
StorageProvider::Local(_) => Ok(Url::parse("file:///")?),
StorageProvider::Azure(_) => Ok(Url::parse(&storage_settings.storage_uri)?),
}
}
pub fn get_session(
&self,
storage_settings: &ObjectStorageSettings,
) -> Result<SessionContext, StorageError> {
let ctx = SessionContext::new();
let base_url = self.get_base_url(storage_settings)?;
match self {
StorageProvider::Google(store) => {
ctx.register_object_store(&base_url, store.clone());
}
StorageProvider::Aws(store) => {
ctx.register_object_store(&base_url, store.clone());
}
StorageProvider::Local(store) => {
ctx.register_object_store(&base_url, store.clone());
}
StorageProvider::Azure(store) => {
ctx.register_object_store(&base_url, store.clone());
}
}
Ok(ctx)
}
pub async fn list(&self, path: Option<&Path>) -> Result<Vec<String>, StorageError> {
let stream = match self {
StorageProvider::Local(store) => store.list(path),
StorageProvider::Google(store) => store.list(path),
StorageProvider::Aws(store) => store.list(path),
StorageProvider::Azure(store) => store.list(path),
};
stream
.try_fold(Vec::new(), |mut files, meta| async move {
files.push(meta.location.to_string());
Ok(files)
})
.await
.map_err(Into::into)
}
pub async fn delete(&self, path: &Path) -> Result<(), StorageError> {
match self {
StorageProvider::Local(store) => {
store.delete(path).await?;
Ok(())
}
StorageProvider::Google(store) => {
store.delete(path).await?;
Ok(())
}
StorageProvider::Aws(store) => {
store.delete(path).await?;
Ok(())
}
StorageProvider::Azure(store) => {
store.delete(path).await?;
Ok(())
}
}
}
}
#[derive(Debug, Clone)]
pub struct ObjectStore {
provider: StorageProvider,
pub storage_settings: ObjectStorageSettings,
}
impl ObjectStore {
pub fn new(storage_settings: &ObjectStorageSettings) -> Result<Self, StorageError> {
let store = StorageProvider::new(storage_settings)?;
Ok(ObjectStore {
provider: store,
storage_settings: storage_settings.clone(),
})
}
pub fn get_session(&self) -> Result<SessionContext, StorageError> {
let ctx = self.provider.get_session(&self.storage_settings)?;
Ok(ctx)
}
pub fn get_base_url(&self) -> Result<Url, StorageError> {
self.provider.get_base_url(&self.storage_settings)
}
pub async fn list(&self, path: Option<&Path>) -> Result<Vec<String>, StorageError> {
self.provider.list(path).await
}
pub async fn delete(&self, path: &Path) -> Result<(), StorageError> {
self.provider.delete(path).await
}
}