Skip to main content

krishiv_sql/lakehouse/
as_of.rs

1//! Time travel SQL preprocessing (R18 S4, ADR-18.3).
2
3use krishiv_connectors::lakehouse::AsOfSpec;
4use sqlparser::ast::{
5    Expr, Select, SetExpr, Statement, TableFactor, TableVersion, TableWithJoins, Value,
6};
7use sqlparser::dialect::DatabricksDialect;
8use sqlparser::parser::Parser;
9
10/// Parsed `AS OF` qualifier attached to a table reference.
11#[derive(Debug, Clone, PartialEq, Eq)]
12pub struct AsOfTableRef {
13    pub table: String,
14    pub spec: AsOfSpec,
15}
16
17/// Strip `AS OF` clauses and return rewritten SQL plus qualifiers.
18pub fn preprocess_as_of_sql(sql: &str) -> Result<(String, Vec<AsOfTableRef>), String> {
19    let dialect = DatabricksDialect {};
20    let mut stmts =
21        Parser::parse_sql(&dialect, sql).map_err(|e| format!("SQL parse error: {e}"))?;
22    if stmts.len() != 1 {
23        return Err("expected a single SQL statement".into());
24    }
25    let mut refs = Vec::new();
26    if let Some(stmt) = stmts.first_mut() {
27        process_statement(stmt, &mut refs);
28    }
29    let clean_sql = stmts.first().map(|s| s.to_string()).unwrap_or_default();
30    Ok((clean_sql, refs))
31}
32
33fn process_statement(stmt: &mut Statement, refs: &mut Vec<AsOfTableRef>) {
34    if let Statement::Query(query) = stmt {
35        process_query(query, refs);
36    }
37}
38
39fn process_query(query: &mut sqlparser::ast::Query, refs: &mut Vec<AsOfTableRef>) {
40    if let Some(with) = &mut query.with {
41        for cte in &mut with.cte_tables {
42            process_query(&mut cte.query, refs);
43        }
44    }
45    process_set_expr(&mut query.body, refs);
46}
47
48fn process_set_expr(set_expr: &mut SetExpr, refs: &mut Vec<AsOfTableRef>) {
49    match set_expr {
50        SetExpr::Select(select) => process_select(select, refs),
51        SetExpr::Query(query) => process_query(query, refs),
52        SetExpr::SetOperation { left, right, .. } => {
53            process_set_expr(left, refs);
54            process_set_expr(right, refs);
55        }
56        _ => {}
57    }
58}
59
60fn process_select(select: &mut Select, refs: &mut Vec<AsOfTableRef>) {
61    for twj in &mut select.from {
62        process_table_with_joins(twj, refs);
63    }
64}
65
66fn process_table_with_joins(twj: &mut TableWithJoins, refs: &mut Vec<AsOfTableRef>) {
67    process_table_factor(&mut twj.relation, refs);
68    for join in &mut twj.joins {
69        process_table_factor(&mut join.relation, refs);
70    }
71}
72
73fn process_table_factor(tf: &mut TableFactor, refs: &mut Vec<AsOfTableRef>) {
74    match tf {
75        TableFactor::Table { name, version, .. } => {
76            if let Some(ver) = version.take() {
77                let table_name = name.to_string();
78                if let Some(spec) = table_version_to_spec(ver) {
79                    refs.push(AsOfTableRef {
80                        table: table_name,
81                        spec,
82                    });
83                }
84            }
85        }
86        TableFactor::Derived { subquery, .. } => {
87            process_query(subquery, refs);
88        }
89        _ => {}
90    }
91}
92
93fn table_version_to_spec(ver: TableVersion) -> Option<AsOfSpec> {
94    match ver {
95        TableVersion::VersionAsOf(Expr::Value(vws)) => match vws.value {
96            Value::Number(n, _) => {
97                let v = n.parse::<i64>().ok()?;
98                Some(AsOfSpec::Version(v))
99            }
100            Value::SingleQuotedString(s) => {
101                let v = s.parse::<i64>().ok()?;
102                Some(AsOfSpec::Version(v))
103            }
104            _ => None,
105        },
106        TableVersion::TimestampAsOf(Expr::Value(vws)) => match vws.value {
107            Value::SingleQuotedString(s) => AsOfSpec::parse(&s).ok(),
108            _ => None,
109        },
110        TableVersion::ForSystemTimeAsOf(Expr::TypedString(ts)) => {
111            let s = ts.value.value.into_string()?;
112            AsOfSpec::parse(&s).ok()
113        }
114        TableVersion::ForSystemTimeAsOf(Expr::Value(vws)) => match vws.value {
115            Value::SingleQuotedString(s) => AsOfSpec::parse(&s).ok(),
116            _ => None,
117        },
118        _ => None,
119    }
120}
121
122#[cfg(test)]
123mod tests {
124    use super::*;
125    use krishiv_connectors::lakehouse::AsOfSpec;
126
127    #[test]
128    fn parses_version_as_of() {
129        let (sql, refs) = preprocess_as_of_sql("SELECT * FROM orders VERSION AS OF 3").unwrap();
130        assert!(sql.contains("FROM orders"));
131        assert_eq!(refs.len(), 1);
132        assert_eq!(refs[0].spec, AsOfSpec::Version(3));
133    }
134
135    #[test]
136    fn parses_timestamp_as_of() {
137        let (sql, refs) =
138            preprocess_as_of_sql("SELECT * FROM events TIMESTAMP AS OF '2024-01-15T10:30:00Z'")
139                .unwrap();
140        assert!(!sql.contains("TIMESTAMP AS OF"));
141        assert_eq!(refs.len(), 1);
142    }
143
144    #[test]
145    fn parses_system_time_as_of() {
146        let (sql, refs) = preprocess_as_of_sql(
147            "SELECT * FROM tbl FOR SYSTEM_TIME AS OF TIMESTAMP '2024-06-01T00:00:00Z'",
148        )
149        .unwrap();
150        assert!(!sql.contains("FOR SYSTEM_TIME AS OF"));
151        assert_eq!(refs.len(), 1);
152    }
153
154    #[test]
155    fn handles_join_as_of() {
156        let (sql, refs) = preprocess_as_of_sql(
157            "SELECT * FROM a VERSION AS OF 1 JOIN b VERSION AS OF 2 ON a.id = b.id",
158        )
159        .unwrap();
160        assert_eq!(refs.len(), 2);
161        assert_eq!(refs[0].spec, AsOfSpec::Version(1));
162        assert_eq!(refs[1].spec, AsOfSpec::Version(2));
163        assert!(!sql.contains("VERSION AS OF"));
164    }
165
166    #[test]
167    fn handles_subquery_as_of() {
168        let (sql, refs) =
169            preprocess_as_of_sql("SELECT * FROM (SELECT * FROM inner_tbl VERSION AS OF 42) AS sub")
170                .unwrap();
171        assert_eq!(refs.len(), 1);
172        assert_eq!(refs[0].spec, AsOfSpec::Version(42));
173        assert!(!sql.contains("VERSION AS OF"));
174    }
175
176    #[test]
177    fn handles_cte_as_of() {
178        let (sql, refs) = preprocess_as_of_sql(
179            "WITH cte AS (SELECT * FROM inner_tbl VERSION AS OF 99) SELECT * FROM cte",
180        )
181        .unwrap();
182        assert_eq!(refs.len(), 1);
183        assert_eq!(refs[0].spec, AsOfSpec::Version(99));
184        assert!(!sql.contains("VERSION AS OF"));
185    }
186
187    #[test]
188    fn ignores_string_literals() {
189        let (sql, refs) =
190            preprocess_as_of_sql("SELECT * FROM t WHERE name = 'VERSION AS OF 123'").unwrap();
191        assert_eq!(refs.len(), 0);
192        assert!(sql.contains("VERSION AS OF 123"));
193    }
194
195    #[test]
196    fn no_as_of_passes_through() {
197        let input = "SELECT id, name FROM users WHERE age > 21";
198        let (sql, refs) = preprocess_as_of_sql(input).unwrap();
199        assert_eq!(refs.len(), 0);
200        assert_eq!(sql, input);
201    }
202
203    #[test]
204    fn handles_union_as_of() {
205        let (sql, refs) = preprocess_as_of_sql(
206            "SELECT * FROM a VERSION AS OF 1 UNION ALL SELECT * FROM b VERSION AS OF 2",
207        )
208        .unwrap();
209        assert_eq!(refs.len(), 2);
210        assert!(!sql.contains("VERSION AS OF"));
211    }
212}