Skip to main content

activecube_rs/sql/
clickhouse.rs

1use std::collections::HashMap;
2
3use crate::compiler::ir::*;
4use crate::compiler::ir::{CompileResult, JoinType};
5use crate::sql::dialect::SqlDialect;
6
7pub struct ClickHouseDialect;
8
9impl ClickHouseDialect {
10    pub fn new() -> Self {
11        Self
12    }
13}
14
15impl Default for ClickHouseDialect {
16    fn default() -> Self {
17        Self::new()
18    }
19}
20
21impl SqlDialect for ClickHouseDialect {
22    fn compile(&self, ir: &QueryIR) -> CompileResult {
23        if let Some(ref builder) = ir.custom_query_builder {
24            return (builder.0)(ir);
25        }
26
27        let mut bindings = Vec::new();
28        let mut alias_remap: Vec<(String, String)> = Vec::new();
29
30        if ir.joins.is_empty() {
31            let inner_sql = self.compile_inner(ir, &mut bindings, &mut alias_remap);
32            return CompileResult { sql: inner_sql, bindings, alias_remap };
33        }
34
35        // When JOINs are present, force aliases on function expression columns
36        // (e.g. `argMaxMerge(latest_balance)`) so the outer SELECT can reference
37        // them by simple identifier names. Without aliases, `_main.argMaxMerge(x)`
38        // is parsed as a function call — a ClickHouse syntax error.
39        let mut ir_mod = ir.clone();
40        let mut mc_counter = 0u32;
41        for sel in &mut ir_mod.selects {
42            if let SelectExpr::Column { column, alias } = sel {
43                if column.contains('(') && alias.is_none() {
44                    let a = format!("__mc_{mc_counter}");
45                    mc_counter += 1;
46                    alias_remap.push((a.clone(), column.clone()));
47                    *alias = Some(a);
48                }
49            }
50        }
51
52        let mut jc_counter = 0u32;
53        for join in &mut ir_mod.joins {
54            for sel in &mut join.selects {
55                if let SelectExpr::Column { column, alias } = sel {
56                    if column.contains('(') && alias.is_none() {
57                        let a = format!("__jc_{jc_counter}");
58                        jc_counter += 1;
59                        *alias = Some(a);
60                    }
61                }
62            }
63        }
64
65        let inner_sql = self.compile_inner(&ir_mod, &mut bindings, &mut alias_remap);
66
67        // Build outer SELECT with explicit column listing.
68        // Always backtick-quote to prevent ClickHouse auto-prefixing ambiguous
69        // names when multiple JOINed tables share column names.
70        let main_cols: Vec<String> = ir_mod.selects.iter().map(|s| {
71            let col = match s {
72                SelectExpr::Column { column, alias } => alias.as_ref().unwrap_or(column).clone(),
73                SelectExpr::Aggregate { alias, .. } | SelectExpr::DimAggregate { alias, .. } => alias.clone(),
74            };
75            format!("_main.`{}` AS `{}`", col, col)
76        }).collect();
77
78        let mut sql = String::from("SELECT ");
79        sql.push_str(&main_cols.join(", "));
80
81        // Collect joined column aliases for the outer SELECT
82        for join in &ir_mod.joins {
83            for sel in &join.selects {
84                let col_name = match sel {
85                    SelectExpr::Column { column, alias } => alias.as_ref().unwrap_or(column).clone(),
86                    SelectExpr::Aggregate { alias, .. } | SelectExpr::DimAggregate { alias, .. } => alias.clone(),
87                };
88                let outer_alias = format!("{}.{}", join.alias, col_name);
89                if let SelectExpr::Column { column, alias: Some(_) } = sel {
90                    if column.contains('(') {
91                        let outer_original = format!("{}.{}", join.alias, column);
92                        alias_remap.push((outer_alias.clone(), outer_original));
93                    }
94                }
95                sql.push_str(&format!(", {}.`{}` AS `{}`",
96                    join.alias, col_name, outer_alias));
97            }
98        }
99
100        sql.push_str(&format!(" FROM ({}) AS _main", inner_sql));
101
102        // Build a lookup for main query aliases to resolve ON condition columns
103        // that might be function expressions (aliased above).
104        let main_alias_map: HashMap<String, String> = ir_mod.selects.iter()
105            .filter_map(|s| {
106                if let SelectExpr::Column { column, alias: Some(a) } = s {
107                    if column.contains('(') { Some((column.clone(), a.clone())) } else { None }
108                } else { None }
109            })
110            .collect();
111
112        for join in &ir_mod.joins {
113            let join_kw = join.join_type.sql_keyword();
114
115            if join.is_aggregate {
116                // Mode B: subquery JOIN for AggregatingMergeTree targets
117                sql.push_str(&format!(" {} (SELECT ", join_kw));
118                let mut sub_parts: Vec<String> = Vec::new();
119                for gb_col in &join.group_by {
120                    sub_parts.push(quote_col(gb_col));
121                }
122                for sel in &join.selects {
123                    match sel {
124                        SelectExpr::Column { column, alias } => {
125                            let col = if column.contains('(') { column.clone() } else { format!("`{column}`") };
126                            if let Some(a) = alias {
127                                sub_parts.push(format!("{col} AS `{a}`"));
128                            } else if column.contains('(') || !join.group_by.contains(column) {
129                                sub_parts.push(col);
130                            }
131                        }
132                        SelectExpr::Aggregate { function, column, alias, condition } => {
133                            let func = function.to_lowercase();
134                            let expr = match (func.as_str(), column.as_str(), condition) {
135                                ("count", "*", None) => format!("count() AS `{alias}`"),
136                                ("uniq", col, None) => format!("uniq(`{col}`) AS `{alias}`"),
137                                (f, col, None) => format!("{f}(`{col}`) AS `{alias}`"),
138                                (f, col, Some(cond)) => format!("{f}If(`{col}`, {cond}) AS `{alias}`"),
139                            };
140                            sub_parts.push(expr);
141                        }
142                        SelectExpr::DimAggregate { agg_type, value_column, compare_column, alias, condition } => {
143                            let func = match agg_type {
144                                DimAggType::ArgMax => "argMax",
145                                DimAggType::ArgMin => "argMin",
146                            };
147                            let expr = match condition {
148                                None => format!("{func}(`{value_column}`, `{compare_column}`) AS `{alias}`"),
149                                Some(cond) => format!("{func}If(`{value_column}`, `{compare_column}`, {cond}) AS `{alias}`"),
150                            };
151                            sub_parts.push(expr);
152                        }
153                    }
154                }
155                sql.push_str(&sub_parts.join(", "));
156                sql.push_str(&format!(" FROM `{}`.`{}`", join.schema, join.table));
157                if !join.group_by.is_empty() {
158                    sql.push_str(" GROUP BY ");
159                    let gb: Vec<String> = join.group_by.iter().map(|c| quote_col(c)).collect();
160                    sql.push_str(&gb.join(", "));
161                }
162                sql.push_str(&format!(") AS {}", join.alias));
163            } else {
164                // Mode A: direct JOIN for MergeTree / ReplacingMergeTree targets
165                sql.push_str(&format!(" {} `{}`.`{}` AS {}",
166                    join_kw, join.schema, join.table, join.alias));
167                if join.use_final {
168                    sql.push_str(" FINAL");
169                }
170            }
171
172            if join.join_type == JoinType::Cross {
173                // CROSS JOIN has no ON clause
174                continue;
175            }
176
177            // ON conditions — use alias if the local column was a func expression
178            let on_parts: Vec<String> = join.conditions.iter().map(|(local, remote)| {
179                let local_ref = main_alias_map.get(local).unwrap_or(local);
180                format!("_main.`{}` = {}.`{}`", local_ref, join.alias, remote)
181            }).collect();
182            sql.push_str(" ON ");
183            sql.push_str(&on_parts.join(" AND "));
184        }
185
186        CompileResult { sql, bindings, alias_remap }
187    }
188
189    fn quote_identifier(&self, name: &str) -> String {
190        format!("`{name}`")
191    }
192
193    fn name(&self) -> &str {
194        "ClickHouse"
195    }
196}
197
198impl ClickHouseDialect {
199    fn compile_inner(
200        &self,
201        ir: &QueryIR,
202        bindings: &mut Vec<SqlValue>,
203        alias_remap: &mut Vec<(String, String)>,
204    ) -> String {
205        let mut sql = String::new();
206
207        let mut augmented_selects = ir.selects.clone();
208        let mut agg_alias_map: HashMap<String, String> = HashMap::new();
209        let mut alias_counter = 0u32;
210
211        let having_cols: std::collections::HashSet<String> =
212            collect_filter_columns(&ir.having).into_iter().collect();
213        let has_having_agg = having_cols.iter().any(|c| c.contains('('));
214
215        if has_having_agg {
216            for sel in &mut augmented_selects {
217                if let SelectExpr::Column { column, alias } = sel {
218                    if column.contains('(') && having_cols.contains(column.as_str()) {
219                        if alias.is_none() {
220                            let a = format!("__f_{alias_counter}");
221                            alias_counter += 1;
222                            alias_remap.push((a.clone(), column.clone()));
223                            agg_alias_map.insert(column.clone(), a.clone());
224                            *alias = Some(a);
225                        } else if let Some(existing) = alias {
226                            agg_alias_map.insert(column.clone(), existing.clone());
227                        }
228                    }
229                }
230            }
231            for col in &having_cols {
232                if col.contains('(') && !agg_alias_map.contains_key(col.as_str()) {
233                    let a = format!("__f_{alias_counter}");
234                    alias_counter += 1;
235                    agg_alias_map.insert(col.clone(), a.clone());
236                    augmented_selects.push(SelectExpr::Column {
237                        column: col.clone(),
238                        alias: Some(a),
239                    });
240                }
241            }
242        }
243
244        for sel in &augmented_selects {
245            if let SelectExpr::DimAggregate { agg_type, value_column, compare_column, alias, .. } = sel {
246                let func = match agg_type {
247                    DimAggType::ArgMax => "argMax",
248                    DimAggType::ArgMin => "argMin",
249                };
250                let expr = format!("{func}(`{value_column}`, `{compare_column}`)");
251                agg_alias_map.insert(expr, alias.clone());
252            }
253        }
254
255        sql.push_str("SELECT ");
256        let select_parts: Vec<String> = augmented_selects.iter().map(|s| match s {
257            SelectExpr::Column { column, alias } => {
258                let col = if column.contains('(') { column.clone() } else { format!("`{column}`") };
259                match alias {
260                    Some(a) => format!("{col} AS `{a}`"),
261                    None => col,
262                }
263            },
264            SelectExpr::Aggregate { function, column, alias, condition } => {
265                let func = function.to_uppercase();
266                match (func.as_str(), column.as_str(), condition) {
267                    ("COUNT", "*", None) => format!("count() AS `{alias}`"),
268                    ("COUNT", "*", Some(cond)) => format!("countIf({cond}) AS `{alias}`"),
269                    ("UNIQ", col, None) => format!("uniq(`{col}`) AS `{alias}`"),
270                    ("UNIQ", col, Some(cond)) => format!("uniqIf(`{col}`, {cond}) AS `{alias}`"),
271                    (_, col, None) => format!("{f}(`{col}`) AS `{alias}`", f = func.to_lowercase()),
272                    (_, col, Some(cond)) => format!("{f}If(`{col}`, {cond}) AS `{alias}`", f = func.to_lowercase()),
273                }
274            }
275            SelectExpr::DimAggregate { agg_type, value_column, compare_column, alias, condition } => {
276                let func = match agg_type {
277                    DimAggType::ArgMax => "argMax",
278                    DimAggType::ArgMin => "argMin",
279                };
280                match condition {
281                    None => format!("{func}(`{value_column}`, `{compare_column}`) AS `{alias}`"),
282                    Some(cond) => format!("{func}If(`{value_column}`, `{compare_column}`, {cond}) AS `{alias}`"),
283                }
284            }
285        }).collect();
286        sql.push_str(&select_parts.join(", "));
287
288        if let Some(ref subquery) = ir.from_subquery {
289            sql.push_str(&format!(" FROM ({}) AS _t", subquery));
290        } else {
291            sql.push_str(&format!(" FROM `{}`.`{}`", ir.schema, ir.table));
292            if ir.use_final {
293                sql.push_str(" FINAL");
294            }
295        }
296
297        let where_clause = compile_filter(&ir.filters, bindings);
298        if !where_clause.is_empty() {
299            sql.push_str(" WHERE ");
300            sql.push_str(&where_clause);
301        }
302
303        let effective_group_by = if !ir.group_by.is_empty() {
304            ir.group_by.clone()
305        } else {
306            let has_agg_cols = augmented_selects.iter().any(|s| match s {
307                SelectExpr::Column { column, .. } => column.contains("Merge("),
308                SelectExpr::Aggregate { .. } | SelectExpr::DimAggregate { .. } => true,
309            });
310            if has_agg_cols {
311                augmented_selects.iter().filter_map(|s| match s {
312                    SelectExpr::Column { column, .. } if !column.contains("Merge(") && !column.contains('(') => {
313                        Some(column.clone())
314                    }
315                    _ => None,
316                }).collect()
317            } else {
318                vec![]
319            }
320        };
321
322        if !effective_group_by.is_empty() {
323            sql.push_str(" GROUP BY ");
324            let cols: Vec<String> = effective_group_by.iter().map(|c| quote_col(c)).collect();
325            sql.push_str(&cols.join(", "));
326        }
327
328        if has_having_agg {
329            let having_clause = compile_filter_with_aliases(&ir.having, bindings, &agg_alias_map);
330            if !having_clause.is_empty() {
331                sql.push_str(" HAVING ");
332                sql.push_str(&having_clause);
333            }
334        } else {
335            let having_clause = compile_filter(&ir.having, bindings);
336            if !having_clause.is_empty() {
337                sql.push_str(" HAVING ");
338                sql.push_str(&having_clause);
339            }
340        }
341
342        if !ir.order_by.is_empty() {
343            sql.push_str(" ORDER BY ");
344            let parts: Vec<String> = ir.order_by.iter().map(|o| {
345                let col = if o.column.contains('(') {
346                    agg_alias_map.get(&o.column)
347                        .map(|a| format!("`{a}`"))
348                        .unwrap_or_else(|| o.column.clone())
349                } else {
350                    format!("`{}`", o.column)
351                };
352                let dir = if o.descending { "DESC" } else { "ASC" };
353                format!("{col} {dir}")
354            }).collect();
355            sql.push_str(&parts.join(", "));
356        }
357
358        if let Some(ref lb) = ir.limit_by {
359            let by_cols: Vec<String> = lb.columns.iter().map(|c| format!("`{c}`")).collect();
360            sql.push_str(&format!(" LIMIT {} BY {}", lb.count, by_cols.join(", ")));
361            if lb.offset > 0 {
362                sql.push_str(&format!(" OFFSET {}", lb.offset));
363            }
364        }
365
366        sql.push_str(&format!(" LIMIT {}", ir.limit));
367        if ir.offset > 0 {
368            sql.push_str(&format!(" OFFSET {}", ir.offset));
369        }
370
371        sql
372    }
373}
374
375/// Collect all column names referenced in a filter tree.
376fn collect_filter_columns(node: &FilterNode) -> Vec<String> {
377    match node {
378        FilterNode::Empty => vec![],
379        FilterNode::Condition { column, .. } => vec![column.clone()],
380        FilterNode::And(children) | FilterNode::Or(children) => {
381            children.iter().flat_map(collect_filter_columns).collect()
382        }
383        FilterNode::ArrayIncludes { array_columns, .. } => array_columns.clone(),
384    }
385}
386
387/// Like `compile_filter` but replaces aggregate expression columns with their
388/// SELECT aliases so ClickHouse can resolve them in HAVING scope.
389fn compile_filter_with_aliases(
390    node: &FilterNode,
391    bindings: &mut Vec<SqlValue>,
392    aliases: &HashMap<String, String>,
393) -> String {
394    match node {
395        FilterNode::Empty => String::new(),
396        FilterNode::Condition { column, op, value } => {
397            let effective_col = aliases.get(column)
398                .map(|a| a.as_str())
399                .unwrap_or(column.as_str());
400            compile_condition(effective_col, op, value, bindings)
401        }
402        FilterNode::And(children) => {
403            let parts: Vec<String> = children.iter()
404                .map(|c| compile_filter_with_aliases(c, bindings, aliases))
405                .filter(|s| !s.is_empty())
406                .collect();
407            match parts.len() {
408                0 => String::new(),
409                1 => parts.into_iter().next().unwrap(),
410                _ => format!("({})", parts.join(" AND ")),
411            }
412        }
413        FilterNode::Or(children) => {
414            let parts: Vec<String> = children.iter()
415                .map(|c| compile_filter_with_aliases(c, bindings, aliases))
416                .filter(|s| !s.is_empty())
417                .collect();
418            match parts.len() {
419                0 => String::new(),
420                1 => parts.into_iter().next().unwrap(),
421                _ => format!("({})", parts.join(" OR ")),
422            }
423        }
424        FilterNode::ArrayIncludes { array_columns, element_conditions } => {
425            compile_array_includes(array_columns, element_conditions, bindings)
426        }
427    }
428}
429
430fn compile_filter(node: &FilterNode, bindings: &mut Vec<SqlValue>) -> String {
431    match node {
432        FilterNode::Empty => String::new(),
433        FilterNode::Condition { column, op, value } => {
434            compile_condition(column, op, value, bindings)
435        }
436        FilterNode::And(children) => {
437            let parts: Vec<String> = children.iter()
438                .map(|c| compile_filter(c, bindings))
439                .filter(|s| !s.is_empty())
440                .collect();
441            match parts.len() {
442                0 => String::new(),
443                1 => parts.into_iter().next().unwrap(),
444                _ => format!("({})", parts.join(" AND ")),
445            }
446        }
447        FilterNode::Or(children) => {
448            let parts: Vec<String> = children.iter()
449                .map(|c| compile_filter(c, bindings))
450                .filter(|s| !s.is_empty())
451                .collect();
452            match parts.len() {
453                0 => String::new(),
454                1 => parts.into_iter().next().unwrap(),
455                _ => format!("({})", parts.join(" OR ")),
456            }
457        }
458        FilterNode::ArrayIncludes { array_columns, element_conditions } => {
459            compile_array_includes(array_columns, element_conditions, bindings)
460        }
461    }
462}
463
464/// Compile ArrayIncludes into one or more `arrayExists(lambda, arrays)` expressions.
465fn compile_array_includes(
466    array_columns: &[String],
467    element_conditions: &[Vec<FilterNode>],
468    bindings: &mut Vec<SqlValue>,
469) -> String {
470    let params: Vec<String> = (0..array_columns.len())
471        .map(|i| format!("_p{i}"))
472        .collect();
473    let arrays_sql: Vec<String> = array_columns.iter()
474        .map(|c| quote_col(c))
475        .collect();
476    let arrays_ref = arrays_sql.join(", ");
477    let params_ref = params.join(", ");
478
479    let col_to_param: std::collections::HashMap<&str, &str> = array_columns.iter()
480        .zip(params.iter())
481        .map(|(c, p)| (c.as_str(), p.as_str()))
482        .collect();
483
484    let exists_parts: Vec<String> = element_conditions.iter().map(|conds| {
485        let cond_parts: Vec<String> = conds.iter()
486            .map(|c| compile_filter_with_param_remap(c, bindings, &col_to_param))
487            .filter(|s| !s.is_empty())
488            .collect();
489        let cond_sql = match cond_parts.len() {
490            0 => "1".to_string(),
491            1 => cond_parts.into_iter().next().unwrap(),
492            _ => format!("({})", cond_parts.join(" AND ")),
493        };
494        format!("arrayExists(({params_ref}) -> {cond_sql}, {arrays_ref})")
495    }).collect();
496
497    match exists_parts.len() {
498        0 => String::new(),
499        1 => exists_parts.into_iter().next().unwrap(),
500        _ => format!("({})", exists_parts.join(" AND ")),
501    }
502}
503
504/// Compile a filter node but remap column names to lambda parameter names.
505/// Lambda parameters must NOT be backtick-quoted in ClickHouse.
506fn compile_filter_with_param_remap(
507    node: &FilterNode,
508    bindings: &mut Vec<SqlValue>,
509    col_to_param: &std::collections::HashMap<&str, &str>,
510) -> String {
511    match node {
512        FilterNode::Empty => String::new(),
513        FilterNode::Condition { column, op, value } => {
514            if let Some(&param) = col_to_param.get(column.as_str()) {
515                compile_condition_raw(param, op, value, bindings)
516            } else {
517                compile_condition(column, op, value, bindings)
518            }
519        }
520        FilterNode::And(children) => {
521            let parts: Vec<String> = children.iter()
522                .map(|c| compile_filter_with_param_remap(c, bindings, col_to_param))
523                .filter(|s| !s.is_empty())
524                .collect();
525            match parts.len() {
526                0 => String::new(),
527                1 => parts.into_iter().next().unwrap(),
528                _ => format!("({})", parts.join(" AND ")),
529            }
530        }
531        FilterNode::Or(children) => {
532            let parts: Vec<String> = children.iter()
533                .map(|c| compile_filter_with_param_remap(c, bindings, col_to_param))
534                .filter(|s| !s.is_empty())
535                .collect();
536            match parts.len() {
537                0 => String::new(),
538                1 => parts.into_iter().next().unwrap(),
539                _ => format!("({})", parts.join(" OR ")),
540            }
541        }
542        FilterNode::ArrayIncludes { array_columns, element_conditions } => {
543            compile_array_includes(array_columns, element_conditions, bindings)
544        }
545    }
546}
547
548fn quote_col(column: &str) -> String {
549    if column.contains('(') {
550        column.to_string()
551    } else {
552        format!("`{column}`")
553    }
554}
555
556/// Like compile_condition but uses the column name as-is (no backtick quoting).
557/// Used for lambda parameters in arrayExists.
558fn compile_condition_raw(
559    col: &str, op: &CompareOp, value: &SqlValue, bindings: &mut Vec<SqlValue>,
560) -> String {
561    compile_condition_inner(col, op, value, bindings)
562}
563
564fn compile_condition(
565    column: &str, op: &CompareOp, value: &SqlValue, bindings: &mut Vec<SqlValue>,
566) -> String {
567    let col = quote_col(column);
568    compile_condition_inner(&col, op, value, bindings)
569}
570
571fn compile_condition_inner(
572    col: &str, op: &CompareOp, value: &SqlValue, bindings: &mut Vec<SqlValue>,
573) -> String {
574    match op {
575        CompareOp::In | CompareOp::NotIn => {
576            if let SqlValue::String(csv) = value {
577                let items: Vec<&str> = csv.split(',').collect();
578                let placeholders: Vec<&str> = items.iter().map(|_| "?").collect();
579                for item in &items {
580                    bindings.push(SqlValue::String(item.trim().to_string()));
581                }
582                format!("{col} {} ({})", op.sql_op(), placeholders.join(", "))
583            } else {
584                bindings.push(value.clone());
585                format!("{col} {} (?)", op.sql_op())
586            }
587        }
588        CompareOp::Includes => {
589            if let SqlValue::String(s) = value {
590                bindings.push(SqlValue::String(format!("%{s}%")));
591            } else {
592                bindings.push(value.clone());
593            }
594            format!("{col} LIKE ?")
595        }
596        CompareOp::IsNull | CompareOp::IsNotNull => {
597            format!("{col} {}", op.sql_op())
598        }
599        _ => {
600            if let SqlValue::Expression(expr) = value {
601                format!("{col} {} {expr}", op.sql_op())
602            } else {
603                bindings.push(value.clone());
604                format!("{col} {} ?", op.sql_op())
605            }
606        }
607    }
608}
609
610#[cfg(test)]
611mod tests {
612    use super::*;
613
614    fn ch() -> ClickHouseDialect { ClickHouseDialect::new() }
615
616    #[test]
617    fn test_simple_select() {
618        let ir = QueryIR {
619            cube: "DEXTrades".into(), schema: "default".into(),
620            table: "dwd_dex_trades".into(),
621            selects: vec![
622                SelectExpr::Column { column: "tx_hash".into(), alias: None },
623                SelectExpr::Column { column: "token_a_amount".into(), alias: None },
624            ],
625            filters: FilterNode::Empty, having: FilterNode::Empty,
626            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
627            limit_by: None,
628            use_final: false,
629            joins: vec![],
630            custom_query_builder: None,
631            from_subquery: None,
632        };
633        let r = ch().compile(&ir);
634        assert_eq!(r.sql, "SELECT `tx_hash`, `token_a_amount` FROM `default`.`dwd_dex_trades` LIMIT 10");
635        assert!(r.bindings.is_empty());
636    }
637
638    #[test]
639    fn test_final_keyword() {
640        let ir = QueryIR {
641            cube: "T".into(), schema: "db".into(), table: "tokens".into(),
642            selects: vec![SelectExpr::Column { column: "id".into(), alias: None }],
643            filters: FilterNode::Empty, having: FilterNode::Empty,
644            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
645            limit_by: None,
646            use_final: true,
647            joins: vec![],
648            custom_query_builder: None,
649            from_subquery: None,
650        };
651        let r = ch().compile(&ir);
652        assert!(r.sql.contains("FROM `db`.`tokens` FINAL"), "FINAL should be appended, got: {}", r.sql);
653    }
654
655    #[test]
656    fn test_uniq_uses_native_function() {
657        let ir = QueryIR {
658            cube: "T".into(), schema: "db".into(), table: "t".into(),
659            selects: vec![
660                SelectExpr::Aggregate { function: "UNIQ".into(), column: "wallet".into(), alias: "__uniq".into(), condition: None },
661            ],
662            filters: FilterNode::Empty, having: FilterNode::Empty,
663            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
664            limit_by: None, use_final: false, joins: vec![], custom_query_builder: None, from_subquery: None,
665        };
666        let r = ch().compile(&ir);
667        assert!(r.sql.contains("uniq(`wallet`) AS `__uniq`"), "ClickHouse should use native uniq(), got: {}", r.sql);
668    }
669
670    #[test]
671    fn test_count_star() {
672        let ir = QueryIR {
673            cube: "T".into(), schema: "db".into(), table: "t".into(),
674            selects: vec![
675                SelectExpr::Aggregate { function: "COUNT".into(), column: "*".into(), alias: "__count".into(), condition: None },
676            ],
677            filters: FilterNode::Empty, having: FilterNode::Empty,
678            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
679            limit_by: None, use_final: false, joins: vec![], custom_query_builder: None, from_subquery: None,
680        };
681        let r = ch().compile(&ir);
682        assert!(r.sql.contains("count() AS `__count`"), "ClickHouse should use count() not COUNT(*), got: {}", r.sql);
683    }
684
685    #[test]
686    fn test_aggregate_lowercase() {
687        let ir = QueryIR {
688            cube: "T".into(), schema: "db".into(), table: "t".into(),
689            selects: vec![
690                SelectExpr::Aggregate { function: "SUM".into(), column: "amount".into(), alias: "__sum".into(), condition: None },
691                SelectExpr::Aggregate { function: "AVG".into(), column: "price".into(), alias: "__avg".into(), condition: None },
692            ],
693            filters: FilterNode::Empty, having: FilterNode::Empty,
694            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
695            limit_by: None, use_final: false, joins: vec![], custom_query_builder: None, from_subquery: None,
696        };
697        let r = ch().compile(&ir);
698        assert!(r.sql.contains("sum(`amount`) AS `__sum`"), "ClickHouse functions should be lowercase, got: {}", r.sql);
699        assert!(r.sql.contains("avg(`price`) AS `__avg`"), "got: {}", r.sql);
700    }
701
702    #[test]
703    fn test_where_and_order() {
704        let ir = QueryIR {
705            cube: "T".into(), schema: "db".into(), table: "t".into(),
706            selects: vec![SelectExpr::Column { column: "id".into(), alias: None }],
707            filters: FilterNode::And(vec![
708                FilterNode::Condition { column: "chain_id".into(), op: CompareOp::Eq, value: SqlValue::Int(1) },
709                FilterNode::Condition { column: "amount_usd".into(), op: CompareOp::Gt, value: SqlValue::Float(1000.0) },
710            ]),
711            having: FilterNode::Empty, group_by: vec![],
712            order_by: vec![OrderExpr { column: "block_timestamp".into(), descending: true }],
713            limit: 25, offset: 0,
714            limit_by: None, use_final: false, joins: vec![], custom_query_builder: None, from_subquery: None,
715        };
716        let r = ch().compile(&ir);
717        assert!(r.sql.contains("WHERE (`chain_id` = ? AND `amount_usd` > ?)"));
718        assert!(r.sql.contains("ORDER BY `block_timestamp` DESC"));
719        assert_eq!(r.bindings.len(), 2);
720    }
721
722    #[test]
723    fn test_having_with_aggregate_expr() {
724        let ir = QueryIR {
725            cube: "T".into(), schema: "db".into(), table: "t".into(),
726            selects: vec![
727                SelectExpr::Column { column: "token_address".into(), alias: None },
728                SelectExpr::Aggregate { function: "SUM".into(), column: "amount_usd".into(), alias: "__sum".into(), condition: None },
729            ],
730            filters: FilterNode::Empty,
731            having: FilterNode::Condition {
732                column: "sum(`amount_usd`)".into(), op: CompareOp::Gt, value: SqlValue::Float(1000000.0),
733            },
734            group_by: vec!["token_address".into()], order_by: vec![], limit: 25, offset: 0,
735            limit_by: None, use_final: false, joins: vec![], custom_query_builder: None, from_subquery: None,
736        };
737        let r = ch().compile(&ir);
738        assert!(r.sql.contains("GROUP BY `token_address`"));
739        assert!(r.sql.contains("HAVING `__f_0` > ?"), "expected alias in HAVING, got: {}", r.sql);
740        assert!(r.sql.contains("sum(`amount_usd`) AS `__f_0`"), "expected alias in SELECT, got: {}", r.sql);
741        assert_eq!(r.bindings.len(), 1);
742    }
743
744    #[test]
745    fn test_having_appends_missing_agg_column() {
746        let ir = QueryIR {
747            cube: "T".into(), schema: "db".into(), table: "t".into(),
748            selects: vec![
749                SelectExpr::Column { column: "pool_address".into(), alias: None },
750                SelectExpr::Column { column: "argMaxMerge(latest_liquidity_usd_state)".into(), alias: None },
751            ],
752            filters: FilterNode::Empty,
753            having: FilterNode::And(vec![
754                FilterNode::Condition {
755                    column: "argMaxMerge(latest_liquidity_usd_state)".into(),
756                    op: CompareOp::Gt, value: SqlValue::Float(2.0),
757                },
758                FilterNode::Condition {
759                    column: "argMaxMerge(latest_token_a_amount_state)".into(),
760                    op: CompareOp::Gt, value: SqlValue::Float(3.0),
761                },
762            ]),
763            group_by: vec!["pool_address".into()], order_by: vec![], limit: 25, offset: 0,
764            limit_by: None, use_final: false, joins: vec![], custom_query_builder: None, from_subquery: None,
765        };
766        let r = ch().compile(&ir);
767        assert!(r.sql.contains("argMaxMerge(latest_liquidity_usd_state) AS `__f_0`"),
768            "existing HAVING col should be aliased, got: {}", r.sql);
769        assert!(r.sql.contains("argMaxMerge(latest_token_a_amount_state) AS `__f_1`"),
770            "missing agg col should be appended, got: {}", r.sql);
771        assert!(r.sql.contains("HAVING (`__f_0` > ? AND `__f_1` > ?)"),
772            "HAVING should use aliases, got: {}", r.sql);
773        assert_eq!(r.bindings.len(), 2);
774        assert_eq!(r.alias_remap.len(), 1);
775        assert_eq!(r.alias_remap[0], ("__f_0".to_string(), "argMaxMerge(latest_liquidity_usd_state)".to_string()));
776    }
777
778    #[test]
779    fn test_limit_by() {
780        let ir = QueryIR {
781            cube: "T".into(), schema: "db".into(), table: "t".into(),
782            selects: vec![
783                SelectExpr::Column { column: "owner".into(), alias: None },
784                SelectExpr::Column { column: "amount".into(), alias: None },
785            ],
786            filters: FilterNode::Empty, having: FilterNode::Empty,
787            group_by: vec![], 
788            order_by: vec![OrderExpr { column: "amount".into(), descending: true }],
789            limit: 100, offset: 0,
790            limit_by: Some(LimitByExpr { count: 3, offset: 0, columns: vec!["owner".into()] }),
791            use_final: false, joins: vec![], custom_query_builder: None, from_subquery: None,
792        };
793        let r = ch().compile(&ir);
794        let sql = &r.sql;
795        assert!(sql.contains("LIMIT 3 BY `owner`"), "LIMIT BY should be present, got: {sql}");
796        assert!(sql.contains("ORDER BY `amount` DESC"), "ORDER BY should be present, got: {sql}");
797        assert!(sql.contains("LIMIT 100"), "outer LIMIT should be present, got: {sql}");
798        let order_by_pos = sql.find("ORDER BY").unwrap();
799        let limit_by_pos = sql.find("LIMIT 3 BY").unwrap();
800        let limit_pos = sql.rfind("LIMIT 100").unwrap();
801        assert!(order_by_pos < limit_by_pos, "ORDER BY should come before LIMIT BY in ClickHouse");
802        assert!(limit_by_pos < limit_pos, "LIMIT BY should come before outer LIMIT");
803    }
804
805    #[test]
806    fn test_limit_by_with_offset() {
807        let ir = QueryIR {
808            cube: "T".into(), schema: "db".into(), table: "t".into(),
809            selects: vec![SelectExpr::Column { column: "id".into(), alias: None }],
810            filters: FilterNode::Empty, having: FilterNode::Empty,
811            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
812            limit_by: Some(LimitByExpr { count: 5, offset: 2, columns: vec!["token".into(), "wallet".into()] }),
813            use_final: false, joins: vec![], custom_query_builder: None, from_subquery: None,
814        };
815        let r = ch().compile(&ir);
816        assert!(r.sql.contains("LIMIT 5 BY `token`, `wallet` OFFSET 2"), "multi-column LIMIT BY with offset, got: {}", r.sql);
817    }
818
819    #[test]
820    fn test_join_direct() {
821        let ir = QueryIR {
822            cube: "DEXTrades".into(), schema: "dexes_dwd".into(),
823            table: "sol_dex_trades".into(),
824            selects: vec![
825                SelectExpr::Column { column: "tx_hash".into(), alias: None },
826                SelectExpr::Column { column: "buy_token_address".into(), alias: None },
827            ],
828            filters: FilterNode::Empty, having: FilterNode::Empty,
829            group_by: vec![], order_by: vec![], limit: 25, offset: 0,
830            limit_by: None, use_final: false,
831            joins: vec![JoinExpr {
832                schema: "dexes_dim".into(), table: "sol_tokens".into(),
833                alias: "_j0".into(),
834                conditions: vec![("buy_token_address".into(), "token_address".into())],
835                selects: vec![
836                    SelectExpr::Column { column: "name".into(), alias: None },
837                    SelectExpr::Column { column: "symbol".into(), alias: None },
838                ],
839                group_by: vec![], use_final: true, is_aggregate: false,
840                target_cube: "TokenSearch".into(), join_field: "joinBuyToken".into(),
841                join_type: JoinType::Left,
842            }],
843            custom_query_builder: None,
844            from_subquery: None,
845        };
846        let r = ch().compile(&ir);
847        assert!(r.sql.contains("FROM (SELECT"), "main query should be wrapped, got: {}", r.sql);
848        assert!(r.sql.contains("LEFT JOIN `dexes_dim`.`sol_tokens` AS _j0 FINAL"),
849            "direct JOIN with FINAL after alias, got: {}", r.sql);
850        assert!(r.sql.contains("_main.`buy_token_address` = _j0.`token_address`"),
851            "ON condition, got: {}", r.sql);
852        assert!(r.sql.contains("_j0.`name` AS `_j0.name`"), "joined col alias, got: {}", r.sql);
853    }
854
855    #[test]
856    fn test_join_aggregate_subquery() {
857        let ir = QueryIR {
858            cube: "DEXTrades".into(), schema: "dexes_dwd".into(),
859            table: "sol_dex_trades".into(),
860            selects: vec![
861                SelectExpr::Column { column: "tx_hash".into(), alias: None },
862                SelectExpr::Column { column: "buy_token_address".into(), alias: None },
863            ],
864            filters: FilterNode::Empty, having: FilterNode::Empty,
865            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
866            limit_by: None, use_final: false,
867            joins: vec![JoinExpr {
868                schema: "dexes_dws".into(), table: "sol_token_market_cap".into(),
869                alias: "_j0".into(),
870                conditions: vec![("buy_token_address".into(), "token_address".into())],
871                selects: vec![
872                    SelectExpr::Column { column: "argMaxMerge(latest_market_cap_usd_state)".into(), alias: None },
873                ],
874                group_by: vec!["token_address".into()],
875                use_final: false, is_aggregate: true,
876                target_cube: "TokenMarketCap".into(), join_field: "joinBuyTokenMarketCap".into(),
877                join_type: JoinType::Left,
878            }],
879            custom_query_builder: None,
880            from_subquery: None,
881        };
882        let r = ch().compile(&ir);
883        assert!(r.sql.contains("LEFT JOIN (SELECT"), "aggregate should use subquery, got: {}", r.sql);
884        assert!(r.sql.contains("GROUP BY `token_address`"), "subquery GROUP BY, got: {}", r.sql);
885        assert!(r.sql.contains("FROM `dexes_dws`.`sol_token_market_cap`"), "subquery FROM, got: {}", r.sql);
886        assert!(r.sql.contains("argMaxMerge(latest_market_cap_usd_state) AS `__jc_0`"),
887            "join func expr should be aliased in subquery, got: {}", r.sql);
888        assert!(r.sql.contains("_j0.`__jc_0` AS `_j0.__jc_0`"),
889            "outer SELECT should use alias for join func col, got: {}", r.sql);
890    }
891
892    #[test]
893    fn test_join_main_query_func_expression_columns() {
894        let ir = QueryIR {
895            cube: "TokenHolders".into(), schema: "dws".into(),
896            table: "sol_token_holders".into(),
897            selects: vec![
898                SelectExpr::Column { column: "token".into(), alias: None },
899                SelectExpr::Column { column: "holder".into(), alias: None },
900                SelectExpr::Column { column: "argMaxMerge(latest_balance)".into(), alias: None },
901                SelectExpr::Column { column: "argMaxMerge(latest_balance_usd)".into(), alias: None },
902                SelectExpr::Column { column: "minMerge(first_seen)".into(), alias: None },
903                SelectExpr::Column { column: "maxMerge(last_seen)".into(), alias: None },
904            ],
905            filters: FilterNode::Empty, having: FilterNode::Empty,
906            group_by: vec![], order_by: vec![
907                OrderExpr { column: "argMaxMerge(latest_balance_usd)".into(), descending: true },
908            ],
909            limit: 100, offset: 0,
910            limit_by: None, use_final: false,
911            joins: vec![JoinExpr {
912                schema: "dim".into(), table: "sol_tokens".into(),
913                alias: "_j0".into(),
914                conditions: vec![("token".into(), "token_address".into())],
915                selects: vec![
916                    SelectExpr::Column { column: "name".into(), alias: None },
917                    SelectExpr::Column { column: "symbol".into(), alias: None },
918                ],
919                group_by: vec![], use_final: true, is_aggregate: false,
920                target_cube: "TokenSearch".into(), join_field: "joinToken".into(),
921                join_type: JoinType::Left,
922            }],
923            custom_query_builder: None,
924            from_subquery: None,
925        };
926        let r = ch().compile(&ir);
927        let sql = &r.sql;
928
929        assert!(sql.contains("_main.`__mc_0` AS `__mc_0`"),
930            "func expr should use alias __mc_0 in outer SELECT, got: {sql}");
931        assert!(sql.contains("_main.`__mc_1` AS `__mc_1`"),
932            "func expr should use alias __mc_1, got: {sql}");
933        assert!(sql.contains("_main.`token` AS `token`"),
934            "simple col should be backtick-quoted, got: {sql}");
935
936        assert!(!sql.contains("_main.argMaxMerge("),
937            "outer SELECT must NOT have bare _main.argMaxMerge(...), got: {sql}");
938
939        assert!(sql.contains("argMaxMerge(latest_balance) AS `__mc_0`"),
940            "inner query should alias func expr, got: {sql}");
941
942        assert!(r.alias_remap.iter().any(|(a, o)| a == "__mc_0" && o == "argMaxMerge(latest_balance)"),
943            "alias_remap should map __mc_0 → original, got: {:?}", r.alias_remap);
944        assert!(r.alias_remap.iter().any(|(a, o)| a == "__mc_1" && o == "argMaxMerge(latest_balance_usd)"),
945            "alias_remap should map __mc_1, got: {:?}", r.alias_remap);
946    }
947
948    #[test]
949    fn test_join_inner_type() {
950        let ir = QueryIR {
951            cube: "DEXTrades".into(), schema: "dexes_dwd".into(),
952            table: "sol_dex_trades".into(),
953            selects: vec![
954                SelectExpr::Column { column: "tx_hash".into(), alias: None },
955            ],
956            filters: FilterNode::Empty, having: FilterNode::Empty,
957            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
958            limit_by: None, use_final: false,
959            joins: vec![JoinExpr {
960                schema: "dexes_dim".into(), table: "sol_tokens".into(),
961                alias: "_j0".into(),
962                conditions: vec![("buy_token_address".into(), "token_address".into())],
963                selects: vec![
964                    SelectExpr::Column { column: "name".into(), alias: None },
965                ],
966                group_by: vec![], use_final: false, is_aggregate: false,
967                target_cube: "TokenSearch".into(), join_field: "joinBuyToken".into(),
968                join_type: JoinType::Inner,
969            }],
970            custom_query_builder: None,
971            from_subquery: None,
972        };
973        let r = ch().compile(&ir);
974        assert!(r.sql.contains("INNER JOIN `dexes_dim`.`sol_tokens` AS _j0"),
975            "should use INNER JOIN, got: {}", r.sql);
976    }
977
978    #[test]
979    fn test_join_full_outer_type() {
980        let ir = QueryIR {
981            cube: "T".into(), schema: "db".into(), table: "t".into(),
982            selects: vec![
983                SelectExpr::Column { column: "id".into(), alias: None },
984            ],
985            filters: FilterNode::Empty, having: FilterNode::Empty,
986            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
987            limit_by: None, use_final: false,
988            joins: vec![JoinExpr {
989                schema: "db2".into(), table: "t2".into(),
990                alias: "_j0".into(),
991                conditions: vec![("id".into(), "ref_id".into())],
992                selects: vec![
993                    SelectExpr::Column { column: "val".into(), alias: None },
994                ],
995                group_by: vec![], use_final: false, is_aggregate: false,
996                target_cube: "Other".into(), join_field: "joinOther".into(),
997                join_type: JoinType::Full,
998            }],
999            custom_query_builder: None,
1000            from_subquery: None,
1001        };
1002        let r = ch().compile(&ir);
1003        assert!(r.sql.contains("FULL OUTER JOIN `db2`.`t2` AS _j0"),
1004            "should use FULL OUTER JOIN, got: {}", r.sql);
1005    }
1006
1007    #[test]
1008    fn test_custom_query_builder() {
1009        let ir = QueryIR {
1010            cube: "Custom".into(), schema: "db".into(), table: "t".into(),
1011            selects: vec![
1012                SelectExpr::Column { column: "id".into(), alias: None },
1013            ],
1014            filters: FilterNode::Empty, having: FilterNode::Empty,
1015            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
1016            limit_by: None, use_final: false, joins: vec![],
1017            custom_query_builder: Some(QueryBuilderFn(std::sync::Arc::new(|_ir| {
1018                CompileResult {
1019                    sql: "SELECT 1 FROM custom_view".into(),
1020                    bindings: vec![],
1021                    alias_remap: vec![],
1022                }
1023            }))),
1024            from_subquery: None,
1025        };
1026        let r = ch().compile(&ir);
1027        assert_eq!(r.sql, "SELECT 1 FROM custom_view",
1028            "custom builder should bypass standard compilation, got: {}", r.sql);
1029    }
1030
1031    #[test]
1032    fn test_from_subquery() {
1033        let ir = QueryIR {
1034            cube: "DEXTradeByTokens".into(), schema: "dwd".into(),
1035            table: "sol_trades".into(),
1036            selects: vec![
1037                SelectExpr::Column { column: "amount".into(), alias: None },
1038                SelectExpr::Column { column: "side_type".into(), alias: None },
1039            ],
1040            filters: FilterNode::Condition {
1041                column: "token".into(), op: CompareOp::Eq,
1042                value: SqlValue::String("SOL".into()),
1043            },
1044            having: FilterNode::Empty,
1045            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
1046            limit_by: None, use_final: false, joins: vec![],
1047            custom_query_builder: None,
1048            from_subquery: Some(
1049                "SELECT amount, 'buy' AS side_type, token FROM dwd.sol_a UNION ALL SELECT amount, 'sell' AS side_type, token FROM dwd.sol_b".into()
1050            ),
1051        };
1052        let r = ch().compile(&ir);
1053        assert!(r.sql.starts_with("SELECT `amount`, `side_type` FROM (SELECT"),
1054            "should use subquery in FROM, got: {}", r.sql);
1055        assert!(r.sql.contains("UNION ALL"),
1056            "subquery should contain UNION ALL, got: {}", r.sql);
1057        assert!(r.sql.contains(") AS _t"),
1058            "subquery should be aliased as _t, got: {}", r.sql);
1059        assert!(r.sql.contains("WHERE `token` = ?"),
1060            "WHERE clause should be applied to subquery result, got: {}", r.sql);
1061        assert!(!r.sql.contains("FROM `dwd`.`sol_trades`"),
1062            "should NOT use schema.table when from_subquery is set, got: {}", r.sql);
1063    }
1064
1065    #[test]
1066    fn test_array_includes_single_condition() {
1067        let ir = QueryIR {
1068            cube: "Instructions".into(), schema: "dexes_dwd2".into(),
1069            table: "sol_instructions".into(),
1070            selects: vec![SelectExpr::Column { column: "tx_hash".into(), alias: None }],
1071            filters: FilterNode::ArrayIncludes {
1072                array_columns: vec![
1073                    "instruction_arg_names".into(),
1074                    "instruction_arg_types".into(),
1075                    "instruction_arg_values".into(),
1076                ],
1077                element_conditions: vec![vec![
1078                    FilterNode::Condition {
1079                        column: "instruction_arg_names".into(),
1080                        op: CompareOp::Eq,
1081                        value: SqlValue::String("amount_in".into()),
1082                    },
1083                ]],
1084            },
1085            having: FilterNode::Empty,
1086            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
1087            limit_by: None, use_final: false, joins: vec![],
1088            custom_query_builder: None, from_subquery: None,
1089        };
1090        let r = ch().compile(&ir);
1091        assert!(r.sql.contains("arrayExists((_p0, _p1, _p2) -> _p0 = ?"),
1092            "should generate arrayExists with lambda params, got: {}", r.sql);
1093        assert!(r.sql.contains("`instruction_arg_names`, `instruction_arg_types`, `instruction_arg_values`"),
1094            "should reference all parallel array columns, got: {}", r.sql);
1095        assert_eq!(r.bindings.len(), 1);
1096    }
1097
1098    #[test]
1099    fn test_array_includes_multiple_conditions() {
1100        let ir = QueryIR {
1101            cube: "Instructions".into(), schema: "dexes_dwd2".into(),
1102            table: "sol_instructions".into(),
1103            selects: vec![SelectExpr::Column { column: "tx_hash".into(), alias: None }],
1104            filters: FilterNode::ArrayIncludes {
1105                array_columns: vec![
1106                    "instruction_arg_names".into(),
1107                    "instruction_arg_values".into(),
1108                ],
1109                element_conditions: vec![
1110                    vec![
1111                        FilterNode::Condition {
1112                            column: "instruction_arg_names".into(),
1113                            op: CompareOp::Eq,
1114                            value: SqlValue::String("amount_in".into()),
1115                        },
1116                        FilterNode::Condition {
1117                            column: "instruction_arg_values".into(),
1118                            op: CompareOp::Gt,
1119                            value: SqlValue::String("10000".into()),
1120                        },
1121                    ],
1122                    vec![
1123                        FilterNode::Condition {
1124                            column: "instruction_arg_names".into(),
1125                            op: CompareOp::Eq,
1126                            value: SqlValue::String("owner".into()),
1127                        },
1128                    ],
1129                ],
1130            },
1131            having: FilterNode::Empty,
1132            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
1133            limit_by: None, use_final: false, joins: vec![],
1134            custom_query_builder: None, from_subquery: None,
1135        };
1136        let r = ch().compile(&ir);
1137        let sql = &r.sql;
1138        let count = sql.matches("arrayExists").count();
1139        assert_eq!(count, 2, "should have two arrayExists calls (AND-ed), got: {sql}");
1140        assert!(sql.contains(" AND arrayExists("),
1141            "two arrayExists should be AND-ed, got: {sql}");
1142        assert_eq!(r.bindings.len(), 3);
1143    }
1144
1145    #[test]
1146    fn test_array_includes_with_in_operator() {
1147        let ir = QueryIR {
1148            cube: "Instructions".into(), schema: "dexes_dwd2".into(),
1149            table: "sol_instructions".into(),
1150            selects: vec![SelectExpr::Column { column: "tx_hash".into(), alias: None }],
1151            filters: FilterNode::ArrayIncludes {
1152                array_columns: vec![
1153                    "instruction_arg_names".into(),
1154                    "instruction_arg_values".into(),
1155                ],
1156                element_conditions: vec![vec![
1157                    FilterNode::Condition {
1158                        column: "instruction_arg_names".into(),
1159                        op: CompareOp::Eq,
1160                        value: SqlValue::String("authorityType".into()),
1161                    },
1162                    FilterNode::Condition {
1163                        column: "instruction_arg_values".into(),
1164                        op: CompareOp::In,
1165                        value: SqlValue::String("0,1".into()),
1166                    },
1167                ]],
1168            },
1169            having: FilterNode::Empty,
1170            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
1171            limit_by: None, use_final: false, joins: vec![],
1172            custom_query_builder: None, from_subquery: None,
1173        };
1174        let r = ch().compile(&ir);
1175        assert!(r.sql.contains("arrayExists((_p0, _p1) -> (_p0 = ? AND _p1 IN (?, ?))"),
1176            "should generate arrayExists with AND-ed conditions, got: {}", r.sql);
1177        assert_eq!(r.bindings.len(), 3);
1178    }
1179
1180    #[test]
1181    fn test_array_includes_combined_with_regular_filter() {
1182        let ir = QueryIR {
1183            cube: "Instructions".into(), schema: "dexes_dwd2".into(),
1184            table: "sol_instructions".into(),
1185            selects: vec![SelectExpr::Column { column: "tx_hash".into(), alias: None }],
1186            filters: FilterNode::And(vec![
1187                FilterNode::Condition {
1188                    column: "instruction_program_address".into(),
1189                    op: CompareOp::Eq,
1190                    value: SqlValue::String("pAMMBay6oceH9fJKBRHGP5D4bD4sWpmSwMn52FMfXEA".into()),
1191                },
1192                FilterNode::ArrayIncludes {
1193                    array_columns: vec!["instruction_arg_names".into(), "instruction_arg_values".into()],
1194                    element_conditions: vec![vec![
1195                        FilterNode::Condition {
1196                            column: "instruction_arg_names".into(),
1197                            op: CompareOp::Eq,
1198                            value: SqlValue::String("amount".into()),
1199                        },
1200                    ]],
1201                },
1202            ]),
1203            having: FilterNode::Empty,
1204            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
1205            limit_by: None, use_final: false, joins: vec![],
1206            custom_query_builder: None, from_subquery: None,
1207        };
1208        let r = ch().compile(&ir);
1209        let sql = &r.sql;
1210        assert!(sql.contains("`instruction_program_address` = ?"),
1211            "should have regular condition, got: {sql}");
1212        assert!(sql.contains("arrayExists("),
1213            "should have arrayExists, got: {sql}");
1214        assert!(sql.contains(" AND "),
1215            "regular + array conditions should be AND-ed, got: {sql}");
1216    }
1217}