Skip to main content

activecube_rs/sql/
clickhouse.rs

1use std::collections::HashMap;
2
3use crate::compiler::ir::*;
4use crate::compiler::ir::{CompileResult, JoinType};
5use crate::sql::dialect::SqlDialect;
6
7pub struct ClickHouseDialect;
8
9impl ClickHouseDialect {
10    pub fn new() -> Self {
11        Self
12    }
13}
14
15impl Default for ClickHouseDialect {
16    fn default() -> Self {
17        Self::new()
18    }
19}
20
21impl SqlDialect for ClickHouseDialect {
22    fn compile(&self, ir: &QueryIR) -> CompileResult {
23        if let Some(ref builder) = ir.custom_query_builder {
24            return (builder.0)(ir);
25        }
26
27        let mut bindings = Vec::new();
28        let mut alias_remap: Vec<(String, String)> = Vec::new();
29
30        if ir.joins.is_empty() {
31            let inner_sql = self.compile_inner(ir, &mut bindings, &mut alias_remap);
32            return CompileResult { sql: inner_sql, bindings, alias_remap };
33        }
34
35        // When JOINs are present, force aliases on function expression columns
36        // (e.g. `argMaxMerge(latest_balance)`) so the outer SELECT can reference
37        // them by simple identifier names. Without aliases, `_main.argMaxMerge(x)`
38        // is parsed as a function call — a ClickHouse syntax error.
39        let mut ir_mod = ir.clone();
40        let mut mc_counter = 0u32;
41        for sel in &mut ir_mod.selects {
42            if let SelectExpr::Column { column, alias } = sel {
43                if column.contains('(') && alias.is_none() {
44                    let a = format!("__mc_{mc_counter}");
45                    mc_counter += 1;
46                    alias_remap.push((a.clone(), column.clone()));
47                    *alias = Some(a);
48                }
49            }
50        }
51
52        let mut jc_counter = 0u32;
53        for join in &mut ir_mod.joins {
54            for sel in &mut join.selects {
55                if let SelectExpr::Column { column, alias } = sel {
56                    if column.contains('(') && alias.is_none() {
57                        let a = format!("__jc_{jc_counter}");
58                        jc_counter += 1;
59                        *alias = Some(a);
60                    }
61                }
62            }
63        }
64
65        let inner_sql = self.compile_inner(&ir_mod, &mut bindings, &mut alias_remap);
66
67        // Build outer SELECT with explicit column listing.
68        // Always backtick-quote to prevent ClickHouse auto-prefixing ambiguous
69        // names when multiple JOINed tables share column names.
70        let main_cols: Vec<String> = ir_mod.selects.iter().map(|s| {
71            let col = match s {
72                SelectExpr::Column { column, alias } => alias.as_ref().unwrap_or(column).clone(),
73                SelectExpr::Aggregate { alias, .. } => alias.clone(),
74            };
75            format!("_main.`{}` AS `{}`", col, col)
76        }).collect();
77
78        let mut sql = String::from("SELECT ");
79        sql.push_str(&main_cols.join(", "));
80
81        // Collect joined column aliases for the outer SELECT
82        for join in &ir_mod.joins {
83            for sel in &join.selects {
84                let col_name = match sel {
85                    SelectExpr::Column { column, alias } => alias.as_ref().unwrap_or(column).clone(),
86                    SelectExpr::Aggregate { alias, .. } => alias.clone(),
87                };
88                let outer_alias = format!("{}.{}", join.alias, col_name);
89                if let SelectExpr::Column { column, alias: Some(_) } = sel {
90                    if column.contains('(') {
91                        let outer_original = format!("{}.{}", join.alias, column);
92                        alias_remap.push((outer_alias.clone(), outer_original));
93                    }
94                }
95                sql.push_str(&format!(", {}.`{}` AS `{}`",
96                    join.alias, col_name, outer_alias));
97            }
98        }
99
100        sql.push_str(&format!(" FROM ({}) AS _main", inner_sql));
101
102        // Build a lookup for main query aliases to resolve ON condition columns
103        // that might be function expressions (aliased above).
104        let main_alias_map: HashMap<String, String> = ir_mod.selects.iter()
105            .filter_map(|s| {
106                if let SelectExpr::Column { column, alias: Some(a) } = s {
107                    if column.contains('(') { Some((column.clone(), a.clone())) } else { None }
108                } else { None }
109            })
110            .collect();
111
112        for join in &ir_mod.joins {
113            let join_kw = join.join_type.sql_keyword();
114
115            if join.is_aggregate {
116                // Mode B: subquery JOIN for AggregatingMergeTree targets
117                sql.push_str(&format!(" {} (SELECT ", join_kw));
118                let mut sub_parts: Vec<String> = Vec::new();
119                for gb_col in &join.group_by {
120                    sub_parts.push(quote_col(gb_col));
121                }
122                for sel in &join.selects {
123                    match sel {
124                        SelectExpr::Column { column, alias } => {
125                            let col = if column.contains('(') { column.clone() } else { format!("`{column}`") };
126                            if let Some(a) = alias {
127                                sub_parts.push(format!("{col} AS `{a}`"));
128                            } else if column.contains('(') || !join.group_by.contains(column) {
129                                sub_parts.push(col);
130                            }
131                        }
132                        SelectExpr::Aggregate { function, column, alias, condition } => {
133                            let func = function.to_lowercase();
134                            let expr = match (func.as_str(), column.as_str(), condition) {
135                                ("count", "*", None) => format!("count() AS `{alias}`"),
136                                ("uniq", col, None) => format!("uniq(`{col}`) AS `{alias}`"),
137                                (f, col, None) => format!("{f}(`{col}`) AS `{alias}`"),
138                                (f, col, Some(cond)) => format!("{f}If(`{col}`, {cond}) AS `{alias}`"),
139                            };
140                            sub_parts.push(expr);
141                        }
142                    }
143                }
144                sql.push_str(&sub_parts.join(", "));
145                sql.push_str(&format!(" FROM `{}`.`{}`", join.schema, join.table));
146                if !join.group_by.is_empty() {
147                    sql.push_str(" GROUP BY ");
148                    let gb: Vec<String> = join.group_by.iter().map(|c| quote_col(c)).collect();
149                    sql.push_str(&gb.join(", "));
150                }
151                sql.push_str(&format!(") AS {}", join.alias));
152            } else {
153                // Mode A: direct JOIN for MergeTree / ReplacingMergeTree targets
154                sql.push_str(&format!(" {} `{}`.`{}` AS {}",
155                    join_kw, join.schema, join.table, join.alias));
156                if join.use_final {
157                    sql.push_str(" FINAL");
158                }
159            }
160
161            if join.join_type == JoinType::Cross {
162                // CROSS JOIN has no ON clause
163                continue;
164            }
165
166            // ON conditions — use alias if the local column was a func expression
167            let on_parts: Vec<String> = join.conditions.iter().map(|(local, remote)| {
168                let local_ref = main_alias_map.get(local).unwrap_or(local);
169                format!("_main.`{}` = {}.`{}`", local_ref, join.alias, remote)
170            }).collect();
171            sql.push_str(" ON ");
172            sql.push_str(&on_parts.join(" AND "));
173        }
174
175        CompileResult { sql, bindings, alias_remap }
176    }
177
178    fn quote_identifier(&self, name: &str) -> String {
179        format!("`{name}`")
180    }
181
182    fn name(&self) -> &str {
183        "ClickHouse"
184    }
185}
186
187impl ClickHouseDialect {
188    fn compile_inner(
189        &self,
190        ir: &QueryIR,
191        bindings: &mut Vec<SqlValue>,
192        alias_remap: &mut Vec<(String, String)>,
193    ) -> String {
194        let mut sql = String::new();
195
196        let mut augmented_selects = ir.selects.clone();
197        let mut agg_alias_map: HashMap<String, String> = HashMap::new();
198        let mut alias_counter = 0u32;
199
200        let having_cols: std::collections::HashSet<String> =
201            collect_filter_columns(&ir.having).into_iter().collect();
202        let has_having_agg = having_cols.iter().any(|c| c.contains('('));
203
204        if has_having_agg {
205            for sel in &mut augmented_selects {
206                if let SelectExpr::Column { column, alias } = sel {
207                    if column.contains('(') && having_cols.contains(column.as_str()) {
208                        if alias.is_none() {
209                            let a = format!("__f_{alias_counter}");
210                            alias_counter += 1;
211                            alias_remap.push((a.clone(), column.clone()));
212                            agg_alias_map.insert(column.clone(), a.clone());
213                            *alias = Some(a);
214                        } else if let Some(existing) = alias {
215                            agg_alias_map.insert(column.clone(), existing.clone());
216                        }
217                    }
218                }
219            }
220            for col in &having_cols {
221                if col.contains('(') && !agg_alias_map.contains_key(col.as_str()) {
222                    let a = format!("__f_{alias_counter}");
223                    alias_counter += 1;
224                    agg_alias_map.insert(col.clone(), a.clone());
225                    augmented_selects.push(SelectExpr::Column {
226                        column: col.clone(),
227                        alias: Some(a),
228                    });
229                }
230            }
231        }
232
233        sql.push_str("SELECT ");
234        let select_parts: Vec<String> = augmented_selects.iter().map(|s| match s {
235            SelectExpr::Column { column, alias } => {
236                let col = if column.contains('(') { column.clone() } else { format!("`{column}`") };
237                match alias {
238                    Some(a) => format!("{col} AS `{a}`"),
239                    None => col,
240                }
241            },
242            SelectExpr::Aggregate { function, column, alias, condition } => {
243                let func = function.to_uppercase();
244                match (func.as_str(), column.as_str(), condition) {
245                    ("COUNT", "*", None) => format!("count() AS `{alias}`"),
246                    ("COUNT", "*", Some(cond)) => format!("countIf({cond}) AS `{alias}`"),
247                    ("UNIQ", col, None) => format!("uniq(`{col}`) AS `{alias}`"),
248                    ("UNIQ", col, Some(cond)) => format!("uniqIf(`{col}`, {cond}) AS `{alias}`"),
249                    (_, col, None) => format!("{f}(`{col}`) AS `{alias}`", f = func.to_lowercase()),
250                    (_, col, Some(cond)) => format!("{f}If(`{col}`, {cond}) AS `{alias}`", f = func.to_lowercase()),
251                }
252            }
253        }).collect();
254        sql.push_str(&select_parts.join(", "));
255
256        sql.push_str(&format!(" FROM `{}`.`{}`", ir.schema, ir.table));
257        if ir.use_final {
258            sql.push_str(" FINAL");
259        }
260
261        let where_clause = compile_filter(&ir.filters, bindings);
262        if !where_clause.is_empty() {
263            sql.push_str(" WHERE ");
264            sql.push_str(&where_clause);
265        }
266
267        let effective_group_by = if !ir.group_by.is_empty() {
268            ir.group_by.clone()
269        } else {
270            let has_merge_cols = augmented_selects.iter().any(|s| match s {
271                SelectExpr::Column { column, .. } => column.contains("Merge("),
272                SelectExpr::Aggregate { .. } => true,
273            });
274            if has_merge_cols {
275                augmented_selects.iter().filter_map(|s| match s {
276                    SelectExpr::Column { column, .. } if !column.contains("Merge(") && !column.contains('(') => {
277                        Some(column.clone())
278                    }
279                    _ => None,
280                }).collect()
281            } else {
282                vec![]
283            }
284        };
285
286        if !effective_group_by.is_empty() {
287            sql.push_str(" GROUP BY ");
288            let cols: Vec<String> = effective_group_by.iter().map(|c| format!("`{c}`")).collect();
289            sql.push_str(&cols.join(", "));
290        }
291
292        if has_having_agg {
293            let having_clause = compile_filter_with_aliases(&ir.having, bindings, &agg_alias_map);
294            if !having_clause.is_empty() {
295                sql.push_str(" HAVING ");
296                sql.push_str(&having_clause);
297            }
298        } else {
299            let having_clause = compile_filter(&ir.having, bindings);
300            if !having_clause.is_empty() {
301                sql.push_str(" HAVING ");
302                sql.push_str(&having_clause);
303            }
304        }
305
306        if !ir.order_by.is_empty() {
307            sql.push_str(" ORDER BY ");
308            let parts: Vec<String> = ir.order_by.iter().map(|o| {
309                let col = if o.column.contains('(') {
310                    agg_alias_map.get(&o.column)
311                        .map(|a| format!("`{a}`"))
312                        .unwrap_or_else(|| o.column.clone())
313                } else {
314                    format!("`{}`", o.column)
315                };
316                let dir = if o.descending { "DESC" } else { "ASC" };
317                format!("{col} {dir}")
318            }).collect();
319            sql.push_str(&parts.join(", "));
320        }
321
322        if let Some(ref lb) = ir.limit_by {
323            let by_cols: Vec<String> = lb.columns.iter().map(|c| format!("`{c}`")).collect();
324            sql.push_str(&format!(" LIMIT {} BY {}", lb.count, by_cols.join(", ")));
325            if lb.offset > 0 {
326                sql.push_str(&format!(" OFFSET {}", lb.offset));
327            }
328        }
329
330        sql.push_str(&format!(" LIMIT {}", ir.limit));
331        if ir.offset > 0 {
332            sql.push_str(&format!(" OFFSET {}", ir.offset));
333        }
334
335        sql
336    }
337}
338
339/// Collect all column names referenced in a filter tree.
340fn collect_filter_columns(node: &FilterNode) -> Vec<String> {
341    match node {
342        FilterNode::Empty => vec![],
343        FilterNode::Condition { column, .. } => vec![column.clone()],
344        FilterNode::And(children) | FilterNode::Or(children) => {
345            children.iter().flat_map(collect_filter_columns).collect()
346        }
347    }
348}
349
350/// Like `compile_filter` but replaces aggregate expression columns with their
351/// SELECT aliases so ClickHouse can resolve them in HAVING scope.
352fn compile_filter_with_aliases(
353    node: &FilterNode,
354    bindings: &mut Vec<SqlValue>,
355    aliases: &HashMap<String, String>,
356) -> String {
357    match node {
358        FilterNode::Empty => String::new(),
359        FilterNode::Condition { column, op, value } => {
360            let effective_col = aliases.get(column)
361                .map(|a| a.as_str())
362                .unwrap_or(column.as_str());
363            compile_condition(effective_col, op, value, bindings)
364        }
365        FilterNode::And(children) => {
366            let parts: Vec<String> = children.iter()
367                .map(|c| compile_filter_with_aliases(c, bindings, aliases))
368                .filter(|s| !s.is_empty())
369                .collect();
370            match parts.len() {
371                0 => String::new(),
372                1 => parts.into_iter().next().unwrap(),
373                _ => format!("({})", parts.join(" AND ")),
374            }
375        }
376        FilterNode::Or(children) => {
377            let parts: Vec<String> = children.iter()
378                .map(|c| compile_filter_with_aliases(c, bindings, aliases))
379                .filter(|s| !s.is_empty())
380                .collect();
381            match parts.len() {
382                0 => String::new(),
383                1 => parts.into_iter().next().unwrap(),
384                _ => format!("({})", parts.join(" OR ")),
385            }
386        }
387    }
388}
389
390fn compile_filter(node: &FilterNode, bindings: &mut Vec<SqlValue>) -> String {
391    match node {
392        FilterNode::Empty => String::new(),
393        FilterNode::Condition { column, op, value } => {
394            compile_condition(column, op, value, bindings)
395        }
396        FilterNode::And(children) => {
397            let parts: Vec<String> = children.iter()
398                .map(|c| compile_filter(c, bindings))
399                .filter(|s| !s.is_empty())
400                .collect();
401            match parts.len() {
402                0 => String::new(),
403                1 => parts.into_iter().next().unwrap(),
404                _ => format!("({})", parts.join(" AND ")),
405            }
406        }
407        FilterNode::Or(children) => {
408            let parts: Vec<String> = children.iter()
409                .map(|c| compile_filter(c, bindings))
410                .filter(|s| !s.is_empty())
411                .collect();
412            match parts.len() {
413                0 => String::new(),
414                1 => parts.into_iter().next().unwrap(),
415                _ => format!("({})", parts.join(" OR ")),
416            }
417        }
418    }
419}
420
421fn quote_col(column: &str) -> String {
422    if column.contains('(') {
423        column.to_string()
424    } else {
425        format!("`{column}`")
426    }
427}
428
429fn compile_condition(
430    column: &str, op: &CompareOp, value: &SqlValue, bindings: &mut Vec<SqlValue>,
431) -> String {
432    let col = quote_col(column);
433    match op {
434        CompareOp::In | CompareOp::NotIn => {
435            if let SqlValue::String(csv) = value {
436                let items: Vec<&str> = csv.split(',').collect();
437                let placeholders: Vec<&str> = items.iter().map(|_| "?").collect();
438                for item in &items {
439                    bindings.push(SqlValue::String(item.trim().to_string()));
440                }
441                format!("{col} {} ({})", op.sql_op(), placeholders.join(", "))
442            } else {
443                bindings.push(value.clone());
444                format!("{col} {} (?)", op.sql_op())
445            }
446        }
447        CompareOp::Includes => {
448            if let SqlValue::String(s) = value {
449                bindings.push(SqlValue::String(format!("%{s}%")));
450            } else {
451                bindings.push(value.clone());
452            }
453            format!("{col} LIKE ?")
454        }
455        CompareOp::IsNull | CompareOp::IsNotNull => {
456            format!("{col} {}", op.sql_op())
457        }
458        _ => {
459            bindings.push(value.clone());
460            format!("{col} {} ?", op.sql_op())
461        }
462    }
463}
464
465#[cfg(test)]
466mod tests {
467    use super::*;
468
469    fn ch() -> ClickHouseDialect { ClickHouseDialect::new() }
470
471    #[test]
472    fn test_simple_select() {
473        let ir = QueryIR {
474            cube: "DEXTrades".into(), schema: "default".into(),
475            table: "dwd_dex_trades".into(),
476            selects: vec![
477                SelectExpr::Column { column: "tx_hash".into(), alias: None },
478                SelectExpr::Column { column: "token_a_amount".into(), alias: None },
479            ],
480            filters: FilterNode::Empty, having: FilterNode::Empty,
481            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
482            limit_by: None,
483            use_final: false,
484            joins: vec![],
485            custom_query_builder: None,
486        };
487        let r = ch().compile(&ir);
488        assert_eq!(r.sql, "SELECT `tx_hash`, `token_a_amount` FROM `default`.`dwd_dex_trades` LIMIT 10");
489        assert!(r.bindings.is_empty());
490    }
491
492    #[test]
493    fn test_final_keyword() {
494        let ir = QueryIR {
495            cube: "T".into(), schema: "db".into(), table: "tokens".into(),
496            selects: vec![SelectExpr::Column { column: "id".into(), alias: None }],
497            filters: FilterNode::Empty, having: FilterNode::Empty,
498            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
499            limit_by: None,
500            use_final: true,
501            joins: vec![],
502            custom_query_builder: None,
503        };
504        let r = ch().compile(&ir);
505        assert!(r.sql.contains("FROM `db`.`tokens` FINAL"), "FINAL should be appended, got: {}", r.sql);
506    }
507
508    #[test]
509    fn test_uniq_uses_native_function() {
510        let ir = QueryIR {
511            cube: "T".into(), schema: "db".into(), table: "t".into(),
512            selects: vec![
513                SelectExpr::Aggregate { function: "UNIQ".into(), column: "wallet".into(), alias: "__uniq".into(), condition: None },
514            ],
515            filters: FilterNode::Empty, having: FilterNode::Empty,
516            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
517            limit_by: None, use_final: false, joins: vec![], custom_query_builder: None,
518        };
519        let r = ch().compile(&ir);
520        assert!(r.sql.contains("uniq(`wallet`) AS `__uniq`"), "ClickHouse should use native uniq(), got: {}", r.sql);
521    }
522
523    #[test]
524    fn test_count_star() {
525        let ir = QueryIR {
526            cube: "T".into(), schema: "db".into(), table: "t".into(),
527            selects: vec![
528                SelectExpr::Aggregate { function: "COUNT".into(), column: "*".into(), alias: "__count".into(), condition: None },
529            ],
530            filters: FilterNode::Empty, having: FilterNode::Empty,
531            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
532            limit_by: None, use_final: false, joins: vec![], custom_query_builder: None,
533        };
534        let r = ch().compile(&ir);
535        assert!(r.sql.contains("count() AS `__count`"), "ClickHouse should use count() not COUNT(*), got: {}", r.sql);
536    }
537
538    #[test]
539    fn test_aggregate_lowercase() {
540        let ir = QueryIR {
541            cube: "T".into(), schema: "db".into(), table: "t".into(),
542            selects: vec![
543                SelectExpr::Aggregate { function: "SUM".into(), column: "amount".into(), alias: "__sum".into(), condition: None },
544                SelectExpr::Aggregate { function: "AVG".into(), column: "price".into(), alias: "__avg".into(), condition: None },
545            ],
546            filters: FilterNode::Empty, having: FilterNode::Empty,
547            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
548            limit_by: None, use_final: false, joins: vec![], custom_query_builder: None,
549        };
550        let r = ch().compile(&ir);
551        assert!(r.sql.contains("sum(`amount`) AS `__sum`"), "ClickHouse functions should be lowercase, got: {}", r.sql);
552        assert!(r.sql.contains("avg(`price`) AS `__avg`"), "got: {}", r.sql);
553    }
554
555    #[test]
556    fn test_where_and_order() {
557        let ir = QueryIR {
558            cube: "T".into(), schema: "db".into(), table: "t".into(),
559            selects: vec![SelectExpr::Column { column: "id".into(), alias: None }],
560            filters: FilterNode::And(vec![
561                FilterNode::Condition { column: "chain_id".into(), op: CompareOp::Eq, value: SqlValue::Int(1) },
562                FilterNode::Condition { column: "amount_usd".into(), op: CompareOp::Gt, value: SqlValue::Float(1000.0) },
563            ]),
564            having: FilterNode::Empty, group_by: vec![],
565            order_by: vec![OrderExpr { column: "block_timestamp".into(), descending: true }],
566            limit: 25, offset: 0,
567            limit_by: None, use_final: false, joins: vec![], custom_query_builder: None,
568        };
569        let r = ch().compile(&ir);
570        assert!(r.sql.contains("WHERE (`chain_id` = ? AND `amount_usd` > ?)"));
571        assert!(r.sql.contains("ORDER BY `block_timestamp` DESC"));
572        assert_eq!(r.bindings.len(), 2);
573    }
574
575    #[test]
576    fn test_having_with_aggregate_expr() {
577        let ir = QueryIR {
578            cube: "T".into(), schema: "db".into(), table: "t".into(),
579            selects: vec![
580                SelectExpr::Column { column: "token_address".into(), alias: None },
581                SelectExpr::Aggregate { function: "SUM".into(), column: "amount_usd".into(), alias: "__sum".into(), condition: None },
582            ],
583            filters: FilterNode::Empty,
584            having: FilterNode::Condition {
585                column: "sum(`amount_usd`)".into(), op: CompareOp::Gt, value: SqlValue::Float(1000000.0),
586            },
587            group_by: vec!["token_address".into()], order_by: vec![], limit: 25, offset: 0,
588            limit_by: None, use_final: false, joins: vec![], custom_query_builder: None,
589        };
590        let r = ch().compile(&ir);
591        assert!(r.sql.contains("GROUP BY `token_address`"));
592        assert!(r.sql.contains("HAVING `__f_0` > ?"), "expected alias in HAVING, got: {}", r.sql);
593        assert!(r.sql.contains("sum(`amount_usd`) AS `__f_0`"), "expected alias in SELECT, got: {}", r.sql);
594        assert_eq!(r.bindings.len(), 1);
595    }
596
597    #[test]
598    fn test_having_appends_missing_agg_column() {
599        let ir = QueryIR {
600            cube: "T".into(), schema: "db".into(), table: "t".into(),
601            selects: vec![
602                SelectExpr::Column { column: "pool_address".into(), alias: None },
603                SelectExpr::Column { column: "argMaxMerge(latest_liquidity_usd_state)".into(), alias: None },
604            ],
605            filters: FilterNode::Empty,
606            having: FilterNode::And(vec![
607                FilterNode::Condition {
608                    column: "argMaxMerge(latest_liquidity_usd_state)".into(),
609                    op: CompareOp::Gt, value: SqlValue::Float(2.0),
610                },
611                FilterNode::Condition {
612                    column: "argMaxMerge(latest_token_a_amount_state)".into(),
613                    op: CompareOp::Gt, value: SqlValue::Float(3.0),
614                },
615            ]),
616            group_by: vec!["pool_address".into()], order_by: vec![], limit: 25, offset: 0,
617            limit_by: None, use_final: false, joins: vec![], custom_query_builder: None,
618        };
619        let r = ch().compile(&ir);
620        assert!(r.sql.contains("argMaxMerge(latest_liquidity_usd_state) AS `__f_0`"),
621            "existing HAVING col should be aliased, got: {}", r.sql);
622        assert!(r.sql.contains("argMaxMerge(latest_token_a_amount_state) AS `__f_1`"),
623            "missing agg col should be appended, got: {}", r.sql);
624        assert!(r.sql.contains("HAVING (`__f_0` > ? AND `__f_1` > ?)"),
625            "HAVING should use aliases, got: {}", r.sql);
626        assert_eq!(r.bindings.len(), 2);
627        assert_eq!(r.alias_remap.len(), 1);
628        assert_eq!(r.alias_remap[0], ("__f_0".to_string(), "argMaxMerge(latest_liquidity_usd_state)".to_string()));
629    }
630
631    #[test]
632    fn test_limit_by() {
633        let ir = QueryIR {
634            cube: "T".into(), schema: "db".into(), table: "t".into(),
635            selects: vec![
636                SelectExpr::Column { column: "owner".into(), alias: None },
637                SelectExpr::Column { column: "amount".into(), alias: None },
638            ],
639            filters: FilterNode::Empty, having: FilterNode::Empty,
640            group_by: vec![], 
641            order_by: vec![OrderExpr { column: "amount".into(), descending: true }],
642            limit: 100, offset: 0,
643            limit_by: Some(LimitByExpr { count: 3, offset: 0, columns: vec!["owner".into()] }),
644            use_final: false, joins: vec![], custom_query_builder: None,
645        };
646        let r = ch().compile(&ir);
647        let sql = &r.sql;
648        assert!(sql.contains("LIMIT 3 BY `owner`"), "LIMIT BY should be present, got: {sql}");
649        assert!(sql.contains("ORDER BY `amount` DESC"), "ORDER BY should be present, got: {sql}");
650        assert!(sql.contains("LIMIT 100"), "outer LIMIT should be present, got: {sql}");
651        let order_by_pos = sql.find("ORDER BY").unwrap();
652        let limit_by_pos = sql.find("LIMIT 3 BY").unwrap();
653        let limit_pos = sql.rfind("LIMIT 100").unwrap();
654        assert!(order_by_pos < limit_by_pos, "ORDER BY should come before LIMIT BY in ClickHouse");
655        assert!(limit_by_pos < limit_pos, "LIMIT BY should come before outer LIMIT");
656    }
657
658    #[test]
659    fn test_limit_by_with_offset() {
660        let ir = QueryIR {
661            cube: "T".into(), schema: "db".into(), table: "t".into(),
662            selects: vec![SelectExpr::Column { column: "id".into(), alias: None }],
663            filters: FilterNode::Empty, having: FilterNode::Empty,
664            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
665            limit_by: Some(LimitByExpr { count: 5, offset: 2, columns: vec!["token".into(), "wallet".into()] }),
666            use_final: false, joins: vec![], custom_query_builder: None,
667        };
668        let r = ch().compile(&ir);
669        assert!(r.sql.contains("LIMIT 5 BY `token`, `wallet` OFFSET 2"), "multi-column LIMIT BY with offset, got: {}", r.sql);
670    }
671
672    #[test]
673    fn test_join_direct() {
674        let ir = QueryIR {
675            cube: "DEXTrades".into(), schema: "dexes_dwd".into(),
676            table: "sol_dex_trades".into(),
677            selects: vec![
678                SelectExpr::Column { column: "tx_hash".into(), alias: None },
679                SelectExpr::Column { column: "buy_token_address".into(), alias: None },
680            ],
681            filters: FilterNode::Empty, having: FilterNode::Empty,
682            group_by: vec![], order_by: vec![], limit: 25, offset: 0,
683            limit_by: None, use_final: false,
684            joins: vec![JoinExpr {
685                schema: "dexes_dim".into(), table: "sol_tokens".into(),
686                alias: "_j0".into(),
687                conditions: vec![("buy_token_address".into(), "token_address".into())],
688                selects: vec![
689                    SelectExpr::Column { column: "name".into(), alias: None },
690                    SelectExpr::Column { column: "symbol".into(), alias: None },
691                ],
692                group_by: vec![], use_final: true, is_aggregate: false,
693                target_cube: "TokenSearch".into(), join_field: "joinBuyToken".into(),
694                join_type: JoinType::Left,
695            }],
696            custom_query_builder: None,
697        };
698        let r = ch().compile(&ir);
699        assert!(r.sql.contains("FROM (SELECT"), "main query should be wrapped, got: {}", r.sql);
700        assert!(r.sql.contains("LEFT JOIN `dexes_dim`.`sol_tokens` AS _j0 FINAL"),
701            "direct JOIN with FINAL after alias, got: {}", r.sql);
702        assert!(r.sql.contains("_main.`buy_token_address` = _j0.`token_address`"),
703            "ON condition, got: {}", r.sql);
704        assert!(r.sql.contains("_j0.`name` AS `_j0.name`"), "joined col alias, got: {}", r.sql);
705    }
706
707    #[test]
708    fn test_join_aggregate_subquery() {
709        let ir = QueryIR {
710            cube: "DEXTrades".into(), schema: "dexes_dwd".into(),
711            table: "sol_dex_trades".into(),
712            selects: vec![
713                SelectExpr::Column { column: "tx_hash".into(), alias: None },
714                SelectExpr::Column { column: "buy_token_address".into(), alias: None },
715            ],
716            filters: FilterNode::Empty, having: FilterNode::Empty,
717            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
718            limit_by: None, use_final: false,
719            joins: vec![JoinExpr {
720                schema: "dexes_dws".into(), table: "sol_token_market_cap".into(),
721                alias: "_j0".into(),
722                conditions: vec![("buy_token_address".into(), "token_address".into())],
723                selects: vec![
724                    SelectExpr::Column { column: "argMaxMerge(latest_market_cap_usd_state)".into(), alias: None },
725                ],
726                group_by: vec!["token_address".into()],
727                use_final: false, is_aggregate: true,
728                target_cube: "TokenMarketCap".into(), join_field: "joinBuyTokenMarketCap".into(),
729                join_type: JoinType::Left,
730            }],
731            custom_query_builder: None,
732        };
733        let r = ch().compile(&ir);
734        assert!(r.sql.contains("LEFT JOIN (SELECT"), "aggregate should use subquery, got: {}", r.sql);
735        assert!(r.sql.contains("GROUP BY `token_address`"), "subquery GROUP BY, got: {}", r.sql);
736        assert!(r.sql.contains("FROM `dexes_dws`.`sol_token_market_cap`"), "subquery FROM, got: {}", r.sql);
737        assert!(r.sql.contains("argMaxMerge(latest_market_cap_usd_state) AS `__jc_0`"),
738            "join func expr should be aliased in subquery, got: {}", r.sql);
739        assert!(r.sql.contains("_j0.`__jc_0` AS `_j0.__jc_0`"),
740            "outer SELECT should use alias for join func col, got: {}", r.sql);
741    }
742
743    #[test]
744    fn test_join_main_query_func_expression_columns() {
745        let ir = QueryIR {
746            cube: "TokenHolders".into(), schema: "dws".into(),
747            table: "sol_token_holders".into(),
748            selects: vec![
749                SelectExpr::Column { column: "token".into(), alias: None },
750                SelectExpr::Column { column: "holder".into(), alias: None },
751                SelectExpr::Column { column: "argMaxMerge(latest_balance)".into(), alias: None },
752                SelectExpr::Column { column: "argMaxMerge(latest_balance_usd)".into(), alias: None },
753                SelectExpr::Column { column: "minMerge(first_seen)".into(), alias: None },
754                SelectExpr::Column { column: "maxMerge(last_seen)".into(), alias: None },
755            ],
756            filters: FilterNode::Empty, having: FilterNode::Empty,
757            group_by: vec![], order_by: vec![
758                OrderExpr { column: "argMaxMerge(latest_balance_usd)".into(), descending: true },
759            ],
760            limit: 100, offset: 0,
761            limit_by: None, use_final: false,
762            joins: vec![JoinExpr {
763                schema: "dim".into(), table: "sol_tokens".into(),
764                alias: "_j0".into(),
765                conditions: vec![("token".into(), "token_address".into())],
766                selects: vec![
767                    SelectExpr::Column { column: "name".into(), alias: None },
768                    SelectExpr::Column { column: "symbol".into(), alias: None },
769                ],
770                group_by: vec![], use_final: true, is_aggregate: false,
771                target_cube: "TokenSearch".into(), join_field: "joinToken".into(),
772                join_type: JoinType::Left,
773            }],
774            custom_query_builder: None,
775        };
776        let r = ch().compile(&ir);
777        let sql = &r.sql;
778
779        assert!(sql.contains("_main.`__mc_0` AS `__mc_0`"),
780            "func expr should use alias __mc_0 in outer SELECT, got: {sql}");
781        assert!(sql.contains("_main.`__mc_1` AS `__mc_1`"),
782            "func expr should use alias __mc_1, got: {sql}");
783        assert!(sql.contains("_main.`token` AS `token`"),
784            "simple col should be backtick-quoted, got: {sql}");
785
786        assert!(!sql.contains("_main.argMaxMerge("),
787            "outer SELECT must NOT have bare _main.argMaxMerge(...), got: {sql}");
788
789        assert!(sql.contains("argMaxMerge(latest_balance) AS `__mc_0`"),
790            "inner query should alias func expr, got: {sql}");
791
792        assert!(r.alias_remap.iter().any(|(a, o)| a == "__mc_0" && o == "argMaxMerge(latest_balance)"),
793            "alias_remap should map __mc_0 → original, got: {:?}", r.alias_remap);
794        assert!(r.alias_remap.iter().any(|(a, o)| a == "__mc_1" && o == "argMaxMerge(latest_balance_usd)"),
795            "alias_remap should map __mc_1, got: {:?}", r.alias_remap);
796    }
797
798    #[test]
799    fn test_join_inner_type() {
800        let ir = QueryIR {
801            cube: "DEXTrades".into(), schema: "dexes_dwd".into(),
802            table: "sol_dex_trades".into(),
803            selects: vec![
804                SelectExpr::Column { column: "tx_hash".into(), alias: None },
805            ],
806            filters: FilterNode::Empty, having: FilterNode::Empty,
807            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
808            limit_by: None, use_final: false,
809            joins: vec![JoinExpr {
810                schema: "dexes_dim".into(), table: "sol_tokens".into(),
811                alias: "_j0".into(),
812                conditions: vec![("buy_token_address".into(), "token_address".into())],
813                selects: vec![
814                    SelectExpr::Column { column: "name".into(), alias: None },
815                ],
816                group_by: vec![], use_final: false, is_aggregate: false,
817                target_cube: "TokenSearch".into(), join_field: "joinBuyToken".into(),
818                join_type: JoinType::Inner,
819            }],
820            custom_query_builder: None,
821        };
822        let r = ch().compile(&ir);
823        assert!(r.sql.contains("INNER JOIN `dexes_dim`.`sol_tokens` AS _j0"),
824            "should use INNER JOIN, got: {}", r.sql);
825    }
826
827    #[test]
828    fn test_join_full_outer_type() {
829        let ir = QueryIR {
830            cube: "T".into(), schema: "db".into(), table: "t".into(),
831            selects: vec![
832                SelectExpr::Column { column: "id".into(), alias: None },
833            ],
834            filters: FilterNode::Empty, having: FilterNode::Empty,
835            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
836            limit_by: None, use_final: false,
837            joins: vec![JoinExpr {
838                schema: "db2".into(), table: "t2".into(),
839                alias: "_j0".into(),
840                conditions: vec![("id".into(), "ref_id".into())],
841                selects: vec![
842                    SelectExpr::Column { column: "val".into(), alias: None },
843                ],
844                group_by: vec![], use_final: false, is_aggregate: false,
845                target_cube: "Other".into(), join_field: "joinOther".into(),
846                join_type: JoinType::Full,
847            }],
848            custom_query_builder: None,
849        };
850        let r = ch().compile(&ir);
851        assert!(r.sql.contains("FULL OUTER JOIN `db2`.`t2` AS _j0"),
852            "should use FULL OUTER JOIN, got: {}", r.sql);
853    }
854
855    #[test]
856    fn test_custom_query_builder() {
857        let ir = QueryIR {
858            cube: "Custom".into(), schema: "db".into(), table: "t".into(),
859            selects: vec![
860                SelectExpr::Column { column: "id".into(), alias: None },
861            ],
862            filters: FilterNode::Empty, having: FilterNode::Empty,
863            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
864            limit_by: None, use_final: false, joins: vec![],
865            custom_query_builder: Some(QueryBuilderFn(std::sync::Arc::new(|_ir| {
866                CompileResult {
867                    sql: "SELECT 1 FROM custom_view".into(),
868                    bindings: vec![],
869                    alias_remap: vec![],
870                }
871            }))),
872        };
873        let r = ch().compile(&ir);
874        assert_eq!(r.sql, "SELECT 1 FROM custom_view",
875            "custom builder should bypass standard compilation, got: {}", r.sql);
876    }
877}