use std::sync::Arc;
use async_trait::async_trait;
use datafusion::{error::DataFusionError, execution::runtime_env::RuntimeEnv};
use object_store::gcp::GoogleCloudStorageBuilder;
use object_store::ObjectStore;
use exon_io::build_s3_object_store;
#[async_trait]
pub trait ExonRuntimeEnvExt {
async fn register_s3_object_store(
&self,
url: &url::Url,
) -> Result<Option<Arc<dyn ObjectStore>>, DataFusionError>;
async fn exon_register_object_store_url(
&self,
url: &url::Url,
) -> Result<Option<Arc<dyn ObjectStore>>, DataFusionError>;
async fn exon_register_object_store_uri(
&self,
uri: &str,
) -> Result<Option<Arc<dyn ObjectStore>>, DataFusionError>;
}
#[async_trait]
impl ExonRuntimeEnvExt for Arc<RuntimeEnv> {
async fn register_s3_object_store(
&self,
url: &url::Url,
) -> Result<Option<Arc<dyn ObjectStore>>, DataFusionError> {
let object_store = match build_s3_object_store(url).await {
Ok(object_store) => object_store,
Err(e) => return Err(DataFusionError::Execution(e.to_string())),
};
let previous = self.register_object_store(url, object_store);
Ok(previous)
}
async fn exon_register_object_store_url(
&self,
url: &url::Url,
) -> Result<Option<Arc<dyn ObjectStore>>, DataFusionError> {
match url.scheme() {
"s3" => self.register_s3_object_store(url).await,
"gs" => {
if std::env::var("GOOGLE_SERVICE_ACCOUNT").is_err() {
return Err(DataFusionError::Execution(
"GOOGLE_SERVICE_ACCOUNT env var must be set to use gs://".to_string(),
));
}
let gcs = Arc::new(
GoogleCloudStorageBuilder::from_env()
.with_url(url.to_string())
.build()?,
);
let previous = self.register_object_store(url, gcs);
Ok(previous)
}
"file" => {
use object_store::local::LocalFileSystem;
let local = LocalFileSystem::default();
let previous = self.register_object_store(url, Arc::new(local));
Ok(previous)
}
_ => Err(DataFusionError::Execution(format!(
"Unsupported scheme: {}",
url.scheme()
))),
}
}
async fn exon_register_object_store_uri(
&self,
uri: &str,
) -> Result<Option<Arc<dyn ObjectStore>>, DataFusionError> {
match url::Url::parse(uri) {
Ok(url) => self.exon_register_object_store_url(&url).await,
_ => Ok(None),
}
}
}