use std::collections::HashMap;
use std::sync::Arc;
use async_trait::async_trait;
use once_cell::sync::OnceCell;
use crate::metastore::file_backed_metastore::FileBackedMetastoreFactory;
#[cfg(feature = "postgres")]
use crate::metastore::postgresql_metastore::PostgresqlMetastoreFactory;
use crate::{Metastore, MetastoreResolverError};
#[cfg_attr(any(test, feature = "testsuite"), mockall::automock)]
#[async_trait]
pub trait MetastoreFactory: Send + Sync + 'static {
async fn resolve(&self, uri: &str) -> Result<Arc<dyn Metastore>, MetastoreResolverError>;
}
#[derive(Default)]
pub struct MetastoreUriResolverBuilder {
per_protocol_resolver: HashMap<String, Arc<dyn MetastoreFactory>>,
}
impl MetastoreUriResolverBuilder {
pub fn register<S: MetastoreFactory>(mut self, protocol: &str, resolver: S) -> Self {
self.per_protocol_resolver
.insert(protocol.to_string(), Arc::new(resolver));
self
}
pub fn build(self) -> MetastoreUriResolver {
MetastoreUriResolver {
per_protocol_resolver: Arc::new(self.per_protocol_resolver),
}
}
}
pub struct MetastoreUriResolver {
per_protocol_resolver: Arc<HashMap<String, Arc<dyn MetastoreFactory>>>,
}
pub fn quickwit_metastore_uri_resolver() -> &'static MetastoreUriResolver {
static METASTORE_URI_RESOLVER: OnceCell<MetastoreUriResolver> = OnceCell::new();
METASTORE_URI_RESOLVER.get_or_init(|| {
#[allow(unused_mut)]
let mut builder = MetastoreUriResolver::builder()
.register("ram", FileBackedMetastoreFactory::default())
.register("file", FileBackedMetastoreFactory::default())
.register("s3", FileBackedMetastoreFactory::default());
#[cfg(feature = "postgres")]
{
builder = builder
.register("postgres", PostgresqlMetastoreFactory::default())
.register("postgresql", PostgresqlMetastoreFactory::default());
}
#[cfg(not(feature = "postgres"))]
{
builder = builder
.register(
"postgres",
UnsuportedMetastore {
message: "postgres unsupported, quickwit was compiled without the \
'postgres' feature flag"
.to_string(),
},
)
.register(
"postgresql",
UnsuportedMetastore {
message: "postgresql unsupported, quickwit was compiled without the \
'postgres' feature flag"
.to_string(),
},
)
}
builder.build()
})
}
#[derive(Clone, Default)]
pub struct UnsuportedMetastore {
message: String,
}
#[async_trait]
impl MetastoreFactory for UnsuportedMetastore {
async fn resolve(&self, _uri: &str) -> Result<Arc<dyn Metastore>, MetastoreResolverError> {
Err(MetastoreResolverError::ProtocolUnsupported(
self.message.to_string(),
))
}
}
impl MetastoreUriResolver {
pub fn builder() -> MetastoreUriResolverBuilder {
MetastoreUriResolverBuilder::default()
}
pub async fn resolve<S: AsRef<str>>(
&self,
uri: S,
) -> Result<Arc<dyn Metastore>, MetastoreResolverError> {
let protocol = uri.as_ref().split("://").next().ok_or_else(|| {
MetastoreResolverError::InvalidUri(format!(
"Protocol not found in metastore URI: {}",
uri.as_ref()
))
})?;
let resolver = self
.per_protocol_resolver
.get(protocol)
.ok_or_else(|| MetastoreResolverError::ProtocolUnsupported(protocol.to_string()))?;
let metastore = resolver.resolve(uri.as_ref()).await?;
Ok(metastore)
}
}
#[cfg(test)]
mod tests {
use crate::quickwit_metastore_uri_resolver;
#[tokio::test]
async fn test_metastore_resolver_should_not_raise_errors_on_file() -> anyhow::Result<()> {
let metastore_resolver = quickwit_metastore_uri_resolver();
metastore_resolver.resolve("file://").await?;
Ok(())
}
#[tokio::test]
#[should_panic(expected = "ProtocolUnsupported(\"s4\")")]
async fn test_metastore_resolver_should_raise_error_on_storage_error() {
let metastore_resolver = quickwit_metastore_uri_resolver();
metastore_resolver
.resolve("s4://bucket/path/to/object")
.await
.unwrap();
}
#[cfg(feature = "postgres")]
#[tokio::test]
async fn test_postgres_and_postgresql_protocol_accepted() {
use std::env;
let metastore_resolver = quickwit_metastore_uri_resolver();
let test_database_url = env::var("TEST_DATABASE_URL").unwrap_or(
"postgres://quickwit-dev:quickwit-dev@localhost/quickwit-metastore-dev".to_string(),
);
let (_uri_protocol, uri_path) = test_database_url.split_once("://").unwrap();
for protocol in &["postgres", "postgresql"] {
let postgres_uri = format!("{}://{}", protocol, uri_path);
metastore_resolver.resolve(&postgres_uri).await.unwrap();
}
}
}