Skip to main content

activecube_rs/sql/
clickhouse.rs

1use std::collections::HashMap;
2
3use crate::compiler::ir::*;
4use crate::compiler::ir::{CompileResult, JoinType};
5use crate::sql::dialect::SqlDialect;
6
7pub struct ClickHouseDialect;
8
9impl ClickHouseDialect {
10    pub fn new() -> Self {
11        Self
12    }
13}
14
15impl Default for ClickHouseDialect {
16    fn default() -> Self {
17        Self::new()
18    }
19}
20
21impl SqlDialect for ClickHouseDialect {
22    fn compile(&self, ir: &QueryIR) -> CompileResult {
23        if let Some(ref builder) = ir.custom_query_builder {
24            return (builder.0)(ir);
25        }
26
27        let mut bindings = Vec::new();
28        let mut alias_remap: Vec<(String, String)> = Vec::new();
29
30        if ir.joins.is_empty() {
31            let inner_sql = self.compile_inner(ir, &mut bindings, &mut alias_remap);
32            return CompileResult { sql: inner_sql, bindings, alias_remap };
33        }
34
35        // When JOINs are present, force aliases on function expression columns
36        // (e.g. `argMaxMerge(latest_balance)`) so the outer SELECT can reference
37        // them by simple identifier names. Without aliases, `_main.argMaxMerge(x)`
38        // is parsed as a function call — a ClickHouse syntax error.
39        let mut ir_mod = ir.clone();
40        let mut mc_counter = 0u32;
41        for sel in &mut ir_mod.selects {
42            if let SelectExpr::Column { column, alias } = sel {
43                if column.contains('(') && alias.is_none() {
44                    let a = format!("__mc_{mc_counter}");
45                    mc_counter += 1;
46                    alias_remap.push((a.clone(), column.clone()));
47                    *alias = Some(a);
48                }
49            }
50        }
51
52        let mut jc_counter = 0u32;
53        for join in &mut ir_mod.joins {
54            for sel in &mut join.selects {
55                if let SelectExpr::Column { column, alias } = sel {
56                    if column.contains('(') && alias.is_none() {
57                        let a = format!("__jc_{jc_counter}");
58                        jc_counter += 1;
59                        *alias = Some(a);
60                    }
61                }
62            }
63        }
64
65        let inner_sql = self.compile_inner(&ir_mod, &mut bindings, &mut alias_remap);
66
67        // Build outer SELECT with explicit column listing.
68        // Always backtick-quote to prevent ClickHouse auto-prefixing ambiguous
69        // names when multiple JOINed tables share column names.
70        let main_cols: Vec<String> = ir_mod.selects.iter().map(|s| {
71            let col = match s {
72                SelectExpr::Column { column, alias } => alias.as_ref().unwrap_or(column).clone(),
73                SelectExpr::Aggregate { alias, .. } => alias.clone(),
74            };
75            format!("_main.`{}` AS `{}`", col, col)
76        }).collect();
77
78        let mut sql = String::from("SELECT ");
79        sql.push_str(&main_cols.join(", "));
80
81        // Collect joined column aliases for the outer SELECT
82        for join in &ir_mod.joins {
83            for sel in &join.selects {
84                let col_name = match sel {
85                    SelectExpr::Column { column, alias } => alias.as_ref().unwrap_or(column).clone(),
86                    SelectExpr::Aggregate { alias, .. } => alias.clone(),
87                };
88                let outer_alias = format!("{}.{}", join.alias, col_name);
89                if let SelectExpr::Column { column, alias: Some(_) } = sel {
90                    if column.contains('(') {
91                        let outer_original = format!("{}.{}", join.alias, column);
92                        alias_remap.push((outer_alias.clone(), outer_original));
93                    }
94                }
95                sql.push_str(&format!(", {}.`{}` AS `{}`",
96                    join.alias, col_name, outer_alias));
97            }
98        }
99
100        sql.push_str(&format!(" FROM ({}) AS _main", inner_sql));
101
102        // Build a lookup for main query aliases to resolve ON condition columns
103        // that might be function expressions (aliased above).
104        let main_alias_map: HashMap<String, String> = ir_mod.selects.iter()
105            .filter_map(|s| {
106                if let SelectExpr::Column { column, alias: Some(a) } = s {
107                    if column.contains('(') { Some((column.clone(), a.clone())) } else { None }
108                } else { None }
109            })
110            .collect();
111
112        for join in &ir_mod.joins {
113            let join_kw = join.join_type.sql_keyword();
114
115            if join.is_aggregate {
116                // Mode B: subquery JOIN for AggregatingMergeTree targets
117                sql.push_str(&format!(" {} (SELECT ", join_kw));
118                let mut sub_parts: Vec<String> = Vec::new();
119                for gb_col in &join.group_by {
120                    sub_parts.push(quote_col(gb_col));
121                }
122                for sel in &join.selects {
123                    match sel {
124                        SelectExpr::Column { column, alias } => {
125                            let col = if column.contains('(') { column.clone() } else { format!("`{column}`") };
126                            if let Some(a) = alias {
127                                sub_parts.push(format!("{col} AS `{a}`"));
128                            } else if column.contains('(') || !join.group_by.contains(column) {
129                                sub_parts.push(col);
130                            }
131                        }
132                        SelectExpr::Aggregate { function, column, alias, condition } => {
133                            let func = function.to_lowercase();
134                            let expr = match (func.as_str(), column.as_str(), condition) {
135                                ("count", "*", None) => format!("count() AS `{alias}`"),
136                                ("uniq", col, None) => format!("uniq(`{col}`) AS `{alias}`"),
137                                (f, col, None) => format!("{f}(`{col}`) AS `{alias}`"),
138                                (f, col, Some(cond)) => format!("{f}If(`{col}`, {cond}) AS `{alias}`"),
139                            };
140                            sub_parts.push(expr);
141                        }
142                    }
143                }
144                sql.push_str(&sub_parts.join(", "));
145                sql.push_str(&format!(" FROM `{}`.`{}`", join.schema, join.table));
146                if !join.group_by.is_empty() {
147                    sql.push_str(" GROUP BY ");
148                    let gb: Vec<String> = join.group_by.iter().map(|c| quote_col(c)).collect();
149                    sql.push_str(&gb.join(", "));
150                }
151                sql.push_str(&format!(") AS {}", join.alias));
152            } else {
153                // Mode A: direct JOIN for MergeTree / ReplacingMergeTree targets
154                sql.push_str(&format!(" {} `{}`.`{}` AS {}",
155                    join_kw, join.schema, join.table, join.alias));
156                if join.use_final {
157                    sql.push_str(" FINAL");
158                }
159            }
160
161            if join.join_type == JoinType::Cross {
162                // CROSS JOIN has no ON clause
163                continue;
164            }
165
166            // ON conditions — use alias if the local column was a func expression
167            let on_parts: Vec<String> = join.conditions.iter().map(|(local, remote)| {
168                let local_ref = main_alias_map.get(local).unwrap_or(local);
169                format!("_main.`{}` = {}.`{}`", local_ref, join.alias, remote)
170            }).collect();
171            sql.push_str(" ON ");
172            sql.push_str(&on_parts.join(" AND "));
173        }
174
175        CompileResult { sql, bindings, alias_remap }
176    }
177
178    fn quote_identifier(&self, name: &str) -> String {
179        format!("`{name}`")
180    }
181
182    fn name(&self) -> &str {
183        "ClickHouse"
184    }
185}
186
187impl ClickHouseDialect {
188    fn compile_inner(
189        &self,
190        ir: &QueryIR,
191        bindings: &mut Vec<SqlValue>,
192        alias_remap: &mut Vec<(String, String)>,
193    ) -> String {
194        let mut sql = String::new();
195
196        let mut augmented_selects = ir.selects.clone();
197        let mut agg_alias_map: HashMap<String, String> = HashMap::new();
198        let mut alias_counter = 0u32;
199
200        let having_cols: std::collections::HashSet<String> =
201            collect_filter_columns(&ir.having).into_iter().collect();
202        let has_having_agg = having_cols.iter().any(|c| c.contains('('));
203
204        if has_having_agg {
205            for sel in &mut augmented_selects {
206                if let SelectExpr::Column { column, alias } = sel {
207                    if column.contains('(') && having_cols.contains(column.as_str()) {
208                        if alias.is_none() {
209                            let a = format!("__f_{alias_counter}");
210                            alias_counter += 1;
211                            alias_remap.push((a.clone(), column.clone()));
212                            agg_alias_map.insert(column.clone(), a.clone());
213                            *alias = Some(a);
214                        } else if let Some(existing) = alias {
215                            agg_alias_map.insert(column.clone(), existing.clone());
216                        }
217                    }
218                }
219            }
220            for col in &having_cols {
221                if col.contains('(') && !agg_alias_map.contains_key(col.as_str()) {
222                    let a = format!("__f_{alias_counter}");
223                    alias_counter += 1;
224                    agg_alias_map.insert(col.clone(), a.clone());
225                    augmented_selects.push(SelectExpr::Column {
226                        column: col.clone(),
227                        alias: Some(a),
228                    });
229                }
230            }
231        }
232
233        sql.push_str("SELECT ");
234        let select_parts: Vec<String> = augmented_selects.iter().map(|s| match s {
235            SelectExpr::Column { column, alias } => {
236                let col = if column.contains('(') { column.clone() } else { format!("`{column}`") };
237                match alias {
238                    Some(a) => format!("{col} AS `{a}`"),
239                    None => col,
240                }
241            },
242            SelectExpr::Aggregate { function, column, alias, condition } => {
243                let func = function.to_uppercase();
244                match (func.as_str(), column.as_str(), condition) {
245                    ("COUNT", "*", None) => format!("count() AS `{alias}`"),
246                    ("COUNT", "*", Some(cond)) => format!("countIf({cond}) AS `{alias}`"),
247                    ("UNIQ", col, None) => format!("uniq(`{col}`) AS `{alias}`"),
248                    ("UNIQ", col, Some(cond)) => format!("uniqIf(`{col}`, {cond}) AS `{alias}`"),
249                    (_, col, None) => format!("{f}(`{col}`) AS `{alias}`", f = func.to_lowercase()),
250                    (_, col, Some(cond)) => format!("{f}If(`{col}`, {cond}) AS `{alias}`", f = func.to_lowercase()),
251                }
252            }
253        }).collect();
254        sql.push_str(&select_parts.join(", "));
255
256        if let Some(ref subquery) = ir.from_subquery {
257            sql.push_str(&format!(" FROM ({}) AS _t", subquery));
258        } else {
259            sql.push_str(&format!(" FROM `{}`.`{}`", ir.schema, ir.table));
260            if ir.use_final {
261                sql.push_str(" FINAL");
262            }
263        }
264
265        let where_clause = compile_filter(&ir.filters, bindings);
266        if !where_clause.is_empty() {
267            sql.push_str(" WHERE ");
268            sql.push_str(&where_clause);
269        }
270
271        let effective_group_by = if !ir.group_by.is_empty() {
272            ir.group_by.clone()
273        } else {
274            let has_merge_cols = augmented_selects.iter().any(|s| match s {
275                SelectExpr::Column { column, .. } => column.contains("Merge("),
276                SelectExpr::Aggregate { .. } => true,
277            });
278            if has_merge_cols {
279                augmented_selects.iter().filter_map(|s| match s {
280                    SelectExpr::Column { column, .. } if !column.contains("Merge(") && !column.contains('(') => {
281                        Some(column.clone())
282                    }
283                    _ => None,
284                }).collect()
285            } else {
286                vec![]
287            }
288        };
289
290        if !effective_group_by.is_empty() {
291            sql.push_str(" GROUP BY ");
292            let cols: Vec<String> = effective_group_by.iter().map(|c| format!("`{c}`")).collect();
293            sql.push_str(&cols.join(", "));
294        }
295
296        if has_having_agg {
297            let having_clause = compile_filter_with_aliases(&ir.having, bindings, &agg_alias_map);
298            if !having_clause.is_empty() {
299                sql.push_str(" HAVING ");
300                sql.push_str(&having_clause);
301            }
302        } else {
303            let having_clause = compile_filter(&ir.having, bindings);
304            if !having_clause.is_empty() {
305                sql.push_str(" HAVING ");
306                sql.push_str(&having_clause);
307            }
308        }
309
310        if !ir.order_by.is_empty() {
311            sql.push_str(" ORDER BY ");
312            let parts: Vec<String> = ir.order_by.iter().map(|o| {
313                let col = if o.column.contains('(') {
314                    agg_alias_map.get(&o.column)
315                        .map(|a| format!("`{a}`"))
316                        .unwrap_or_else(|| o.column.clone())
317                } else {
318                    format!("`{}`", o.column)
319                };
320                let dir = if o.descending { "DESC" } else { "ASC" };
321                format!("{col} {dir}")
322            }).collect();
323            sql.push_str(&parts.join(", "));
324        }
325
326        if let Some(ref lb) = ir.limit_by {
327            let by_cols: Vec<String> = lb.columns.iter().map(|c| format!("`{c}`")).collect();
328            sql.push_str(&format!(" LIMIT {} BY {}", lb.count, by_cols.join(", ")));
329            if lb.offset > 0 {
330                sql.push_str(&format!(" OFFSET {}", lb.offset));
331            }
332        }
333
334        sql.push_str(&format!(" LIMIT {}", ir.limit));
335        if ir.offset > 0 {
336            sql.push_str(&format!(" OFFSET {}", ir.offset));
337        }
338
339        sql
340    }
341}
342
343/// Collect all column names referenced in a filter tree.
344fn collect_filter_columns(node: &FilterNode) -> Vec<String> {
345    match node {
346        FilterNode::Empty => vec![],
347        FilterNode::Condition { column, .. } => vec![column.clone()],
348        FilterNode::And(children) | FilterNode::Or(children) => {
349            children.iter().flat_map(collect_filter_columns).collect()
350        }
351    }
352}
353
354/// Like `compile_filter` but replaces aggregate expression columns with their
355/// SELECT aliases so ClickHouse can resolve them in HAVING scope.
356fn compile_filter_with_aliases(
357    node: &FilterNode,
358    bindings: &mut Vec<SqlValue>,
359    aliases: &HashMap<String, String>,
360) -> String {
361    match node {
362        FilterNode::Empty => String::new(),
363        FilterNode::Condition { column, op, value } => {
364            let effective_col = aliases.get(column)
365                .map(|a| a.as_str())
366                .unwrap_or(column.as_str());
367            compile_condition(effective_col, op, value, bindings)
368        }
369        FilterNode::And(children) => {
370            let parts: Vec<String> = children.iter()
371                .map(|c| compile_filter_with_aliases(c, bindings, aliases))
372                .filter(|s| !s.is_empty())
373                .collect();
374            match parts.len() {
375                0 => String::new(),
376                1 => parts.into_iter().next().unwrap(),
377                _ => format!("({})", parts.join(" AND ")),
378            }
379        }
380        FilterNode::Or(children) => {
381            let parts: Vec<String> = children.iter()
382                .map(|c| compile_filter_with_aliases(c, bindings, aliases))
383                .filter(|s| !s.is_empty())
384                .collect();
385            match parts.len() {
386                0 => String::new(),
387                1 => parts.into_iter().next().unwrap(),
388                _ => format!("({})", parts.join(" OR ")),
389            }
390        }
391    }
392}
393
394fn compile_filter(node: &FilterNode, bindings: &mut Vec<SqlValue>) -> String {
395    match node {
396        FilterNode::Empty => String::new(),
397        FilterNode::Condition { column, op, value } => {
398            compile_condition(column, op, value, bindings)
399        }
400        FilterNode::And(children) => {
401            let parts: Vec<String> = children.iter()
402                .map(|c| compile_filter(c, bindings))
403                .filter(|s| !s.is_empty())
404                .collect();
405            match parts.len() {
406                0 => String::new(),
407                1 => parts.into_iter().next().unwrap(),
408                _ => format!("({})", parts.join(" AND ")),
409            }
410        }
411        FilterNode::Or(children) => {
412            let parts: Vec<String> = children.iter()
413                .map(|c| compile_filter(c, bindings))
414                .filter(|s| !s.is_empty())
415                .collect();
416            match parts.len() {
417                0 => String::new(),
418                1 => parts.into_iter().next().unwrap(),
419                _ => format!("({})", parts.join(" OR ")),
420            }
421        }
422    }
423}
424
425fn quote_col(column: &str) -> String {
426    if column.contains('(') {
427        column.to_string()
428    } else {
429        format!("`{column}`")
430    }
431}
432
433fn compile_condition(
434    column: &str, op: &CompareOp, value: &SqlValue, bindings: &mut Vec<SqlValue>,
435) -> String {
436    let col = quote_col(column);
437    match op {
438        CompareOp::In | CompareOp::NotIn => {
439            if let SqlValue::String(csv) = value {
440                let items: Vec<&str> = csv.split(',').collect();
441                let placeholders: Vec<&str> = items.iter().map(|_| "?").collect();
442                for item in &items {
443                    bindings.push(SqlValue::String(item.trim().to_string()));
444                }
445                format!("{col} {} ({})", op.sql_op(), placeholders.join(", "))
446            } else {
447                bindings.push(value.clone());
448                format!("{col} {} (?)", op.sql_op())
449            }
450        }
451        CompareOp::Includes => {
452            if let SqlValue::String(s) = value {
453                bindings.push(SqlValue::String(format!("%{s}%")));
454            } else {
455                bindings.push(value.clone());
456            }
457            format!("{col} LIKE ?")
458        }
459        CompareOp::IsNull | CompareOp::IsNotNull => {
460            format!("{col} {}", op.sql_op())
461        }
462        _ => {
463            bindings.push(value.clone());
464            format!("{col} {} ?", op.sql_op())
465        }
466    }
467}
468
469#[cfg(test)]
470mod tests {
471    use super::*;
472
473    fn ch() -> ClickHouseDialect { ClickHouseDialect::new() }
474
475    #[test]
476    fn test_simple_select() {
477        let ir = QueryIR {
478            cube: "DEXTrades".into(), schema: "default".into(),
479            table: "dwd_dex_trades".into(),
480            selects: vec![
481                SelectExpr::Column { column: "tx_hash".into(), alias: None },
482                SelectExpr::Column { column: "token_a_amount".into(), alias: None },
483            ],
484            filters: FilterNode::Empty, having: FilterNode::Empty,
485            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
486            limit_by: None,
487            use_final: false,
488            joins: vec![],
489            custom_query_builder: None,
490            from_subquery: None,
491        };
492        let r = ch().compile(&ir);
493        assert_eq!(r.sql, "SELECT `tx_hash`, `token_a_amount` FROM `default`.`dwd_dex_trades` LIMIT 10");
494        assert!(r.bindings.is_empty());
495    }
496
497    #[test]
498    fn test_final_keyword() {
499        let ir = QueryIR {
500            cube: "T".into(), schema: "db".into(), table: "tokens".into(),
501            selects: vec![SelectExpr::Column { column: "id".into(), alias: None }],
502            filters: FilterNode::Empty, having: FilterNode::Empty,
503            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
504            limit_by: None,
505            use_final: true,
506            joins: vec![],
507            custom_query_builder: None,
508            from_subquery: None,
509        };
510        let r = ch().compile(&ir);
511        assert!(r.sql.contains("FROM `db`.`tokens` FINAL"), "FINAL should be appended, got: {}", r.sql);
512    }
513
514    #[test]
515    fn test_uniq_uses_native_function() {
516        let ir = QueryIR {
517            cube: "T".into(), schema: "db".into(), table: "t".into(),
518            selects: vec![
519                SelectExpr::Aggregate { function: "UNIQ".into(), column: "wallet".into(), alias: "__uniq".into(), condition: None },
520            ],
521            filters: FilterNode::Empty, having: FilterNode::Empty,
522            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
523            limit_by: None, use_final: false, joins: vec![], custom_query_builder: None, from_subquery: None,
524        };
525        let r = ch().compile(&ir);
526        assert!(r.sql.contains("uniq(`wallet`) AS `__uniq`"), "ClickHouse should use native uniq(), got: {}", r.sql);
527    }
528
529    #[test]
530    fn test_count_star() {
531        let ir = QueryIR {
532            cube: "T".into(), schema: "db".into(), table: "t".into(),
533            selects: vec![
534                SelectExpr::Aggregate { function: "COUNT".into(), column: "*".into(), alias: "__count".into(), condition: None },
535            ],
536            filters: FilterNode::Empty, having: FilterNode::Empty,
537            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
538            limit_by: None, use_final: false, joins: vec![], custom_query_builder: None, from_subquery: None,
539        };
540        let r = ch().compile(&ir);
541        assert!(r.sql.contains("count() AS `__count`"), "ClickHouse should use count() not COUNT(*), got: {}", r.sql);
542    }
543
544    #[test]
545    fn test_aggregate_lowercase() {
546        let ir = QueryIR {
547            cube: "T".into(), schema: "db".into(), table: "t".into(),
548            selects: vec![
549                SelectExpr::Aggregate { function: "SUM".into(), column: "amount".into(), alias: "__sum".into(), condition: None },
550                SelectExpr::Aggregate { function: "AVG".into(), column: "price".into(), alias: "__avg".into(), condition: None },
551            ],
552            filters: FilterNode::Empty, having: FilterNode::Empty,
553            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
554            limit_by: None, use_final: false, joins: vec![], custom_query_builder: None, from_subquery: None,
555        };
556        let r = ch().compile(&ir);
557        assert!(r.sql.contains("sum(`amount`) AS `__sum`"), "ClickHouse functions should be lowercase, got: {}", r.sql);
558        assert!(r.sql.contains("avg(`price`) AS `__avg`"), "got: {}", r.sql);
559    }
560
561    #[test]
562    fn test_where_and_order() {
563        let ir = QueryIR {
564            cube: "T".into(), schema: "db".into(), table: "t".into(),
565            selects: vec![SelectExpr::Column { column: "id".into(), alias: None }],
566            filters: FilterNode::And(vec![
567                FilterNode::Condition { column: "chain_id".into(), op: CompareOp::Eq, value: SqlValue::Int(1) },
568                FilterNode::Condition { column: "amount_usd".into(), op: CompareOp::Gt, value: SqlValue::Float(1000.0) },
569            ]),
570            having: FilterNode::Empty, group_by: vec![],
571            order_by: vec![OrderExpr { column: "block_timestamp".into(), descending: true }],
572            limit: 25, offset: 0,
573            limit_by: None, use_final: false, joins: vec![], custom_query_builder: None, from_subquery: None,
574        };
575        let r = ch().compile(&ir);
576        assert!(r.sql.contains("WHERE (`chain_id` = ? AND `amount_usd` > ?)"));
577        assert!(r.sql.contains("ORDER BY `block_timestamp` DESC"));
578        assert_eq!(r.bindings.len(), 2);
579    }
580
581    #[test]
582    fn test_having_with_aggregate_expr() {
583        let ir = QueryIR {
584            cube: "T".into(), schema: "db".into(), table: "t".into(),
585            selects: vec![
586                SelectExpr::Column { column: "token_address".into(), alias: None },
587                SelectExpr::Aggregate { function: "SUM".into(), column: "amount_usd".into(), alias: "__sum".into(), condition: None },
588            ],
589            filters: FilterNode::Empty,
590            having: FilterNode::Condition {
591                column: "sum(`amount_usd`)".into(), op: CompareOp::Gt, value: SqlValue::Float(1000000.0),
592            },
593            group_by: vec!["token_address".into()], order_by: vec![], limit: 25, offset: 0,
594            limit_by: None, use_final: false, joins: vec![], custom_query_builder: None, from_subquery: None,
595        };
596        let r = ch().compile(&ir);
597        assert!(r.sql.contains("GROUP BY `token_address`"));
598        assert!(r.sql.contains("HAVING `__f_0` > ?"), "expected alias in HAVING, got: {}", r.sql);
599        assert!(r.sql.contains("sum(`amount_usd`) AS `__f_0`"), "expected alias in SELECT, got: {}", r.sql);
600        assert_eq!(r.bindings.len(), 1);
601    }
602
603    #[test]
604    fn test_having_appends_missing_agg_column() {
605        let ir = QueryIR {
606            cube: "T".into(), schema: "db".into(), table: "t".into(),
607            selects: vec![
608                SelectExpr::Column { column: "pool_address".into(), alias: None },
609                SelectExpr::Column { column: "argMaxMerge(latest_liquidity_usd_state)".into(), alias: None },
610            ],
611            filters: FilterNode::Empty,
612            having: FilterNode::And(vec![
613                FilterNode::Condition {
614                    column: "argMaxMerge(latest_liquidity_usd_state)".into(),
615                    op: CompareOp::Gt, value: SqlValue::Float(2.0),
616                },
617                FilterNode::Condition {
618                    column: "argMaxMerge(latest_token_a_amount_state)".into(),
619                    op: CompareOp::Gt, value: SqlValue::Float(3.0),
620                },
621            ]),
622            group_by: vec!["pool_address".into()], order_by: vec![], limit: 25, offset: 0,
623            limit_by: None, use_final: false, joins: vec![], custom_query_builder: None, from_subquery: None,
624        };
625        let r = ch().compile(&ir);
626        assert!(r.sql.contains("argMaxMerge(latest_liquidity_usd_state) AS `__f_0`"),
627            "existing HAVING col should be aliased, got: {}", r.sql);
628        assert!(r.sql.contains("argMaxMerge(latest_token_a_amount_state) AS `__f_1`"),
629            "missing agg col should be appended, got: {}", r.sql);
630        assert!(r.sql.contains("HAVING (`__f_0` > ? AND `__f_1` > ?)"),
631            "HAVING should use aliases, got: {}", r.sql);
632        assert_eq!(r.bindings.len(), 2);
633        assert_eq!(r.alias_remap.len(), 1);
634        assert_eq!(r.alias_remap[0], ("__f_0".to_string(), "argMaxMerge(latest_liquidity_usd_state)".to_string()));
635    }
636
637    #[test]
638    fn test_limit_by() {
639        let ir = QueryIR {
640            cube: "T".into(), schema: "db".into(), table: "t".into(),
641            selects: vec![
642                SelectExpr::Column { column: "owner".into(), alias: None },
643                SelectExpr::Column { column: "amount".into(), alias: None },
644            ],
645            filters: FilterNode::Empty, having: FilterNode::Empty,
646            group_by: vec![], 
647            order_by: vec![OrderExpr { column: "amount".into(), descending: true }],
648            limit: 100, offset: 0,
649            limit_by: Some(LimitByExpr { count: 3, offset: 0, columns: vec!["owner".into()] }),
650            use_final: false, joins: vec![], custom_query_builder: None, from_subquery: None,
651        };
652        let r = ch().compile(&ir);
653        let sql = &r.sql;
654        assert!(sql.contains("LIMIT 3 BY `owner`"), "LIMIT BY should be present, got: {sql}");
655        assert!(sql.contains("ORDER BY `amount` DESC"), "ORDER BY should be present, got: {sql}");
656        assert!(sql.contains("LIMIT 100"), "outer LIMIT should be present, got: {sql}");
657        let order_by_pos = sql.find("ORDER BY").unwrap();
658        let limit_by_pos = sql.find("LIMIT 3 BY").unwrap();
659        let limit_pos = sql.rfind("LIMIT 100").unwrap();
660        assert!(order_by_pos < limit_by_pos, "ORDER BY should come before LIMIT BY in ClickHouse");
661        assert!(limit_by_pos < limit_pos, "LIMIT BY should come before outer LIMIT");
662    }
663
664    #[test]
665    fn test_limit_by_with_offset() {
666        let ir = QueryIR {
667            cube: "T".into(), schema: "db".into(), table: "t".into(),
668            selects: vec![SelectExpr::Column { column: "id".into(), alias: None }],
669            filters: FilterNode::Empty, having: FilterNode::Empty,
670            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
671            limit_by: Some(LimitByExpr { count: 5, offset: 2, columns: vec!["token".into(), "wallet".into()] }),
672            use_final: false, joins: vec![], custom_query_builder: None, from_subquery: None,
673        };
674        let r = ch().compile(&ir);
675        assert!(r.sql.contains("LIMIT 5 BY `token`, `wallet` OFFSET 2"), "multi-column LIMIT BY with offset, got: {}", r.sql);
676    }
677
678    #[test]
679    fn test_join_direct() {
680        let ir = QueryIR {
681            cube: "DEXTrades".into(), schema: "dexes_dwd".into(),
682            table: "sol_dex_trades".into(),
683            selects: vec![
684                SelectExpr::Column { column: "tx_hash".into(), alias: None },
685                SelectExpr::Column { column: "buy_token_address".into(), alias: None },
686            ],
687            filters: FilterNode::Empty, having: FilterNode::Empty,
688            group_by: vec![], order_by: vec![], limit: 25, offset: 0,
689            limit_by: None, use_final: false,
690            joins: vec![JoinExpr {
691                schema: "dexes_dim".into(), table: "sol_tokens".into(),
692                alias: "_j0".into(),
693                conditions: vec![("buy_token_address".into(), "token_address".into())],
694                selects: vec![
695                    SelectExpr::Column { column: "name".into(), alias: None },
696                    SelectExpr::Column { column: "symbol".into(), alias: None },
697                ],
698                group_by: vec![], use_final: true, is_aggregate: false,
699                target_cube: "TokenSearch".into(), join_field: "joinBuyToken".into(),
700                join_type: JoinType::Left,
701            }],
702            custom_query_builder: None,
703            from_subquery: None,
704        };
705        let r = ch().compile(&ir);
706        assert!(r.sql.contains("FROM (SELECT"), "main query should be wrapped, got: {}", r.sql);
707        assert!(r.sql.contains("LEFT JOIN `dexes_dim`.`sol_tokens` AS _j0 FINAL"),
708            "direct JOIN with FINAL after alias, got: {}", r.sql);
709        assert!(r.sql.contains("_main.`buy_token_address` = _j0.`token_address`"),
710            "ON condition, got: {}", r.sql);
711        assert!(r.sql.contains("_j0.`name` AS `_j0.name`"), "joined col alias, got: {}", r.sql);
712    }
713
714    #[test]
715    fn test_join_aggregate_subquery() {
716        let ir = QueryIR {
717            cube: "DEXTrades".into(), schema: "dexes_dwd".into(),
718            table: "sol_dex_trades".into(),
719            selects: vec![
720                SelectExpr::Column { column: "tx_hash".into(), alias: None },
721                SelectExpr::Column { column: "buy_token_address".into(), alias: None },
722            ],
723            filters: FilterNode::Empty, having: FilterNode::Empty,
724            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
725            limit_by: None, use_final: false,
726            joins: vec![JoinExpr {
727                schema: "dexes_dws".into(), table: "sol_token_market_cap".into(),
728                alias: "_j0".into(),
729                conditions: vec![("buy_token_address".into(), "token_address".into())],
730                selects: vec![
731                    SelectExpr::Column { column: "argMaxMerge(latest_market_cap_usd_state)".into(), alias: None },
732                ],
733                group_by: vec!["token_address".into()],
734                use_final: false, is_aggregate: true,
735                target_cube: "TokenMarketCap".into(), join_field: "joinBuyTokenMarketCap".into(),
736                join_type: JoinType::Left,
737            }],
738            custom_query_builder: None,
739            from_subquery: None,
740        };
741        let r = ch().compile(&ir);
742        assert!(r.sql.contains("LEFT JOIN (SELECT"), "aggregate should use subquery, got: {}", r.sql);
743        assert!(r.sql.contains("GROUP BY `token_address`"), "subquery GROUP BY, got: {}", r.sql);
744        assert!(r.sql.contains("FROM `dexes_dws`.`sol_token_market_cap`"), "subquery FROM, got: {}", r.sql);
745        assert!(r.sql.contains("argMaxMerge(latest_market_cap_usd_state) AS `__jc_0`"),
746            "join func expr should be aliased in subquery, got: {}", r.sql);
747        assert!(r.sql.contains("_j0.`__jc_0` AS `_j0.__jc_0`"),
748            "outer SELECT should use alias for join func col, got: {}", r.sql);
749    }
750
751    #[test]
752    fn test_join_main_query_func_expression_columns() {
753        let ir = QueryIR {
754            cube: "TokenHolders".into(), schema: "dws".into(),
755            table: "sol_token_holders".into(),
756            selects: vec![
757                SelectExpr::Column { column: "token".into(), alias: None },
758                SelectExpr::Column { column: "holder".into(), alias: None },
759                SelectExpr::Column { column: "argMaxMerge(latest_balance)".into(), alias: None },
760                SelectExpr::Column { column: "argMaxMerge(latest_balance_usd)".into(), alias: None },
761                SelectExpr::Column { column: "minMerge(first_seen)".into(), alias: None },
762                SelectExpr::Column { column: "maxMerge(last_seen)".into(), alias: None },
763            ],
764            filters: FilterNode::Empty, having: FilterNode::Empty,
765            group_by: vec![], order_by: vec![
766                OrderExpr { column: "argMaxMerge(latest_balance_usd)".into(), descending: true },
767            ],
768            limit: 100, offset: 0,
769            limit_by: None, use_final: false,
770            joins: vec![JoinExpr {
771                schema: "dim".into(), table: "sol_tokens".into(),
772                alias: "_j0".into(),
773                conditions: vec![("token".into(), "token_address".into())],
774                selects: vec![
775                    SelectExpr::Column { column: "name".into(), alias: None },
776                    SelectExpr::Column { column: "symbol".into(), alias: None },
777                ],
778                group_by: vec![], use_final: true, is_aggregate: false,
779                target_cube: "TokenSearch".into(), join_field: "joinToken".into(),
780                join_type: JoinType::Left,
781            }],
782            custom_query_builder: None,
783            from_subquery: None,
784        };
785        let r = ch().compile(&ir);
786        let sql = &r.sql;
787
788        assert!(sql.contains("_main.`__mc_0` AS `__mc_0`"),
789            "func expr should use alias __mc_0 in outer SELECT, got: {sql}");
790        assert!(sql.contains("_main.`__mc_1` AS `__mc_1`"),
791            "func expr should use alias __mc_1, got: {sql}");
792        assert!(sql.contains("_main.`token` AS `token`"),
793            "simple col should be backtick-quoted, got: {sql}");
794
795        assert!(!sql.contains("_main.argMaxMerge("),
796            "outer SELECT must NOT have bare _main.argMaxMerge(...), got: {sql}");
797
798        assert!(sql.contains("argMaxMerge(latest_balance) AS `__mc_0`"),
799            "inner query should alias func expr, got: {sql}");
800
801        assert!(r.alias_remap.iter().any(|(a, o)| a == "__mc_0" && o == "argMaxMerge(latest_balance)"),
802            "alias_remap should map __mc_0 → original, got: {:?}", r.alias_remap);
803        assert!(r.alias_remap.iter().any(|(a, o)| a == "__mc_1" && o == "argMaxMerge(latest_balance_usd)"),
804            "alias_remap should map __mc_1, got: {:?}", r.alias_remap);
805    }
806
807    #[test]
808    fn test_join_inner_type() {
809        let ir = QueryIR {
810            cube: "DEXTrades".into(), schema: "dexes_dwd".into(),
811            table: "sol_dex_trades".into(),
812            selects: vec![
813                SelectExpr::Column { column: "tx_hash".into(), alias: None },
814            ],
815            filters: FilterNode::Empty, having: FilterNode::Empty,
816            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
817            limit_by: None, use_final: false,
818            joins: vec![JoinExpr {
819                schema: "dexes_dim".into(), table: "sol_tokens".into(),
820                alias: "_j0".into(),
821                conditions: vec![("buy_token_address".into(), "token_address".into())],
822                selects: vec![
823                    SelectExpr::Column { column: "name".into(), alias: None },
824                ],
825                group_by: vec![], use_final: false, is_aggregate: false,
826                target_cube: "TokenSearch".into(), join_field: "joinBuyToken".into(),
827                join_type: JoinType::Inner,
828            }],
829            custom_query_builder: None,
830            from_subquery: None,
831        };
832        let r = ch().compile(&ir);
833        assert!(r.sql.contains("INNER JOIN `dexes_dim`.`sol_tokens` AS _j0"),
834            "should use INNER JOIN, got: {}", r.sql);
835    }
836
837    #[test]
838    fn test_join_full_outer_type() {
839        let ir = QueryIR {
840            cube: "T".into(), schema: "db".into(), table: "t".into(),
841            selects: vec![
842                SelectExpr::Column { column: "id".into(), alias: None },
843            ],
844            filters: FilterNode::Empty, having: FilterNode::Empty,
845            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
846            limit_by: None, use_final: false,
847            joins: vec![JoinExpr {
848                schema: "db2".into(), table: "t2".into(),
849                alias: "_j0".into(),
850                conditions: vec![("id".into(), "ref_id".into())],
851                selects: vec![
852                    SelectExpr::Column { column: "val".into(), alias: None },
853                ],
854                group_by: vec![], use_final: false, is_aggregate: false,
855                target_cube: "Other".into(), join_field: "joinOther".into(),
856                join_type: JoinType::Full,
857            }],
858            custom_query_builder: None,
859            from_subquery: None,
860        };
861        let r = ch().compile(&ir);
862        assert!(r.sql.contains("FULL OUTER JOIN `db2`.`t2` AS _j0"),
863            "should use FULL OUTER JOIN, got: {}", r.sql);
864    }
865
866    #[test]
867    fn test_custom_query_builder() {
868        let ir = QueryIR {
869            cube: "Custom".into(), schema: "db".into(), table: "t".into(),
870            selects: vec![
871                SelectExpr::Column { column: "id".into(), alias: None },
872            ],
873            filters: FilterNode::Empty, having: FilterNode::Empty,
874            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
875            limit_by: None, use_final: false, joins: vec![],
876            custom_query_builder: Some(QueryBuilderFn(std::sync::Arc::new(|_ir| {
877                CompileResult {
878                    sql: "SELECT 1 FROM custom_view".into(),
879                    bindings: vec![],
880                    alias_remap: vec![],
881                }
882            }))),
883            from_subquery: None,
884        };
885        let r = ch().compile(&ir);
886        assert_eq!(r.sql, "SELECT 1 FROM custom_view",
887            "custom builder should bypass standard compilation, got: {}", r.sql);
888    }
889
890    #[test]
891    fn test_from_subquery() {
892        let ir = QueryIR {
893            cube: "DEXTradeByTokens".into(), schema: "dwd".into(),
894            table: "sol_trades".into(),
895            selects: vec![
896                SelectExpr::Column { column: "amount".into(), alias: None },
897                SelectExpr::Column { column: "side_type".into(), alias: None },
898            ],
899            filters: FilterNode::Condition {
900                column: "token".into(), op: CompareOp::Eq,
901                value: SqlValue::String("SOL".into()),
902            },
903            having: FilterNode::Empty,
904            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
905            limit_by: None, use_final: false, joins: vec![],
906            custom_query_builder: None,
907            from_subquery: Some(
908                "SELECT amount, 'buy' AS side_type, token FROM dwd.sol_a UNION ALL SELECT amount, 'sell' AS side_type, token FROM dwd.sol_b".into()
909            ),
910        };
911        let r = ch().compile(&ir);
912        assert!(r.sql.starts_with("SELECT `amount`, `side_type` FROM (SELECT"),
913            "should use subquery in FROM, got: {}", r.sql);
914        assert!(r.sql.contains("UNION ALL"),
915            "subquery should contain UNION ALL, got: {}", r.sql);
916        assert!(r.sql.contains(") AS _t"),
917            "subquery should be aliased as _t, got: {}", r.sql);
918        assert!(r.sql.contains("WHERE `token` = ?"),
919            "WHERE clause should be applied to subquery result, got: {}", r.sql);
920        assert!(!r.sql.contains("FROM `dwd`.`sol_trades`"),
921            "should NOT use schema.table when from_subquery is set, got: {}", r.sql);
922    }
923}