use std::sync::Arc;
use datafusion::datasource::TableProvider;
use datafusion::error::{DataFusionError, Result as DFResult};
use paimon::catalog::{Catalog, Identifier, SYSTEM_BRANCH_PREFIX, SYSTEM_TABLE_SPLITTER};
use paimon::table::Table;
use crate::error::to_datafusion_error;
mod options;
type Builder = fn(Table) -> DFResult<Arc<dyn TableProvider>>;
const TABLES: &[(&str, Builder)] = &[("options", options::build)];
pub(crate) fn split_object_name(name: &str) -> (&str, Option<&str>) {
let mut parts = name.splitn(3, SYSTEM_TABLE_SPLITTER);
let base = parts.next().unwrap_or(name);
match (parts.next(), parts.next()) {
(None, _) => (base, None),
(Some(second), None) => {
if second.starts_with(SYSTEM_BRANCH_PREFIX) {
(base, None)
} else {
(base, Some(second))
}
}
(Some(second), Some(third)) => {
if second.starts_with(SYSTEM_BRANCH_PREFIX) {
(base, Some(third))
} else {
(base, None)
}
}
}
}
pub(crate) fn is_registered(name: &str) -> bool {
TABLES.iter().any(|(n, _)| name.eq_ignore_ascii_case(n))
}
fn wrap_to_system_table(name: &str, base_table: Table) -> Option<DFResult<Arc<dyn TableProvider>>> {
TABLES
.iter()
.find(|(n, _)| name.eq_ignore_ascii_case(n))
.map(|(_, build)| build(base_table))
}
pub(crate) async fn load(
catalog: Arc<dyn Catalog>,
database: String,
base: String,
system_name: String,
) -> DFResult<Option<Arc<dyn TableProvider>>> {
if !is_registered(&system_name) {
return Ok(None);
}
let identifier = Identifier::new(database, base.clone());
match catalog.get_table(&identifier).await {
Ok(table) => wrap_to_system_table(&system_name, table)
.expect("is_registered guarantees a builder")
.map(Some),
Err(paimon::Error::TableNotExist { .. }) => Err(DataFusionError::Plan(format!(
"Cannot read system table `${system_name}`: \
base table `{base}` does not exist"
))),
Err(e) => Err(to_datafusion_error(e)),
}
}
#[cfg(test)]
mod tests {
use super::{is_registered, split_object_name};
#[test]
fn is_registered_is_case_insensitive() {
assert!(is_registered("options"));
assert!(is_registered("Options"));
assert!(is_registered("OPTIONS"));
assert!(!is_registered("nonsense"));
}
#[test]
fn plain_table_name() {
assert_eq!(split_object_name("orders"), ("orders", None));
}
#[test]
fn system_table_only() {
assert_eq!(
split_object_name("orders$options"),
("orders", Some("options"))
);
}
#[test]
fn branch_reference_is_not_a_system_table() {
assert_eq!(split_object_name("orders$branch_main"), ("orders", None));
}
#[test]
fn branch_plus_system_table() {
assert_eq!(
split_object_name("orders$branch_main$options"),
("orders", Some("options"))
);
}
#[test]
fn three_parts_without_branch_prefix_is_not_a_system_table() {
assert_eq!(split_object_name("orders$foo$bar"), ("orders", None));
}
#[test]
fn system_table_name_preserves_case() {
assert_eq!(
split_object_name("orders$Options"),
("orders", Some("Options"))
);
}
}