krishiv_sql/lakehouse/
as_of.rs1use krishiv_connectors::lakehouse::AsOfSpec;
4use sqlparser::ast::{
5 Expr, Select, SetExpr, Statement, TableFactor, TableVersion, TableWithJoins, Value,
6};
7use sqlparser::dialect::DatabricksDialect;
8use sqlparser::parser::Parser;
9
10#[derive(Debug, Clone, PartialEq, Eq)]
12pub struct AsOfTableRef {
13 pub table: String,
14 pub spec: AsOfSpec,
15}
16
17pub fn preprocess_as_of_sql(sql: &str) -> Result<(String, Vec<AsOfTableRef>), String> {
19 let dialect = DatabricksDialect {};
20 let mut stmts =
21 Parser::parse_sql(&dialect, sql).map_err(|e| format!("SQL parse error: {e}"))?;
22 if stmts.len() != 1 {
23 return Err("expected a single SQL statement".into());
24 }
25 let mut refs = Vec::new();
26 if let Some(stmt) = stmts.first_mut() {
27 process_statement(stmt, &mut refs);
28 }
29 let clean_sql = stmts.first().map(|s| s.to_string()).unwrap_or_default();
30 Ok((clean_sql, refs))
31}
32
33fn process_statement(stmt: &mut Statement, refs: &mut Vec<AsOfTableRef>) {
34 if let Statement::Query(query) = stmt {
35 process_query(query, refs);
36 }
37}
38
39fn process_query(query: &mut sqlparser::ast::Query, refs: &mut Vec<AsOfTableRef>) {
40 if let Some(with) = &mut query.with {
41 for cte in &mut with.cte_tables {
42 process_query(&mut cte.query, refs);
43 }
44 }
45 process_set_expr(&mut query.body, refs);
46}
47
48fn process_set_expr(set_expr: &mut SetExpr, refs: &mut Vec<AsOfTableRef>) {
49 match set_expr {
50 SetExpr::Select(select) => process_select(select, refs),
51 SetExpr::Query(query) => process_query(query, refs),
52 SetExpr::SetOperation { left, right, .. } => {
53 process_set_expr(left, refs);
54 process_set_expr(right, refs);
55 }
56 _ => {}
57 }
58}
59
60fn process_select(select: &mut Select, refs: &mut Vec<AsOfTableRef>) {
61 for twj in &mut select.from {
62 process_table_with_joins(twj, refs);
63 }
64}
65
66fn process_table_with_joins(twj: &mut TableWithJoins, refs: &mut Vec<AsOfTableRef>) {
67 process_table_factor(&mut twj.relation, refs);
68 for join in &mut twj.joins {
69 process_table_factor(&mut join.relation, refs);
70 }
71}
72
73fn process_table_factor(tf: &mut TableFactor, refs: &mut Vec<AsOfTableRef>) {
74 match tf {
75 TableFactor::Table { name, version, .. } => {
76 if let Some(ver) = version.take() {
77 let table_name = name.to_string();
78 if let Some(spec) = table_version_to_spec(ver) {
79 refs.push(AsOfTableRef {
80 table: table_name,
81 spec,
82 });
83 }
84 }
85 }
86 TableFactor::Derived { subquery, .. } => {
87 process_query(subquery, refs);
88 }
89 _ => {}
90 }
91}
92
93fn table_version_to_spec(ver: TableVersion) -> Option<AsOfSpec> {
94 match ver {
95 TableVersion::VersionAsOf(Expr::Value(vws)) => match vws.value {
96 Value::Number(n, _) => {
97 let v = n.parse::<i64>().ok()?;
98 Some(AsOfSpec::Version(v))
99 }
100 Value::SingleQuotedString(s) => {
101 let v = s.parse::<i64>().ok()?;
102 Some(AsOfSpec::Version(v))
103 }
104 _ => None,
105 },
106 TableVersion::TimestampAsOf(Expr::Value(vws)) => match vws.value {
107 Value::SingleQuotedString(s) => AsOfSpec::parse(&s).ok(),
108 _ => None,
109 },
110 TableVersion::ForSystemTimeAsOf(Expr::TypedString(ts)) => {
111 let s = ts.value.value.into_string()?;
112 AsOfSpec::parse(&s).ok()
113 }
114 TableVersion::ForSystemTimeAsOf(Expr::Value(vws)) => match vws.value {
115 Value::SingleQuotedString(s) => AsOfSpec::parse(&s).ok(),
116 _ => None,
117 },
118 _ => None,
119 }
120}
121
122#[cfg(test)]
123mod tests {
124 use super::*;
125 use krishiv_connectors::lakehouse::AsOfSpec;
126
127 #[test]
128 fn parses_version_as_of() {
129 let (sql, refs) = preprocess_as_of_sql("SELECT * FROM orders VERSION AS OF 3").unwrap();
130 assert!(sql.contains("FROM orders"));
131 assert_eq!(refs.len(), 1);
132 assert_eq!(refs[0].spec, AsOfSpec::Version(3));
133 }
134
135 #[test]
136 fn parses_timestamp_as_of() {
137 let (sql, refs) =
138 preprocess_as_of_sql("SELECT * FROM events TIMESTAMP AS OF '2024-01-15T10:30:00Z'")
139 .unwrap();
140 assert!(!sql.contains("TIMESTAMP AS OF"));
141 assert_eq!(refs.len(), 1);
142 }
143
144 #[test]
145 fn parses_system_time_as_of() {
146 let (sql, refs) = preprocess_as_of_sql(
147 "SELECT * FROM tbl FOR SYSTEM_TIME AS OF TIMESTAMP '2024-06-01T00:00:00Z'",
148 )
149 .unwrap();
150 assert!(!sql.contains("FOR SYSTEM_TIME AS OF"));
151 assert_eq!(refs.len(), 1);
152 }
153
154 #[test]
155 fn handles_join_as_of() {
156 let (sql, refs) = preprocess_as_of_sql(
157 "SELECT * FROM a VERSION AS OF 1 JOIN b VERSION AS OF 2 ON a.id = b.id",
158 )
159 .unwrap();
160 assert_eq!(refs.len(), 2);
161 assert_eq!(refs[0].spec, AsOfSpec::Version(1));
162 assert_eq!(refs[1].spec, AsOfSpec::Version(2));
163 assert!(!sql.contains("VERSION AS OF"));
164 }
165
166 #[test]
167 fn handles_subquery_as_of() {
168 let (sql, refs) =
169 preprocess_as_of_sql("SELECT * FROM (SELECT * FROM inner_tbl VERSION AS OF 42) AS sub")
170 .unwrap();
171 assert_eq!(refs.len(), 1);
172 assert_eq!(refs[0].spec, AsOfSpec::Version(42));
173 assert!(!sql.contains("VERSION AS OF"));
174 }
175
176 #[test]
177 fn handles_cte_as_of() {
178 let (sql, refs) = preprocess_as_of_sql(
179 "WITH cte AS (SELECT * FROM inner_tbl VERSION AS OF 99) SELECT * FROM cte",
180 )
181 .unwrap();
182 assert_eq!(refs.len(), 1);
183 assert_eq!(refs[0].spec, AsOfSpec::Version(99));
184 assert!(!sql.contains("VERSION AS OF"));
185 }
186
187 #[test]
188 fn ignores_string_literals() {
189 let (sql, refs) =
190 preprocess_as_of_sql("SELECT * FROM t WHERE name = 'VERSION AS OF 123'").unwrap();
191 assert_eq!(refs.len(), 0);
192 assert!(sql.contains("VERSION AS OF 123"));
193 }
194
195 #[test]
196 fn no_as_of_passes_through() {
197 let input = "SELECT id, name FROM users WHERE age > 21";
198 let (sql, refs) = preprocess_as_of_sql(input).unwrap();
199 assert_eq!(refs.len(), 0);
200 assert_eq!(sql, input);
201 }
202
203 #[test]
204 fn handles_union_as_of() {
205 let (sql, refs) = preprocess_as_of_sql(
206 "SELECT * FROM a VERSION AS OF 1 UNION ALL SELECT * FROM b VERSION AS OF 2",
207 )
208 .unwrap();
209 assert_eq!(refs.len(), 2);
210 assert!(!sql.contains("VERSION AS OF"));
211 }
212}