paimon_datafusion/
relation_planner.rs1use std::collections::HashMap;
21use std::fmt::Debug;
22use std::sync::Arc;
23
24use datafusion::catalog::default_table_source::{provider_as_source, source_as_provider};
25use datafusion::common::TableReference;
26use datafusion::error::Result as DFResult;
27use datafusion::logical_expr::builder::LogicalPlanBuilder;
28use datafusion::logical_expr::planner::{
29 PlannedRelation, RelationPlanner, RelationPlannerContext, RelationPlanning,
30};
31use datafusion::sql::sqlparser::ast::{self, TableFactor, TableVersion};
32use paimon::spec::{SCAN_TIMESTAMP_MILLIS_OPTION, SCAN_VERSION_OPTION};
33
34use crate::table::PaimonTableProvider;
35
36#[derive(Debug)]
43pub struct PaimonRelationPlanner;
44
45impl PaimonRelationPlanner {
46 pub fn new() -> Self {
47 Self
48 }
49}
50
51impl Default for PaimonRelationPlanner {
52 fn default() -> Self {
53 Self::new()
54 }
55}
56
57impl RelationPlanner for PaimonRelationPlanner {
58 fn plan_relation(
59 &self,
60 relation: TableFactor,
61 context: &mut dyn RelationPlannerContext,
62 ) -> DFResult<RelationPlanning> {
63 let TableFactor::Table {
65 ref name,
66 ref version,
67 ..
68 } = relation
69 else {
70 return Ok(RelationPlanning::Original(Box::new(relation)));
71 };
72
73 let extra_options = match version {
74 Some(TableVersion::VersionAsOf(expr)) => resolve_version_as_of(expr)?,
75 Some(TableVersion::TimestampAsOf(expr)) => resolve_timestamp_as_of(expr)?,
76 _ => return Ok(RelationPlanning::Original(Box::new(relation))),
77 };
78
79 let table_ref = object_name_to_table_reference(name, context)?;
81 let source = context
82 .context_provider()
83 .get_table_source(table_ref.clone())?;
84 let provider = source_as_provider(&source)?;
85
86 let Some(paimon_provider) = provider.as_any().downcast_ref::<PaimonTableProvider>() else {
88 return Ok(RelationPlanning::Original(Box::new(relation)));
89 };
90
91 let new_table = paimon_provider.table().copy_with_options(extra_options);
92 let new_provider = PaimonTableProvider::try_new(new_table)?;
93 let new_source = provider_as_source(Arc::new(new_provider));
94
95 let TableFactor::Table { alias, .. } = relation else {
97 unreachable!()
98 };
99
100 let plan = LogicalPlanBuilder::scan(table_ref, new_source, None)?.build()?;
101 Ok(RelationPlanning::Planned(Box::new(PlannedRelation::new(
102 plan, alias,
103 ))))
104 }
105}
106
107fn object_name_to_table_reference(
109 name: &ast::ObjectName,
110 context: &mut dyn RelationPlannerContext,
111) -> DFResult<TableReference> {
112 let idents: Vec<String> = name
113 .0
114 .iter()
115 .map(|part| {
116 let ident = part.as_ident().ok_or_else(|| {
117 datafusion::error::DataFusionError::Plan(format!(
118 "Expected simple identifier in table reference, got: {part}"
119 ))
120 })?;
121 Ok(context.normalize_ident(ident.clone()))
122 })
123 .collect::<DFResult<_>>()?;
124 match idents.len() {
125 1 => Ok(TableReference::bare(idents[0].clone())),
126 2 => Ok(TableReference::partial(
127 idents[0].clone(),
128 idents[1].clone(),
129 )),
130 3 => Ok(TableReference::full(
131 idents[0].clone(),
132 idents[1].clone(),
133 idents[2].clone(),
134 )),
135 _ => Err(datafusion::error::DataFusionError::Plan(format!(
136 "Unsupported table reference: {name}"
137 ))),
138 }
139}
140
141fn resolve_version_as_of(expr: &ast::Expr) -> DFResult<HashMap<String, String>> {
146 let version = match expr {
147 ast::Expr::Value(v) => match &v.value {
148 ast::Value::Number(n, _) => n.clone(),
149 ast::Value::SingleQuotedString(s) | ast::Value::DoubleQuotedString(s) => s.clone(),
150 _ => {
151 return Err(datafusion::error::DataFusionError::Plan(format!(
152 "Unsupported VERSION AS OF expression: {expr}"
153 )))
154 }
155 },
156 _ => {
157 return Err(datafusion::error::DataFusionError::Plan(format!(
158 "Unsupported VERSION AS OF expression: {expr}. Expected an integer snapshot id or a tag name."
159 )))
160 }
161 };
162 Ok(HashMap::from([(SCAN_VERSION_OPTION.to_string(), version)]))
163}
164
165fn resolve_timestamp_as_of(expr: &ast::Expr) -> DFResult<HashMap<String, String>> {
167 match expr {
168 ast::Expr::Value(v) => match &v.value {
169 ast::Value::SingleQuotedString(s) | ast::Value::DoubleQuotedString(s) => {
170 let millis = parse_timestamp_to_millis(s)?;
171 Ok(HashMap::from([(
172 SCAN_TIMESTAMP_MILLIS_OPTION.to_string(),
173 millis.to_string(),
174 )]))
175 }
176 _ => Err(datafusion::error::DataFusionError::Plan(format!(
177 "Unsupported TIMESTAMP AS OF expression: {expr}. Expected a timestamp string."
178 ))),
179 },
180 _ => Err(datafusion::error::DataFusionError::Plan(format!(
181 "Unsupported TIMESTAMP AS OF expression: {expr}. Expected a timestamp string."
182 ))),
183 }
184}
185
186fn parse_timestamp_to_millis(ts: &str) -> DFResult<i64> {
190 use chrono::{Local, NaiveDateTime, TimeZone};
191
192 let naive = NaiveDateTime::parse_from_str(ts, "%Y-%m-%d %H:%M:%S").map_err(|e| {
193 datafusion::error::DataFusionError::Plan(format!(
194 "Cannot parse time travel timestamp '{ts}': {e}. Expected format: YYYY-MM-DD HH:MM:SS"
195 ))
196 })?;
197 let local = Local.from_local_datetime(&naive).single().ok_or_else(|| {
198 datafusion::error::DataFusionError::Plan(format!("Ambiguous or invalid local time: '{ts}'"))
199 })?;
200 Ok(local.timestamp_millis())
201}