use std::collections::BTreeMap;
use std::fs;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, LazyLock};
use async_trait::async_trait;
use datafusion::execution::context::SessionContext;
use datafusion::execution::object_store::ObjectStoreUrl;
use futures_util::TryStreamExt;
use object_store::ObjectStore as ObjectStoreClient;
use object_store::aws::AmazonS3Builder;
use object_store::azure::MicrosoftAzureBuilder;
use object_store::gcp::GoogleCloudStorageBuilder;
use object_store::local::LocalFileSystem;
use object_store::path::Path as ObjectStorePath;
use serde::{Deserialize, Serialize};
use tracing::{debug, error};
use crate::backend::{Backend, BackendMetadata, Capability, ConnectionKind, ConnectionMetadata};
use crate::error::{Error, Result};
use crate::response::{ListSummary, TableSummary};
const IGNORE_FILES: &[&str] = &[".DS_Store", ".git", ".env"];
static OBJECT_STORE_METADATA: LazyLock<BackendMetadata> = LazyLock::new(|| BackendMetadata {
kind: ConnectionKind::ObjectStore,
capabilities: vec![Capability::ExecuteSql, Capability::List],
});
#[derive(Default, Debug, Clone, PartialEq, Eq, Hash, Deserialize, Serialize, utoipa::ToSchema)]
#[schema(as = ObjectStoreConfiguration)]
pub struct Config {
#[serde(default)]
pub format: ObjectStoreFormat,
#[serde(default)]
pub store: ObjectStore,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Deserialize, Serialize, utoipa::ToSchema)]
#[cfg_attr(feature = "strum", derive(strum_macros::AsRefStr))]
#[schema(as = ObjectStoreFormat)]
#[serde(rename_all = "snake_case")]
pub enum ObjectStoreFormat {
#[schema(value_type = Option<BTreeMap<String, String>>)]
Parquet(#[serde(default)] Option<BTreeMap<String, String>>),
}
impl Default for ObjectStoreFormat {
fn default() -> Self { ObjectStoreFormat::Parquet(None) }
}
#[derive(Default, Debug, Clone, PartialEq, Eq, Hash, Deserialize, Serialize, utoipa::ToSchema)]
pub struct ObjectStoreOptions {
#[serde(default)]
#[schema(required)]
pub bucket: String,
#[serde(default)]
pub from_env: bool,
#[serde(default)]
#[schema(value_type = BTreeMap<String, String>)]
pub options: BTreeMap<String, String>,
}
impl ObjectStoreOptions {
pub fn get(&self, key: &str) -> Option<&String> { self.options.get(key) }
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Deserialize, Serialize, utoipa::ToSchema)]
#[cfg_attr(feature = "strum", derive(strum_macros::AsRefStr))]
#[schema(as = ObjectStore)]
#[serde(rename_all = "snake_case")]
pub enum ObjectStore {
#[serde(alias = "s3", alias = "aws")]
#[cfg_attr(feature = "strum", strum(serialize = "s3"))]
Aws(ObjectStoreOptions),
#[serde(alias = "gcs", alias = "google", alias = "gcp")]
#[cfg_attr(feature = "strum", strum(serialize = "gs"))]
Gcp(ObjectStoreOptions),
#[serde(alias = "azure", alias = "microsoft")]
#[cfg_attr(feature = "strum", strum(serialize = "az"))]
Azure(ObjectStoreOptions),
#[serde(alias = "local", alias = "file")]
#[cfg_attr(feature = "strum", strum(serialize = "file"))]
Local(ObjectStoreOptions),
}
impl ObjectStore {
pub fn bucket(&self) -> String {
match self {
ObjectStore::Aws(settings)
| ObjectStore::Gcp(settings)
| ObjectStore::Azure(settings) => settings.bucket.clone(),
ObjectStore::Local(settings) => {
let root = settings
.options
.get("root")
.or_else(|| settings.options.get("path"))
.or_else(|| settings.options.get("base_path"))
.or_else(|| settings.options.get("base_dir"));
match root {
Some(path) => format!("{path}/{}", settings.bucket),
None => settings.bucket.clone(),
}
}
}
}
pub fn url(&self) -> String {
let scheme = match self {
ObjectStore::Aws(_) => "s3",
ObjectStore::Gcp(_) => "gs",
ObjectStore::Azure(_) => "az",
ObjectStore::Local(_) => "file",
};
format!("{scheme}://{}", self.bucket())
}
pub fn config(&self) -> &ObjectStoreOptions {
match self {
ObjectStore::Aws(cfg)
| ObjectStore::Gcp(cfg)
| ObjectStore::Azure(cfg)
| ObjectStore::Local(cfg) => cfg,
}
}
}
impl Default for ObjectStore {
fn default() -> Self { Self::Local(ObjectStoreOptions::default()) }
}
pub struct ObjectStoreBackend {
metadata: ConnectionMetadata,
store: Arc<dyn ObjectStoreClient>,
url: ObjectStoreUrl,
registered: Arc<AtomicBool>,
}
impl ObjectStoreBackend {
pub fn try_new(
id: impl Into<String>,
name: impl Into<String>,
config: &Config,
) -> Result<Self> {
let metadata = ConnectionMetadata {
id: id.into(),
name: name.into(),
catalog: Some(config.store.url()),
metadata: OBJECT_STORE_METADATA.clone(),
};
let ObjectStoreRegistration { object_store: store, url, .. } =
create_object_store(&config.store)
.inspect_err(|error| error!(?error, "Failed to create object store"))?;
let url = ObjectStoreUrl::parse(&url)
.map_err(|e| Error::Internal(format!("Invalid bucket URL: {e}")))?;
Ok(Self { metadata, store, url, registered: Arc::new(AtomicBool::new(false)) })
}
pub fn metadata() -> BackendMetadata { OBJECT_STORE_METADATA.clone() }
}
#[async_trait]
impl Backend for ObjectStoreBackend {
fn connection(&self) -> &ConnectionMetadata { &self.metadata }
async fn prepare_session(&self, session: &SessionContext) -> Result<()> {
if self.registered.load(Ordering::Acquire) {
debug!("Object store already registered, skipping registration");
return Ok(());
}
let url = &self.url;
let store = Arc::clone(&self.store);
let previous = session.register_object_store(url.as_ref(), store);
let overwrote = previous.is_some();
debug!(url = url.as_str(), overwrote, "Registered object store with session");
self.registered.store(true, Ordering::Release);
Ok(())
}
async fn list(&self, path: Option<&str>) -> Result<ListSummary> {
let prefix = path
.filter(|s| !s.is_empty())
.map(|db| ObjectStorePath::from(db.trim_start_matches('/')));
let object_metas = self
.store
.list(prefix.as_ref())
.try_collect::<Vec<_>>()
.await?
.into_iter()
.filter(|meta| {
!IGNORE_FILES
.iter()
.any(|i| meta.location.filename().is_some_and(|f| f.starts_with(i)))
})
.map(|meta| (meta.location.to_string(), meta.size))
.collect::<Vec<_>>();
let database_search = path.is_none()
&& object_metas.iter().all(|(location, _)| {
location.contains('/') && !location.starts_with('/') && !location.ends_with('/')
});
Ok(if database_search {
ListSummary::Paths(
object_metas
.into_iter()
.filter_map(|(location, _)| location.split('/').next().map(ToString::to_string))
.collect::<Vec<_>>(),
)
} else {
ListSummary::Files(
object_metas
.into_iter()
.map(|(location, size)| {
let location =
if let Some(p) = path.as_ref().filter(|p| location.starts_with(*p)) {
location.strip_prefix(p).unwrap_or(&location).to_string()
} else {
location
};
(location, size)
})
.map(|(location, size)| TableSummary {
name: location,
rows: None,
size_bytes: Some(size),
})
.collect::<Vec<_>>(),
)
})
}
}
pub struct ObjectStoreRegistration {
pub object_store: Arc<dyn ObjectStoreClient>,
pub url: String,
pub full_path: Option<String>,
}
pub fn create_object_store(store: &ObjectStore) -> Result<ObjectStoreRegistration> {
macro_rules! build {
($url:expr, $conf:expr, $b:ident, $bn:ident) => {{
let mut builder = if $conf.from_env { $b::from_env() } else { $b::new() }.$bn($url);
for (key, value) in &$conf.options {
builder = builder.with_config(key.parse()?, value);
}
Arc::new(builder.build()?) as Arc<dyn ObjectStoreClient>
}};
}
let bucket = store.bucket();
let url = store.url();
let mut full_path = None;
let object_store = match store {
ObjectStore::Aws(s) => build!(bucket, s, AmazonS3Builder, with_bucket_name),
ObjectStore::Gcp(s) => build!(bucket, s, GoogleCloudStorageBuilder, with_bucket_name),
ObjectStore::Azure(s) => build!(bucket, s, MicrosoftAzureBuilder, with_container_name),
ObjectStore::Local(_) => {
let path = PathBuf::from(bucket);
let path = if path.is_relative() {
std::env::current_dir()
.map(|c| c.join(path))
.map_err(|e| Error::ObjectStoreCreate(e.to_string()))?
} else {
path
};
if let Err(e) = fs::create_dir_all(&path) {
error!(?path, "Failed to prepare local object store directory");
return Err(Error::ObjectStoreCreate(format!(
"Failed to create local store path `{}`: {e}",
path.display()
)));
}
full_path = Some(path.to_string_lossy().to_string());
Arc::new(LocalFileSystem::new_with_prefix(&path)?)
}
};
Ok(ObjectStoreRegistration { object_store, url, full_path })
}