Skip to main content

activecube_rs/sql/
clickhouse.rs

1use std::collections::HashMap;
2
3use crate::compiler::ir::*;
4use crate::compiler::ir::{CompileResult, JoinType};
5use crate::sql::dialect::SqlDialect;
6
7pub struct ClickHouseDialect;
8
9impl ClickHouseDialect {
10    pub fn new() -> Self {
11        Self
12    }
13}
14
15impl Default for ClickHouseDialect {
16    fn default() -> Self {
17        Self::new()
18    }
19}
20
21impl SqlDialect for ClickHouseDialect {
22    fn compile(&self, ir: &QueryIR) -> CompileResult {
23        if let Some(ref builder) = ir.custom_query_builder {
24            return (builder.0)(ir);
25        }
26
27        let mut bindings = Vec::new();
28        let mut alias_remap: Vec<(String, String)> = Vec::new();
29
30        if ir.joins.is_empty() {
31            let inner_sql = self.compile_inner(ir, &mut bindings, &mut alias_remap);
32            return CompileResult { sql: inner_sql, bindings, alias_remap };
33        }
34
35        // When JOINs are present, force aliases on function expression columns
36        // (e.g. `argMaxMerge(latest_balance)`) so the outer SELECT can reference
37        // them by simple identifier names. Without aliases, `_main.argMaxMerge(x)`
38        // is parsed as a function call — a ClickHouse syntax error.
39        let mut ir_mod = ir.clone();
40        let mut mc_counter = 0u32;
41        for sel in &mut ir_mod.selects {
42            if let SelectExpr::Column { column, alias } = sel {
43                if column.contains('(') && alias.is_none() {
44                    let a = format!("__mc_{mc_counter}");
45                    mc_counter += 1;
46                    alias_remap.push((a.clone(), column.clone()));
47                    *alias = Some(a);
48                }
49            }
50        }
51
52        let mut jc_counter = 0u32;
53        for join in &mut ir_mod.joins {
54            for sel in &mut join.selects {
55                if let SelectExpr::Column { column, alias } = sel {
56                    if column.contains('(') && alias.is_none() {
57                        let a = format!("__jc_{jc_counter}");
58                        jc_counter += 1;
59                        *alias = Some(a);
60                    }
61                }
62            }
63        }
64
65        let inner_sql = self.compile_inner(&ir_mod, &mut bindings, &mut alias_remap);
66
67        // Build outer SELECT with explicit column listing.
68        // Always backtick-quote to prevent ClickHouse auto-prefixing ambiguous
69        // names when multiple JOINed tables share column names.
70        let main_cols: Vec<String> = ir_mod.selects.iter().map(|s| {
71            let col = match s {
72                SelectExpr::Column { column, alias } => alias.as_ref().unwrap_or(column).clone(),
73                SelectExpr::Aggregate { alias, .. } | SelectExpr::DimAggregate { alias, .. } => alias.clone(),
74            };
75            format!("_main.`{}` AS `{}`", col, col)
76        }).collect();
77
78        let mut sql = String::from("SELECT ");
79        sql.push_str(&main_cols.join(", "));
80
81        // Collect joined column aliases for the outer SELECT
82        for join in &ir_mod.joins {
83            for sel in &join.selects {
84                let col_name = match sel {
85                    SelectExpr::Column { column, alias } => alias.as_ref().unwrap_or(column).clone(),
86                    SelectExpr::Aggregate { alias, .. } | SelectExpr::DimAggregate { alias, .. } => alias.clone(),
87                };
88                let outer_alias = format!("{}.{}", join.alias, col_name);
89                if let SelectExpr::Column { column, alias: Some(_) } = sel {
90                    if column.contains('(') {
91                        let outer_original = format!("{}.{}", join.alias, column);
92                        alias_remap.push((outer_alias.clone(), outer_original));
93                    }
94                }
95                sql.push_str(&format!(", {}.`{}` AS `{}`",
96                    join.alias, col_name, outer_alias));
97            }
98        }
99
100        sql.push_str(&format!(" FROM ({}) AS _main", inner_sql));
101
102        // Build a lookup for main query aliases to resolve ON condition columns
103        // that might be function expressions (aliased above).
104        let main_alias_map: HashMap<String, String> = ir_mod.selects.iter()
105            .filter_map(|s| {
106                if let SelectExpr::Column { column, alias: Some(a) } = s {
107                    if column.contains('(') { Some((column.clone(), a.clone())) } else { None }
108                } else { None }
109            })
110            .collect();
111
112        for join in &ir_mod.joins {
113            let join_kw = join.join_type.sql_keyword();
114
115            if join.is_aggregate {
116                // Mode B: subquery JOIN for AggregatingMergeTree targets
117                sql.push_str(&format!(" {} (SELECT ", join_kw));
118                let mut sub_parts: Vec<String> = Vec::new();
119                for gb_col in &join.group_by {
120                    sub_parts.push(quote_col(gb_col));
121                }
122                for sel in &join.selects {
123                    match sel {
124                        SelectExpr::Column { column, alias } => {
125                            let col = if column.contains('(') { column.clone() } else { format!("`{column}`") };
126                            if let Some(a) = alias {
127                                sub_parts.push(format!("{col} AS `{a}`"));
128                            } else if column.contains('(') || !join.group_by.contains(column) {
129                                sub_parts.push(col);
130                            }
131                        }
132                        SelectExpr::Aggregate { function, column, alias, condition } => {
133                            let func = function.to_lowercase();
134                            let qcol = quote_col(column);
135                            let expr = match (func.as_str(), column.as_str(), condition) {
136                                ("count", "*", None) => format!("count() AS `{alias}`"),
137                                ("count", "*", Some(cond)) => format!("countIf({cond}) AS `{alias}`"),
138                                ("count", _, None) => format!("uniqExact({qcol}) AS `{alias}`"),
139                                ("count", _, Some(cond)) => format!("uniqExactIf({qcol}, {cond}) AS `{alias}`"),
140                                ("uniq", _, None) => format!("uniq({qcol}) AS `{alias}`"),
141                                ("uniq", _, Some(cond)) => format!("uniqIf({qcol}, {cond}) AS `{alias}`"),
142                                (f, _, None) => format!("{f}({qcol}) AS `{alias}`"),
143                                (f, _, Some(cond)) => format!("{f}If({qcol}, {cond}) AS `{alias}`"),
144                            };
145                            sub_parts.push(expr);
146                        }
147                        SelectExpr::DimAggregate { agg_type, value_column, compare_column, alias, condition } => {
148                            let func = match agg_type {
149                                DimAggType::ArgMax => "argMax",
150                                DimAggType::ArgMin => "argMin",
151                            };
152                            let qv = quote_col(value_column);
153                            let qc = quote_col(compare_column);
154                            let expr = match condition {
155                                None => format!("{func}({qv}, {qc}) AS `{alias}`"),
156                                Some(cond) => format!("{func}If({qv}, {qc}, {cond}) AS `{alias}`"),
157                            };
158                            sub_parts.push(expr);
159                        }
160                    }
161                }
162                sql.push_str(&sub_parts.join(", "));
163                sql.push_str(&format!(" FROM `{}`.`{}`", join.schema, join.table));
164                if !join.group_by.is_empty() {
165                    sql.push_str(" GROUP BY ");
166                    let gb: Vec<String> = join.group_by.iter().map(|c| quote_col(c)).collect();
167                    sql.push_str(&gb.join(", "));
168                }
169                sql.push_str(&format!(") AS {}", join.alias));
170            } else {
171                // Mode A: direct JOIN for MergeTree / ReplacingMergeTree targets
172                sql.push_str(&format!(" {} `{}`.`{}` AS {}",
173                    join_kw, join.schema, join.table, join.alias));
174                if join.use_final {
175                    sql.push_str(" FINAL");
176                }
177            }
178
179            if join.join_type == JoinType::Cross {
180                // CROSS JOIN has no ON clause
181                continue;
182            }
183
184            // ON conditions — use alias if the local column was a func expression
185            let on_parts: Vec<String> = join.conditions.iter().map(|(local, remote)| {
186                let local_ref = main_alias_map.get(local).unwrap_or(local);
187                format!("_main.`{}` = {}.`{}`", local_ref, join.alias, remote)
188            }).collect();
189            sql.push_str(" ON ");
190            sql.push_str(&on_parts.join(" AND "));
191        }
192
193        CompileResult { sql, bindings, alias_remap }
194    }
195
196    fn quote_identifier(&self, name: &str) -> String {
197        format!("`{name}`")
198    }
199
200    fn name(&self) -> &str {
201        "ClickHouse"
202    }
203}
204
205impl ClickHouseDialect {
206    fn compile_inner(
207        &self,
208        ir: &QueryIR,
209        bindings: &mut Vec<SqlValue>,
210        alias_remap: &mut Vec<(String, String)>,
211    ) -> String {
212        let mut sql = String::new();
213
214        let mut augmented_selects = ir.selects.clone();
215        let mut agg_alias_map: HashMap<String, String> = HashMap::new();
216        let mut alias_counter = 0u32;
217
218        let having_cols: std::collections::HashSet<String> =
219            collect_filter_columns(&ir.having).into_iter().collect();
220        let has_having_agg = having_cols.iter().any(|c| c.contains('('));
221
222        if has_having_agg {
223            for sel in &mut augmented_selects {
224                if let SelectExpr::Column { column, alias } = sel {
225                    if column.contains('(') && having_cols.contains(column.as_str()) {
226                        if alias.is_none() {
227                            let a = format!("__f_{alias_counter}");
228                            alias_counter += 1;
229                            alias_remap.push((a.clone(), column.clone()));
230                            agg_alias_map.insert(column.clone(), a.clone());
231                            *alias = Some(a);
232                        } else if let Some(existing) = alias {
233                            agg_alias_map.insert(column.clone(), existing.clone());
234                        }
235                    }
236                }
237            }
238            for col in &having_cols {
239                if col.contains('(') && !agg_alias_map.contains_key(col.as_str()) {
240                    let a = format!("__f_{alias_counter}");
241                    alias_counter += 1;
242                    agg_alias_map.insert(col.clone(), a.clone());
243                    augmented_selects.push(SelectExpr::Column {
244                        column: col.clone(),
245                        alias: Some(a),
246                    });
247                }
248            }
249        }
250
251        for sel in &augmented_selects {
252            if let SelectExpr::DimAggregate { agg_type, value_column, compare_column, alias, .. } = sel {
253                let func = match agg_type {
254                    DimAggType::ArgMax => "argMax",
255                    DimAggType::ArgMin => "argMin",
256                };
257                let qv = quote_col(value_column);
258                let qc = quote_col(compare_column);
259                let expr = format!("{func}({qv}, {qc})");
260                agg_alias_map.insert(expr, alias.clone());
261            }
262        }
263
264        sql.push_str("SELECT ");
265        let select_parts: Vec<String> = augmented_selects.iter().map(|s| match s {
266            SelectExpr::Column { column, alias } => {
267                let col = if column.contains('(') { column.clone() } else { format!("`{column}`") };
268                match alias {
269                    Some(a) => format!("{col} AS `{a}`"),
270                    None => col,
271                }
272            },
273            SelectExpr::Aggregate { function, column, alias, condition } => {
274                let func = function.to_uppercase();
275                let qcol = quote_col(column);
276                match (func.as_str(), column.as_str(), condition) {
277                    ("COUNT", "*", None) => format!("count() AS `{alias}`"),
278                    ("COUNT", "*", Some(cond)) => format!("countIf({cond}) AS `{alias}`"),
279                    ("COUNT", _, None) => format!("uniqExact({qcol}) AS `{alias}`"),
280                    ("COUNT", _, Some(cond)) => format!("uniqExactIf({qcol}, {cond}) AS `{alias}`"),
281                    ("UNIQ", _, None) => format!("uniq({qcol}) AS `{alias}`"),
282                    ("UNIQ", _, Some(cond)) => format!("uniqIf({qcol}, {cond}) AS `{alias}`"),
283                    (_, _, None) => format!("{f}({qcol}) AS `{alias}`", f = func.to_lowercase()),
284                    (_, _, Some(cond)) => format!("{f}If({qcol}, {cond}) AS `{alias}`", f = func.to_lowercase()),
285                }
286            }
287            SelectExpr::DimAggregate { agg_type, value_column, compare_column, alias, condition } => {
288                let func = match agg_type {
289                    DimAggType::ArgMax => "argMax",
290                    DimAggType::ArgMin => "argMin",
291                };
292                let qv = quote_col(value_column);
293                let qc = quote_col(compare_column);
294                match condition {
295                    None => format!("{func}({qv}, {qc}) AS `{alias}`"),
296                    Some(cond) => format!("{func}If({qv}, {qc}, {cond}) AS `{alias}`"),
297                }
298            }
299        }).collect();
300        sql.push_str(&select_parts.join(", "));
301
302        if let Some(ref subquery) = ir.from_subquery {
303            sql.push_str(&format!(" FROM ({}) AS _t", subquery));
304        } else {
305            sql.push_str(&format!(" FROM `{}`.`{}`", ir.schema, ir.table));
306            if ir.use_final {
307                sql.push_str(" FINAL");
308            }
309        }
310
311        let where_clause = compile_filter(&ir.filters, bindings);
312        if !where_clause.is_empty() {
313            sql.push_str(" WHERE ");
314            sql.push_str(&where_clause);
315        }
316
317        let effective_group_by = if !ir.group_by.is_empty() {
318            ir.group_by.clone()
319        } else {
320            let has_agg_cols = augmented_selects.iter().any(|s| match s {
321                SelectExpr::Column { column, .. } => column.contains("Merge("),
322                SelectExpr::Aggregate { .. } | SelectExpr::DimAggregate { .. } => true,
323            });
324            if has_agg_cols {
325                augmented_selects.iter().filter_map(|s| match s {
326                    SelectExpr::Column { column, alias } if !contains_aggregate_expr(column) && alias.is_none() => {
327                        Some(column.clone())
328                    }
329                    _ => None,
330                }).collect()
331            } else {
332                vec![]
333            }
334        };
335
336        if !effective_group_by.is_empty() {
337            sql.push_str(" GROUP BY ");
338            let cols: Vec<String> = effective_group_by.iter().map(|c| quote_col(c)).collect();
339            sql.push_str(&cols.join(", "));
340        }
341
342        if has_having_agg {
343            let having_clause = compile_filter_with_aliases(&ir.having, bindings, &agg_alias_map);
344            if !having_clause.is_empty() {
345                sql.push_str(" HAVING ");
346                sql.push_str(&having_clause);
347            }
348        } else {
349            let having_clause = compile_filter(&ir.having, bindings);
350            if !having_clause.is_empty() {
351                sql.push_str(" HAVING ");
352                sql.push_str(&having_clause);
353            }
354        }
355
356        if !ir.order_by.is_empty() {
357            sql.push_str(" ORDER BY ");
358            let parts: Vec<String> = ir.order_by.iter().map(|o| {
359                let col = if o.column.contains('(') {
360                    agg_alias_map.get(&o.column)
361                        .map(|a| format!("`{a}`"))
362                        .unwrap_or_else(|| o.column.clone())
363                } else {
364                    format!("`{}`", o.column)
365                };
366                let dir = if o.descending { "DESC" } else { "ASC" };
367                format!("{col} {dir}")
368            }).collect();
369            sql.push_str(&parts.join(", "));
370        }
371
372        if let Some(ref lb) = ir.limit_by {
373            let by_cols: Vec<String> = lb.columns.iter().map(|c| format!("`{c}`")).collect();
374            sql.push_str(&format!(" LIMIT {} BY {}", lb.count, by_cols.join(", ")));
375            if lb.offset > 0 {
376                sql.push_str(&format!(" OFFSET {}", lb.offset));
377            }
378        }
379
380        sql.push_str(&format!(" LIMIT {}", ir.limit));
381        if ir.offset > 0 {
382            sql.push_str(&format!(" OFFSET {}", ir.offset));
383        }
384
385        sql
386    }
387}
388
389/// Collect all column names referenced in a filter tree.
390fn collect_filter_columns(node: &FilterNode) -> Vec<String> {
391    match node {
392        FilterNode::Empty => vec![],
393        FilterNode::Condition { column, .. } => vec![column.clone()],
394        FilterNode::And(children) | FilterNode::Or(children) => {
395            children.iter().flat_map(collect_filter_columns).collect()
396        }
397        FilterNode::ArrayIncludes { array_columns, .. } => array_columns.clone(),
398    }
399}
400
401/// Like `compile_filter` but replaces aggregate expression columns with their
402/// SELECT aliases so ClickHouse can resolve them in HAVING scope.
403fn compile_filter_with_aliases(
404    node: &FilterNode,
405    bindings: &mut Vec<SqlValue>,
406    aliases: &HashMap<String, String>,
407) -> String {
408    match node {
409        FilterNode::Empty => String::new(),
410        FilterNode::Condition { column, op, value } => {
411            let effective_col = aliases.get(column)
412                .map(|a| a.as_str())
413                .unwrap_or(column.as_str());
414            compile_condition(effective_col, op, value, bindings)
415        }
416        FilterNode::And(children) => {
417            let parts: Vec<String> = children.iter()
418                .map(|c| compile_filter_with_aliases(c, bindings, aliases))
419                .filter(|s| !s.is_empty())
420                .collect();
421            match parts.len() {
422                0 => String::new(),
423                1 => parts.into_iter().next().unwrap(),
424                _ => format!("({})", parts.join(" AND ")),
425            }
426        }
427        FilterNode::Or(children) => {
428            let parts: Vec<String> = children.iter()
429                .map(|c| compile_filter_with_aliases(c, bindings, aliases))
430                .filter(|s| !s.is_empty())
431                .collect();
432            match parts.len() {
433                0 => String::new(),
434                1 => parts.into_iter().next().unwrap(),
435                _ => format!("({})", parts.join(" OR ")),
436            }
437        }
438        FilterNode::ArrayIncludes { array_columns, element_conditions } => {
439            compile_array_includes(array_columns, element_conditions, bindings)
440        }
441    }
442}
443
444fn compile_filter(node: &FilterNode, bindings: &mut Vec<SqlValue>) -> String {
445    match node {
446        FilterNode::Empty => String::new(),
447        FilterNode::Condition { column, op, value } => {
448            compile_condition(column, op, value, bindings)
449        }
450        FilterNode::And(children) => {
451            let parts: Vec<String> = children.iter()
452                .map(|c| compile_filter(c, bindings))
453                .filter(|s| !s.is_empty())
454                .collect();
455            match parts.len() {
456                0 => String::new(),
457                1 => parts.into_iter().next().unwrap(),
458                _ => format!("({})", parts.join(" AND ")),
459            }
460        }
461        FilterNode::Or(children) => {
462            let parts: Vec<String> = children.iter()
463                .map(|c| compile_filter(c, bindings))
464                .filter(|s| !s.is_empty())
465                .collect();
466            match parts.len() {
467                0 => String::new(),
468                1 => parts.into_iter().next().unwrap(),
469                _ => format!("({})", parts.join(" OR ")),
470            }
471        }
472        FilterNode::ArrayIncludes { array_columns, element_conditions } => {
473            compile_array_includes(array_columns, element_conditions, bindings)
474        }
475    }
476}
477
478/// Compile ArrayIncludes into one or more `arrayExists(lambda, arrays)` expressions.
479fn compile_array_includes(
480    array_columns: &[String],
481    element_conditions: &[Vec<FilterNode>],
482    bindings: &mut Vec<SqlValue>,
483) -> String {
484    let params: Vec<String> = (0..array_columns.len())
485        .map(|i| format!("_p{i}"))
486        .collect();
487    let arrays_sql: Vec<String> = array_columns.iter()
488        .map(|c| quote_col(c))
489        .collect();
490    let arrays_ref = arrays_sql.join(", ");
491    let params_ref = params.join(", ");
492
493    let col_to_param: std::collections::HashMap<&str, &str> = array_columns.iter()
494        .zip(params.iter())
495        .map(|(c, p)| (c.as_str(), p.as_str()))
496        .collect();
497
498    let exists_parts: Vec<String> = element_conditions.iter().map(|conds| {
499        let cond_parts: Vec<String> = conds.iter()
500            .map(|c| compile_filter_with_param_remap(c, bindings, &col_to_param))
501            .filter(|s| !s.is_empty())
502            .collect();
503        let cond_sql = match cond_parts.len() {
504            0 => "1".to_string(),
505            1 => cond_parts.into_iter().next().unwrap(),
506            _ => format!("({})", cond_parts.join(" AND ")),
507        };
508        format!("arrayExists(({params_ref}) -> {cond_sql}, {arrays_ref})")
509    }).collect();
510
511    match exists_parts.len() {
512        0 => String::new(),
513        1 => exists_parts.into_iter().next().unwrap(),
514        _ => format!("({})", exists_parts.join(" AND ")),
515    }
516}
517
518/// Compile a filter node but remap column names to lambda parameter names.
519/// Lambda parameters must NOT be backtick-quoted in ClickHouse.
520fn compile_filter_with_param_remap(
521    node: &FilterNode,
522    bindings: &mut Vec<SqlValue>,
523    col_to_param: &std::collections::HashMap<&str, &str>,
524) -> String {
525    match node {
526        FilterNode::Empty => String::new(),
527        FilterNode::Condition { column, op, value } => {
528            if let Some(&param) = col_to_param.get(column.as_str()) {
529                compile_condition_raw(param, op, value, bindings)
530            } else {
531                compile_condition(column, op, value, bindings)
532            }
533        }
534        FilterNode::And(children) => {
535            let parts: Vec<String> = children.iter()
536                .map(|c| compile_filter_with_param_remap(c, bindings, col_to_param))
537                .filter(|s| !s.is_empty())
538                .collect();
539            match parts.len() {
540                0 => String::new(),
541                1 => parts.into_iter().next().unwrap(),
542                _ => format!("({})", parts.join(" AND ")),
543            }
544        }
545        FilterNode::Or(children) => {
546            let parts: Vec<String> = children.iter()
547                .map(|c| compile_filter_with_param_remap(c, bindings, col_to_param))
548                .filter(|s| !s.is_empty())
549                .collect();
550            match parts.len() {
551                0 => String::new(),
552                1 => parts.into_iter().next().unwrap(),
553                _ => format!("({})", parts.join(" OR ")),
554            }
555        }
556        FilterNode::ArrayIncludes { array_columns, element_conditions } => {
557            compile_array_includes(array_columns, element_conditions, bindings)
558        }
559    }
560}
561
562fn quote_col(column: &str) -> String {
563    if column.contains('(') {
564        column.to_string()
565    } else {
566        format!("`{column}`")
567    }
568}
569
570/// Like compile_condition but uses the column name as-is (no backtick quoting).
571/// Used for lambda parameters in arrayExists.
572fn compile_condition_raw(
573    col: &str, op: &CompareOp, value: &SqlValue, bindings: &mut Vec<SqlValue>,
574) -> String {
575    compile_condition_inner(col, op, value, bindings)
576}
577
578fn compile_condition(
579    column: &str, op: &CompareOp, value: &SqlValue, bindings: &mut Vec<SqlValue>,
580) -> String {
581    let col = quote_col(column);
582    compile_condition_inner(&col, op, value, bindings)
583}
584
585fn compile_condition_inner(
586    col: &str, op: &CompareOp, value: &SqlValue, bindings: &mut Vec<SqlValue>,
587) -> String {
588    match op {
589        CompareOp::In | CompareOp::NotIn => {
590            if let SqlValue::String(csv) = value {
591                let items: Vec<&str> = csv.split(',').collect();
592                let placeholders: Vec<&str> = items.iter().map(|_| "?").collect();
593                for item in &items {
594                    bindings.push(SqlValue::String(item.trim().to_string()));
595                }
596                format!("{col} {} ({})", op.sql_op(), placeholders.join(", "))
597            } else {
598                bindings.push(value.clone());
599                format!("{col} {} (?)", op.sql_op())
600            }
601        }
602        CompareOp::Includes | CompareOp::NotIncludes => {
603            if let SqlValue::String(s) = value {
604                bindings.push(SqlValue::String(format!("%{s}%")));
605            } else {
606                bindings.push(value.clone());
607            }
608            format!("{col} {} ?", op.sql_op())
609        }
610        CompareOp::IlikeIncludes => {
611            if let SqlValue::String(s) = value {
612                bindings.push(SqlValue::String(format!("%{s}%")));
613            } else {
614                bindings.push(value.clone());
615            }
616            format!("{col} ilike ?")
617        }
618        CompareOp::NotIlikeIncludes => {
619            if let SqlValue::String(s) = value {
620                bindings.push(SqlValue::String(format!("%{s}%")));
621            } else {
622                bindings.push(value.clone());
623            }
624            format!("{col} NOT ilike ?")
625        }
626        CompareOp::StartsWith | CompareOp::IlikeStartsWith => {
627            if let SqlValue::String(s) = value {
628                bindings.push(SqlValue::String(format!("{s}%")));
629            } else {
630                bindings.push(value.clone());
631            }
632            format!("{col} {} ?", op.sql_op())
633        }
634        CompareOp::EndsWith => {
635            if let SqlValue::String(s) = value {
636                bindings.push(SqlValue::String(format!("%{s}")));
637            } else {
638                bindings.push(value.clone());
639            }
640            format!("{col} LIKE ?")
641        }
642        CompareOp::IsNull | CompareOp::IsNotNull => {
643            format!("{col} {}", op.sql_op())
644        }
645        _ => {
646            if let SqlValue::Expression(expr) = value {
647                format!("{col} {} {expr}", op.sql_op())
648            } else {
649                bindings.push(value.clone());
650                format!("{col} {} ?", op.sql_op())
651            }
652        }
653    }
654}
655
656#[cfg(test)]
657mod tests {
658    use super::*;
659
660    fn ch() -> ClickHouseDialect { ClickHouseDialect::new() }
661
662    #[test]
663    fn test_simple_select() {
664        let ir = QueryIR {
665            cube: "DEXTrades".into(), schema: "default".into(),
666            table: "dwd_dex_trades".into(),
667            selects: vec![
668                SelectExpr::Column { column: "tx_hash".into(), alias: None },
669                SelectExpr::Column { column: "token_a_amount".into(), alias: None },
670            ],
671            filters: FilterNode::Empty, having: FilterNode::Empty,
672            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
673            limit_by: None,
674            use_final: false,
675            joins: vec![],
676            custom_query_builder: None,
677            from_subquery: None,
678        };
679        let r = ch().compile(&ir);
680        assert_eq!(r.sql, "SELECT `tx_hash`, `token_a_amount` FROM `default`.`dwd_dex_trades` LIMIT 10");
681        assert!(r.bindings.is_empty());
682    }
683
684    #[test]
685    fn test_final_keyword() {
686        let ir = QueryIR {
687            cube: "T".into(), schema: "db".into(), table: "tokens".into(),
688            selects: vec![SelectExpr::Column { column: "id".into(), alias: None }],
689            filters: FilterNode::Empty, having: FilterNode::Empty,
690            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
691            limit_by: None,
692            use_final: true,
693            joins: vec![],
694            custom_query_builder: None,
695            from_subquery: None,
696        };
697        let r = ch().compile(&ir);
698        assert!(r.sql.contains("FROM `db`.`tokens` FINAL"), "FINAL should be appended, got: {}", r.sql);
699    }
700
701    #[test]
702    fn test_uniq_uses_native_function() {
703        let ir = QueryIR {
704            cube: "T".into(), schema: "db".into(), table: "t".into(),
705            selects: vec![
706                SelectExpr::Aggregate { function: "UNIQ".into(), column: "wallet".into(), alias: "__uniq".into(), condition: None },
707            ],
708            filters: FilterNode::Empty, having: FilterNode::Empty,
709            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
710            limit_by: None, use_final: false, joins: vec![], custom_query_builder: None, from_subquery: None,
711        };
712        let r = ch().compile(&ir);
713        assert!(r.sql.contains("uniq(`wallet`) AS `__uniq`"), "ClickHouse should use native uniq(), got: {}", r.sql);
714    }
715
716    #[test]
717    fn test_count_star() {
718        let ir = QueryIR {
719            cube: "T".into(), schema: "db".into(), table: "t".into(),
720            selects: vec![
721                SelectExpr::Aggregate { function: "COUNT".into(), column: "*".into(), alias: "__count".into(), condition: None },
722            ],
723            filters: FilterNode::Empty, having: FilterNode::Empty,
724            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
725            limit_by: None, use_final: false, joins: vec![], custom_query_builder: None, from_subquery: None,
726        };
727        let r = ch().compile(&ir);
728        assert!(r.sql.contains("count() AS `__count`"), "ClickHouse should use count() not COUNT(*), got: {}", r.sql);
729    }
730
731    #[test]
732    fn test_aggregate_lowercase() {
733        let ir = QueryIR {
734            cube: "T".into(), schema: "db".into(), table: "t".into(),
735            selects: vec![
736                SelectExpr::Aggregate { function: "SUM".into(), column: "amount".into(), alias: "__sum".into(), condition: None },
737                SelectExpr::Aggregate { function: "AVG".into(), column: "price".into(), alias: "__avg".into(), condition: None },
738            ],
739            filters: FilterNode::Empty, having: FilterNode::Empty,
740            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
741            limit_by: None, use_final: false, joins: vec![], custom_query_builder: None, from_subquery: None,
742        };
743        let r = ch().compile(&ir);
744        assert!(r.sql.contains("sum(`amount`) AS `__sum`"), "ClickHouse functions should be lowercase, got: {}", r.sql);
745        assert!(r.sql.contains("avg(`price`) AS `__avg`"), "got: {}", r.sql);
746    }
747
748    #[test]
749    fn test_where_and_order() {
750        let ir = QueryIR {
751            cube: "T".into(), schema: "db".into(), table: "t".into(),
752            selects: vec![SelectExpr::Column { column: "id".into(), alias: None }],
753            filters: FilterNode::And(vec![
754                FilterNode::Condition { column: "chain_id".into(), op: CompareOp::Eq, value: SqlValue::Int(1) },
755                FilterNode::Condition { column: "amount_usd".into(), op: CompareOp::Gt, value: SqlValue::Float(1000.0) },
756            ]),
757            having: FilterNode::Empty, group_by: vec![],
758            order_by: vec![OrderExpr { column: "block_timestamp".into(), descending: true }],
759            limit: 25, offset: 0,
760            limit_by: None, use_final: false, joins: vec![], custom_query_builder: None, from_subquery: None,
761        };
762        let r = ch().compile(&ir);
763        assert!(r.sql.contains("WHERE (`chain_id` = ? AND `amount_usd` > ?)"));
764        assert!(r.sql.contains("ORDER BY `block_timestamp` DESC"));
765        assert_eq!(r.bindings.len(), 2);
766    }
767
768    #[test]
769    fn test_having_with_aggregate_expr() {
770        let ir = QueryIR {
771            cube: "T".into(), schema: "db".into(), table: "t".into(),
772            selects: vec![
773                SelectExpr::Column { column: "token_address".into(), alias: None },
774                SelectExpr::Aggregate { function: "SUM".into(), column: "amount_usd".into(), alias: "__sum".into(), condition: None },
775            ],
776            filters: FilterNode::Empty,
777            having: FilterNode::Condition {
778                column: "sum(`amount_usd`)".into(), op: CompareOp::Gt, value: SqlValue::Float(1000000.0),
779            },
780            group_by: vec!["token_address".into()], order_by: vec![], limit: 25, offset: 0,
781            limit_by: None, use_final: false, joins: vec![], custom_query_builder: None, from_subquery: None,
782        };
783        let r = ch().compile(&ir);
784        assert!(r.sql.contains("GROUP BY `token_address`"));
785        assert!(r.sql.contains("HAVING `__f_0` > ?"), "expected alias in HAVING, got: {}", r.sql);
786        assert!(r.sql.contains("sum(`amount_usd`) AS `__f_0`"), "expected alias in SELECT, got: {}", r.sql);
787        assert_eq!(r.bindings.len(), 1);
788    }
789
790    #[test]
791    fn test_having_appends_missing_agg_column() {
792        let ir = QueryIR {
793            cube: "T".into(), schema: "db".into(), table: "t".into(),
794            selects: vec![
795                SelectExpr::Column { column: "pool_address".into(), alias: None },
796                SelectExpr::Column { column: "argMaxMerge(latest_liquidity_usd_state)".into(), alias: None },
797            ],
798            filters: FilterNode::Empty,
799            having: FilterNode::And(vec![
800                FilterNode::Condition {
801                    column: "argMaxMerge(latest_liquidity_usd_state)".into(),
802                    op: CompareOp::Gt, value: SqlValue::Float(2.0),
803                },
804                FilterNode::Condition {
805                    column: "argMaxMerge(latest_token_a_amount_state)".into(),
806                    op: CompareOp::Gt, value: SqlValue::Float(3.0),
807                },
808            ]),
809            group_by: vec!["pool_address".into()], order_by: vec![], limit: 25, offset: 0,
810            limit_by: None, use_final: false, joins: vec![], custom_query_builder: None, from_subquery: None,
811        };
812        let r = ch().compile(&ir);
813        assert!(r.sql.contains("argMaxMerge(latest_liquidity_usd_state) AS `__f_0`"),
814            "existing HAVING col should be aliased, got: {}", r.sql);
815        assert!(r.sql.contains("argMaxMerge(latest_token_a_amount_state) AS `__f_1`"),
816            "missing agg col should be appended, got: {}", r.sql);
817        assert!(r.sql.contains("HAVING (`__f_0` > ? AND `__f_1` > ?)"),
818            "HAVING should use aliases, got: {}", r.sql);
819        assert_eq!(r.bindings.len(), 2);
820        assert_eq!(r.alias_remap.len(), 1);
821        assert_eq!(r.alias_remap[0], ("__f_0".to_string(), "argMaxMerge(latest_liquidity_usd_state)".to_string()));
822    }
823
824    #[test]
825    fn test_limit_by() {
826        let ir = QueryIR {
827            cube: "T".into(), schema: "db".into(), table: "t".into(),
828            selects: vec![
829                SelectExpr::Column { column: "owner".into(), alias: None },
830                SelectExpr::Column { column: "amount".into(), alias: None },
831            ],
832            filters: FilterNode::Empty, having: FilterNode::Empty,
833            group_by: vec![], 
834            order_by: vec![OrderExpr { column: "amount".into(), descending: true }],
835            limit: 100, offset: 0,
836            limit_by: Some(LimitByExpr { count: 3, offset: 0, columns: vec!["owner".into()] }),
837            use_final: false, joins: vec![], custom_query_builder: None, from_subquery: None,
838        };
839        let r = ch().compile(&ir);
840        let sql = &r.sql;
841        assert!(sql.contains("LIMIT 3 BY `owner`"), "LIMIT BY should be present, got: {sql}");
842        assert!(sql.contains("ORDER BY `amount` DESC"), "ORDER BY should be present, got: {sql}");
843        assert!(sql.contains("LIMIT 100"), "outer LIMIT should be present, got: {sql}");
844        let order_by_pos = sql.find("ORDER BY").unwrap();
845        let limit_by_pos = sql.find("LIMIT 3 BY").unwrap();
846        let limit_pos = sql.rfind("LIMIT 100").unwrap();
847        assert!(order_by_pos < limit_by_pos, "ORDER BY should come before LIMIT BY in ClickHouse");
848        assert!(limit_by_pos < limit_pos, "LIMIT BY should come before outer LIMIT");
849    }
850
851    #[test]
852    fn test_limit_by_with_offset() {
853        let ir = QueryIR {
854            cube: "T".into(), schema: "db".into(), table: "t".into(),
855            selects: vec![SelectExpr::Column { column: "id".into(), alias: None }],
856            filters: FilterNode::Empty, having: FilterNode::Empty,
857            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
858            limit_by: Some(LimitByExpr { count: 5, offset: 2, columns: vec!["token".into(), "wallet".into()] }),
859            use_final: false, joins: vec![], custom_query_builder: None, from_subquery: None,
860        };
861        let r = ch().compile(&ir);
862        assert!(r.sql.contains("LIMIT 5 BY `token`, `wallet` OFFSET 2"), "multi-column LIMIT BY with offset, got: {}", r.sql);
863    }
864
865    #[test]
866    fn test_join_direct() {
867        let ir = QueryIR {
868            cube: "DEXTrades".into(), schema: "dexes_dwd".into(),
869            table: "sol_dex_trades".into(),
870            selects: vec![
871                SelectExpr::Column { column: "tx_hash".into(), alias: None },
872                SelectExpr::Column { column: "buy_token_address".into(), alias: None },
873            ],
874            filters: FilterNode::Empty, having: FilterNode::Empty,
875            group_by: vec![], order_by: vec![], limit: 25, offset: 0,
876            limit_by: None, use_final: false,
877            joins: vec![JoinExpr {
878                schema: "dexes_dim".into(), table: "sol_tokens".into(),
879                alias: "_j0".into(),
880                conditions: vec![("buy_token_address".into(), "token_address".into())],
881                selects: vec![
882                    SelectExpr::Column { column: "name".into(), alias: None },
883                    SelectExpr::Column { column: "symbol".into(), alias: None },
884                ],
885                group_by: vec![], use_final: true, is_aggregate: false,
886                target_cube: "TokenSearch".into(), join_field: "joinBuyToken".into(),
887                join_type: JoinType::Left,
888            }],
889            custom_query_builder: None,
890            from_subquery: None,
891        };
892        let r = ch().compile(&ir);
893        assert!(r.sql.contains("FROM (SELECT"), "main query should be wrapped, got: {}", r.sql);
894        assert!(r.sql.contains("LEFT JOIN `dexes_dim`.`sol_tokens` AS _j0 FINAL"),
895            "direct JOIN with FINAL after alias, got: {}", r.sql);
896        assert!(r.sql.contains("_main.`buy_token_address` = _j0.`token_address`"),
897            "ON condition, got: {}", r.sql);
898        assert!(r.sql.contains("_j0.`name` AS `_j0.name`"), "joined col alias, got: {}", r.sql);
899    }
900
901    #[test]
902    fn test_join_aggregate_subquery() {
903        let ir = QueryIR {
904            cube: "DEXTrades".into(), schema: "dexes_dwd".into(),
905            table: "sol_dex_trades".into(),
906            selects: vec![
907                SelectExpr::Column { column: "tx_hash".into(), alias: None },
908                SelectExpr::Column { column: "buy_token_address".into(), alias: None },
909            ],
910            filters: FilterNode::Empty, having: FilterNode::Empty,
911            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
912            limit_by: None, use_final: false,
913            joins: vec![JoinExpr {
914                schema: "dexes_dws".into(), table: "sol_token_market_cap".into(),
915                alias: "_j0".into(),
916                conditions: vec![("buy_token_address".into(), "token_address".into())],
917                selects: vec![
918                    SelectExpr::Column { column: "argMaxMerge(latest_market_cap_usd_state)".into(), alias: None },
919                ],
920                group_by: vec!["token_address".into()],
921                use_final: false, is_aggregate: true,
922                target_cube: "TokenMarketCap".into(), join_field: "joinBuyTokenMarketCap".into(),
923                join_type: JoinType::Left,
924            }],
925            custom_query_builder: None,
926            from_subquery: None,
927        };
928        let r = ch().compile(&ir);
929        assert!(r.sql.contains("LEFT JOIN (SELECT"), "aggregate should use subquery, got: {}", r.sql);
930        assert!(r.sql.contains("GROUP BY `token_address`"), "subquery GROUP BY, got: {}", r.sql);
931        assert!(r.sql.contains("FROM `dexes_dws`.`sol_token_market_cap`"), "subquery FROM, got: {}", r.sql);
932        assert!(r.sql.contains("argMaxMerge(latest_market_cap_usd_state) AS `__jc_0`"),
933            "join func expr should be aliased in subquery, got: {}", r.sql);
934        assert!(r.sql.contains("_j0.`__jc_0` AS `_j0.__jc_0`"),
935            "outer SELECT should use alias for join func col, got: {}", r.sql);
936    }
937
938    #[test]
939    fn test_join_main_query_func_expression_columns() {
940        let ir = QueryIR {
941            cube: "TokenHolders".into(), schema: "dws".into(),
942            table: "sol_token_holders".into(),
943            selects: vec![
944                SelectExpr::Column { column: "token".into(), alias: None },
945                SelectExpr::Column { column: "holder".into(), alias: None },
946                SelectExpr::Column { column: "argMaxMerge(latest_balance)".into(), alias: None },
947                SelectExpr::Column { column: "argMaxMerge(latest_balance_usd)".into(), alias: None },
948                SelectExpr::Column { column: "minMerge(first_seen)".into(), alias: None },
949                SelectExpr::Column { column: "maxMerge(last_seen)".into(), alias: None },
950            ],
951            filters: FilterNode::Empty, having: FilterNode::Empty,
952            group_by: vec![], order_by: vec![
953                OrderExpr { column: "argMaxMerge(latest_balance_usd)".into(), descending: true },
954            ],
955            limit: 100, offset: 0,
956            limit_by: None, use_final: false,
957            joins: vec![JoinExpr {
958                schema: "dim".into(), table: "sol_tokens".into(),
959                alias: "_j0".into(),
960                conditions: vec![("token".into(), "token_address".into())],
961                selects: vec![
962                    SelectExpr::Column { column: "name".into(), alias: None },
963                    SelectExpr::Column { column: "symbol".into(), alias: None },
964                ],
965                group_by: vec![], use_final: true, is_aggregate: false,
966                target_cube: "TokenSearch".into(), join_field: "joinToken".into(),
967                join_type: JoinType::Left,
968            }],
969            custom_query_builder: None,
970            from_subquery: None,
971        };
972        let r = ch().compile(&ir);
973        let sql = &r.sql;
974
975        assert!(sql.contains("_main.`__mc_0` AS `__mc_0`"),
976            "func expr should use alias __mc_0 in outer SELECT, got: {sql}");
977        assert!(sql.contains("_main.`__mc_1` AS `__mc_1`"),
978            "func expr should use alias __mc_1, got: {sql}");
979        assert!(sql.contains("_main.`token` AS `token`"),
980            "simple col should be backtick-quoted, got: {sql}");
981
982        assert!(!sql.contains("_main.argMaxMerge("),
983            "outer SELECT must NOT have bare _main.argMaxMerge(...), got: {sql}");
984
985        assert!(sql.contains("argMaxMerge(latest_balance) AS `__mc_0`"),
986            "inner query should alias func expr, got: {sql}");
987
988        assert!(r.alias_remap.iter().any(|(a, o)| a == "__mc_0" && o == "argMaxMerge(latest_balance)"),
989            "alias_remap should map __mc_0 → original, got: {:?}", r.alias_remap);
990        assert!(r.alias_remap.iter().any(|(a, o)| a == "__mc_1" && o == "argMaxMerge(latest_balance_usd)"),
991            "alias_remap should map __mc_1, got: {:?}", r.alias_remap);
992    }
993
994    #[test]
995    fn test_join_inner_type() {
996        let ir = QueryIR {
997            cube: "DEXTrades".into(), schema: "dexes_dwd".into(),
998            table: "sol_dex_trades".into(),
999            selects: vec![
1000                SelectExpr::Column { column: "tx_hash".into(), alias: None },
1001            ],
1002            filters: FilterNode::Empty, having: FilterNode::Empty,
1003            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
1004            limit_by: None, use_final: false,
1005            joins: vec![JoinExpr {
1006                schema: "dexes_dim".into(), table: "sol_tokens".into(),
1007                alias: "_j0".into(),
1008                conditions: vec![("buy_token_address".into(), "token_address".into())],
1009                selects: vec![
1010                    SelectExpr::Column { column: "name".into(), alias: None },
1011                ],
1012                group_by: vec![], use_final: false, is_aggregate: false,
1013                target_cube: "TokenSearch".into(), join_field: "joinBuyToken".into(),
1014                join_type: JoinType::Inner,
1015            }],
1016            custom_query_builder: None,
1017            from_subquery: None,
1018        };
1019        let r = ch().compile(&ir);
1020        assert!(r.sql.contains("INNER JOIN `dexes_dim`.`sol_tokens` AS _j0"),
1021            "should use INNER JOIN, got: {}", r.sql);
1022    }
1023
1024    #[test]
1025    fn test_join_full_outer_type() {
1026        let ir = QueryIR {
1027            cube: "T".into(), schema: "db".into(), table: "t".into(),
1028            selects: vec![
1029                SelectExpr::Column { column: "id".into(), alias: None },
1030            ],
1031            filters: FilterNode::Empty, having: FilterNode::Empty,
1032            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
1033            limit_by: None, use_final: false,
1034            joins: vec![JoinExpr {
1035                schema: "db2".into(), table: "t2".into(),
1036                alias: "_j0".into(),
1037                conditions: vec![("id".into(), "ref_id".into())],
1038                selects: vec![
1039                    SelectExpr::Column { column: "val".into(), alias: None },
1040                ],
1041                group_by: vec![], use_final: false, is_aggregate: false,
1042                target_cube: "Other".into(), join_field: "joinOther".into(),
1043                join_type: JoinType::Full,
1044            }],
1045            custom_query_builder: None,
1046            from_subquery: None,
1047        };
1048        let r = ch().compile(&ir);
1049        assert!(r.sql.contains("FULL OUTER JOIN `db2`.`t2` AS _j0"),
1050            "should use FULL OUTER JOIN, got: {}", r.sql);
1051    }
1052
1053    #[test]
1054    fn test_custom_query_builder() {
1055        let ir = QueryIR {
1056            cube: "Custom".into(), schema: "db".into(), table: "t".into(),
1057            selects: vec![
1058                SelectExpr::Column { column: "id".into(), alias: None },
1059            ],
1060            filters: FilterNode::Empty, having: FilterNode::Empty,
1061            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
1062            limit_by: None, use_final: false, joins: vec![],
1063            custom_query_builder: Some(QueryBuilderFn(std::sync::Arc::new(|_ir| {
1064                CompileResult {
1065                    sql: "SELECT 1 FROM custom_view".into(),
1066                    bindings: vec![],
1067                    alias_remap: vec![],
1068                }
1069            }))),
1070            from_subquery: None,
1071        };
1072        let r = ch().compile(&ir);
1073        assert_eq!(r.sql, "SELECT 1 FROM custom_view",
1074            "custom builder should bypass standard compilation, got: {}", r.sql);
1075    }
1076
1077    #[test]
1078    fn test_from_subquery() {
1079        let ir = QueryIR {
1080            cube: "DEXTradeByTokens".into(), schema: "dwd".into(),
1081            table: "sol_trades".into(),
1082            selects: vec![
1083                SelectExpr::Column { column: "amount".into(), alias: None },
1084                SelectExpr::Column { column: "side_type".into(), alias: None },
1085            ],
1086            filters: FilterNode::Condition {
1087                column: "token".into(), op: CompareOp::Eq,
1088                value: SqlValue::String("SOL".into()),
1089            },
1090            having: FilterNode::Empty,
1091            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
1092            limit_by: None, use_final: false, joins: vec![],
1093            custom_query_builder: None,
1094            from_subquery: Some(
1095                "SELECT amount, 'buy' AS side_type, token FROM dwd.sol_a UNION ALL SELECT amount, 'sell' AS side_type, token FROM dwd.sol_b".into()
1096            ),
1097        };
1098        let r = ch().compile(&ir);
1099        assert!(r.sql.starts_with("SELECT `amount`, `side_type` FROM (SELECT"),
1100            "should use subquery in FROM, got: {}", r.sql);
1101        assert!(r.sql.contains("UNION ALL"),
1102            "subquery should contain UNION ALL, got: {}", r.sql);
1103        assert!(r.sql.contains(") AS _t"),
1104            "subquery should be aliased as _t, got: {}", r.sql);
1105        assert!(r.sql.contains("WHERE `token` = ?"),
1106            "WHERE clause should be applied to subquery result, got: {}", r.sql);
1107        assert!(!r.sql.contains("FROM `dwd`.`sol_trades`"),
1108            "should NOT use schema.table when from_subquery is set, got: {}", r.sql);
1109    }
1110
1111    #[test]
1112    fn test_array_includes_single_condition() {
1113        let ir = QueryIR {
1114            cube: "Instructions".into(), schema: "dexes_dwd2".into(),
1115            table: "sol_instructions".into(),
1116            selects: vec![SelectExpr::Column { column: "tx_hash".into(), alias: None }],
1117            filters: FilterNode::ArrayIncludes {
1118                array_columns: vec![
1119                    "instruction_arg_names".into(),
1120                    "instruction_arg_types".into(),
1121                    "instruction_arg_values".into(),
1122                ],
1123                element_conditions: vec![vec![
1124                    FilterNode::Condition {
1125                        column: "instruction_arg_names".into(),
1126                        op: CompareOp::Eq,
1127                        value: SqlValue::String("amount_in".into()),
1128                    },
1129                ]],
1130            },
1131            having: FilterNode::Empty,
1132            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
1133            limit_by: None, use_final: false, joins: vec![],
1134            custom_query_builder: None, from_subquery: None,
1135        };
1136        let r = ch().compile(&ir);
1137        assert!(r.sql.contains("arrayExists((_p0, _p1, _p2) -> _p0 = ?"),
1138            "should generate arrayExists with lambda params, got: {}", r.sql);
1139        assert!(r.sql.contains("`instruction_arg_names`, `instruction_arg_types`, `instruction_arg_values`"),
1140            "should reference all parallel array columns, got: {}", r.sql);
1141        assert_eq!(r.bindings.len(), 1);
1142    }
1143
1144    #[test]
1145    fn test_array_includes_multiple_conditions() {
1146        let ir = QueryIR {
1147            cube: "Instructions".into(), schema: "dexes_dwd2".into(),
1148            table: "sol_instructions".into(),
1149            selects: vec![SelectExpr::Column { column: "tx_hash".into(), alias: None }],
1150            filters: FilterNode::ArrayIncludes {
1151                array_columns: vec![
1152                    "instruction_arg_names".into(),
1153                    "instruction_arg_values".into(),
1154                ],
1155                element_conditions: vec![
1156                    vec![
1157                        FilterNode::Condition {
1158                            column: "instruction_arg_names".into(),
1159                            op: CompareOp::Eq,
1160                            value: SqlValue::String("amount_in".into()),
1161                        },
1162                        FilterNode::Condition {
1163                            column: "instruction_arg_values".into(),
1164                            op: CompareOp::Gt,
1165                            value: SqlValue::String("10000".into()),
1166                        },
1167                    ],
1168                    vec![
1169                        FilterNode::Condition {
1170                            column: "instruction_arg_names".into(),
1171                            op: CompareOp::Eq,
1172                            value: SqlValue::String("owner".into()),
1173                        },
1174                    ],
1175                ],
1176            },
1177            having: FilterNode::Empty,
1178            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
1179            limit_by: None, use_final: false, joins: vec![],
1180            custom_query_builder: None, from_subquery: None,
1181        };
1182        let r = ch().compile(&ir);
1183        let sql = &r.sql;
1184        let count = sql.matches("arrayExists").count();
1185        assert_eq!(count, 2, "should have two arrayExists calls (AND-ed), got: {sql}");
1186        assert!(sql.contains(" AND arrayExists("),
1187            "two arrayExists should be AND-ed, got: {sql}");
1188        assert_eq!(r.bindings.len(), 3);
1189    }
1190
1191    #[test]
1192    fn test_array_includes_with_in_operator() {
1193        let ir = QueryIR {
1194            cube: "Instructions".into(), schema: "dexes_dwd2".into(),
1195            table: "sol_instructions".into(),
1196            selects: vec![SelectExpr::Column { column: "tx_hash".into(), alias: None }],
1197            filters: FilterNode::ArrayIncludes {
1198                array_columns: vec![
1199                    "instruction_arg_names".into(),
1200                    "instruction_arg_values".into(),
1201                ],
1202                element_conditions: vec![vec![
1203                    FilterNode::Condition {
1204                        column: "instruction_arg_names".into(),
1205                        op: CompareOp::Eq,
1206                        value: SqlValue::String("authorityType".into()),
1207                    },
1208                    FilterNode::Condition {
1209                        column: "instruction_arg_values".into(),
1210                        op: CompareOp::In,
1211                        value: SqlValue::String("0,1".into()),
1212                    },
1213                ]],
1214            },
1215            having: FilterNode::Empty,
1216            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
1217            limit_by: None, use_final: false, joins: vec![],
1218            custom_query_builder: None, from_subquery: None,
1219        };
1220        let r = ch().compile(&ir);
1221        assert!(r.sql.contains("arrayExists((_p0, _p1) -> (_p0 = ? AND _p1 IN (?, ?))"),
1222            "should generate arrayExists with AND-ed conditions, got: {}", r.sql);
1223        assert_eq!(r.bindings.len(), 3);
1224    }
1225
1226    #[test]
1227    fn test_array_includes_combined_with_regular_filter() {
1228        let ir = QueryIR {
1229            cube: "Instructions".into(), schema: "dexes_dwd2".into(),
1230            table: "sol_instructions".into(),
1231            selects: vec![SelectExpr::Column { column: "tx_hash".into(), alias: None }],
1232            filters: FilterNode::And(vec![
1233                FilterNode::Condition {
1234                    column: "instruction_program_address".into(),
1235                    op: CompareOp::Eq,
1236                    value: SqlValue::String("pAMMBay6oceH9fJKBRHGP5D4bD4sWpmSwMn52FMfXEA".into()),
1237                },
1238                FilterNode::ArrayIncludes {
1239                    array_columns: vec!["instruction_arg_names".into(), "instruction_arg_values".into()],
1240                    element_conditions: vec![vec![
1241                        FilterNode::Condition {
1242                            column: "instruction_arg_names".into(),
1243                            op: CompareOp::Eq,
1244                            value: SqlValue::String("amount".into()),
1245                        },
1246                    ]],
1247                },
1248            ]),
1249            having: FilterNode::Empty,
1250            group_by: vec![], order_by: vec![], limit: 10, offset: 0,
1251            limit_by: None, use_final: false, joins: vec![],
1252            custom_query_builder: None, from_subquery: None,
1253        };
1254        let r = ch().compile(&ir);
1255        let sql = &r.sql;
1256        assert!(sql.contains("`instruction_program_address` = ?"),
1257            "should have regular condition, got: {sql}");
1258        assert!(sql.contains("arrayExists("),
1259            "should have arrayExists, got: {sql}");
1260        assert!(sql.contains(" AND "),
1261            "regular + array conditions should be AND-ed, got: {sql}");
1262    }
1263}