use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::Arc;
use datafusion::catalog::default_table_source::{provider_as_source, source_as_provider};
use datafusion::common::TableReference;
use datafusion::error::Result as DFResult;
use datafusion::logical_expr::builder::LogicalPlanBuilder;
use datafusion::logical_expr::planner::{
PlannedRelation, RelationPlanner, RelationPlannerContext, RelationPlanning,
};
use datafusion::sql::sqlparser::ast::{self, TableFactor, TableVersion};
use paimon::spec::{SCAN_TIMESTAMP_MILLIS_OPTION, SCAN_VERSION_OPTION};
use crate::table::PaimonTableProvider;
#[derive(Debug)]
pub struct PaimonRelationPlanner;
impl PaimonRelationPlanner {
pub fn new() -> Self {
Self
}
}
impl Default for PaimonRelationPlanner {
fn default() -> Self {
Self::new()
}
}
impl RelationPlanner for PaimonRelationPlanner {
fn plan_relation(
&self,
relation: TableFactor,
context: &mut dyn RelationPlannerContext,
) -> DFResult<RelationPlanning> {
let TableFactor::Table {
ref name,
ref version,
..
} = relation
else {
return Ok(RelationPlanning::Original(Box::new(relation)));
};
let extra_options = match version {
Some(TableVersion::VersionAsOf(expr)) => resolve_version_as_of(expr)?,
Some(TableVersion::TimestampAsOf(expr)) => resolve_timestamp_as_of(expr)?,
_ => return Ok(RelationPlanning::Original(Box::new(relation))),
};
let table_ref = object_name_to_table_reference(name, context)?;
let source = context
.context_provider()
.get_table_source(table_ref.clone())?;
let provider = source_as_provider(&source)?;
let Some(paimon_provider) = provider.as_any().downcast_ref::<PaimonTableProvider>() else {
return Ok(RelationPlanning::Original(Box::new(relation)));
};
let new_table = paimon_provider.table().copy_with_options(extra_options);
let new_provider = PaimonTableProvider::try_new(new_table)?;
let new_source = provider_as_source(Arc::new(new_provider));
let TableFactor::Table { alias, .. } = relation else {
unreachable!()
};
let plan = LogicalPlanBuilder::scan(table_ref, new_source, None)?.build()?;
Ok(RelationPlanning::Planned(Box::new(PlannedRelation::new(
plan, alias,
))))
}
}
fn object_name_to_table_reference(
name: &ast::ObjectName,
context: &mut dyn RelationPlannerContext,
) -> DFResult<TableReference> {
let idents: Vec<String> = name
.0
.iter()
.map(|part| {
let ident = part.as_ident().ok_or_else(|| {
datafusion::error::DataFusionError::Plan(format!(
"Expected simple identifier in table reference, got: {part}"
))
})?;
Ok(context.normalize_ident(ident.clone()))
})
.collect::<DFResult<_>>()?;
match idents.len() {
1 => Ok(TableReference::bare(idents[0].clone())),
2 => Ok(TableReference::partial(
idents[0].clone(),
idents[1].clone(),
)),
3 => Ok(TableReference::full(
idents[0].clone(),
idents[1].clone(),
idents[2].clone(),
)),
_ => Err(datafusion::error::DataFusionError::Plan(format!(
"Unsupported table reference: {name}"
))),
}
}
fn resolve_version_as_of(expr: &ast::Expr) -> DFResult<HashMap<String, String>> {
let version = match expr {
ast::Expr::Value(v) => match &v.value {
ast::Value::Number(n, _) => n.clone(),
ast::Value::SingleQuotedString(s) | ast::Value::DoubleQuotedString(s) => s.clone(),
_ => {
return Err(datafusion::error::DataFusionError::Plan(format!(
"Unsupported VERSION AS OF expression: {expr}"
)))
}
},
_ => {
return Err(datafusion::error::DataFusionError::Plan(format!(
"Unsupported VERSION AS OF expression: {expr}. Expected an integer snapshot id or a tag name."
)))
}
};
Ok(HashMap::from([(SCAN_VERSION_OPTION.to_string(), version)]))
}
fn resolve_timestamp_as_of(expr: &ast::Expr) -> DFResult<HashMap<String, String>> {
match expr {
ast::Expr::Value(v) => match &v.value {
ast::Value::SingleQuotedString(s) | ast::Value::DoubleQuotedString(s) => {
let millis = parse_timestamp_to_millis(s)?;
Ok(HashMap::from([(
SCAN_TIMESTAMP_MILLIS_OPTION.to_string(),
millis.to_string(),
)]))
}
_ => Err(datafusion::error::DataFusionError::Plan(format!(
"Unsupported TIMESTAMP AS OF expression: {expr}. Expected a timestamp string."
))),
},
_ => Err(datafusion::error::DataFusionError::Plan(format!(
"Unsupported TIMESTAMP AS OF expression: {expr}. Expected a timestamp string."
))),
}
}
fn parse_timestamp_to_millis(ts: &str) -> DFResult<i64> {
use chrono::{Local, NaiveDateTime, TimeZone};
let naive = NaiveDateTime::parse_from_str(ts, "%Y-%m-%d %H:%M:%S").map_err(|e| {
datafusion::error::DataFusionError::Plan(format!(
"Cannot parse time travel timestamp '{ts}': {e}. Expected format: YYYY-MM-DD HH:MM:SS"
))
})?;
let local = Local.from_local_datetime(&naive).single().ok_or_else(|| {
datafusion::error::DataFusionError::Plan(format!("Ambiguous or invalid local time: '{ts}'"))
})?;
Ok(local.timestamp_millis())
}