Skip to main content

activecube_rs/sql/
clickhouse.rs

1use std::collections::HashMap;
2
3use crate::compiler::ir::*;
4use crate::compiler::ir::{CompileResult, JoinType};
5use crate::sql::dialect::SqlDialect;
6
7pub struct ClickHouseDialect;
8
9impl ClickHouseDialect {
10    pub fn new() -> Self {
11        Self
12    }
13}
14
15impl Default for ClickHouseDialect {
16    fn default() -> Self {
17        Self::new()
18    }
19}
20
21impl SqlDialect for ClickHouseDialect {
22    fn compile(&self, ir: &QueryIR) -> CompileResult {
23        if let Some(ref builder) = ir.custom_query_builder {
24            return (builder.0)(ir);
25        }
26
27        let mut bindings = Vec::new();
28        let mut alias_remap: Vec<(String, String)> = Vec::new();
29
30        if ir.joins.is_empty() {
31            let inner_sql = self.compile_inner(ir, &mut bindings, &mut alias_remap);
32            return CompileResult { sql: inner_sql, bindings, alias_remap };
33        }
34
35        // When JOINs are present, force aliases on function expression columns
36        // (e.g. `argMaxMerge(latest_balance)`) so the outer SELECT can reference
37        // them by simple identifier names. Without aliases, `_main.argMaxMerge(x)`
38        // is parsed as a function call — a ClickHouse syntax error.
39        let mut ir_mod = ir.clone();
40        let mut mc_counter = 0u32;
41        for sel in &mut ir_mod.selects {
42            if let SelectExpr::Column { column, alias } = sel {
43                if column.contains('(') && alias.is_none() {
44                    let a = format!("__mc_{mc_counter}");
45                    mc_counter += 1;
46                    alias_remap.push((a.clone(), column.clone()));
47                    *alias = Some(a);
48                }
49            }
50        }
51
52        let mut jc_counter = 0u32;
53        for join in &mut ir_mod.joins {
54            for sel in &mut join.selects {
55                if let SelectExpr::Column { column, alias } = sel {
56                    if column.contains('(') && alias.is_none() {
57                        let a = format!("__jc_{jc_counter}");
58                        jc_counter += 1;
59                        *alias = Some(a);
60                    }
61                }
62            }
63        }
64
65        let inner_sql = self.compile_inner(&ir_mod, &mut bindings, &mut alias_remap);
66
67        // Build outer SELECT with explicit column listing.
68        // Always backtick-quote to prevent ClickHouse auto-prefixing ambiguous
69        // names when multiple JOINed tables share column names.
70        let main_cols: Vec<String> = ir_mod.selects.iter().map(|s| {
71            let col = match s {
72                SelectExpr::Column { column, alias } => alias.as_ref().unwrap_or(column).clone(),
73                SelectExpr::Aggregate { alias, .. } | SelectExpr::DimAggregate { alias, .. } => alias.clone(),
74            };
75            format!("_main.`{}` AS `{}`", col, col)
76        }).collect();
77
78        let mut sql = String::from("SELECT ");
79        sql.push_str(&main_cols.join(", "));
80
81        // Collect joined column aliases for the outer SELECT
82        for join in &ir_mod.joins {
83            for sel in &join.selects {
84                let col_name = match sel {
85                    SelectExpr::Column { column, alias } => alias.as_ref().unwrap_or(column).clone(),
86                    SelectExpr::Aggregate { alias, .. } | SelectExpr::DimAggregate { alias, .. } => alias.clone(),
87                };
88                let outer_alias = format!("{}.{}", join.alias, col_name);
89                if let SelectExpr::Column { column, alias: Some(_) } = sel {
90                    if column.contains('(') {
91                        let outer_original = format!("{}.{}", join.alias, column);
92                        alias_remap.push((outer_alias.clone(), outer_original));
93                    }
94                }
95                sql.push_str(&format!(", {}.`{}` AS `{}`",
96                    join.alias, col_name, outer_alias));
97            }
98        }
99
100        sql.push_str(&format!(" FROM ({}) AS _main", inner_sql));
101
102        // Build a lookup for main query aliases to resolve ON condition columns
103        // that might be function expressions (aliased above).
104        let main_alias_map: HashMap<String, String> = ir_mod.selects.iter()
105            .filter_map(|s| {
106                if let SelectExpr::Column { column, alias: Some(a) } = s {
107                    if column.contains('(') { Some((column.clone(), a.clone())) } else { None }
108                } else { None }
109            })
110            .collect();
111
112        for join in &ir_mod.joins {
113            let join_kw = join.join_type.sql_keyword();
114
115            if join.is_aggregate {
116                // Mode B: subquery JOIN for AggregatingMergeTree targets
117                sql.push_str(&format!(" {} (SELECT ", join_kw));
118                let mut sub_parts: Vec<String> = Vec::new();
119                for gb_col in &join.group_by {
120                    sub_parts.push(quote_col(gb_col));
121                }
122                for sel in &join.selects {
123                    match sel {
124                        SelectExpr::Column { column, alias } => {
125                            let col = if column.contains('(') { column.clone() } else { format!("`{column}`") };
126                            if let Some(a) = alias {
127                                sub_parts.push(format!("{col} AS `{a}`"));
128                            } else if column.contains('(') || !join.group_by.contains(column) {
129                                sub_parts.push(col);
130                            }
131                        }
132                        SelectExpr::Aggregate { function, column, alias, condition } => {
133                            let func = function.to_lowercase();
134                            let qcol = quote_col(column);
135                            let expr = match (func.as_str(), column.as_str(), condition) {
136                                ("count", "*", None) => format!("count() AS `{alias}`"),
137                                ("count", "*", Some(cond)) => format!("countIf({cond}) AS `{alias}`"),
138                                ("count", _, None) => format!("uniqExact({qcol}) AS `{alias}`"),
139                                ("count", _, Some(cond)) => format!("uniqExactIf({qcol}, {cond}) AS `{alias}`"),
140                                ("uniq", _, None) => format!("uniq({qcol}) AS `{alias}`"),
141                                ("uniq", _, Some(cond)) => format!("uniqIf({qcol}, {cond}) AS `{alias}`"),
142                                (f, _, None) => format!("{f}({qcol}) AS `{alias}`"),
143                                (f, _, Some(cond)) => format!("{f}If({qcol}, {cond}) AS `{alias}`"),
144                            };
145                            sub_parts.push(expr);
146                        }
147                        SelectExpr::DimAggregate { agg_type, value_column, compare_column, alias, condition } => {
148                            let func = match agg_type {
149                                DimAggType::ArgMax => "argMax",
150                                DimAggType::ArgMin => "argMin",
151                            };
152                            let expr = match condition {
153                                None => format!("{func}(`{value_column}`, `{compare_column}`) AS `{alias}`"),
154                                Some(cond) => format!("{func}If(`{value_column}`, `{compare_column}`, {cond}) AS `{alias}`"),
155                            };
156                            sub_parts.push(expr);
157                        }
158                    }
159                }
160                sql.push_str(&sub_parts.join(", "));
161                sql.push_str(&format!(" FROM `{}`.`{}`", join.schema, join.table));
162                if !join.group_by.is_empty() {
163                    sql.push_str(" GROUP BY ");
164                    let gb: Vec<String> = join.group_by.iter().map(|c| quote_col(c)).collect();
165                    sql.push_str(&gb.join(", "));
166                }
167                sql.push_str(&format!(") AS {}", join.alias));
168            } else {
169                // Mode A: direct JOIN for MergeTree / ReplacingMergeTree targets
170                sql.push_str(&format!(" {} `{}`.`{}` AS {}",
171                    join_kw, join.schema, join.table, join.alias));
172                if join.use_final {
173                    sql.push_str(" FINAL");
174                }
175            }
176
177            if join.join_type == JoinType::Cross {
178                // CROSS JOIN has no ON clause
179                continue;
180            }
181
182            // ON conditions — use alias if the local column was a func expression
183            let on_parts: Vec<String> = join.conditions.iter().map(|(local, remote)| {
184                let local_ref = main_alias_map.get(local).unwrap_or(local);
185                format!("_main.`{}` = {}.`{}`", local_ref, join.alias, remote)
186            }).collect();
187            sql.push_str(" ON ");
188            sql.push_str(&on_parts.join(" AND "));
189        }
190
191        CompileResult { sql, bindings, alias_remap }
192    }
193
194    fn quote_identifier(&self, name: &str) -> String {
195        format!("`{name}`")
196    }
197
198    fn name(&self) -> &str {
199        "ClickHouse"
200    }
201}
202
203impl ClickHouseDialect {
204    fn compile_inner(
205        &self,
206        ir: &QueryIR,
207        bindings: &mut Vec<SqlValue>,
208        alias_remap: &mut Vec<(String, String)>,
209    ) -> String {
210        let mut sql = String::new();
211
212        let mut augmented_selects = ir.selects.clone();
213        let mut agg_alias_map: HashMap<String, String> = HashMap::new();
214        let mut alias_counter = 0u32;
215
216        let having_cols: std::collections::HashSet<String> =
217            collect_filter_columns(&ir.having).into_iter().collect();
218        let has_having_agg = having_cols.iter().any(|c| c.contains('('));
219
220        if has_having_agg {
221            for sel in &mut augmented_selects {
222                if let SelectExpr::Column { column, alias } = sel {
223                    if column.contains('(') && having_cols.contains(column.as_str()) {
224                        if alias.is_none() {
225                            let a = format!("__f_{alias_counter}");
226                            alias_counter += 1;
227                            alias_remap.push((a.clone(), column.clone()));
228                            agg_alias_map.insert(column.clone(), a.clone());
229                            *alias = Some(a);
230                        } else if let Some(existing) = alias {
231                            agg_alias_map.insert(column.clone(), existing.clone());
232                        }
233                    }
234                }
235            }
236            for col in &having_cols {
237                if col.contains('(') && !agg_alias_map.contains_key(col.as_str()) {
238                    let a = format!("__f_{alias_counter}");
239                    alias_counter += 1;
240                    agg_alias_map.insert(col.clone(), a.clone());
241                    augmented_selects.push(SelectExpr::Column {
242                        column: col.clone(),
243                        alias: Some(a),
244                    });
245                }
246            }
247        }
248
249        for sel in &augmented_selects {
250            if let SelectExpr::DimAggregate { agg_type, value_column, compare_column, alias, .. } = sel {
251                let func = match agg_type {
252                    DimAggType::ArgMax => "argMax",
253                    DimAggType::ArgMin => "argMin",
254                };
255                let expr = format!("{func}(`{value_column}`, `{compare_column}`)");
256                agg_alias_map.insert(expr, alias.clone());
257            }
258        }
259
260        sql.push_str("SELECT ");
261        let select_parts: Vec<String> = augmented_selects.iter().map(|s| match s {
262            SelectExpr::Column { column, alias } => {
263                let col = if column.contains('(') { column.clone() } else { format!("`{column}`") };
264                match alias {
265                    Some(a) => format!("{col} AS `{a}`"),
266                    None => col,
267                }
268            },
269            SelectExpr::Aggregate { function, column, alias, condition } => {
270                let func = function.to_uppercase();
271                let qcol = quote_col(column);
272                match (func.as_str(), column.as_str(), condition) {
273                    ("COUNT", "*", None) => format!("count() AS `{alias}`"),
274                    ("COUNT", "*", Some(cond)) => format!("countIf({cond}) AS `{alias}`"),
275                    ("COUNT", _, None) => format!("uniqExact({qcol}) AS `{alias}`"),
276                    ("COUNT", _, Some(cond)) => format!("uniqExactIf({qcol}, {cond}) AS `{alias}`"),
277                    ("UNIQ", _, None) => format!("uniq({qcol}) AS `{alias}`"),
278                    ("UNIQ", _, Some(cond)) => format!("uniqIf({qcol}, {cond}) AS `{alias}`"),
279                    (_, _, None) => format!("{f}({qcol}) AS `{alias}`", f = func.to_lowercase()),
280                    (_, _, Some(cond)) => format!("{f}If({qcol}, {cond}) AS `{alias}`", f = func.to_lowercase()),
281                }
282            }
283            SelectExpr::DimAggregate { agg_type, value_column, compare_column, alias, condition } => {
284                let func = match agg_type {
285                    DimAggType::ArgMax => "argMax",
286                    DimAggType::ArgMin => "argMin",
287                };
288                match condition {
289                    None => format!("{func}(`{value_column}`, `{compare_column}`) AS `{alias}`"),
290                    Some(cond) => format!("{func}If(`{value_column}`, `{compare_column}`, {cond}) AS `{alias}`"),
291                }
292            }
293        }).collect();
294        sql.push_str(&select_parts.join(", "));
295
296        if let Some(ref subquery) = ir.from_subquery {
297            sql.push_str(&format!(" FROM ({}) AS _t", subquery));
298        } else {
299            sql.push_str(&format!(" FROM `{}`.`{}`", ir.schema, ir.table));
300            if ir.use_final {
301                sql.push_str(" FINAL");
302            }
303        }
304
305        let where_clause = compile_filter(&ir.filters, bindings);
306        if !where_clause.is_empty() {
307            sql.push_str(" WHERE ");
308            sql.push_str(&where_clause);
309        }
310
311        let effective_group_by = if !ir.group_by.is_empty() {
312            ir.group_by.clone()
313        } else {
314            let has_agg_cols = augmented_selects.iter().any(|s| match s {
315                SelectExpr::Column { column, .. } => column.contains("Merge("),
316                SelectExpr::Aggregate { .. } | SelectExpr::DimAggregate { .. } => true,
317            });
318            if has_agg_cols {
319                augmented_selects.iter().filter_map(|s| match s {
320                    SelectExpr::Column { column, .. } if !column.contains("Merge(") && !column.contains('(') => {
321                        Some(column.clone())
322                    }
323                    _ => None,
324                }).collect()
325            } else {
326                vec![]
327            }
328        };
329
330        if !effective_group_by.is_empty() {
331            sql.push_str(" GROUP BY ");
332            let cols: Vec<String> = effective_group_by.iter().map(|c| quote_col(c)).collect();
333            sql.push_str(&cols.join(", "));
334        }
335
336        if has_having_agg {
337            let having_clause = compile_filter_with_aliases(&ir.having, bindings, &agg_alias_map);
338            if !having_clause.is_empty() {
339                sql.push_str(" HAVING ");
340                sql.push_str(&having_clause);
341            }
342        } else {
343            let having_clause = compile_filter(&ir.having, bindings);
344            if !having_clause.is_empty() {
345                sql.push_str(" HAVING ");
346                sql.push_str(&having_clause);
347            }
348        }
349
350        if !ir.order_by.is_empty() {
351            sql.push_str(" ORDER BY ");
352            let parts: Vec<String> = ir.order_by.iter().map(|o| {
353                let col = if o.column.contains('(') {
354                    agg_alias_map.get(&o.column)
355                        .map(|a| format!("`{a}`"))
356                        .unwrap_or_else(|| o.column.clone())
357                } else {
358                    format!("`{}`", o.column)
359                };
360                let dir = if o.descending { "DESC" } else { "ASC" };
361                format!("{col} {dir}")
362            }).collect();
363            sql.push_str(&parts.join(", "));
364        }
365
366        if let Some(ref lb) = ir.limit_by {
367            let by_cols: Vec<String> = lb.columns.iter().map(|c| format!("`{c}`")).collect();
368            sql.push_str(&format!(" LIMIT {} BY {}", lb.count, by_cols.join(", ")));
369            if lb.offset > 0 {
370                sql.push_str(&format!(" OFFSET {}", lb.offset));
371            }
372        }
373
374        sql.push_str(&format!(" LIMIT {}", ir.limit));
375        if ir.offset > 0 {
376            sql.push_str(&format!(" OFFSET {}", ir.offset));
377        }
378
379        sql
380    }
381}
382
383/// Collect all column names referenced in a filter tree.
384fn collect_filter_columns(node: &FilterNode) -> Vec<String> {
385    match node {
386        FilterNode::Empty => vec![],
387        FilterNode::Condition { column, .. } => vec![column.clone()],
388        FilterNode::And(children) | FilterNode::Or(children) => {
389            children.iter().flat_map(collect_filter_columns).collect()
390        }
391        FilterNode::ArrayIncludes { array_columns, .. } => array_columns.clone(),
392    }
393}
394
395/// Like `compile_filter` but replaces aggregate expression columns with their
396/// SELECT aliases so ClickHouse can resolve them in HAVING scope.
397fn compile_filter_with_aliases(
398    node: &FilterNode,
399    bindings: &mut Vec<SqlValue>,
400    aliases: &HashMap<String, String>,
401) -> String {
402    match node {
403        FilterNode::Empty => String::new(),
404        FilterNode::Condition { column, op, value } => {
405            let effective_col = aliases.get(column)
406                .map(|a| a.as_str())
407                .unwrap_or(column.as_str());
408            compile_condition(effective_col, op, value, bindings)
409        }
410        FilterNode::And(children) => {
411            let parts: Vec<String> = children.iter()
412                .map(|c| compile_filter_with_aliases(c, bindings, aliases))
413                .filter(|s| !s.is_empty())
414                .collect();
415            match parts.len() {
416                0 => String::new(),
417                1 => parts.into_iter().next().unwrap(),
418                _ => format!("({})", parts.join(" AND ")),
419            }
420        }
421        FilterNode::Or(children) => {
422            let parts: Vec<String> = children.iter()
423                .map(|c| compile_filter_with_aliases(c, bindings, aliases))
424                .filter(|s| !s.is_empty())
425                .collect();
426            match parts.len() {
427                0 => String::new(),
428                1 => parts.into_iter().next().unwrap(),
429                _ => format!("({})", parts.join(" OR ")),
430            }
431        }
432        FilterNode::ArrayIncludes { array_columns, element_conditions } => {
433            compile_array_includes(array_columns, element_conditions, bindings)
434        }
435    }
436}
437
438fn compile_filter(node: &FilterNode, bindings: &mut Vec<SqlValue>) -> String {
439    match node {
440        FilterNode::Empty => String::new(),
441        FilterNode::Condition { column, op, value } => {
442            compile_condition(column, op, value, bindings)
443        }
444        FilterNode::And(children) => {
445            let parts: Vec<String> = children.iter()
446                .map(|c| compile_filter(c, bindings))
447                .filter(|s| !s.is_empty())
448                .collect();
449            match parts.len() {
450                0 => String::new(),
451                1 => parts.into_iter().next().unwrap(),
452                _ => format!("({})", parts.join(" AND ")),
453            }
454        }
455        FilterNode::Or(children) => {
456            let parts: Vec<String> = children.iter()
457                .map(|c| compile_filter(c, bindings))
458                .filter(|s| !s.is_empty())
459                .collect();
460            match parts.len() {
461                0 => String::new(),
462                1 => parts.into_iter().next().unwrap(),
463                _ => format!("({})", parts.join(" OR ")),
464            }
465        }
466        FilterNode::ArrayIncludes { array_columns, element_conditions } => {
467            compile_array_includes(array_columns, element_conditions, bindings)
468        }
469    }
470}
471
472/// Compile ArrayIncludes into one or more `arrayExists(lambda, arrays)` expressions.
473fn compile_array_includes(
474    array_columns: &[String],
475    element_conditions: &[Vec<FilterNode>],
476    bindings: &mut Vec<SqlValue>,
477) -> String {
478    let params: Vec<String> = (0..array_columns.len())
479        .map(|i| format!("_p{i}"))
480        .collect();
481    let arrays_sql: Vec<String> = array_columns.iter()
482        .map(|c| quote_col(c))
483        .collect();
484    let arrays_ref = arrays_sql.join(", ");
485    let params_ref = params.join(", ");
486
487    let col_to_param: std::collections::HashMap<&str, &str> = array_columns.iter()
488        .zip(params.iter())
489        .map(|(c, p)| (c.as_str(), p.as_str()))
490        .collect();
491
492    let exists_parts: Vec<String> = element_conditions.iter().map(|conds| {
493        let cond_parts: Vec<String> = conds.iter()
494            .map(|c| compile_filter_with_param_remap(c, bindings, &col_to_param))
495            .filter(|s| !s.is_empty())
496            .collect();
497        let cond_sql = match cond_parts.len() {
498            0 => "1".to_string(),
499            1 => cond_parts.into_iter().next().unwrap(),
500            _ => format!("({})", cond_parts.join(" AND ")),
501        };
502        format!("arrayExists(({params_ref}) -> {cond_sql}, {arrays_ref})")
503    }).collect();
504
505    match exists_parts.len() {
506        0 => String::new(),
507        1 => exists_parts.into_iter().next().unwrap(),
508        _ => format!("({})", exists_parts.join(" AND ")),
509    }
510}
511
512/// Compile a filter node but remap column names to lambda parameter names.
513/// Lambda parameters must NOT be backtick-quoted in ClickHouse.
514fn compile_filter_with_param_remap(
515    node: &FilterNode,
516    bindings: &mut Vec<SqlValue>,
517    col_to_param: &std::collections::HashMap<&str, &str>,
518) -> String {
519    match node {
520        FilterNode::Empty => String::new(),
521        FilterNode::Condition { column, op, value } => {
522            if let Some(&param) = col_to_param.get(column.as_str()) {
523                compile_condition_raw(param, op, value, bindings)
524            } else {
525                compile_condition(column, op, value, bindings)
526            }
527        }
528        FilterNode::And(children) => {
529            let parts: Vec<String> = children.iter()
530                .map(|c| compile_filter_with_param_remap(c, bindings, col_to_param))
531                .filter(|s| !s.is_empty())
532                .collect();
533            match parts.len() {
534                0 => String::new(),
535                1 => parts.into_iter().next().unwrap(),
536                _ => format!("({})", parts.join(" AND ")),
537            }
538        }
539        FilterNode::Or(children) => {
540            let parts: Vec<String> = children.iter()
541                .map(|c| compile_filter_with_param_remap(c, bindings, col_to_param))
542                .filter(|s| !s.is_empty())
543                .collect();
544            match parts.len() {
545                0 => String::new(),
546                1 => parts.into_iter().next().unwrap(),
547                _ => format!("({})", parts.join(" OR ")),
548            }
549        }
550        FilterNode::ArrayIncludes { array_columns, element_conditions } => {
551            compile_array_includes(array_columns, element_conditions, bindings)
552        }
553    }
554}
555
556fn quote_col(column: &str) -> String {
557    if column.contains('(') {
558        column.to_string()
559    } else {
560        format!("`{column}`")
561    }
562}
563
564/// Like compile_condition but uses the column name as-is (no backtick quoting).
565/// Used for lambda parameters in arrayExists.
566fn compile_condition_raw(
567    col: &str, op: &CompareOp, value: &SqlValue, bindings: &mut Vec<SqlValue>,
568) -> String {
569    compile_condition_inner(col, op, value, bindings)
570}
571
572fn compile_condition(
573    column: &str, op: &CompareOp, value: &SqlValue, bindings: &mut Vec<SqlValue>,
574) -> String {
575    let col = quote_col(column);
576    compile_condition_inner(&col, op, value, bindings)
577}
578
579fn compile_condition_inner(
580    col: &str, op: &CompareOp, value: &SqlValue, bindings: &mut Vec<SqlValue>,
581) -> String {
582    match op {
583        CompareOp::In | CompareOp::NotIn => {
584            if let SqlValue::String(csv) = value {
585                let items: Vec<&str> = csv.split(',').collect();
586                let placeholders: Vec<&str> = items.iter().map(|_| "?").collect();
587                for item in &items {
588                    bindings.push(SqlValue::String(item.trim().to_string()));
589                }
590                format!("{col} {} ({})", op.sql_op(), placeholders.join(", "))
591            } else {
592                bindings.push(value.clone());
593                format!("{col} {} (?)", op.sql_op())
594            }
595        }
596        CompareOp::Includes => {
597            if let SqlValue::String(s) = value {
598                bindings.push(SqlValue::String(format!("%{s}%")));
599            } else {
600                bindings.push(value.clone());
601            }
602            format!("{col} LIKE ?")
603        }
604        CompareOp::IsNull | CompareOp::IsNotNull => {
605            format!("{col} {}", op.sql_op())
606        }
607        _ => {
608            if let SqlValue::Expression(expr) = value {
609                format!("{col} {} {expr}", op.sql_op())
610            } else {
611                bindings.push(value.clone());
612                format!("{col} {} ?", op.sql_op())
613            }
614        }
615    }
616}
617
618#[cfg(test)]
619mod tests {
620    use super::*;
621
622    fn ch() -> ClickHouseDialect { ClickHouseDialect::new() }
623
624    #[test]
625    fn test_simple_select() {
626        let ir = QueryIR {
627            cube: "DEXTrades".into(), schema: "default".into(),
628            table: "dwd_dex_trades".into(),
629            selects: vec![
630                SelectExpr::Column { column: "tx_hash".into(), alias: None },
631                SelectExpr::Column { column: "token_a_amount".into(), alias: None },
632            ],
633            filters: FilterNode::Empty, having: FilterNode::Empty,
634            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
635            limit_by: None,
636            use_final: false,
637            joins: vec![],
638            custom_query_builder: None,
639            from_subquery: None,
640        };
641        let r = ch().compile(&ir);
642        assert_eq!(r.sql, "SELECT `tx_hash`, `token_a_amount` FROM `default`.`dwd_dex_trades` LIMIT 10");
643        assert!(r.bindings.is_empty());
644    }
645
646    #[test]
647    fn test_final_keyword() {
648        let ir = QueryIR {
649            cube: "T".into(), schema: "db".into(), table: "tokens".into(),
650            selects: vec![SelectExpr::Column { column: "id".into(), alias: None }],
651            filters: FilterNode::Empty, having: FilterNode::Empty,
652            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
653            limit_by: None,
654            use_final: true,
655            joins: vec![],
656            custom_query_builder: None,
657            from_subquery: None,
658        };
659        let r = ch().compile(&ir);
660        assert!(r.sql.contains("FROM `db`.`tokens` FINAL"), "FINAL should be appended, got: {}", r.sql);
661    }
662
663    #[test]
664    fn test_uniq_uses_native_function() {
665        let ir = QueryIR {
666            cube: "T".into(), schema: "db".into(), table: "t".into(),
667            selects: vec![
668                SelectExpr::Aggregate { function: "UNIQ".into(), column: "wallet".into(), alias: "__uniq".into(), condition: None },
669            ],
670            filters: FilterNode::Empty, having: FilterNode::Empty,
671            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
672            limit_by: None, use_final: false, joins: vec![], custom_query_builder: None, from_subquery: None,
673        };
674        let r = ch().compile(&ir);
675        assert!(r.sql.contains("uniq(`wallet`) AS `__uniq`"), "ClickHouse should use native uniq(), got: {}", r.sql);
676    }
677
678    #[test]
679    fn test_count_star() {
680        let ir = QueryIR {
681            cube: "T".into(), schema: "db".into(), table: "t".into(),
682            selects: vec![
683                SelectExpr::Aggregate { function: "COUNT".into(), column: "*".into(), alias: "__count".into(), condition: None },
684            ],
685            filters: FilterNode::Empty, having: FilterNode::Empty,
686            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
687            limit_by: None, use_final: false, joins: vec![], custom_query_builder: None, from_subquery: None,
688        };
689        let r = ch().compile(&ir);
690        assert!(r.sql.contains("count() AS `__count`"), "ClickHouse should use count() not COUNT(*), got: {}", r.sql);
691    }
692
693    #[test]
694    fn test_aggregate_lowercase() {
695        let ir = QueryIR {
696            cube: "T".into(), schema: "db".into(), table: "t".into(),
697            selects: vec![
698                SelectExpr::Aggregate { function: "SUM".into(), column: "amount".into(), alias: "__sum".into(), condition: None },
699                SelectExpr::Aggregate { function: "AVG".into(), column: "price".into(), alias: "__avg".into(), condition: None },
700            ],
701            filters: FilterNode::Empty, having: FilterNode::Empty,
702            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
703            limit_by: None, use_final: false, joins: vec![], custom_query_builder: None, from_subquery: None,
704        };
705        let r = ch().compile(&ir);
706        assert!(r.sql.contains("sum(`amount`) AS `__sum`"), "ClickHouse functions should be lowercase, got: {}", r.sql);
707        assert!(r.sql.contains("avg(`price`) AS `__avg`"), "got: {}", r.sql);
708    }
709
710    #[test]
711    fn test_where_and_order() {
712        let ir = QueryIR {
713            cube: "T".into(), schema: "db".into(), table: "t".into(),
714            selects: vec![SelectExpr::Column { column: "id".into(), alias: None }],
715            filters: FilterNode::And(vec![
716                FilterNode::Condition { column: "chain_id".into(), op: CompareOp::Eq, value: SqlValue::Int(1) },
717                FilterNode::Condition { column: "amount_usd".into(), op: CompareOp::Gt, value: SqlValue::Float(1000.0) },
718            ]),
719            having: FilterNode::Empty, group_by: vec![],
720            order_by: vec![OrderExpr { column: "block_timestamp".into(), descending: true }],
721            limit: 25, offset: 0,
722            limit_by: None, use_final: false, joins: vec![], custom_query_builder: None, from_subquery: None,
723        };
724        let r = ch().compile(&ir);
725        assert!(r.sql.contains("WHERE (`chain_id` = ? AND `amount_usd` > ?)"));
726        assert!(r.sql.contains("ORDER BY `block_timestamp` DESC"));
727        assert_eq!(r.bindings.len(), 2);
728    }
729
730    #[test]
731    fn test_having_with_aggregate_expr() {
732        let ir = QueryIR {
733            cube: "T".into(), schema: "db".into(), table: "t".into(),
734            selects: vec![
735                SelectExpr::Column { column: "token_address".into(), alias: None },
736                SelectExpr::Aggregate { function: "SUM".into(), column: "amount_usd".into(), alias: "__sum".into(), condition: None },
737            ],
738            filters: FilterNode::Empty,
739            having: FilterNode::Condition {
740                column: "sum(`amount_usd`)".into(), op: CompareOp::Gt, value: SqlValue::Float(1000000.0),
741            },
742            group_by: vec!["token_address".into()], order_by: vec![], limit: 25, offset: 0,
743            limit_by: None, use_final: false, joins: vec![], custom_query_builder: None, from_subquery: None,
744        };
745        let r = ch().compile(&ir);
746        assert!(r.sql.contains("GROUP BY `token_address`"));
747        assert!(r.sql.contains("HAVING `__f_0` > ?"), "expected alias in HAVING, got: {}", r.sql);
748        assert!(r.sql.contains("sum(`amount_usd`) AS `__f_0`"), "expected alias in SELECT, got: {}", r.sql);
749        assert_eq!(r.bindings.len(), 1);
750    }
751
752    #[test]
753    fn test_having_appends_missing_agg_column() {
754        let ir = QueryIR {
755            cube: "T".into(), schema: "db".into(), table: "t".into(),
756            selects: vec![
757                SelectExpr::Column { column: "pool_address".into(), alias: None },
758                SelectExpr::Column { column: "argMaxMerge(latest_liquidity_usd_state)".into(), alias: None },
759            ],
760            filters: FilterNode::Empty,
761            having: FilterNode::And(vec![
762                FilterNode::Condition {
763                    column: "argMaxMerge(latest_liquidity_usd_state)".into(),
764                    op: CompareOp::Gt, value: SqlValue::Float(2.0),
765                },
766                FilterNode::Condition {
767                    column: "argMaxMerge(latest_token_a_amount_state)".into(),
768                    op: CompareOp::Gt, value: SqlValue::Float(3.0),
769                },
770            ]),
771            group_by: vec!["pool_address".into()], order_by: vec![], limit: 25, offset: 0,
772            limit_by: None, use_final: false, joins: vec![], custom_query_builder: None, from_subquery: None,
773        };
774        let r = ch().compile(&ir);
775        assert!(r.sql.contains("argMaxMerge(latest_liquidity_usd_state) AS `__f_0`"),
776            "existing HAVING col should be aliased, got: {}", r.sql);
777        assert!(r.sql.contains("argMaxMerge(latest_token_a_amount_state) AS `__f_1`"),
778            "missing agg col should be appended, got: {}", r.sql);
779        assert!(r.sql.contains("HAVING (`__f_0` > ? AND `__f_1` > ?)"),
780            "HAVING should use aliases, got: {}", r.sql);
781        assert_eq!(r.bindings.len(), 2);
782        assert_eq!(r.alias_remap.len(), 1);
783        assert_eq!(r.alias_remap[0], ("__f_0".to_string(), "argMaxMerge(latest_liquidity_usd_state)".to_string()));
784    }
785
786    #[test]
787    fn test_limit_by() {
788        let ir = QueryIR {
789            cube: "T".into(), schema: "db".into(), table: "t".into(),
790            selects: vec![
791                SelectExpr::Column { column: "owner".into(), alias: None },
792                SelectExpr::Column { column: "amount".into(), alias: None },
793            ],
794            filters: FilterNode::Empty, having: FilterNode::Empty,
795            group_by: vec![], 
796            order_by: vec![OrderExpr { column: "amount".into(), descending: true }],
797            limit: 100, offset: 0,
798            limit_by: Some(LimitByExpr { count: 3, offset: 0, columns: vec!["owner".into()] }),
799            use_final: false, joins: vec![], custom_query_builder: None, from_subquery: None,
800        };
801        let r = ch().compile(&ir);
802        let sql = &r.sql;
803        assert!(sql.contains("LIMIT 3 BY `owner`"), "LIMIT BY should be present, got: {sql}");
804        assert!(sql.contains("ORDER BY `amount` DESC"), "ORDER BY should be present, got: {sql}");
805        assert!(sql.contains("LIMIT 100"), "outer LIMIT should be present, got: {sql}");
806        let order_by_pos = sql.find("ORDER BY").unwrap();
807        let limit_by_pos = sql.find("LIMIT 3 BY").unwrap();
808        let limit_pos = sql.rfind("LIMIT 100").unwrap();
809        assert!(order_by_pos < limit_by_pos, "ORDER BY should come before LIMIT BY in ClickHouse");
810        assert!(limit_by_pos < limit_pos, "LIMIT BY should come before outer LIMIT");
811    }
812
813    #[test]
814    fn test_limit_by_with_offset() {
815        let ir = QueryIR {
816            cube: "T".into(), schema: "db".into(), table: "t".into(),
817            selects: vec![SelectExpr::Column { column: "id".into(), alias: None }],
818            filters: FilterNode::Empty, having: FilterNode::Empty,
819            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
820            limit_by: Some(LimitByExpr { count: 5, offset: 2, columns: vec!["token".into(), "wallet".into()] }),
821            use_final: false, joins: vec![], custom_query_builder: None, from_subquery: None,
822        };
823        let r = ch().compile(&ir);
824        assert!(r.sql.contains("LIMIT 5 BY `token`, `wallet` OFFSET 2"), "multi-column LIMIT BY with offset, got: {}", r.sql);
825    }
826
827    #[test]
828    fn test_join_direct() {
829        let ir = QueryIR {
830            cube: "DEXTrades".into(), schema: "dexes_dwd".into(),
831            table: "sol_dex_trades".into(),
832            selects: vec![
833                SelectExpr::Column { column: "tx_hash".into(), alias: None },
834                SelectExpr::Column { column: "buy_token_address".into(), alias: None },
835            ],
836            filters: FilterNode::Empty, having: FilterNode::Empty,
837            group_by: vec![], order_by: vec![], limit: 25, offset: 0,
838            limit_by: None, use_final: false,
839            joins: vec![JoinExpr {
840                schema: "dexes_dim".into(), table: "sol_tokens".into(),
841                alias: "_j0".into(),
842                conditions: vec![("buy_token_address".into(), "token_address".into())],
843                selects: vec![
844                    SelectExpr::Column { column: "name".into(), alias: None },
845                    SelectExpr::Column { column: "symbol".into(), alias: None },
846                ],
847                group_by: vec![], use_final: true, is_aggregate: false,
848                target_cube: "TokenSearch".into(), join_field: "joinBuyToken".into(),
849                join_type: JoinType::Left,
850            }],
851            custom_query_builder: None,
852            from_subquery: None,
853        };
854        let r = ch().compile(&ir);
855        assert!(r.sql.contains("FROM (SELECT"), "main query should be wrapped, got: {}", r.sql);
856        assert!(r.sql.contains("LEFT JOIN `dexes_dim`.`sol_tokens` AS _j0 FINAL"),
857            "direct JOIN with FINAL after alias, got: {}", r.sql);
858        assert!(r.sql.contains("_main.`buy_token_address` = _j0.`token_address`"),
859            "ON condition, got: {}", r.sql);
860        assert!(r.sql.contains("_j0.`name` AS `_j0.name`"), "joined col alias, got: {}", r.sql);
861    }
862
863    #[test]
864    fn test_join_aggregate_subquery() {
865        let ir = QueryIR {
866            cube: "DEXTrades".into(), schema: "dexes_dwd".into(),
867            table: "sol_dex_trades".into(),
868            selects: vec![
869                SelectExpr::Column { column: "tx_hash".into(), alias: None },
870                SelectExpr::Column { column: "buy_token_address".into(), alias: None },
871            ],
872            filters: FilterNode::Empty, having: FilterNode::Empty,
873            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
874            limit_by: None, use_final: false,
875            joins: vec![JoinExpr {
876                schema: "dexes_dws".into(), table: "sol_token_market_cap".into(),
877                alias: "_j0".into(),
878                conditions: vec![("buy_token_address".into(), "token_address".into())],
879                selects: vec![
880                    SelectExpr::Column { column: "argMaxMerge(latest_market_cap_usd_state)".into(), alias: None },
881                ],
882                group_by: vec!["token_address".into()],
883                use_final: false, is_aggregate: true,
884                target_cube: "TokenMarketCap".into(), join_field: "joinBuyTokenMarketCap".into(),
885                join_type: JoinType::Left,
886            }],
887            custom_query_builder: None,
888            from_subquery: None,
889        };
890        let r = ch().compile(&ir);
891        assert!(r.sql.contains("LEFT JOIN (SELECT"), "aggregate should use subquery, got: {}", r.sql);
892        assert!(r.sql.contains("GROUP BY `token_address`"), "subquery GROUP BY, got: {}", r.sql);
893        assert!(r.sql.contains("FROM `dexes_dws`.`sol_token_market_cap`"), "subquery FROM, got: {}", r.sql);
894        assert!(r.sql.contains("argMaxMerge(latest_market_cap_usd_state) AS `__jc_0`"),
895            "join func expr should be aliased in subquery, got: {}", r.sql);
896        assert!(r.sql.contains("_j0.`__jc_0` AS `_j0.__jc_0`"),
897            "outer SELECT should use alias for join func col, got: {}", r.sql);
898    }
899
900    #[test]
901    fn test_join_main_query_func_expression_columns() {
902        let ir = QueryIR {
903            cube: "TokenHolders".into(), schema: "dws".into(),
904            table: "sol_token_holders".into(),
905            selects: vec![
906                SelectExpr::Column { column: "token".into(), alias: None },
907                SelectExpr::Column { column: "holder".into(), alias: None },
908                SelectExpr::Column { column: "argMaxMerge(latest_balance)".into(), alias: None },
909                SelectExpr::Column { column: "argMaxMerge(latest_balance_usd)".into(), alias: None },
910                SelectExpr::Column { column: "minMerge(first_seen)".into(), alias: None },
911                SelectExpr::Column { column: "maxMerge(last_seen)".into(), alias: None },
912            ],
913            filters: FilterNode::Empty, having: FilterNode::Empty,
914            group_by: vec![], order_by: vec![
915                OrderExpr { column: "argMaxMerge(latest_balance_usd)".into(), descending: true },
916            ],
917            limit: 100, offset: 0,
918            limit_by: None, use_final: false,
919            joins: vec![JoinExpr {
920                schema: "dim".into(), table: "sol_tokens".into(),
921                alias: "_j0".into(),
922                conditions: vec![("token".into(), "token_address".into())],
923                selects: vec![
924                    SelectExpr::Column { column: "name".into(), alias: None },
925                    SelectExpr::Column { column: "symbol".into(), alias: None },
926                ],
927                group_by: vec![], use_final: true, is_aggregate: false,
928                target_cube: "TokenSearch".into(), join_field: "joinToken".into(),
929                join_type: JoinType::Left,
930            }],
931            custom_query_builder: None,
932            from_subquery: None,
933        };
934        let r = ch().compile(&ir);
935        let sql = &r.sql;
936
937        assert!(sql.contains("_main.`__mc_0` AS `__mc_0`"),
938            "func expr should use alias __mc_0 in outer SELECT, got: {sql}");
939        assert!(sql.contains("_main.`__mc_1` AS `__mc_1`"),
940            "func expr should use alias __mc_1, got: {sql}");
941        assert!(sql.contains("_main.`token` AS `token`"),
942            "simple col should be backtick-quoted, got: {sql}");
943
944        assert!(!sql.contains("_main.argMaxMerge("),
945            "outer SELECT must NOT have bare _main.argMaxMerge(...), got: {sql}");
946
947        assert!(sql.contains("argMaxMerge(latest_balance) AS `__mc_0`"),
948            "inner query should alias func expr, got: {sql}");
949
950        assert!(r.alias_remap.iter().any(|(a, o)| a == "__mc_0" && o == "argMaxMerge(latest_balance)"),
951            "alias_remap should map __mc_0 → original, got: {:?}", r.alias_remap);
952        assert!(r.alias_remap.iter().any(|(a, o)| a == "__mc_1" && o == "argMaxMerge(latest_balance_usd)"),
953            "alias_remap should map __mc_1, got: {:?}", r.alias_remap);
954    }
955
956    #[test]
957    fn test_join_inner_type() {
958        let ir = QueryIR {
959            cube: "DEXTrades".into(), schema: "dexes_dwd".into(),
960            table: "sol_dex_trades".into(),
961            selects: vec![
962                SelectExpr::Column { column: "tx_hash".into(), alias: None },
963            ],
964            filters: FilterNode::Empty, having: FilterNode::Empty,
965            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
966            limit_by: None, use_final: false,
967            joins: vec![JoinExpr {
968                schema: "dexes_dim".into(), table: "sol_tokens".into(),
969                alias: "_j0".into(),
970                conditions: vec![("buy_token_address".into(), "token_address".into())],
971                selects: vec![
972                    SelectExpr::Column { column: "name".into(), alias: None },
973                ],
974                group_by: vec![], use_final: false, is_aggregate: false,
975                target_cube: "TokenSearch".into(), join_field: "joinBuyToken".into(),
976                join_type: JoinType::Inner,
977            }],
978            custom_query_builder: None,
979            from_subquery: None,
980        };
981        let r = ch().compile(&ir);
982        assert!(r.sql.contains("INNER JOIN `dexes_dim`.`sol_tokens` AS _j0"),
983            "should use INNER JOIN, got: {}", r.sql);
984    }
985
986    #[test]
987    fn test_join_full_outer_type() {
988        let ir = QueryIR {
989            cube: "T".into(), schema: "db".into(), table: "t".into(),
990            selects: vec![
991                SelectExpr::Column { column: "id".into(), alias: None },
992            ],
993            filters: FilterNode::Empty, having: FilterNode::Empty,
994            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
995            limit_by: None, use_final: false,
996            joins: vec![JoinExpr {
997                schema: "db2".into(), table: "t2".into(),
998                alias: "_j0".into(),
999                conditions: vec![("id".into(), "ref_id".into())],
1000                selects: vec![
1001                    SelectExpr::Column { column: "val".into(), alias: None },
1002                ],
1003                group_by: vec![], use_final: false, is_aggregate: false,
1004                target_cube: "Other".into(), join_field: "joinOther".into(),
1005                join_type: JoinType::Full,
1006            }],
1007            custom_query_builder: None,
1008            from_subquery: None,
1009        };
1010        let r = ch().compile(&ir);
1011        assert!(r.sql.contains("FULL OUTER JOIN `db2`.`t2` AS _j0"),
1012            "should use FULL OUTER JOIN, got: {}", r.sql);
1013    }
1014
1015    #[test]
1016    fn test_custom_query_builder() {
1017        let ir = QueryIR {
1018            cube: "Custom".into(), schema: "db".into(), table: "t".into(),
1019            selects: vec![
1020                SelectExpr::Column { column: "id".into(), alias: None },
1021            ],
1022            filters: FilterNode::Empty, having: FilterNode::Empty,
1023            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
1024            limit_by: None, use_final: false, joins: vec![],
1025            custom_query_builder: Some(QueryBuilderFn(std::sync::Arc::new(|_ir| {
1026                CompileResult {
1027                    sql: "SELECT 1 FROM custom_view".into(),
1028                    bindings: vec![],
1029                    alias_remap: vec![],
1030                }
1031            }))),
1032            from_subquery: None,
1033        };
1034        let r = ch().compile(&ir);
1035        assert_eq!(r.sql, "SELECT 1 FROM custom_view",
1036            "custom builder should bypass standard compilation, got: {}", r.sql);
1037    }
1038
1039    #[test]
1040    fn test_from_subquery() {
1041        let ir = QueryIR {
1042            cube: "DEXTradeByTokens".into(), schema: "dwd".into(),
1043            table: "sol_trades".into(),
1044            selects: vec![
1045                SelectExpr::Column { column: "amount".into(), alias: None },
1046                SelectExpr::Column { column: "side_type".into(), alias: None },
1047            ],
1048            filters: FilterNode::Condition {
1049                column: "token".into(), op: CompareOp::Eq,
1050                value: SqlValue::String("SOL".into()),
1051            },
1052            having: FilterNode::Empty,
1053            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
1054            limit_by: None, use_final: false, joins: vec![],
1055            custom_query_builder: None,
1056            from_subquery: Some(
1057                "SELECT amount, 'buy' AS side_type, token FROM dwd.sol_a UNION ALL SELECT amount, 'sell' AS side_type, token FROM dwd.sol_b".into()
1058            ),
1059        };
1060        let r = ch().compile(&ir);
1061        assert!(r.sql.starts_with("SELECT `amount`, `side_type` FROM (SELECT"),
1062            "should use subquery in FROM, got: {}", r.sql);
1063        assert!(r.sql.contains("UNION ALL"),
1064            "subquery should contain UNION ALL, got: {}", r.sql);
1065        assert!(r.sql.contains(") AS _t"),
1066            "subquery should be aliased as _t, got: {}", r.sql);
1067        assert!(r.sql.contains("WHERE `token` = ?"),
1068            "WHERE clause should be applied to subquery result, got: {}", r.sql);
1069        assert!(!r.sql.contains("FROM `dwd`.`sol_trades`"),
1070            "should NOT use schema.table when from_subquery is set, got: {}", r.sql);
1071    }
1072
1073    #[test]
1074    fn test_array_includes_single_condition() {
1075        let ir = QueryIR {
1076            cube: "Instructions".into(), schema: "dexes_dwd2".into(),
1077            table: "sol_instructions".into(),
1078            selects: vec![SelectExpr::Column { column: "tx_hash".into(), alias: None }],
1079            filters: FilterNode::ArrayIncludes {
1080                array_columns: vec![
1081                    "instruction_arg_names".into(),
1082                    "instruction_arg_types".into(),
1083                    "instruction_arg_values".into(),
1084                ],
1085                element_conditions: vec![vec![
1086                    FilterNode::Condition {
1087                        column: "instruction_arg_names".into(),
1088                        op: CompareOp::Eq,
1089                        value: SqlValue::String("amount_in".into()),
1090                    },
1091                ]],
1092            },
1093            having: FilterNode::Empty,
1094            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
1095            limit_by: None, use_final: false, joins: vec![],
1096            custom_query_builder: None, from_subquery: None,
1097        };
1098        let r = ch().compile(&ir);
1099        assert!(r.sql.contains("arrayExists((_p0, _p1, _p2) -> _p0 = ?"),
1100            "should generate arrayExists with lambda params, got: {}", r.sql);
1101        assert!(r.sql.contains("`instruction_arg_names`, `instruction_arg_types`, `instruction_arg_values`"),
1102            "should reference all parallel array columns, got: {}", r.sql);
1103        assert_eq!(r.bindings.len(), 1);
1104    }
1105
1106    #[test]
1107    fn test_array_includes_multiple_conditions() {
1108        let ir = QueryIR {
1109            cube: "Instructions".into(), schema: "dexes_dwd2".into(),
1110            table: "sol_instructions".into(),
1111            selects: vec![SelectExpr::Column { column: "tx_hash".into(), alias: None }],
1112            filters: FilterNode::ArrayIncludes {
1113                array_columns: vec![
1114                    "instruction_arg_names".into(),
1115                    "instruction_arg_values".into(),
1116                ],
1117                element_conditions: vec![
1118                    vec![
1119                        FilterNode::Condition {
1120                            column: "instruction_arg_names".into(),
1121                            op: CompareOp::Eq,
1122                            value: SqlValue::String("amount_in".into()),
1123                        },
1124                        FilterNode::Condition {
1125                            column: "instruction_arg_values".into(),
1126                            op: CompareOp::Gt,
1127                            value: SqlValue::String("10000".into()),
1128                        },
1129                    ],
1130                    vec![
1131                        FilterNode::Condition {
1132                            column: "instruction_arg_names".into(),
1133                            op: CompareOp::Eq,
1134                            value: SqlValue::String("owner".into()),
1135                        },
1136                    ],
1137                ],
1138            },
1139            having: FilterNode::Empty,
1140            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
1141            limit_by: None, use_final: false, joins: vec![],
1142            custom_query_builder: None, from_subquery: None,
1143        };
1144        let r = ch().compile(&ir);
1145        let sql = &r.sql;
1146        let count = sql.matches("arrayExists").count();
1147        assert_eq!(count, 2, "should have two arrayExists calls (AND-ed), got: {sql}");
1148        assert!(sql.contains(" AND arrayExists("),
1149            "two arrayExists should be AND-ed, got: {sql}");
1150        assert_eq!(r.bindings.len(), 3);
1151    }
1152
1153    #[test]
1154    fn test_array_includes_with_in_operator() {
1155        let ir = QueryIR {
1156            cube: "Instructions".into(), schema: "dexes_dwd2".into(),
1157            table: "sol_instructions".into(),
1158            selects: vec![SelectExpr::Column { column: "tx_hash".into(), alias: None }],
1159            filters: FilterNode::ArrayIncludes {
1160                array_columns: vec![
1161                    "instruction_arg_names".into(),
1162                    "instruction_arg_values".into(),
1163                ],
1164                element_conditions: vec![vec![
1165                    FilterNode::Condition {
1166                        column: "instruction_arg_names".into(),
1167                        op: CompareOp::Eq,
1168                        value: SqlValue::String("authorityType".into()),
1169                    },
1170                    FilterNode::Condition {
1171                        column: "instruction_arg_values".into(),
1172                        op: CompareOp::In,
1173                        value: SqlValue::String("0,1".into()),
1174                    },
1175                ]],
1176            },
1177            having: FilterNode::Empty,
1178            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
1179            limit_by: None, use_final: false, joins: vec![],
1180            custom_query_builder: None, from_subquery: None,
1181        };
1182        let r = ch().compile(&ir);
1183        assert!(r.sql.contains("arrayExists((_p0, _p1) -> (_p0 = ? AND _p1 IN (?, ?))"),
1184            "should generate arrayExists with AND-ed conditions, got: {}", r.sql);
1185        assert_eq!(r.bindings.len(), 3);
1186    }
1187
1188    #[test]
1189    fn test_array_includes_combined_with_regular_filter() {
1190        let ir = QueryIR {
1191            cube: "Instructions".into(), schema: "dexes_dwd2".into(),
1192            table: "sol_instructions".into(),
1193            selects: vec![SelectExpr::Column { column: "tx_hash".into(), alias: None }],
1194            filters: FilterNode::And(vec![
1195                FilterNode::Condition {
1196                    column: "instruction_program_address".into(),
1197                    op: CompareOp::Eq,
1198                    value: SqlValue::String("pAMMBay6oceH9fJKBRHGP5D4bD4sWpmSwMn52FMfXEA".into()),
1199                },
1200                FilterNode::ArrayIncludes {
1201                    array_columns: vec!["instruction_arg_names".into(), "instruction_arg_values".into()],
1202                    element_conditions: vec![vec![
1203                        FilterNode::Condition {
1204                            column: "instruction_arg_names".into(),
1205                            op: CompareOp::Eq,
1206                            value: SqlValue::String("amount".into()),
1207                        },
1208                    ]],
1209                },
1210            ]),
1211            having: FilterNode::Empty,
1212            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
1213            limit_by: None, use_final: false, joins: vec![],
1214            custom_query_builder: None, from_subquery: None,
1215        };
1216        let r = ch().compile(&ir);
1217        let sql = &r.sql;
1218        assert!(sql.contains("`instruction_program_address` = ?"),
1219            "should have regular condition, got: {sql}");
1220        assert!(sql.contains("arrayExists("),
1221            "should have arrayExists, got: {sql}");
1222        assert!(sql.contains(" AND "),
1223            "regular + array conditions should be AND-ed, got: {sql}");
1224    }
1225}