Skip to main content

activecube_rs/sql/
clickhouse.rs

1use std::collections::HashMap;
2
3use crate::compiler::ir::*;
4use crate::compiler::ir::{CompileResult, JoinType};
5use crate::sql::dialect::SqlDialect;
6
7pub struct ClickHouseDialect;
8
9impl ClickHouseDialect {
10    pub fn new() -> Self {
11        Self
12    }
13}
14
15impl Default for ClickHouseDialect {
16    fn default() -> Self {
17        Self::new()
18    }
19}
20
21impl SqlDialect for ClickHouseDialect {
22    fn compile(&self, ir: &QueryIR) -> CompileResult {
23        if let Some(ref builder) = ir.custom_query_builder {
24            return (builder.0)(ir);
25        }
26
27        let mut bindings = Vec::new();
28        let mut alias_remap: Vec<(String, String)> = Vec::new();
29
30        if ir.joins.is_empty() {
31            let inner_sql = self.compile_inner(ir, &mut bindings, &mut alias_remap);
32            return CompileResult { sql: inner_sql, bindings, alias_remap };
33        }
34
35        // When JOINs are present, force aliases on function expression columns
36        // (e.g. `argMaxMerge(latest_balance)`) so the outer SELECT can reference
37        // them by simple identifier names. Without aliases, `_main.argMaxMerge(x)`
38        // is parsed as a function call — a ClickHouse syntax error.
39        let mut ir_mod = ir.clone();
40        let mut mc_counter = 0u32;
41        for sel in &mut ir_mod.selects {
42            if let SelectExpr::Column { column, alias } = sel {
43                if column.contains('(') && alias.is_none() {
44                    let a = format!("__mc_{mc_counter}");
45                    mc_counter += 1;
46                    alias_remap.push((a.clone(), column.clone()));
47                    *alias = Some(a);
48                }
49            }
50        }
51
52        let mut jc_counter = 0u32;
53        for join in &mut ir_mod.joins {
54            for sel in &mut join.selects {
55                if let SelectExpr::Column { column, alias } = sel {
56                    if column.contains('(') && alias.is_none() {
57                        let a = format!("__jc_{jc_counter}");
58                        jc_counter += 1;
59                        *alias = Some(a);
60                    }
61                }
62            }
63        }
64
65        let inner_sql = self.compile_inner(&ir_mod, &mut bindings, &mut alias_remap);
66
67        // Build outer SELECT with explicit column listing.
68        // Always backtick-quote to prevent ClickHouse auto-prefixing ambiguous
69        // names when multiple JOINed tables share column names.
70        let main_cols: Vec<String> = ir_mod.selects.iter().map(|s| {
71            let col = match s {
72                SelectExpr::Column { column, alias } => alias.as_ref().unwrap_or(column).clone(),
73                SelectExpr::Aggregate { alias, .. } | SelectExpr::DimAggregate { alias, .. } => alias.clone(),
74            };
75            format!("_main.`{}` AS `{}`", col, col)
76        }).collect();
77
78        let mut sql = String::from("SELECT ");
79        sql.push_str(&main_cols.join(", "));
80
81        // Collect joined column aliases for the outer SELECT
82        for join in &ir_mod.joins {
83            for sel in &join.selects {
84                let col_name = match sel {
85                    SelectExpr::Column { column, alias } => alias.as_ref().unwrap_or(column).clone(),
86                    SelectExpr::Aggregate { alias, .. } | SelectExpr::DimAggregate { alias, .. } => alias.clone(),
87                };
88                let outer_alias = format!("{}.{}", join.alias, col_name);
89                if let SelectExpr::Column { column, alias: Some(_) } = sel {
90                    if column.contains('(') {
91                        let outer_original = format!("{}.{}", join.alias, column);
92                        alias_remap.push((outer_alias.clone(), outer_original));
93                    }
94                }
95                sql.push_str(&format!(", {}.`{}` AS `{}`",
96                    join.alias, col_name, outer_alias));
97            }
98        }
99
100        sql.push_str(&format!(" FROM ({}) AS _main", inner_sql));
101
102        // Build a lookup for main query aliases to resolve ON condition columns
103        // that might be function expressions (aliased above).
104        let main_alias_map: HashMap<String, String> = ir_mod.selects.iter()
105            .filter_map(|s| {
106                if let SelectExpr::Column { column, alias: Some(a) } = s {
107                    if column.contains('(') { Some((column.clone(), a.clone())) } else { None }
108                } else { None }
109            })
110            .collect();
111
112        for join in &ir_mod.joins {
113            let join_kw = join.join_type.sql_keyword();
114
115            if join.is_aggregate {
116                // Mode B: subquery JOIN for AggregatingMergeTree targets
117                sql.push_str(&format!(" {} (SELECT ", join_kw));
118                let mut sub_parts: Vec<String> = Vec::new();
119                for gb_col in &join.group_by {
120                    sub_parts.push(quote_col(gb_col));
121                }
122                for sel in &join.selects {
123                    match sel {
124                        SelectExpr::Column { column, alias } => {
125                            let col = if column.contains('(') { column.clone() } else { format!("`{column}`") };
126                            if let Some(a) = alias {
127                                sub_parts.push(format!("{col} AS `{a}`"));
128                            } else if column.contains('(') || !join.group_by.contains(column) {
129                                sub_parts.push(col);
130                            }
131                        }
132                        SelectExpr::Aggregate { function, column, alias, condition } => {
133                            let func = function.to_lowercase();
134                            let qcol = quote_col(column);
135                            let expr = match (func.as_str(), column.as_str(), condition) {
136                                ("count", "*", None) => format!("count() AS `{alias}`"),
137                                ("count", "*", Some(cond)) => format!("countIf({cond}) AS `{alias}`"),
138                                ("count", _, None) => format!("uniqExact({qcol}) AS `{alias}`"),
139                                ("count", _, Some(cond)) => format!("uniqExactIf({qcol}, {cond}) AS `{alias}`"),
140                                ("uniq", _, None) => format!("uniq({qcol}) AS `{alias}`"),
141                                ("uniq", _, Some(cond)) => format!("uniqIf({qcol}, {cond}) AS `{alias}`"),
142                                (f, _, None) => format!("{f}({qcol}) AS `{alias}`"),
143                                (f, _, Some(cond)) => format!("{f}If({qcol}, {cond}) AS `{alias}`"),
144                            };
145                            sub_parts.push(expr);
146                        }
147                        SelectExpr::DimAggregate { agg_type, value_column, compare_column, alias, condition } => {
148                            let func = match agg_type {
149                                DimAggType::ArgMax => "argMax",
150                                DimAggType::ArgMin => "argMin",
151                            };
152                            let qv = quote_col(value_column);
153                            let qc = quote_col(compare_column);
154                            let expr = match condition {
155                                None => format!("{func}({qv}, {qc}) AS `{alias}`"),
156                                Some(cond) => format!("{func}If({qv}, {qc}, {cond}) AS `{alias}`"),
157                            };
158                            sub_parts.push(expr);
159                        }
160                    }
161                }
162                sql.push_str(&sub_parts.join(", "));
163                sql.push_str(&format!(" FROM `{}`.`{}`", join.schema, join.table));
164                if !join.group_by.is_empty() {
165                    sql.push_str(" GROUP BY ");
166                    let gb: Vec<String> = join.group_by.iter().map(|c| quote_col(c)).collect();
167                    sql.push_str(&gb.join(", "));
168                }
169                sql.push_str(&format!(") AS {}", join.alias));
170            } else {
171                // Mode A: direct JOIN for MergeTree / ReplacingMergeTree targets
172                sql.push_str(&format!(" {} `{}`.`{}` AS {}",
173                    join_kw, join.schema, join.table, join.alias));
174                if join.use_final {
175                    sql.push_str(" FINAL");
176                }
177            }
178
179            if join.join_type == JoinType::Cross {
180                // CROSS JOIN has no ON clause
181                continue;
182            }
183
184            // ON conditions — use alias if the local column was a func expression
185            let on_parts: Vec<String> = join.conditions.iter().map(|(local, remote)| {
186                let local_ref = main_alias_map.get(local).unwrap_or(local);
187                format!("_main.`{}` = {}.`{}`", local_ref, join.alias, remote)
188            }).collect();
189            sql.push_str(" ON ");
190            sql.push_str(&on_parts.join(" AND "));
191        }
192
193        CompileResult { sql, bindings, alias_remap }
194    }
195
196    fn quote_identifier(&self, name: &str) -> String {
197        format!("`{name}`")
198    }
199
200    fn name(&self) -> &str {
201        "ClickHouse"
202    }
203}
204
205impl ClickHouseDialect {
206    fn compile_inner(
207        &self,
208        ir: &QueryIR,
209        bindings: &mut Vec<SqlValue>,
210        alias_remap: &mut Vec<(String, String)>,
211    ) -> String {
212        let mut sql = String::new();
213
214        let mut augmented_selects = ir.selects.clone();
215        let mut agg_alias_map: HashMap<String, String> = HashMap::new();
216        let mut alias_counter = 0u32;
217
218        let having_cols: std::collections::HashSet<String> =
219            collect_filter_columns(&ir.having).into_iter().collect();
220        let has_having_agg = having_cols.iter().any(|c| c.contains('('));
221
222        if has_having_agg {
223            for sel in &mut augmented_selects {
224                if let SelectExpr::Column { column, alias } = sel {
225                    if column.contains('(') && having_cols.contains(column.as_str()) {
226                        if alias.is_none() {
227                            let a = format!("__f_{alias_counter}");
228                            alias_counter += 1;
229                            alias_remap.push((a.clone(), column.clone()));
230                            agg_alias_map.insert(column.clone(), a.clone());
231                            *alias = Some(a);
232                        } else if let Some(existing) = alias {
233                            agg_alias_map.insert(column.clone(), existing.clone());
234                        }
235                    }
236                }
237            }
238            for col in &having_cols {
239                if col.contains('(') && !agg_alias_map.contains_key(col.as_str()) {
240                    let a = format!("__f_{alias_counter}");
241                    alias_counter += 1;
242                    agg_alias_map.insert(col.clone(), a.clone());
243                    augmented_selects.push(SelectExpr::Column {
244                        column: col.clone(),
245                        alias: Some(a),
246                    });
247                }
248            }
249        }
250
251        for sel in &augmented_selects {
252            if let SelectExpr::DimAggregate { agg_type, value_column, compare_column, alias, .. } = sel {
253                let func = match agg_type {
254                    DimAggType::ArgMax => "argMax",
255                    DimAggType::ArgMin => "argMin",
256                };
257                let qv = quote_col(value_column);
258                let qc = quote_col(compare_column);
259                let expr = format!("{func}({qv}, {qc})");
260                agg_alias_map.insert(expr, alias.clone());
261            }
262        }
263
264        sql.push_str("SELECT ");
265        let select_parts: Vec<String> = augmented_selects.iter().map(|s| match s {
266            SelectExpr::Column { column, alias } => {
267                let col = if column.contains('(') { column.clone() } else { format!("`{column}`") };
268                match alias {
269                    Some(a) => format!("{col} AS `{a}`"),
270                    None => col,
271                }
272            },
273            SelectExpr::Aggregate { function, column, alias, condition } => {
274                let func = function.to_uppercase();
275                let qcol = quote_col(column);
276                match (func.as_str(), column.as_str(), condition) {
277                    ("COUNT", "*", None) => format!("count() AS `{alias}`"),
278                    ("COUNT", "*", Some(cond)) => format!("countIf({cond}) AS `{alias}`"),
279                    ("COUNT", _, None) => format!("uniqExact({qcol}) AS `{alias}`"),
280                    ("COUNT", _, Some(cond)) => format!("uniqExactIf({qcol}, {cond}) AS `{alias}`"),
281                    ("UNIQ", _, None) => format!("uniq({qcol}) AS `{alias}`"),
282                    ("UNIQ", _, Some(cond)) => format!("uniqIf({qcol}, {cond}) AS `{alias}`"),
283                    (_, _, None) => format!("{f}({qcol}) AS `{alias}`", f = func.to_lowercase()),
284                    (_, _, Some(cond)) => format!("{f}If({qcol}, {cond}) AS `{alias}`", f = func.to_lowercase()),
285                }
286            }
287            SelectExpr::DimAggregate { agg_type, value_column, compare_column, alias, condition } => {
288                let func = match agg_type {
289                    DimAggType::ArgMax => "argMax",
290                    DimAggType::ArgMin => "argMin",
291                };
292                let qv = quote_col(value_column);
293                let qc = quote_col(compare_column);
294                match condition {
295                    None => format!("{func}({qv}, {qc}) AS `{alias}`"),
296                    Some(cond) => format!("{func}If({qv}, {qc}, {cond}) AS `{alias}`"),
297                }
298            }
299        }).collect();
300        sql.push_str(&select_parts.join(", "));
301
302        if let Some(ref subquery) = ir.from_subquery {
303            sql.push_str(&format!(" FROM ({}) AS _t", subquery));
304        } else {
305            sql.push_str(&format!(" FROM `{}`.`{}`", ir.schema, ir.table));
306            if ir.use_final {
307                sql.push_str(" FINAL");
308            }
309        }
310
311        let where_clause = compile_filter(&ir.filters, bindings);
312        if !where_clause.is_empty() {
313            sql.push_str(" WHERE ");
314            sql.push_str(&where_clause);
315        }
316
317        let effective_group_by = if !ir.group_by.is_empty() {
318            ir.group_by.clone()
319        } else {
320            let has_agg_cols = augmented_selects.iter().any(|s| match s {
321                SelectExpr::Column { column, .. } => column.contains("Merge("),
322                SelectExpr::Aggregate { .. } | SelectExpr::DimAggregate { .. } => true,
323            });
324            if has_agg_cols {
325                augmented_selects.iter().filter_map(|s| match s {
326                    SelectExpr::Column { column, alias } if !contains_aggregate_expr(column) && alias.is_none() => {
327                        Some(column.clone())
328                    }
329                    _ => None,
330                }).collect()
331            } else {
332                vec![]
333            }
334        };
335
336        if !effective_group_by.is_empty() {
337            sql.push_str(" GROUP BY ");
338            let cols: Vec<String> = effective_group_by.iter().map(|c| quote_col(c)).collect();
339            sql.push_str(&cols.join(", "));
340        }
341
342        if has_having_agg {
343            let having_clause = compile_filter_with_aliases(&ir.having, bindings, &agg_alias_map);
344            if !having_clause.is_empty() {
345                sql.push_str(" HAVING ");
346                sql.push_str(&having_clause);
347            }
348        } else {
349            let having_clause = compile_filter(&ir.having, bindings);
350            if !having_clause.is_empty() {
351                sql.push_str(" HAVING ");
352                sql.push_str(&having_clause);
353            }
354        }
355
356        if !ir.order_by.is_empty() {
357            sql.push_str(" ORDER BY ");
358            let parts: Vec<String> = ir.order_by.iter().map(|o| {
359                let col = if o.column.contains('(') {
360                    agg_alias_map.get(&o.column)
361                        .map(|a| format!("`{a}`"))
362                        .unwrap_or_else(|| o.column.clone())
363                } else {
364                    format!("`{}`", o.column)
365                };
366                let dir = if o.descending { "DESC" } else { "ASC" };
367                format!("{col} {dir}")
368            }).collect();
369            sql.push_str(&parts.join(", "));
370        }
371
372        if let Some(ref lb) = ir.limit_by {
373            let by_cols: Vec<String> = lb.columns.iter().map(|c| {
374                if c.contains('(') {
375                    c.clone()
376                } else {
377                    format!("`{c}`")
378                }
379            }).collect();
380            sql.push_str(&format!(" LIMIT {} BY {}", lb.count, by_cols.join(", ")));
381            if lb.offset > 0 {
382                sql.push_str(&format!(" OFFSET {}", lb.offset));
383            }
384        }
385
386        sql.push_str(&format!(" LIMIT {}", ir.limit));
387        if ir.offset > 0 {
388            sql.push_str(&format!(" OFFSET {}", ir.offset));
389        }
390
391        sql
392    }
393}
394
395/// Collect all column names referenced in a filter tree.
396fn collect_filter_columns(node: &FilterNode) -> Vec<String> {
397    match node {
398        FilterNode::Empty => vec![],
399        FilterNode::Condition { column, .. } => vec![column.clone()],
400        FilterNode::And(children) | FilterNode::Or(children) => {
401            children.iter().flat_map(collect_filter_columns).collect()
402        }
403        FilterNode::ArrayIncludes { array_columns, .. } => array_columns.clone(),
404    }
405}
406
407/// Like `compile_filter` but replaces aggregate expression columns with their
408/// SELECT aliases so ClickHouse can resolve them in HAVING scope.
409fn compile_filter_with_aliases(
410    node: &FilterNode,
411    bindings: &mut Vec<SqlValue>,
412    aliases: &HashMap<String, String>,
413) -> String {
414    match node {
415        FilterNode::Empty => String::new(),
416        FilterNode::Condition { column, op, value } => {
417            let effective_col = aliases.get(column)
418                .map(|a| a.as_str())
419                .unwrap_or(column.as_str());
420            compile_condition(effective_col, op, value, bindings)
421        }
422        FilterNode::And(children) => {
423            let parts: Vec<String> = children.iter()
424                .map(|c| compile_filter_with_aliases(c, bindings, aliases))
425                .filter(|s| !s.is_empty())
426                .collect();
427            match parts.len() {
428                0 => String::new(),
429                1 => parts.into_iter().next().unwrap(),
430                _ => format!("({})", parts.join(" AND ")),
431            }
432        }
433        FilterNode::Or(children) => {
434            let parts: Vec<String> = children.iter()
435                .map(|c| compile_filter_with_aliases(c, bindings, aliases))
436                .filter(|s| !s.is_empty())
437                .collect();
438            match parts.len() {
439                0 => String::new(),
440                1 => parts.into_iter().next().unwrap(),
441                _ => format!("({})", parts.join(" OR ")),
442            }
443        }
444        FilterNode::ArrayIncludes { array_columns, element_conditions } => {
445            compile_array_includes(array_columns, element_conditions, bindings)
446        }
447    }
448}
449
450fn compile_filter(node: &FilterNode, bindings: &mut Vec<SqlValue>) -> String {
451    match node {
452        FilterNode::Empty => String::new(),
453        FilterNode::Condition { column, op, value } => {
454            compile_condition(column, op, value, bindings)
455        }
456        FilterNode::And(children) => {
457            let parts: Vec<String> = children.iter()
458                .map(|c| compile_filter(c, bindings))
459                .filter(|s| !s.is_empty())
460                .collect();
461            match parts.len() {
462                0 => String::new(),
463                1 => parts.into_iter().next().unwrap(),
464                _ => format!("({})", parts.join(" AND ")),
465            }
466        }
467        FilterNode::Or(children) => {
468            let parts: Vec<String> = children.iter()
469                .map(|c| compile_filter(c, bindings))
470                .filter(|s| !s.is_empty())
471                .collect();
472            match parts.len() {
473                0 => String::new(),
474                1 => parts.into_iter().next().unwrap(),
475                _ => format!("({})", parts.join(" OR ")),
476            }
477        }
478        FilterNode::ArrayIncludes { array_columns, element_conditions } => {
479            compile_array_includes(array_columns, element_conditions, bindings)
480        }
481    }
482}
483
484/// Compile ArrayIncludes into one or more `arrayExists(lambda, arrays)` expressions.
485fn compile_array_includes(
486    array_columns: &[String],
487    element_conditions: &[Vec<FilterNode>],
488    bindings: &mut Vec<SqlValue>,
489) -> String {
490    let params: Vec<String> = (0..array_columns.len())
491        .map(|i| format!("_p{i}"))
492        .collect();
493    let arrays_sql: Vec<String> = array_columns.iter()
494        .map(|c| quote_col(c))
495        .collect();
496    let arrays_ref = arrays_sql.join(", ");
497    let params_ref = params.join(", ");
498
499    let col_to_param: std::collections::HashMap<&str, &str> = array_columns.iter()
500        .zip(params.iter())
501        .map(|(c, p)| (c.as_str(), p.as_str()))
502        .collect();
503
504    let exists_parts: Vec<String> = element_conditions.iter().map(|conds| {
505        let cond_parts: Vec<String> = conds.iter()
506            .map(|c| compile_filter_with_param_remap(c, bindings, &col_to_param))
507            .filter(|s| !s.is_empty())
508            .collect();
509        let cond_sql = match cond_parts.len() {
510            0 => "1".to_string(),
511            1 => cond_parts.into_iter().next().unwrap(),
512            _ => format!("({})", cond_parts.join(" AND ")),
513        };
514        format!("arrayExists(({params_ref}) -> {cond_sql}, {arrays_ref})")
515    }).collect();
516
517    match exists_parts.len() {
518        0 => String::new(),
519        1 => exists_parts.into_iter().next().unwrap(),
520        _ => format!("({})", exists_parts.join(" AND ")),
521    }
522}
523
524/// Compile a filter node but remap column names to lambda parameter names.
525/// Lambda parameters must NOT be backtick-quoted in ClickHouse.
526fn compile_filter_with_param_remap(
527    node: &FilterNode,
528    bindings: &mut Vec<SqlValue>,
529    col_to_param: &std::collections::HashMap<&str, &str>,
530) -> String {
531    match node {
532        FilterNode::Empty => String::new(),
533        FilterNode::Condition { column, op, value } => {
534            if let Some(&param) = col_to_param.get(column.as_str()) {
535                compile_condition_raw(param, op, value, bindings)
536            } else {
537                compile_condition(column, op, value, bindings)
538            }
539        }
540        FilterNode::And(children) => {
541            let parts: Vec<String> = children.iter()
542                .map(|c| compile_filter_with_param_remap(c, bindings, col_to_param))
543                .filter(|s| !s.is_empty())
544                .collect();
545            match parts.len() {
546                0 => String::new(),
547                1 => parts.into_iter().next().unwrap(),
548                _ => format!("({})", parts.join(" AND ")),
549            }
550        }
551        FilterNode::Or(children) => {
552            let parts: Vec<String> = children.iter()
553                .map(|c| compile_filter_with_param_remap(c, bindings, col_to_param))
554                .filter(|s| !s.is_empty())
555                .collect();
556            match parts.len() {
557                0 => String::new(),
558                1 => parts.into_iter().next().unwrap(),
559                _ => format!("({})", parts.join(" OR ")),
560            }
561        }
562        FilterNode::ArrayIncludes { array_columns, element_conditions } => {
563            compile_array_includes(array_columns, element_conditions, bindings)
564        }
565    }
566}
567
568fn quote_col(column: &str) -> String {
569    if column.contains('(') {
570        column.to_string()
571    } else {
572        format!("`{column}`")
573    }
574}
575
576/// Like compile_condition but uses the column name as-is (no backtick quoting).
577/// Used for lambda parameters in arrayExists.
578fn compile_condition_raw(
579    col: &str, op: &CompareOp, value: &SqlValue, bindings: &mut Vec<SqlValue>,
580) -> String {
581    compile_condition_inner(col, op, value, bindings)
582}
583
584fn compile_condition(
585    column: &str, op: &CompareOp, value: &SqlValue, bindings: &mut Vec<SqlValue>,
586) -> String {
587    let col = quote_col(column);
588    compile_condition_inner(&col, op, value, bindings)
589}
590
591fn compile_condition_inner(
592    col: &str, op: &CompareOp, value: &SqlValue, bindings: &mut Vec<SqlValue>,
593) -> String {
594    match op {
595        CompareOp::In | CompareOp::NotIn => {
596            if let SqlValue::String(csv) = value {
597                let items: Vec<&str> = csv.split(',').collect();
598                let placeholders: Vec<&str> = items.iter().map(|_| "?").collect();
599                for item in &items {
600                    bindings.push(SqlValue::String(item.trim().to_string()));
601                }
602                format!("{col} {} ({})", op.sql_op(), placeholders.join(", "))
603            } else {
604                bindings.push(value.clone());
605                format!("{col} {} (?)", op.sql_op())
606            }
607        }
608        CompareOp::Includes | CompareOp::NotIncludes => {
609            if let SqlValue::String(s) = value {
610                bindings.push(SqlValue::String(format!("%{s}%")));
611            } else {
612                bindings.push(value.clone());
613            }
614            format!("{col} {} ?", op.sql_op())
615        }
616        CompareOp::IlikeIncludes => {
617            if let SqlValue::String(s) = value {
618                bindings.push(SqlValue::String(format!("%{s}%")));
619            } else {
620                bindings.push(value.clone());
621            }
622            format!("{col} ilike ?")
623        }
624        CompareOp::NotIlikeIncludes => {
625            if let SqlValue::String(s) = value {
626                bindings.push(SqlValue::String(format!("%{s}%")));
627            } else {
628                bindings.push(value.clone());
629            }
630            format!("{col} NOT ilike ?")
631        }
632        CompareOp::StartsWith | CompareOp::IlikeStartsWith => {
633            if let SqlValue::String(s) = value {
634                bindings.push(SqlValue::String(format!("{s}%")));
635            } else {
636                bindings.push(value.clone());
637            }
638            format!("{col} {} ?", op.sql_op())
639        }
640        CompareOp::EndsWith => {
641            if let SqlValue::String(s) = value {
642                bindings.push(SqlValue::String(format!("%{s}")));
643            } else {
644                bindings.push(value.clone());
645            }
646            format!("{col} LIKE ?")
647        }
648        CompareOp::IsNull | CompareOp::IsNotNull => {
649            format!("{col} {}", op.sql_op())
650        }
651        _ => {
652            if let SqlValue::Expression(expr) = value {
653                format!("{col} {} {expr}", op.sql_op())
654            } else {
655                bindings.push(value.clone());
656                format!("{col} {} ?", op.sql_op())
657            }
658        }
659    }
660}
661
662#[cfg(test)]
663mod tests {
664    use super::*;
665
666    fn ch() -> ClickHouseDialect { ClickHouseDialect::new() }
667
668    #[test]
669    fn test_simple_select() {
670        let ir = QueryIR {
671            cube: "DEXTrades".into(), schema: "default".into(),
672            table: "dwd_dex_trades".into(),
673            selects: vec![
674                SelectExpr::Column { column: "tx_hash".into(), alias: None },
675                SelectExpr::Column { column: "token_a_amount".into(), alias: None },
676            ],
677            filters: FilterNode::Empty, having: FilterNode::Empty,
678            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
679            limit_by: None,
680            use_final: false,
681            joins: vec![],
682            custom_query_builder: None,
683            from_subquery: None,
684        };
685        let r = ch().compile(&ir);
686        assert_eq!(r.sql, "SELECT `tx_hash`, `token_a_amount` FROM `default`.`dwd_dex_trades` LIMIT 10");
687        assert!(r.bindings.is_empty());
688    }
689
690    #[test]
691    fn test_final_keyword() {
692        let ir = QueryIR {
693            cube: "T".into(), schema: "db".into(), table: "tokens".into(),
694            selects: vec![SelectExpr::Column { column: "id".into(), alias: None }],
695            filters: FilterNode::Empty, having: FilterNode::Empty,
696            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
697            limit_by: None,
698            use_final: true,
699            joins: vec![],
700            custom_query_builder: None,
701            from_subquery: None,
702        };
703        let r = ch().compile(&ir);
704        assert!(r.sql.contains("FROM `db`.`tokens` FINAL"), "FINAL should be appended, got: {}", r.sql);
705    }
706
707    #[test]
708    fn test_uniq_uses_native_function() {
709        let ir = QueryIR {
710            cube: "T".into(), schema: "db".into(), table: "t".into(),
711            selects: vec![
712                SelectExpr::Aggregate { function: "UNIQ".into(), column: "wallet".into(), alias: "__uniq".into(), condition: None },
713            ],
714            filters: FilterNode::Empty, having: FilterNode::Empty,
715            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
716            limit_by: None, use_final: false, joins: vec![], custom_query_builder: None, from_subquery: None,
717        };
718        let r = ch().compile(&ir);
719        assert!(r.sql.contains("uniq(`wallet`) AS `__uniq`"), "ClickHouse should use native uniq(), got: {}", r.sql);
720    }
721
722    #[test]
723    fn test_count_star() {
724        let ir = QueryIR {
725            cube: "T".into(), schema: "db".into(), table: "t".into(),
726            selects: vec![
727                SelectExpr::Aggregate { function: "COUNT".into(), column: "*".into(), alias: "__count".into(), condition: None },
728            ],
729            filters: FilterNode::Empty, having: FilterNode::Empty,
730            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
731            limit_by: None, use_final: false, joins: vec![], custom_query_builder: None, from_subquery: None,
732        };
733        let r = ch().compile(&ir);
734        assert!(r.sql.contains("count() AS `__count`"), "ClickHouse should use count() not COUNT(*), got: {}", r.sql);
735    }
736
737    #[test]
738    fn test_aggregate_lowercase() {
739        let ir = QueryIR {
740            cube: "T".into(), schema: "db".into(), table: "t".into(),
741            selects: vec![
742                SelectExpr::Aggregate { function: "SUM".into(), column: "amount".into(), alias: "__sum".into(), condition: None },
743                SelectExpr::Aggregate { function: "AVG".into(), column: "price".into(), alias: "__avg".into(), condition: None },
744            ],
745            filters: FilterNode::Empty, having: FilterNode::Empty,
746            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
747            limit_by: None, use_final: false, joins: vec![], custom_query_builder: None, from_subquery: None,
748        };
749        let r = ch().compile(&ir);
750        assert!(r.sql.contains("sum(`amount`) AS `__sum`"), "ClickHouse functions should be lowercase, got: {}", r.sql);
751        assert!(r.sql.contains("avg(`price`) AS `__avg`"), "got: {}", r.sql);
752    }
753
754    #[test]
755    fn test_where_and_order() {
756        let ir = QueryIR {
757            cube: "T".into(), schema: "db".into(), table: "t".into(),
758            selects: vec![SelectExpr::Column { column: "id".into(), alias: None }],
759            filters: FilterNode::And(vec![
760                FilterNode::Condition { column: "chain_id".into(), op: CompareOp::Eq, value: SqlValue::Int(1) },
761                FilterNode::Condition { column: "amount_usd".into(), op: CompareOp::Gt, value: SqlValue::Float(1000.0) },
762            ]),
763            having: FilterNode::Empty, group_by: vec![],
764            order_by: vec![OrderExpr { column: "block_timestamp".into(), descending: true }],
765            limit: 25, offset: 0,
766            limit_by: None, use_final: false, joins: vec![], custom_query_builder: None, from_subquery: None,
767        };
768        let r = ch().compile(&ir);
769        assert!(r.sql.contains("WHERE (`chain_id` = ? AND `amount_usd` > ?)"));
770        assert!(r.sql.contains("ORDER BY `block_timestamp` DESC"));
771        assert_eq!(r.bindings.len(), 2);
772    }
773
774    #[test]
775    fn test_having_with_aggregate_expr() {
776        let ir = QueryIR {
777            cube: "T".into(), schema: "db".into(), table: "t".into(),
778            selects: vec![
779                SelectExpr::Column { column: "token_address".into(), alias: None },
780                SelectExpr::Aggregate { function: "SUM".into(), column: "amount_usd".into(), alias: "__sum".into(), condition: None },
781            ],
782            filters: FilterNode::Empty,
783            having: FilterNode::Condition {
784                column: "sum(`amount_usd`)".into(), op: CompareOp::Gt, value: SqlValue::Float(1000000.0),
785            },
786            group_by: vec!["token_address".into()], order_by: vec![], limit: 25, offset: 0,
787            limit_by: None, use_final: false, joins: vec![], custom_query_builder: None, from_subquery: None,
788        };
789        let r = ch().compile(&ir);
790        assert!(r.sql.contains("GROUP BY `token_address`"));
791        assert!(r.sql.contains("HAVING `__f_0` > ?"), "expected alias in HAVING, got: {}", r.sql);
792        assert!(r.sql.contains("sum(`amount_usd`) AS `__f_0`"), "expected alias in SELECT, got: {}", r.sql);
793        assert_eq!(r.bindings.len(), 1);
794    }
795
796    #[test]
797    fn test_having_appends_missing_agg_column() {
798        let ir = QueryIR {
799            cube: "T".into(), schema: "db".into(), table: "t".into(),
800            selects: vec![
801                SelectExpr::Column { column: "pool_address".into(), alias: None },
802                SelectExpr::Column { column: "argMaxMerge(latest_liquidity_usd_state)".into(), alias: None },
803            ],
804            filters: FilterNode::Empty,
805            having: FilterNode::And(vec![
806                FilterNode::Condition {
807                    column: "argMaxMerge(latest_liquidity_usd_state)".into(),
808                    op: CompareOp::Gt, value: SqlValue::Float(2.0),
809                },
810                FilterNode::Condition {
811                    column: "argMaxMerge(latest_token_a_amount_state)".into(),
812                    op: CompareOp::Gt, value: SqlValue::Float(3.0),
813                },
814            ]),
815            group_by: vec!["pool_address".into()], order_by: vec![], limit: 25, offset: 0,
816            limit_by: None, use_final: false, joins: vec![], custom_query_builder: None, from_subquery: None,
817        };
818        let r = ch().compile(&ir);
819        assert!(r.sql.contains("argMaxMerge(latest_liquidity_usd_state) AS `__f_0`"),
820            "existing HAVING col should be aliased, got: {}", r.sql);
821        assert!(r.sql.contains("argMaxMerge(latest_token_a_amount_state) AS `__f_1`"),
822            "missing agg col should be appended, got: {}", r.sql);
823        assert!(r.sql.contains("HAVING (`__f_0` > ? AND `__f_1` > ?)"),
824            "HAVING should use aliases, got: {}", r.sql);
825        assert_eq!(r.bindings.len(), 2);
826        assert_eq!(r.alias_remap.len(), 1);
827        assert_eq!(r.alias_remap[0], ("__f_0".to_string(), "argMaxMerge(latest_liquidity_usd_state)".to_string()));
828    }
829
830    #[test]
831    fn test_limit_by() {
832        let ir = QueryIR {
833            cube: "T".into(), schema: "db".into(), table: "t".into(),
834            selects: vec![
835                SelectExpr::Column { column: "owner".into(), alias: None },
836                SelectExpr::Column { column: "amount".into(), alias: None },
837            ],
838            filters: FilterNode::Empty, having: FilterNode::Empty,
839            group_by: vec![], 
840            order_by: vec![OrderExpr { column: "amount".into(), descending: true }],
841            limit: 100, offset: 0,
842            limit_by: Some(LimitByExpr { count: 3, offset: 0, columns: vec!["owner".into()] }),
843            use_final: false, joins: vec![], custom_query_builder: None, from_subquery: None,
844        };
845        let r = ch().compile(&ir);
846        let sql = &r.sql;
847        assert!(sql.contains("LIMIT 3 BY `owner`"), "LIMIT BY should be present, got: {sql}");
848        assert!(sql.contains("ORDER BY `amount` DESC"), "ORDER BY should be present, got: {sql}");
849        assert!(sql.contains("LIMIT 100"), "outer LIMIT should be present, got: {sql}");
850        let order_by_pos = sql.find("ORDER BY").unwrap();
851        let limit_by_pos = sql.find("LIMIT 3 BY").unwrap();
852        let limit_pos = sql.rfind("LIMIT 100").unwrap();
853        assert!(order_by_pos < limit_by_pos, "ORDER BY should come before LIMIT BY in ClickHouse");
854        assert!(limit_by_pos < limit_pos, "LIMIT BY should come before outer LIMIT");
855    }
856
857    #[test]
858    fn test_limit_by_with_offset() {
859        let ir = QueryIR {
860            cube: "T".into(), schema: "db".into(), table: "t".into(),
861            selects: vec![SelectExpr::Column { column: "id".into(), alias: None }],
862            filters: FilterNode::Empty, having: FilterNode::Empty,
863            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
864            limit_by: Some(LimitByExpr { count: 5, offset: 2, columns: vec!["token".into(), "wallet".into()] }),
865            use_final: false, joins: vec![], custom_query_builder: None, from_subquery: None,
866        };
867        let r = ch().compile(&ir);
868        assert!(r.sql.contains("LIMIT 5 BY `token`, `wallet` OFFSET 2"), "multi-column LIMIT BY with offset, got: {}", r.sql);
869    }
870
871    #[test]
872    fn test_join_direct() {
873        let ir = QueryIR {
874            cube: "DEXTrades".into(), schema: "dexes_dwd".into(),
875            table: "sol_dex_trades".into(),
876            selects: vec![
877                SelectExpr::Column { column: "tx_hash".into(), alias: None },
878                SelectExpr::Column { column: "buy_token_address".into(), alias: None },
879            ],
880            filters: FilterNode::Empty, having: FilterNode::Empty,
881            group_by: vec![], order_by: vec![], limit: 25, offset: 0,
882            limit_by: None, use_final: false,
883            joins: vec![JoinExpr {
884                schema: "dexes_dim".into(), table: "sol_tokens".into(),
885                alias: "_j0".into(),
886                conditions: vec![("buy_token_address".into(), "token_address".into())],
887                selects: vec![
888                    SelectExpr::Column { column: "name".into(), alias: None },
889                    SelectExpr::Column { column: "symbol".into(), alias: None },
890                ],
891                group_by: vec![], use_final: true, is_aggregate: false,
892                target_cube: "TokenSearch".into(), join_field: "joinBuyToken".into(),
893                join_type: JoinType::Left,
894            }],
895            custom_query_builder: None,
896            from_subquery: None,
897        };
898        let r = ch().compile(&ir);
899        assert!(r.sql.contains("FROM (SELECT"), "main query should be wrapped, got: {}", r.sql);
900        assert!(r.sql.contains("LEFT JOIN `dexes_dim`.`sol_tokens` AS _j0 FINAL"),
901            "direct JOIN with FINAL after alias, got: {}", r.sql);
902        assert!(r.sql.contains("_main.`buy_token_address` = _j0.`token_address`"),
903            "ON condition, got: {}", r.sql);
904        assert!(r.sql.contains("_j0.`name` AS `_j0.name`"), "joined col alias, got: {}", r.sql);
905    }
906
907    #[test]
908    fn test_join_aggregate_subquery() {
909        let ir = QueryIR {
910            cube: "DEXTrades".into(), schema: "dexes_dwd".into(),
911            table: "sol_dex_trades".into(),
912            selects: vec![
913                SelectExpr::Column { column: "tx_hash".into(), alias: None },
914                SelectExpr::Column { column: "buy_token_address".into(), alias: None },
915            ],
916            filters: FilterNode::Empty, having: FilterNode::Empty,
917            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
918            limit_by: None, use_final: false,
919            joins: vec![JoinExpr {
920                schema: "dexes_dws".into(), table: "sol_token_market_cap".into(),
921                alias: "_j0".into(),
922                conditions: vec![("buy_token_address".into(), "token_address".into())],
923                selects: vec![
924                    SelectExpr::Column { column: "argMaxMerge(latest_market_cap_usd_state)".into(), alias: None },
925                ],
926                group_by: vec!["token_address".into()],
927                use_final: false, is_aggregate: true,
928                target_cube: "TokenMarketCap".into(), join_field: "joinBuyTokenMarketCap".into(),
929                join_type: JoinType::Left,
930            }],
931            custom_query_builder: None,
932            from_subquery: None,
933        };
934        let r = ch().compile(&ir);
935        assert!(r.sql.contains("LEFT JOIN (SELECT"), "aggregate should use subquery, got: {}", r.sql);
936        assert!(r.sql.contains("GROUP BY `token_address`"), "subquery GROUP BY, got: {}", r.sql);
937        assert!(r.sql.contains("FROM `dexes_dws`.`sol_token_market_cap`"), "subquery FROM, got: {}", r.sql);
938        assert!(r.sql.contains("argMaxMerge(latest_market_cap_usd_state) AS `__jc_0`"),
939            "join func expr should be aliased in subquery, got: {}", r.sql);
940        assert!(r.sql.contains("_j0.`__jc_0` AS `_j0.__jc_0`"),
941            "outer SELECT should use alias for join func col, got: {}", r.sql);
942    }
943
944    #[test]
945    fn test_join_main_query_func_expression_columns() {
946        let ir = QueryIR {
947            cube: "TokenHolders".into(), schema: "dws".into(),
948            table: "sol_token_holders".into(),
949            selects: vec![
950                SelectExpr::Column { column: "token".into(), alias: None },
951                SelectExpr::Column { column: "holder".into(), alias: None },
952                SelectExpr::Column { column: "argMaxMerge(latest_balance)".into(), alias: None },
953                SelectExpr::Column { column: "argMaxMerge(latest_balance_usd)".into(), alias: None },
954                SelectExpr::Column { column: "minMerge(first_seen)".into(), alias: None },
955                SelectExpr::Column { column: "maxMerge(last_seen)".into(), alias: None },
956            ],
957            filters: FilterNode::Empty, having: FilterNode::Empty,
958            group_by: vec![], order_by: vec![
959                OrderExpr { column: "argMaxMerge(latest_balance_usd)".into(), descending: true },
960            ],
961            limit: 100, offset: 0,
962            limit_by: None, use_final: false,
963            joins: vec![JoinExpr {
964                schema: "dim".into(), table: "sol_tokens".into(),
965                alias: "_j0".into(),
966                conditions: vec![("token".into(), "token_address".into())],
967                selects: vec![
968                    SelectExpr::Column { column: "name".into(), alias: None },
969                    SelectExpr::Column { column: "symbol".into(), alias: None },
970                ],
971                group_by: vec![], use_final: true, is_aggregate: false,
972                target_cube: "TokenSearch".into(), join_field: "joinToken".into(),
973                join_type: JoinType::Left,
974            }],
975            custom_query_builder: None,
976            from_subquery: None,
977        };
978        let r = ch().compile(&ir);
979        let sql = &r.sql;
980
981        assert!(sql.contains("_main.`__mc_0` AS `__mc_0`"),
982            "func expr should use alias __mc_0 in outer SELECT, got: {sql}");
983        assert!(sql.contains("_main.`__mc_1` AS `__mc_1`"),
984            "func expr should use alias __mc_1, got: {sql}");
985        assert!(sql.contains("_main.`token` AS `token`"),
986            "simple col should be backtick-quoted, got: {sql}");
987
988        assert!(!sql.contains("_main.argMaxMerge("),
989            "outer SELECT must NOT have bare _main.argMaxMerge(...), got: {sql}");
990
991        assert!(sql.contains("argMaxMerge(latest_balance) AS `__mc_0`"),
992            "inner query should alias func expr, got: {sql}");
993
994        assert!(r.alias_remap.iter().any(|(a, o)| a == "__mc_0" && o == "argMaxMerge(latest_balance)"),
995            "alias_remap should map __mc_0 → original, got: {:?}", r.alias_remap);
996        assert!(r.alias_remap.iter().any(|(a, o)| a == "__mc_1" && o == "argMaxMerge(latest_balance_usd)"),
997            "alias_remap should map __mc_1, got: {:?}", r.alias_remap);
998    }
999
1000    #[test]
1001    fn test_join_inner_type() {
1002        let ir = QueryIR {
1003            cube: "DEXTrades".into(), schema: "dexes_dwd".into(),
1004            table: "sol_dex_trades".into(),
1005            selects: vec![
1006                SelectExpr::Column { column: "tx_hash".into(), alias: None },
1007            ],
1008            filters: FilterNode::Empty, having: FilterNode::Empty,
1009            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
1010            limit_by: None, use_final: false,
1011            joins: vec![JoinExpr {
1012                schema: "dexes_dim".into(), table: "sol_tokens".into(),
1013                alias: "_j0".into(),
1014                conditions: vec![("buy_token_address".into(), "token_address".into())],
1015                selects: vec![
1016                    SelectExpr::Column { column: "name".into(), alias: None },
1017                ],
1018                group_by: vec![], use_final: false, is_aggregate: false,
1019                target_cube: "TokenSearch".into(), join_field: "joinBuyToken".into(),
1020                join_type: JoinType::Inner,
1021            }],
1022            custom_query_builder: None,
1023            from_subquery: None,
1024        };
1025        let r = ch().compile(&ir);
1026        assert!(r.sql.contains("INNER JOIN `dexes_dim`.`sol_tokens` AS _j0"),
1027            "should use INNER JOIN, got: {}", r.sql);
1028    }
1029
1030    #[test]
1031    fn test_join_full_outer_type() {
1032        let ir = QueryIR {
1033            cube: "T".into(), schema: "db".into(), table: "t".into(),
1034            selects: vec![
1035                SelectExpr::Column { column: "id".into(), alias: None },
1036            ],
1037            filters: FilterNode::Empty, having: FilterNode::Empty,
1038            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
1039            limit_by: None, use_final: false,
1040            joins: vec![JoinExpr {
1041                schema: "db2".into(), table: "t2".into(),
1042                alias: "_j0".into(),
1043                conditions: vec![("id".into(), "ref_id".into())],
1044                selects: vec![
1045                    SelectExpr::Column { column: "val".into(), alias: None },
1046                ],
1047                group_by: vec![], use_final: false, is_aggregate: false,
1048                target_cube: "Other".into(), join_field: "joinOther".into(),
1049                join_type: JoinType::Full,
1050            }],
1051            custom_query_builder: None,
1052            from_subquery: None,
1053        };
1054        let r = ch().compile(&ir);
1055        assert!(r.sql.contains("FULL OUTER JOIN `db2`.`t2` AS _j0"),
1056            "should use FULL OUTER JOIN, got: {}", r.sql);
1057    }
1058
1059    #[test]
1060    fn test_custom_query_builder() {
1061        let ir = QueryIR {
1062            cube: "Custom".into(), schema: "db".into(), table: "t".into(),
1063            selects: vec![
1064                SelectExpr::Column { column: "id".into(), alias: None },
1065            ],
1066            filters: FilterNode::Empty, having: FilterNode::Empty,
1067            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
1068            limit_by: None, use_final: false, joins: vec![],
1069            custom_query_builder: Some(QueryBuilderFn(std::sync::Arc::new(|_ir| {
1070                CompileResult {
1071                    sql: "SELECT 1 FROM custom_view".into(),
1072                    bindings: vec![],
1073                    alias_remap: vec![],
1074                }
1075            }))),
1076            from_subquery: None,
1077        };
1078        let r = ch().compile(&ir);
1079        assert_eq!(r.sql, "SELECT 1 FROM custom_view",
1080            "custom builder should bypass standard compilation, got: {}", r.sql);
1081    }
1082
1083    #[test]
1084    fn test_from_subquery() {
1085        let ir = QueryIR {
1086            cube: "DEXTradeByTokens".into(), schema: "dwd".into(),
1087            table: "sol_trades".into(),
1088            selects: vec![
1089                SelectExpr::Column { column: "amount".into(), alias: None },
1090                SelectExpr::Column { column: "side_type".into(), alias: None },
1091            ],
1092            filters: FilterNode::Condition {
1093                column: "token".into(), op: CompareOp::Eq,
1094                value: SqlValue::String("SOL".into()),
1095            },
1096            having: FilterNode::Empty,
1097            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
1098            limit_by: None, use_final: false, joins: vec![],
1099            custom_query_builder: None,
1100            from_subquery: Some(
1101                "SELECT amount, 'buy' AS side_type, token FROM dwd.sol_a UNION ALL SELECT amount, 'sell' AS side_type, token FROM dwd.sol_b".into()
1102            ),
1103        };
1104        let r = ch().compile(&ir);
1105        assert!(r.sql.starts_with("SELECT `amount`, `side_type` FROM (SELECT"),
1106            "should use subquery in FROM, got: {}", r.sql);
1107        assert!(r.sql.contains("UNION ALL"),
1108            "subquery should contain UNION ALL, got: {}", r.sql);
1109        assert!(r.sql.contains(") AS _t"),
1110            "subquery should be aliased as _t, got: {}", r.sql);
1111        assert!(r.sql.contains("WHERE `token` = ?"),
1112            "WHERE clause should be applied to subquery result, got: {}", r.sql);
1113        assert!(!r.sql.contains("FROM `dwd`.`sol_trades`"),
1114            "should NOT use schema.table when from_subquery is set, got: {}", r.sql);
1115    }
1116
1117    #[test]
1118    fn test_array_includes_single_condition() {
1119        let ir = QueryIR {
1120            cube: "Instructions".into(), schema: "dexes_dwd2".into(),
1121            table: "sol_instructions".into(),
1122            selects: vec![SelectExpr::Column { column: "tx_hash".into(), alias: None }],
1123            filters: FilterNode::ArrayIncludes {
1124                array_columns: vec![
1125                    "instruction_arg_names".into(),
1126                    "instruction_arg_types".into(),
1127                    "instruction_arg_values".into(),
1128                ],
1129                element_conditions: vec![vec![
1130                    FilterNode::Condition {
1131                        column: "instruction_arg_names".into(),
1132                        op: CompareOp::Eq,
1133                        value: SqlValue::String("amount_in".into()),
1134                    },
1135                ]],
1136            },
1137            having: FilterNode::Empty,
1138            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
1139            limit_by: None, use_final: false, joins: vec![],
1140            custom_query_builder: None, from_subquery: None,
1141        };
1142        let r = ch().compile(&ir);
1143        assert!(r.sql.contains("arrayExists((_p0, _p1, _p2) -> _p0 = ?"),
1144            "should generate arrayExists with lambda params, got: {}", r.sql);
1145        assert!(r.sql.contains("`instruction_arg_names`, `instruction_arg_types`, `instruction_arg_values`"),
1146            "should reference all parallel array columns, got: {}", r.sql);
1147        assert_eq!(r.bindings.len(), 1);
1148    }
1149
1150    #[test]
1151    fn test_array_includes_multiple_conditions() {
1152        let ir = QueryIR {
1153            cube: "Instructions".into(), schema: "dexes_dwd2".into(),
1154            table: "sol_instructions".into(),
1155            selects: vec![SelectExpr::Column { column: "tx_hash".into(), alias: None }],
1156            filters: FilterNode::ArrayIncludes {
1157                array_columns: vec![
1158                    "instruction_arg_names".into(),
1159                    "instruction_arg_values".into(),
1160                ],
1161                element_conditions: vec![
1162                    vec![
1163                        FilterNode::Condition {
1164                            column: "instruction_arg_names".into(),
1165                            op: CompareOp::Eq,
1166                            value: SqlValue::String("amount_in".into()),
1167                        },
1168                        FilterNode::Condition {
1169                            column: "instruction_arg_values".into(),
1170                            op: CompareOp::Gt,
1171                            value: SqlValue::String("10000".into()),
1172                        },
1173                    ],
1174                    vec![
1175                        FilterNode::Condition {
1176                            column: "instruction_arg_names".into(),
1177                            op: CompareOp::Eq,
1178                            value: SqlValue::String("owner".into()),
1179                        },
1180                    ],
1181                ],
1182            },
1183            having: FilterNode::Empty,
1184            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
1185            limit_by: None, use_final: false, joins: vec![],
1186            custom_query_builder: None, from_subquery: None,
1187        };
1188        let r = ch().compile(&ir);
1189        let sql = &r.sql;
1190        let count = sql.matches("arrayExists").count();
1191        assert_eq!(count, 2, "should have two arrayExists calls (AND-ed), got: {sql}");
1192        assert!(sql.contains(" AND arrayExists("),
1193            "two arrayExists should be AND-ed, got: {sql}");
1194        assert_eq!(r.bindings.len(), 3);
1195    }
1196
1197    #[test]
1198    fn test_array_includes_with_in_operator() {
1199        let ir = QueryIR {
1200            cube: "Instructions".into(), schema: "dexes_dwd2".into(),
1201            table: "sol_instructions".into(),
1202            selects: vec![SelectExpr::Column { column: "tx_hash".into(), alias: None }],
1203            filters: FilterNode::ArrayIncludes {
1204                array_columns: vec![
1205                    "instruction_arg_names".into(),
1206                    "instruction_arg_values".into(),
1207                ],
1208                element_conditions: vec![vec![
1209                    FilterNode::Condition {
1210                        column: "instruction_arg_names".into(),
1211                        op: CompareOp::Eq,
1212                        value: SqlValue::String("authorityType".into()),
1213                    },
1214                    FilterNode::Condition {
1215                        column: "instruction_arg_values".into(),
1216                        op: CompareOp::In,
1217                        value: SqlValue::String("0,1".into()),
1218                    },
1219                ]],
1220            },
1221            having: FilterNode::Empty,
1222            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
1223            limit_by: None, use_final: false, joins: vec![],
1224            custom_query_builder: None, from_subquery: None,
1225        };
1226        let r = ch().compile(&ir);
1227        assert!(r.sql.contains("arrayExists((_p0, _p1) -> (_p0 = ? AND _p1 IN (?, ?))"),
1228            "should generate arrayExists with AND-ed conditions, got: {}", r.sql);
1229        assert_eq!(r.bindings.len(), 3);
1230    }
1231
1232    #[test]
1233    fn test_array_includes_combined_with_regular_filter() {
1234        let ir = QueryIR {
1235            cube: "Instructions".into(), schema: "dexes_dwd2".into(),
1236            table: "sol_instructions".into(),
1237            selects: vec![SelectExpr::Column { column: "tx_hash".into(), alias: None }],
1238            filters: FilterNode::And(vec![
1239                FilterNode::Condition {
1240                    column: "instruction_program_address".into(),
1241                    op: CompareOp::Eq,
1242                    value: SqlValue::String("pAMMBay6oceH9fJKBRHGP5D4bD4sWpmSwMn52FMfXEA".into()),
1243                },
1244                FilterNode::ArrayIncludes {
1245                    array_columns: vec!["instruction_arg_names".into(), "instruction_arg_values".into()],
1246                    element_conditions: vec![vec![
1247                        FilterNode::Condition {
1248                            column: "instruction_arg_names".into(),
1249                            op: CompareOp::Eq,
1250                            value: SqlValue::String("amount".into()),
1251                        },
1252                    ]],
1253                },
1254            ]),
1255            having: FilterNode::Empty,
1256            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
1257            limit_by: None, use_final: false, joins: vec![],
1258            custom_query_builder: None, from_subquery: None,
1259        };
1260        let r = ch().compile(&ir);
1261        let sql = &r.sql;
1262        assert!(sql.contains("`instruction_program_address` = ?"),
1263            "should have regular condition, got: {sql}");
1264        assert!(sql.contains("arrayExists("),
1265            "should have arrayExists, got: {sql}");
1266        assert!(sql.contains(" AND "),
1267            "regular + array conditions should be AND-ed, got: {sql}");
1268    }
1269}