Skip to main content

activecube_rs/sql/
clickhouse.rs

1use std::collections::HashMap;
2
3use crate::compiler::ir::*;
4use crate::compiler::ir::CompileResult;
5use crate::sql::dialect::SqlDialect;
6
7pub struct ClickHouseDialect;
8
9impl ClickHouseDialect {
10    pub fn new() -> Self {
11        Self
12    }
13}
14
15impl Default for ClickHouseDialect {
16    fn default() -> Self {
17        Self::new()
18    }
19}
20
21impl SqlDialect for ClickHouseDialect {
22    fn compile(&self, ir: &QueryIR) -> CompileResult {
23        let mut bindings = Vec::new();
24        let mut alias_remap: Vec<(String, String)> = Vec::new();
25
26        let inner_sql = self.compile_inner(ir, &mut bindings, &mut alias_remap);
27
28        if ir.joins.is_empty() {
29            return CompileResult { sql: inner_sql, bindings, alias_remap };
30        }
31
32        // Wrap the main query as _main and append LEFT JOINs.
33        // Explicitly list _main columns (not _main.*) to avoid ClickHouse
34        // auto-prefixing ambiguous column names with `_main.` when multiple
35        // JOINed tables share column names (e.g. `ts`, `slot`).
36        let main_cols: Vec<String> = ir.selects.iter().map(|s| {
37            let col = match s {
38                SelectExpr::Column { column, alias } => alias.as_ref().unwrap_or(column).clone(),
39                SelectExpr::Aggregate { alias, .. } => alias.clone(),
40            };
41            format!("_main.{} AS {}", quote_col(&col), quote_col(&col))
42        }).collect();
43
44        let mut sql = String::from("SELECT ");
45        sql.push_str(&main_cols.join(", "));
46
47        // Collect joined column aliases for the outer SELECT
48        for join in &ir.joins {
49            for sel in &join.selects {
50                let col_name = match sel {
51                    SelectExpr::Column { column, .. } => column.clone(),
52                    SelectExpr::Aggregate { alias, .. } => alias.clone(),
53                };
54                let outer_alias = format!("{}.{}", join.alias, col_name);
55                sql.push_str(&format!(", {}.{} AS `{}`",
56                    join.alias, quote_col(&col_name), outer_alias));
57            }
58        }
59
60        sql.push_str(&format!(" FROM ({}) AS _main", inner_sql));
61
62        for join in &ir.joins {
63            if join.is_aggregate {
64                // Mode B: subquery JOIN for AggregatingMergeTree targets
65                sql.push_str(" LEFT JOIN (SELECT ");
66                let mut sub_parts: Vec<String> = Vec::new();
67                for gb_col in &join.group_by {
68                    sub_parts.push(quote_col(gb_col));
69                }
70                for sel in &join.selects {
71                    match sel {
72                        SelectExpr::Column { column, alias } => {
73                            let col = if column.contains('(') { column.clone() } else { format!("`{column}`") };
74                            if let Some(a) = alias {
75                                sub_parts.push(format!("{col} AS `{a}`"));
76                            } else if column.contains('(') || !join.group_by.contains(column) {
77                                sub_parts.push(col);
78                            }
79                        }
80                        SelectExpr::Aggregate { function, column, alias, condition } => {
81                            let func = function.to_lowercase();
82                            let expr = match (func.as_str(), column.as_str(), condition) {
83                                ("count", "*", None) => format!("count() AS `{alias}`"),
84                                ("uniq", col, None) => format!("uniq(`{col}`) AS `{alias}`"),
85                                (f, col, None) => format!("{f}(`{col}`) AS `{alias}`"),
86                                (f, col, Some(cond)) => format!("{f}If(`{col}`, {cond}) AS `{alias}`"),
87                            };
88                            sub_parts.push(expr);
89                        }
90                    }
91                }
92                sql.push_str(&sub_parts.join(", "));
93                sql.push_str(&format!(" FROM `{}`.`{}`", join.schema, join.table));
94                if !join.group_by.is_empty() {
95                    sql.push_str(" GROUP BY ");
96                    let gb: Vec<String> = join.group_by.iter().map(|c| quote_col(c)).collect();
97                    sql.push_str(&gb.join(", "));
98                }
99                sql.push_str(&format!(") AS {}", join.alias));
100            } else {
101                // Mode A: direct JOIN for MergeTree / ReplacingMergeTree targets
102                sql.push_str(&format!(" LEFT JOIN `{}`.`{}`", join.schema, join.table));
103                if join.use_final {
104                    sql.push_str(" FINAL");
105                }
106                sql.push_str(&format!(" AS {}", join.alias));
107            }
108
109            // ON conditions
110            let on_parts: Vec<String> = join.conditions.iter().map(|(local, remote)| {
111                format!("_main.{} = {}.{}", quote_col(local), join.alias, quote_col(remote))
112            }).collect();
113            sql.push_str(" ON ");
114            sql.push_str(&on_parts.join(" AND "));
115        }
116
117        CompileResult { sql, bindings, alias_remap }
118    }
119
120    fn quote_identifier(&self, name: &str) -> String {
121        format!("`{name}`")
122    }
123
124    fn name(&self) -> &str {
125        "ClickHouse"
126    }
127}
128
129impl ClickHouseDialect {
130    fn compile_inner(
131        &self,
132        ir: &QueryIR,
133        bindings: &mut Vec<SqlValue>,
134        alias_remap: &mut Vec<(String, String)>,
135    ) -> String {
136        let mut sql = String::new();
137
138        let mut augmented_selects = ir.selects.clone();
139        let mut agg_alias_map: HashMap<String, String> = HashMap::new();
140        let mut alias_counter = 0u32;
141
142        let having_cols: std::collections::HashSet<String> =
143            collect_filter_columns(&ir.having).into_iter().collect();
144        let has_having_agg = having_cols.iter().any(|c| c.contains('('));
145
146        if has_having_agg {
147            for sel in &mut augmented_selects {
148                if let SelectExpr::Column { column, alias } = sel {
149                    if column.contains('(') && having_cols.contains(column.as_str()) {
150                        if alias.is_none() {
151                            let a = format!("__f_{alias_counter}");
152                            alias_counter += 1;
153                            alias_remap.push((a.clone(), column.clone()));
154                            agg_alias_map.insert(column.clone(), a.clone());
155                            *alias = Some(a);
156                        } else if let Some(existing) = alias {
157                            agg_alias_map.insert(column.clone(), existing.clone());
158                        }
159                    }
160                }
161            }
162            for col in &having_cols {
163                if col.contains('(') && !agg_alias_map.contains_key(col.as_str()) {
164                    let a = format!("__f_{alias_counter}");
165                    alias_counter += 1;
166                    agg_alias_map.insert(col.clone(), a.clone());
167                    augmented_selects.push(SelectExpr::Column {
168                        column: col.clone(),
169                        alias: Some(a),
170                    });
171                }
172            }
173        }
174
175        sql.push_str("SELECT ");
176        let select_parts: Vec<String> = augmented_selects.iter().map(|s| match s {
177            SelectExpr::Column { column, alias } => {
178                let col = if column.contains('(') { column.clone() } else { format!("`{column}`") };
179                match alias {
180                    Some(a) => format!("{col} AS `{a}`"),
181                    None => col,
182                }
183            },
184            SelectExpr::Aggregate { function, column, alias, condition } => {
185                let func = function.to_uppercase();
186                match (func.as_str(), column.as_str(), condition) {
187                    ("COUNT", "*", None) => format!("count() AS `{alias}`"),
188                    ("COUNT", "*", Some(cond)) => format!("countIf({cond}) AS `{alias}`"),
189                    ("UNIQ", col, None) => format!("uniq(`{col}`) AS `{alias}`"),
190                    ("UNIQ", col, Some(cond)) => format!("uniqIf(`{col}`, {cond}) AS `{alias}`"),
191                    (_, col, None) => format!("{f}(`{col}`) AS `{alias}`", f = func.to_lowercase()),
192                    (_, col, Some(cond)) => format!("{f}If(`{col}`, {cond}) AS `{alias}`", f = func.to_lowercase()),
193                }
194            }
195        }).collect();
196        sql.push_str(&select_parts.join(", "));
197
198        sql.push_str(&format!(" FROM `{}`.`{}`", ir.schema, ir.table));
199        if ir.use_final {
200            sql.push_str(" FINAL");
201        }
202
203        let where_clause = compile_filter(&ir.filters, bindings);
204        if !where_clause.is_empty() {
205            sql.push_str(" WHERE ");
206            sql.push_str(&where_clause);
207        }
208
209        let effective_group_by = if !ir.group_by.is_empty() {
210            ir.group_by.clone()
211        } else {
212            let has_merge_cols = augmented_selects.iter().any(|s| match s {
213                SelectExpr::Column { column, .. } => column.contains("Merge("),
214                SelectExpr::Aggregate { .. } => true,
215            });
216            if has_merge_cols {
217                augmented_selects.iter().filter_map(|s| match s {
218                    SelectExpr::Column { column, .. } if !column.contains("Merge(") && !column.contains('(') => {
219                        Some(column.clone())
220                    }
221                    _ => None,
222                }).collect()
223            } else {
224                vec![]
225            }
226        };
227
228        if !effective_group_by.is_empty() {
229            sql.push_str(" GROUP BY ");
230            let cols: Vec<String> = effective_group_by.iter().map(|c| format!("`{c}`")).collect();
231            sql.push_str(&cols.join(", "));
232        }
233
234        if has_having_agg {
235            let having_clause = compile_filter_with_aliases(&ir.having, bindings, &agg_alias_map);
236            if !having_clause.is_empty() {
237                sql.push_str(" HAVING ");
238                sql.push_str(&having_clause);
239            }
240        } else {
241            let having_clause = compile_filter(&ir.having, bindings);
242            if !having_clause.is_empty() {
243                sql.push_str(" HAVING ");
244                sql.push_str(&having_clause);
245            }
246        }
247
248        if !ir.order_by.is_empty() {
249            sql.push_str(" ORDER BY ");
250            let parts: Vec<String> = ir.order_by.iter().map(|o| {
251                let col = if o.column.contains('(') {
252                    agg_alias_map.get(&o.column)
253                        .map(|a| format!("`{a}`"))
254                        .unwrap_or_else(|| o.column.clone())
255                } else {
256                    format!("`{}`", o.column)
257                };
258                let dir = if o.descending { "DESC" } else { "ASC" };
259                format!("{col} {dir}")
260            }).collect();
261            sql.push_str(&parts.join(", "));
262        }
263
264        if let Some(ref lb) = ir.limit_by {
265            let by_cols: Vec<String> = lb.columns.iter().map(|c| format!("`{c}`")).collect();
266            sql.push_str(&format!(" LIMIT {} BY {}", lb.count, by_cols.join(", ")));
267            if lb.offset > 0 {
268                sql.push_str(&format!(" OFFSET {}", lb.offset));
269            }
270        }
271
272        sql.push_str(&format!(" LIMIT {}", ir.limit));
273        if ir.offset > 0 {
274            sql.push_str(&format!(" OFFSET {}", ir.offset));
275        }
276
277        sql
278    }
279}
280
281/// Collect all column names referenced in a filter tree.
282fn collect_filter_columns(node: &FilterNode) -> Vec<String> {
283    match node {
284        FilterNode::Empty => vec![],
285        FilterNode::Condition { column, .. } => vec![column.clone()],
286        FilterNode::And(children) | FilterNode::Or(children) => {
287            children.iter().flat_map(collect_filter_columns).collect()
288        }
289    }
290}
291
292/// Like `compile_filter` but replaces aggregate expression columns with their
293/// SELECT aliases so ClickHouse can resolve them in HAVING scope.
294fn compile_filter_with_aliases(
295    node: &FilterNode,
296    bindings: &mut Vec<SqlValue>,
297    aliases: &HashMap<String, String>,
298) -> String {
299    match node {
300        FilterNode::Empty => String::new(),
301        FilterNode::Condition { column, op, value } => {
302            let effective_col = aliases.get(column)
303                .map(|a| a.as_str())
304                .unwrap_or(column.as_str());
305            compile_condition(effective_col, op, value, bindings)
306        }
307        FilterNode::And(children) => {
308            let parts: Vec<String> = children.iter()
309                .map(|c| compile_filter_with_aliases(c, bindings, aliases))
310                .filter(|s| !s.is_empty())
311                .collect();
312            match parts.len() {
313                0 => String::new(),
314                1 => parts.into_iter().next().unwrap(),
315                _ => format!("({})", parts.join(" AND ")),
316            }
317        }
318        FilterNode::Or(children) => {
319            let parts: Vec<String> = children.iter()
320                .map(|c| compile_filter_with_aliases(c, bindings, aliases))
321                .filter(|s| !s.is_empty())
322                .collect();
323            match parts.len() {
324                0 => String::new(),
325                1 => parts.into_iter().next().unwrap(),
326                _ => format!("({})", parts.join(" OR ")),
327            }
328        }
329    }
330}
331
332fn compile_filter(node: &FilterNode, bindings: &mut Vec<SqlValue>) -> String {
333    match node {
334        FilterNode::Empty => String::new(),
335        FilterNode::Condition { column, op, value } => {
336            compile_condition(column, op, value, bindings)
337        }
338        FilterNode::And(children) => {
339            let parts: Vec<String> = children.iter()
340                .map(|c| compile_filter(c, bindings))
341                .filter(|s| !s.is_empty())
342                .collect();
343            match parts.len() {
344                0 => String::new(),
345                1 => parts.into_iter().next().unwrap(),
346                _ => format!("({})", parts.join(" AND ")),
347            }
348        }
349        FilterNode::Or(children) => {
350            let parts: Vec<String> = children.iter()
351                .map(|c| compile_filter(c, bindings))
352                .filter(|s| !s.is_empty())
353                .collect();
354            match parts.len() {
355                0 => String::new(),
356                1 => parts.into_iter().next().unwrap(),
357                _ => format!("({})", parts.join(" OR ")),
358            }
359        }
360    }
361}
362
363fn quote_col(column: &str) -> String {
364    if column.contains('(') {
365        column.to_string()
366    } else {
367        format!("`{column}`")
368    }
369}
370
371fn compile_condition(
372    column: &str, op: &CompareOp, value: &SqlValue, bindings: &mut Vec<SqlValue>,
373) -> String {
374    let col = quote_col(column);
375    match op {
376        CompareOp::In | CompareOp::NotIn => {
377            if let SqlValue::String(csv) = value {
378                let items: Vec<&str> = csv.split(',').collect();
379                let placeholders: Vec<&str> = items.iter().map(|_| "?").collect();
380                for item in &items {
381                    bindings.push(SqlValue::String(item.trim().to_string()));
382                }
383                format!("{col} {} ({})", op.sql_op(), placeholders.join(", "))
384            } else {
385                bindings.push(value.clone());
386                format!("{col} {} (?)", op.sql_op())
387            }
388        }
389        CompareOp::Includes => {
390            if let SqlValue::String(s) = value {
391                bindings.push(SqlValue::String(format!("%{s}%")));
392            } else {
393                bindings.push(value.clone());
394            }
395            format!("{col} LIKE ?")
396        }
397        CompareOp::IsNull | CompareOp::IsNotNull => {
398            format!("{col} {}", op.sql_op())
399        }
400        _ => {
401            bindings.push(value.clone());
402            format!("{col} {} ?", op.sql_op())
403        }
404    }
405}
406
407#[cfg(test)]
408mod tests {
409    use super::*;
410
411    fn ch() -> ClickHouseDialect { ClickHouseDialect::new() }
412
413    #[test]
414    fn test_simple_select() {
415        let ir = QueryIR {
416            cube: "DEXTrades".into(), schema: "default".into(),
417            table: "dwd_dex_trades".into(),
418            selects: vec![
419                SelectExpr::Column { column: "tx_hash".into(), alias: None },
420                SelectExpr::Column { column: "token_a_amount".into(), alias: None },
421            ],
422            filters: FilterNode::Empty, having: FilterNode::Empty,
423            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
424            limit_by: None,
425            use_final: false,
426            joins: vec![],
427        };
428        let r = ch().compile(&ir);
429        assert_eq!(r.sql, "SELECT `tx_hash`, `token_a_amount` FROM `default`.`dwd_dex_trades` LIMIT 10");
430        assert!(r.bindings.is_empty());
431    }
432
433    #[test]
434    fn test_final_keyword() {
435        let ir = QueryIR {
436            cube: "T".into(), schema: "db".into(), table: "tokens".into(),
437            selects: vec![SelectExpr::Column { column: "id".into(), alias: None }],
438            filters: FilterNode::Empty, having: FilterNode::Empty,
439            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
440            limit_by: None,
441            use_final: true,
442            joins: vec![],
443        };
444        let r = ch().compile(&ir);
445        assert!(r.sql.contains("FROM `db`.`tokens` FINAL"), "FINAL should be appended, got: {}", r.sql);
446    }
447
448    #[test]
449    fn test_uniq_uses_native_function() {
450        let ir = QueryIR {
451            cube: "T".into(), schema: "db".into(), table: "t".into(),
452            selects: vec![
453                SelectExpr::Aggregate { function: "UNIQ".into(), column: "wallet".into(), alias: "__uniq".into(), condition: None },
454            ],
455            filters: FilterNode::Empty, having: FilterNode::Empty,
456            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
457            limit_by: None, use_final: false, joins: vec![],
458        };
459        let r = ch().compile(&ir);
460        assert!(r.sql.contains("uniq(`wallet`) AS `__uniq`"), "ClickHouse should use native uniq(), got: {}", r.sql);
461    }
462
463    #[test]
464    fn test_count_star() {
465        let ir = QueryIR {
466            cube: "T".into(), schema: "db".into(), table: "t".into(),
467            selects: vec![
468                SelectExpr::Aggregate { function: "COUNT".into(), column: "*".into(), alias: "__count".into(), condition: None },
469            ],
470            filters: FilterNode::Empty, having: FilterNode::Empty,
471            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
472            limit_by: None, use_final: false, joins: vec![],
473        };
474        let r = ch().compile(&ir);
475        assert!(r.sql.contains("count() AS `__count`"), "ClickHouse should use count() not COUNT(*), got: {}", r.sql);
476    }
477
478    #[test]
479    fn test_aggregate_lowercase() {
480        let ir = QueryIR {
481            cube: "T".into(), schema: "db".into(), table: "t".into(),
482            selects: vec![
483                SelectExpr::Aggregate { function: "SUM".into(), column: "amount".into(), alias: "__sum".into(), condition: None },
484                SelectExpr::Aggregate { function: "AVG".into(), column: "price".into(), alias: "__avg".into(), condition: None },
485            ],
486            filters: FilterNode::Empty, having: FilterNode::Empty,
487            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
488            limit_by: None, use_final: false, joins: vec![],
489        };
490        let r = ch().compile(&ir);
491        assert!(r.sql.contains("sum(`amount`) AS `__sum`"), "ClickHouse functions should be lowercase, got: {}", r.sql);
492        assert!(r.sql.contains("avg(`price`) AS `__avg`"), "got: {}", r.sql);
493    }
494
495    #[test]
496    fn test_where_and_order() {
497        let ir = QueryIR {
498            cube: "T".into(), schema: "db".into(), table: "t".into(),
499            selects: vec![SelectExpr::Column { column: "id".into(), alias: None }],
500            filters: FilterNode::And(vec![
501                FilterNode::Condition { column: "chain_id".into(), op: CompareOp::Eq, value: SqlValue::Int(1) },
502                FilterNode::Condition { column: "amount_usd".into(), op: CompareOp::Gt, value: SqlValue::Float(1000.0) },
503            ]),
504            having: FilterNode::Empty, group_by: vec![],
505            order_by: vec![OrderExpr { column: "block_timestamp".into(), descending: true }],
506            limit: 25, offset: 0,
507            limit_by: None, use_final: false, joins: vec![],
508        };
509        let r = ch().compile(&ir);
510        assert!(r.sql.contains("WHERE (`chain_id` = ? AND `amount_usd` > ?)"));
511        assert!(r.sql.contains("ORDER BY `block_timestamp` DESC"));
512        assert_eq!(r.bindings.len(), 2);
513    }
514
515    #[test]
516    fn test_having_with_aggregate_expr() {
517        let ir = QueryIR {
518            cube: "T".into(), schema: "db".into(), table: "t".into(),
519            selects: vec![
520                SelectExpr::Column { column: "token_address".into(), alias: None },
521                SelectExpr::Aggregate { function: "SUM".into(), column: "amount_usd".into(), alias: "__sum".into(), condition: None },
522            ],
523            filters: FilterNode::Empty,
524            having: FilterNode::Condition {
525                column: "sum(`amount_usd`)".into(), op: CompareOp::Gt, value: SqlValue::Float(1000000.0),
526            },
527            group_by: vec!["token_address".into()], order_by: vec![], limit: 25, offset: 0,
528            limit_by: None, use_final: false, joins: vec![],
529        };
530        let r = ch().compile(&ir);
531        assert!(r.sql.contains("GROUP BY `token_address`"));
532        assert!(r.sql.contains("HAVING `__f_0` > ?"), "expected alias in HAVING, got: {}", r.sql);
533        assert!(r.sql.contains("sum(`amount_usd`) AS `__f_0`"), "expected alias in SELECT, got: {}", r.sql);
534        assert_eq!(r.bindings.len(), 1);
535    }
536
537    #[test]
538    fn test_having_appends_missing_agg_column() {
539        let ir = QueryIR {
540            cube: "T".into(), schema: "db".into(), table: "t".into(),
541            selects: vec![
542                SelectExpr::Column { column: "pool_address".into(), alias: None },
543                SelectExpr::Column { column: "argMaxMerge(latest_liquidity_usd_state)".into(), alias: None },
544            ],
545            filters: FilterNode::Empty,
546            having: FilterNode::And(vec![
547                FilterNode::Condition {
548                    column: "argMaxMerge(latest_liquidity_usd_state)".into(),
549                    op: CompareOp::Gt, value: SqlValue::Float(2.0),
550                },
551                FilterNode::Condition {
552                    column: "argMaxMerge(latest_token_a_amount_state)".into(),
553                    op: CompareOp::Gt, value: SqlValue::Float(3.0),
554                },
555            ]),
556            group_by: vec!["pool_address".into()], order_by: vec![], limit: 25, offset: 0,
557            limit_by: None, use_final: false, joins: vec![],
558        };
559        let r = ch().compile(&ir);
560        assert!(r.sql.contains("argMaxMerge(latest_liquidity_usd_state) AS `__f_0`"),
561            "existing HAVING col should be aliased, got: {}", r.sql);
562        assert!(r.sql.contains("argMaxMerge(latest_token_a_amount_state) AS `__f_1`"),
563            "missing agg col should be appended, got: {}", r.sql);
564        assert!(r.sql.contains("HAVING (`__f_0` > ? AND `__f_1` > ?)"),
565            "HAVING should use aliases, got: {}", r.sql);
566        assert_eq!(r.bindings.len(), 2);
567        assert_eq!(r.alias_remap.len(), 1);
568        assert_eq!(r.alias_remap[0], ("__f_0".to_string(), "argMaxMerge(latest_liquidity_usd_state)".to_string()));
569    }
570
571    #[test]
572    fn test_limit_by() {
573        let ir = QueryIR {
574            cube: "T".into(), schema: "db".into(), table: "t".into(),
575            selects: vec![
576                SelectExpr::Column { column: "owner".into(), alias: None },
577                SelectExpr::Column { column: "amount".into(), alias: None },
578            ],
579            filters: FilterNode::Empty, having: FilterNode::Empty,
580            group_by: vec![], 
581            order_by: vec![OrderExpr { column: "amount".into(), descending: true }],
582            limit: 100, offset: 0,
583            limit_by: Some(LimitByExpr { count: 3, offset: 0, columns: vec!["owner".into()] }),
584            use_final: false, joins: vec![],
585        };
586        let r = ch().compile(&ir);
587        let sql = &r.sql;
588        assert!(sql.contains("LIMIT 3 BY `owner`"), "LIMIT BY should be present, got: {sql}");
589        assert!(sql.contains("ORDER BY `amount` DESC"), "ORDER BY should be present, got: {sql}");
590        assert!(sql.contains("LIMIT 100"), "outer LIMIT should be present, got: {sql}");
591        let order_by_pos = sql.find("ORDER BY").unwrap();
592        let limit_by_pos = sql.find("LIMIT 3 BY").unwrap();
593        let limit_pos = sql.rfind("LIMIT 100").unwrap();
594        assert!(order_by_pos < limit_by_pos, "ORDER BY should come before LIMIT BY in ClickHouse");
595        assert!(limit_by_pos < limit_pos, "LIMIT BY should come before outer LIMIT");
596    }
597
598    #[test]
599    fn test_limit_by_with_offset() {
600        let ir = QueryIR {
601            cube: "T".into(), schema: "db".into(), table: "t".into(),
602            selects: vec![SelectExpr::Column { column: "id".into(), alias: None }],
603            filters: FilterNode::Empty, having: FilterNode::Empty,
604            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
605            limit_by: Some(LimitByExpr { count: 5, offset: 2, columns: vec!["token".into(), "wallet".into()] }),
606            use_final: false, joins: vec![],
607        };
608        let r = ch().compile(&ir);
609        assert!(r.sql.contains("LIMIT 5 BY `token`, `wallet` OFFSET 2"), "multi-column LIMIT BY with offset, got: {}", r.sql);
610    }
611
612    #[test]
613    fn test_join_direct() {
614        let ir = QueryIR {
615            cube: "DEXTrades".into(), schema: "dexes_dwd".into(),
616            table: "sol_dex_trades".into(),
617            selects: vec![
618                SelectExpr::Column { column: "tx_hash".into(), alias: None },
619                SelectExpr::Column { column: "buy_token_address".into(), alias: None },
620            ],
621            filters: FilterNode::Empty, having: FilterNode::Empty,
622            group_by: vec![], order_by: vec![], limit: 25, offset: 0,
623            limit_by: None, use_final: false,
624            joins: vec![JoinExpr {
625                schema: "dexes_dim".into(), table: "sol_tokens".into(),
626                alias: "_j0".into(),
627                conditions: vec![("buy_token_address".into(), "token_address".into())],
628                selects: vec![
629                    SelectExpr::Column { column: "name".into(), alias: None },
630                    SelectExpr::Column { column: "symbol".into(), alias: None },
631                ],
632                group_by: vec![], use_final: true, is_aggregate: false,
633                target_cube: "TokenSearch".into(), join_field: "joinBuyToken".into(),
634            }],
635        };
636        let r = ch().compile(&ir);
637        assert!(r.sql.contains("FROM (SELECT"), "main query should be wrapped, got: {}", r.sql);
638        assert!(r.sql.contains("LEFT JOIN `dexes_dim`.`sol_tokens` FINAL AS _j0"),
639            "direct JOIN with FINAL, got: {}", r.sql);
640        assert!(r.sql.contains("_main.`buy_token_address` = _j0.`token_address`"),
641            "ON condition, got: {}", r.sql);
642        assert!(r.sql.contains("_j0.`name` AS `_j0.name`"), "joined col alias, got: {}", r.sql);
643    }
644
645    #[test]
646    fn test_join_aggregate_subquery() {
647        let ir = QueryIR {
648            cube: "DEXTrades".into(), schema: "dexes_dwd".into(),
649            table: "sol_dex_trades".into(),
650            selects: vec![
651                SelectExpr::Column { column: "tx_hash".into(), alias: None },
652                SelectExpr::Column { column: "buy_token_address".into(), alias: None },
653            ],
654            filters: FilterNode::Empty, having: FilterNode::Empty,
655            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
656            limit_by: None, use_final: false,
657            joins: vec![JoinExpr {
658                schema: "dexes_dws".into(), table: "sol_token_market_cap".into(),
659                alias: "_j0".into(),
660                conditions: vec![("buy_token_address".into(), "token_address".into())],
661                selects: vec![
662                    SelectExpr::Column { column: "argMaxMerge(latest_market_cap_usd_state)".into(), alias: None },
663                ],
664                group_by: vec!["token_address".into()],
665                use_final: false, is_aggregate: true,
666                target_cube: "TokenMarketCap".into(), join_field: "joinBuyTokenMarketCap".into(),
667            }],
668        };
669        let r = ch().compile(&ir);
670        assert!(r.sql.contains("LEFT JOIN (SELECT"), "aggregate should use subquery, got: {}", r.sql);
671        assert!(r.sql.contains("GROUP BY `token_address`"), "subquery GROUP BY, got: {}", r.sql);
672        assert!(r.sql.contains("FROM `dexes_dws`.`sol_token_market_cap`"), "subquery FROM, got: {}", r.sql);
673    }
674}