use std::any::Any;
use std::sync::Arc;
use ahash::{HashMap, HashSet};
use async_trait::async_trait;
use datafusion::catalog::{CatalogProvider, CatalogProviderList, SchemaProvider, TableProvider};
use datafusion::common::{DataFusionError, Result as DataFusionResult, TableReference, exec_err};
use datafusion::logical_expr::TableType;
use parking_lot::Mutex;
use re_log_types::EntryName;
use re_protos::cloud::v1alpha1::EntryKind;
use re_redap_client::ConnectionClient;
use re_uri::Origin;
use tokio::runtime::Handle as RuntimeHandle;
use crate::IntoDfError as _;
use crate::{TableEntryTableProvider, TableQueryCaller};
pub(crate) const DEFAULT_CATALOG_NAME: &str = "datafusion";
const DEFAULT_SCHEMA_NAME: &str = "public";
#[derive(Debug)]
pub struct RedapCatalogProviderList {
client: ConnectionClient,
runtime: RuntimeHandle,
analytics_origin: Option<Origin>,
registered: Mutex<HashMap<String, Arc<dyn CatalogProvider>>>,
lazy_cache: Mutex<HashMap<String, Arc<dyn CatalogProvider>>>,
}
impl RedapCatalogProviderList {
pub fn new(
client: ConnectionClient,
runtime: RuntimeHandle,
analytics_origin: Option<Origin>,
) -> Self {
Self {
client,
runtime,
analytics_origin,
registered: Mutex::new(HashMap::default()),
lazy_cache: Mutex::new(HashMap::default()),
}
}
}
impl CatalogProviderList for RedapCatalogProviderList {
fn as_any(&self) -> &dyn Any {
self
}
fn register_catalog(
&self,
name: String,
catalog: Arc<dyn CatalogProvider>,
) -> Option<Arc<dyn CatalogProvider>> {
self.lazy_cache.lock().remove(&name);
self.registered.lock().insert(name, catalog)
}
fn catalog_names(&self) -> Vec<String> {
let mut names: HashSet<String> = match get_table_refs(&self.client, &self.runtime) {
Ok(refs) => refs
.into_iter()
.filter_map(|t| t.catalog().map(ToOwned::to_owned))
.collect(),
Err(err) => {
re_log::error!("Error attempting to get catalog names from server: {err}");
HashSet::default()
}
};
names.insert(DEFAULT_CATALOG_NAME.to_owned());
names.extend(self.registered.lock().keys().cloned());
names.into_iter().collect()
}
fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>> {
if let Some(provider) = self.registered.lock().get(name) {
return Some(Arc::clone(provider));
}
let mut lazy = self.lazy_cache.lock();
let provider = lazy.entry(name.to_owned()).or_insert_with(|| {
Arc::new(RedapCatalogProvider::new(
Some(name),
self.client.clone(),
self.runtime.clone(),
self.analytics_origin.clone(),
)) as Arc<dyn CatalogProvider>
});
Some(Arc::clone(provider))
}
}
#[derive(Debug)]
pub(crate) struct RedapCatalogProvider {
catalog_name: Option<String>,
client: ConnectionClient,
schemas: Mutex<HashMap<Option<String>, Arc<RedapSchemaProvider>>>,
runtime: RuntimeHandle,
analytics_origin: Option<Origin>,
}
#[tracing::instrument(skip_all)]
fn get_table_refs(
client: &ConnectionClient,
runtime: &RuntimeHandle,
) -> DataFusionResult<Vec<TableReference>> {
runtime.block_on(async {
Ok::<Vec<_>, DataFusionError>(
client
.clone()
.get_table_names()
.await
.map_err(|err| err.into_df_error())?
.into_iter()
.map(|name| TableReference::from(name.to_string()))
.collect(),
)
})
}
impl RedapCatalogProvider {
pub(crate) fn new(
name: Option<&str>,
client: ConnectionClient,
runtime: RuntimeHandle,
analytics_origin: Option<Origin>,
) -> Self {
let catalog_name = name
.filter(|n| *n != DEFAULT_CATALOG_NAME)
.map(ToOwned::to_owned);
Self {
catalog_name,
client,
schemas: Mutex::new(HashMap::default()),
runtime,
analytics_origin,
}
}
}
impl CatalogProvider for RedapCatalogProvider {
fn as_any(&self) -> &dyn Any {
self
}
fn schema_names(&self) -> Vec<String> {
let table_refs = match get_table_refs(&self.client, &self.runtime) {
Ok(refs) => refs,
Err(err) => {
re_log::error!("Error attempting to get table references from server: {err}");
return vec![];
}
};
let mut schema_keys: HashSet<&str> = table_refs
.iter()
.filter(|table_ref| table_ref.catalog() == self.catalog_name.as_deref())
.filter_map(|table_ref| table_ref.schema())
.collect();
schema_keys.insert(DEFAULT_SCHEMA_NAME);
schema_keys.into_iter().map(str::to_owned).collect()
}
fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
let schema_name: Option<String> = if name == DEFAULT_SCHEMA_NAME {
None
} else {
Some(name.to_owned())
};
let mut schemas = self.schemas.lock();
let provider = schemas.entry(schema_name.clone()).or_insert_with(|| {
Arc::new(RedapSchemaProvider {
catalog_name: self.catalog_name.clone(),
schema_name,
client: self.client.clone(),
runtime: self.runtime.clone(),
in_memory_tables: Default::default(),
analytics_origin: self.analytics_origin.clone(),
})
});
Some(Arc::clone(provider) as Arc<dyn SchemaProvider>)
}
}
#[derive(Debug)]
struct RedapSchemaProvider {
catalog_name: Option<String>,
schema_name: Option<String>,
client: ConnectionClient,
runtime: RuntimeHandle,
in_memory_tables: Mutex<HashMap<String, Arc<dyn TableProvider>>>,
analytics_origin: Option<Origin>,
}
fn full_table_name(catalog: Option<&str>, schema: Option<&str>, table_name: &str) -> String {
match (catalog, schema) {
(Some(catalog), Some(schema)) => format!("{catalog}.{schema}.{table_name}"),
(None, Some(schema)) => format!("{schema}.{table_name}"),
_ => table_name.to_owned(),
}
}
impl RedapSchemaProvider {
fn full_table_name(&self, table_name: &str) -> String {
full_table_name(
self.catalog_name.as_deref(),
self.schema_name.as_deref(),
table_name,
)
}
async fn lookup_table_on_server_async(&self, table_name: &str) -> DataFusionResult<bool> {
let entry_name = EntryName::new(self.full_table_name(table_name))
.map_err(|err| DataFusionError::Plan(format!("invalid entry name: {err}")))?;
let mut client = self.client.clone();
match client
.get_entry_id(&entry_name, Some(EntryKind::Table))
.await
{
Ok(opt) => Ok(opt.is_some()),
Err(err) if err.kind == re_redap_client::ApiErrorKind::NotFound => Ok(false),
Err(err) => Err(err.into_df_error()),
}
}
fn lookup_table_on_server(&self, table_name: &str) -> DataFusionResult<bool> {
self.runtime
.block_on(self.lookup_table_on_server_async(table_name))
}
}
#[async_trait]
impl SchemaProvider for RedapSchemaProvider {
fn owner_name(&self) -> Option<&str> {
self.catalog_name.as_deref()
}
fn as_any(&self) -> &dyn Any {
self
}
fn table_names(&self) -> Vec<String> {
let table_refs = get_table_refs(&self.client, &self.runtime).unwrap_or_else(|err| {
re_log::error!("Error getting table references: {err}");
vec![]
});
let mut table_names = table_refs
.into_iter()
.filter(|table_ref| {
table_ref.catalog() == self.catalog_name.as_deref()
&& table_ref.schema() == self.schema_name.as_deref()
})
.map(|table_ref| table_ref.table().to_owned())
.collect::<Vec<_>>();
table_names.extend(self.in_memory_tables.lock().keys().cloned());
table_names
}
async fn table(
&self,
table_name: &str,
) -> DataFusionResult<Option<Arc<dyn TableProvider>>, DataFusionError> {
if let Some(table) = self.in_memory_tables.lock().get(table_name) {
return Ok(Some(Arc::clone(table)));
}
let mut provider = TableEntryTableProvider::new(
self.client.clone(),
self.full_table_name(table_name),
Some(self.runtime.clone()),
)
.with_caller(TableQueryCaller::CatalogResolver);
if let Some(origin) = self.analytics_origin.clone() {
provider = provider.with_analytics(origin);
}
provider.into_provider().await.map(Some)
}
fn register_table(
&self,
name: String,
table: Arc<dyn TableProvider>,
) -> DataFusionResult<Option<Arc<dyn TableProvider>>> {
if self.lookup_table_on_server(&name)? {
return exec_err!("{name} already exists on the server catalog");
}
self.in_memory_tables.lock().insert(name, table);
Ok(None)
}
fn deregister_table(&self, name: &str) -> DataFusionResult<Option<Arc<dyn TableProvider>>> {
Ok(self.in_memory_tables.lock().remove(name))
}
fn table_exist(&self, name: &str) -> bool {
if self.in_memory_tables.lock().contains_key(name) {
return true;
}
self.lookup_table_on_server(name).unwrap_or_else(|err| {
re_log::error!("Error checking table existence for {name}: {err}");
false
})
}
async fn table_type(&self, name: &str) -> DataFusionResult<Option<TableType>> {
if let Some(table) = self.in_memory_tables.lock().get(name) {
return Ok(Some(table.table_type()));
}
if self.lookup_table_on_server_async(name).await? {
Ok(Some(TableType::Base))
} else {
Ok(None)
}
}
}
#[cfg(test)]
mod tests {
use super::full_table_name;
#[test]
fn full_table_name_combines_catalog_schema_table() {
assert_eq!(
full_table_name(Some("cat"), Some("schema"), "tbl"),
"cat.schema.tbl"
);
assert_eq!(full_table_name(None, Some("schema"), "tbl"), "schema.tbl");
assert_eq!(full_table_name(None, None, "tbl"), "tbl");
assert_eq!(full_table_name(Some("cat"), None, "tbl"), "tbl");
}
}