1use std::collections::HashMap;
2
3use crate::compiler::ir::*;
4use crate::compiler::ir::{CompileResult, JoinType};
5use crate::sql::dialect::SqlDialect;
6
7pub struct ClickHouseDialect;
8
9impl ClickHouseDialect {
10 pub fn new() -> Self {
11 Self
12 }
13}
14
15impl Default for ClickHouseDialect {
16 fn default() -> Self {
17 Self::new()
18 }
19}
20
21impl SqlDialect for ClickHouseDialect {
22 fn compile(&self, ir: &QueryIR) -> CompileResult {
23 if let Some(ref builder) = ir.custom_query_builder {
24 return (builder.0)(ir);
25 }
26
27 let mut bindings = Vec::new();
28 let mut alias_remap: Vec<(String, String)> = Vec::new();
29
30 if ir.joins.is_empty() {
31 let inner_sql = self.compile_inner(ir, &mut bindings, &mut alias_remap);
32 return CompileResult { sql: inner_sql, bindings, alias_remap };
33 }
34
35 let mut ir_mod = ir.clone();
40 let mut mc_counter = 0u32;
41 for sel in &mut ir_mod.selects {
42 if let SelectExpr::Column { column, alias } = sel {
43 if column.contains('(') && alias.is_none() {
44 let a = format!("__mc_{mc_counter}");
45 mc_counter += 1;
46 alias_remap.push((a.clone(), column.clone()));
47 *alias = Some(a);
48 }
49 }
50 }
51
52 let mut jc_counter = 0u32;
53 for join in &mut ir_mod.joins {
54 for sel in &mut join.selects {
55 if let SelectExpr::Column { column, alias } = sel {
56 if column.contains('(') && alias.is_none() {
57 let a = format!("__jc_{jc_counter}");
58 jc_counter += 1;
59 *alias = Some(a);
60 }
61 }
62 }
63 }
64
65 let inner_sql = self.compile_inner(&ir_mod, &mut bindings, &mut alias_remap);
66
67 let main_cols: Vec<String> = ir_mod.selects.iter().map(|s| {
71 let col = match s {
72 SelectExpr::Column { column, alias } => alias.as_ref().unwrap_or(column).clone(),
73 SelectExpr::Aggregate { alias, .. } => alias.clone(),
74 };
75 format!("_main.`{}` AS `{}`", col, col)
76 }).collect();
77
78 let mut sql = String::from("SELECT ");
79 sql.push_str(&main_cols.join(", "));
80
81 for join in &ir_mod.joins {
83 for sel in &join.selects {
84 let col_name = match sel {
85 SelectExpr::Column { column, alias } => alias.as_ref().unwrap_or(column).clone(),
86 SelectExpr::Aggregate { alias, .. } => alias.clone(),
87 };
88 let outer_alias = format!("{}.{}", join.alias, col_name);
89 if let SelectExpr::Column { column, alias: Some(_) } = sel {
90 if column.contains('(') {
91 let outer_original = format!("{}.{}", join.alias, column);
92 alias_remap.push((outer_alias.clone(), outer_original));
93 }
94 }
95 sql.push_str(&format!(", {}.`{}` AS `{}`",
96 join.alias, col_name, outer_alias));
97 }
98 }
99
100 sql.push_str(&format!(" FROM ({}) AS _main", inner_sql));
101
102 let main_alias_map: HashMap<String, String> = ir_mod.selects.iter()
105 .filter_map(|s| {
106 if let SelectExpr::Column { column, alias: Some(a) } = s {
107 if column.contains('(') { Some((column.clone(), a.clone())) } else { None }
108 } else { None }
109 })
110 .collect();
111
112 for join in &ir_mod.joins {
113 let join_kw = join.join_type.sql_keyword();
114
115 if join.is_aggregate {
116 sql.push_str(&format!(" {} (SELECT ", join_kw));
118 let mut sub_parts: Vec<String> = Vec::new();
119 for gb_col in &join.group_by {
120 sub_parts.push(quote_col(gb_col));
121 }
122 for sel in &join.selects {
123 match sel {
124 SelectExpr::Column { column, alias } => {
125 let col = if column.contains('(') { column.clone() } else { format!("`{column}`") };
126 if let Some(a) = alias {
127 sub_parts.push(format!("{col} AS `{a}`"));
128 } else if column.contains('(') || !join.group_by.contains(column) {
129 sub_parts.push(col);
130 }
131 }
132 SelectExpr::Aggregate { function, column, alias, condition } => {
133 let func = function.to_lowercase();
134 let expr = match (func.as_str(), column.as_str(), condition) {
135 ("count", "*", None) => format!("count() AS `{alias}`"),
136 ("uniq", col, None) => format!("uniq(`{col}`) AS `{alias}`"),
137 (f, col, None) => format!("{f}(`{col}`) AS `{alias}`"),
138 (f, col, Some(cond)) => format!("{f}If(`{col}`, {cond}) AS `{alias}`"),
139 };
140 sub_parts.push(expr);
141 }
142 }
143 }
144 sql.push_str(&sub_parts.join(", "));
145 sql.push_str(&format!(" FROM `{}`.`{}`", join.schema, join.table));
146 if !join.group_by.is_empty() {
147 sql.push_str(" GROUP BY ");
148 let gb: Vec<String> = join.group_by.iter().map(|c| quote_col(c)).collect();
149 sql.push_str(&gb.join(", "));
150 }
151 sql.push_str(&format!(") AS {}", join.alias));
152 } else {
153 sql.push_str(&format!(" {} `{}`.`{}` AS {}",
155 join_kw, join.schema, join.table, join.alias));
156 if join.use_final {
157 sql.push_str(" FINAL");
158 }
159 }
160
161 if join.join_type == JoinType::Cross {
162 continue;
164 }
165
166 let on_parts: Vec<String> = join.conditions.iter().map(|(local, remote)| {
168 let local_ref = main_alias_map.get(local).unwrap_or(local);
169 format!("_main.`{}` = {}.`{}`", local_ref, join.alias, remote)
170 }).collect();
171 sql.push_str(" ON ");
172 sql.push_str(&on_parts.join(" AND "));
173 }
174
175 CompileResult { sql, bindings, alias_remap }
176 }
177
178 fn quote_identifier(&self, name: &str) -> String {
179 format!("`{name}`")
180 }
181
182 fn name(&self) -> &str {
183 "ClickHouse"
184 }
185}
186
187impl ClickHouseDialect {
188 fn compile_inner(
189 &self,
190 ir: &QueryIR,
191 bindings: &mut Vec<SqlValue>,
192 alias_remap: &mut Vec<(String, String)>,
193 ) -> String {
194 let mut sql = String::new();
195
196 let mut augmented_selects = ir.selects.clone();
197 let mut agg_alias_map: HashMap<String, String> = HashMap::new();
198 let mut alias_counter = 0u32;
199
200 let having_cols: std::collections::HashSet<String> =
201 collect_filter_columns(&ir.having).into_iter().collect();
202 let has_having_agg = having_cols.iter().any(|c| c.contains('('));
203
204 if has_having_agg {
205 for sel in &mut augmented_selects {
206 if let SelectExpr::Column { column, alias } = sel {
207 if column.contains('(') && having_cols.contains(column.as_str()) {
208 if alias.is_none() {
209 let a = format!("__f_{alias_counter}");
210 alias_counter += 1;
211 alias_remap.push((a.clone(), column.clone()));
212 agg_alias_map.insert(column.clone(), a.clone());
213 *alias = Some(a);
214 } else if let Some(existing) = alias {
215 agg_alias_map.insert(column.clone(), existing.clone());
216 }
217 }
218 }
219 }
220 for col in &having_cols {
221 if col.contains('(') && !agg_alias_map.contains_key(col.as_str()) {
222 let a = format!("__f_{alias_counter}");
223 alias_counter += 1;
224 agg_alias_map.insert(col.clone(), a.clone());
225 augmented_selects.push(SelectExpr::Column {
226 column: col.clone(),
227 alias: Some(a),
228 });
229 }
230 }
231 }
232
233 sql.push_str("SELECT ");
234 let select_parts: Vec<String> = augmented_selects.iter().map(|s| match s {
235 SelectExpr::Column { column, alias } => {
236 let col = if column.contains('(') { column.clone() } else { format!("`{column}`") };
237 match alias {
238 Some(a) => format!("{col} AS `{a}`"),
239 None => col,
240 }
241 },
242 SelectExpr::Aggregate { function, column, alias, condition } => {
243 let func = function.to_uppercase();
244 match (func.as_str(), column.as_str(), condition) {
245 ("COUNT", "*", None) => format!("count() AS `{alias}`"),
246 ("COUNT", "*", Some(cond)) => format!("countIf({cond}) AS `{alias}`"),
247 ("UNIQ", col, None) => format!("uniq(`{col}`) AS `{alias}`"),
248 ("UNIQ", col, Some(cond)) => format!("uniqIf(`{col}`, {cond}) AS `{alias}`"),
249 (_, col, None) => format!("{f}(`{col}`) AS `{alias}`", f = func.to_lowercase()),
250 (_, col, Some(cond)) => format!("{f}If(`{col}`, {cond}) AS `{alias}`", f = func.to_lowercase()),
251 }
252 }
253 }).collect();
254 sql.push_str(&select_parts.join(", "));
255
256 if let Some(ref subquery) = ir.from_subquery {
257 sql.push_str(&format!(" FROM ({}) AS _t", subquery));
258 } else {
259 sql.push_str(&format!(" FROM `{}`.`{}`", ir.schema, ir.table));
260 if ir.use_final {
261 sql.push_str(" FINAL");
262 }
263 }
264
265 let where_clause = compile_filter(&ir.filters, bindings);
266 if !where_clause.is_empty() {
267 sql.push_str(" WHERE ");
268 sql.push_str(&where_clause);
269 }
270
271 let effective_group_by = if !ir.group_by.is_empty() {
272 ir.group_by.clone()
273 } else {
274 let has_merge_cols = augmented_selects.iter().any(|s| match s {
275 SelectExpr::Column { column, .. } => column.contains("Merge("),
276 SelectExpr::Aggregate { .. } => true,
277 });
278 if has_merge_cols {
279 augmented_selects.iter().filter_map(|s| match s {
280 SelectExpr::Column { column, .. } if !column.contains("Merge(") && !column.contains('(') => {
281 Some(column.clone())
282 }
283 _ => None,
284 }).collect()
285 } else {
286 vec![]
287 }
288 };
289
290 if !effective_group_by.is_empty() {
291 sql.push_str(" GROUP BY ");
292 let cols: Vec<String> = effective_group_by.iter().map(|c| format!("`{c}`")).collect();
293 sql.push_str(&cols.join(", "));
294 }
295
296 if has_having_agg {
297 let having_clause = compile_filter_with_aliases(&ir.having, bindings, &agg_alias_map);
298 if !having_clause.is_empty() {
299 sql.push_str(" HAVING ");
300 sql.push_str(&having_clause);
301 }
302 } else {
303 let having_clause = compile_filter(&ir.having, bindings);
304 if !having_clause.is_empty() {
305 sql.push_str(" HAVING ");
306 sql.push_str(&having_clause);
307 }
308 }
309
310 if !ir.order_by.is_empty() {
311 sql.push_str(" ORDER BY ");
312 let parts: Vec<String> = ir.order_by.iter().map(|o| {
313 let col = if o.column.contains('(') {
314 agg_alias_map.get(&o.column)
315 .map(|a| format!("`{a}`"))
316 .unwrap_or_else(|| o.column.clone())
317 } else {
318 format!("`{}`", o.column)
319 };
320 let dir = if o.descending { "DESC" } else { "ASC" };
321 format!("{col} {dir}")
322 }).collect();
323 sql.push_str(&parts.join(", "));
324 }
325
326 if let Some(ref lb) = ir.limit_by {
327 let by_cols: Vec<String> = lb.columns.iter().map(|c| format!("`{c}`")).collect();
328 sql.push_str(&format!(" LIMIT {} BY {}", lb.count, by_cols.join(", ")));
329 if lb.offset > 0 {
330 sql.push_str(&format!(" OFFSET {}", lb.offset));
331 }
332 }
333
334 sql.push_str(&format!(" LIMIT {}", ir.limit));
335 if ir.offset > 0 {
336 sql.push_str(&format!(" OFFSET {}", ir.offset));
337 }
338
339 sql
340 }
341}
342
343fn collect_filter_columns(node: &FilterNode) -> Vec<String> {
345 match node {
346 FilterNode::Empty => vec![],
347 FilterNode::Condition { column, .. } => vec![column.clone()],
348 FilterNode::And(children) | FilterNode::Or(children) => {
349 children.iter().flat_map(collect_filter_columns).collect()
350 }
351 }
352}
353
354fn compile_filter_with_aliases(
357 node: &FilterNode,
358 bindings: &mut Vec<SqlValue>,
359 aliases: &HashMap<String, String>,
360) -> String {
361 match node {
362 FilterNode::Empty => String::new(),
363 FilterNode::Condition { column, op, value } => {
364 let effective_col = aliases.get(column)
365 .map(|a| a.as_str())
366 .unwrap_or(column.as_str());
367 compile_condition(effective_col, op, value, bindings)
368 }
369 FilterNode::And(children) => {
370 let parts: Vec<String> = children.iter()
371 .map(|c| compile_filter_with_aliases(c, bindings, aliases))
372 .filter(|s| !s.is_empty())
373 .collect();
374 match parts.len() {
375 0 => String::new(),
376 1 => parts.into_iter().next().unwrap(),
377 _ => format!("({})", parts.join(" AND ")),
378 }
379 }
380 FilterNode::Or(children) => {
381 let parts: Vec<String> = children.iter()
382 .map(|c| compile_filter_with_aliases(c, bindings, aliases))
383 .filter(|s| !s.is_empty())
384 .collect();
385 match parts.len() {
386 0 => String::new(),
387 1 => parts.into_iter().next().unwrap(),
388 _ => format!("({})", parts.join(" OR ")),
389 }
390 }
391 }
392}
393
394fn compile_filter(node: &FilterNode, bindings: &mut Vec<SqlValue>) -> String {
395 match node {
396 FilterNode::Empty => String::new(),
397 FilterNode::Condition { column, op, value } => {
398 compile_condition(column, op, value, bindings)
399 }
400 FilterNode::And(children) => {
401 let parts: Vec<String> = children.iter()
402 .map(|c| compile_filter(c, bindings))
403 .filter(|s| !s.is_empty())
404 .collect();
405 match parts.len() {
406 0 => String::new(),
407 1 => parts.into_iter().next().unwrap(),
408 _ => format!("({})", parts.join(" AND ")),
409 }
410 }
411 FilterNode::Or(children) => {
412 let parts: Vec<String> = children.iter()
413 .map(|c| compile_filter(c, bindings))
414 .filter(|s| !s.is_empty())
415 .collect();
416 match parts.len() {
417 0 => String::new(),
418 1 => parts.into_iter().next().unwrap(),
419 _ => format!("({})", parts.join(" OR ")),
420 }
421 }
422 }
423}
424
425fn quote_col(column: &str) -> String {
426 if column.contains('(') {
427 column.to_string()
428 } else {
429 format!("`{column}`")
430 }
431}
432
433fn compile_condition(
434 column: &str, op: &CompareOp, value: &SqlValue, bindings: &mut Vec<SqlValue>,
435) -> String {
436 let col = quote_col(column);
437 match op {
438 CompareOp::In | CompareOp::NotIn => {
439 if let SqlValue::String(csv) = value {
440 let items: Vec<&str> = csv.split(',').collect();
441 let placeholders: Vec<&str> = items.iter().map(|_| "?").collect();
442 for item in &items {
443 bindings.push(SqlValue::String(item.trim().to_string()));
444 }
445 format!("{col} {} ({})", op.sql_op(), placeholders.join(", "))
446 } else {
447 bindings.push(value.clone());
448 format!("{col} {} (?)", op.sql_op())
449 }
450 }
451 CompareOp::Includes => {
452 if let SqlValue::String(s) = value {
453 bindings.push(SqlValue::String(format!("%{s}%")));
454 } else {
455 bindings.push(value.clone());
456 }
457 format!("{col} LIKE ?")
458 }
459 CompareOp::IsNull | CompareOp::IsNotNull => {
460 format!("{col} {}", op.sql_op())
461 }
462 _ => {
463 bindings.push(value.clone());
464 format!("{col} {} ?", op.sql_op())
465 }
466 }
467}
468
469#[cfg(test)]
470mod tests {
471 use super::*;
472
473 fn ch() -> ClickHouseDialect { ClickHouseDialect::new() }
474
475 #[test]
476 fn test_simple_select() {
477 let ir = QueryIR {
478 cube: "DEXTrades".into(), schema: "default".into(),
479 table: "dwd_dex_trades".into(),
480 selects: vec![
481 SelectExpr::Column { column: "tx_hash".into(), alias: None },
482 SelectExpr::Column { column: "token_a_amount".into(), alias: None },
483 ],
484 filters: FilterNode::Empty, having: FilterNode::Empty,
485 group_by: vec![], order_by: vec![], limit: 10, offset: 0,
486 limit_by: None,
487 use_final: false,
488 joins: vec![],
489 custom_query_builder: None,
490 from_subquery: None,
491 };
492 let r = ch().compile(&ir);
493 assert_eq!(r.sql, "SELECT `tx_hash`, `token_a_amount` FROM `default`.`dwd_dex_trades` LIMIT 10");
494 assert!(r.bindings.is_empty());
495 }
496
497 #[test]
498 fn test_final_keyword() {
499 let ir = QueryIR {
500 cube: "T".into(), schema: "db".into(), table: "tokens".into(),
501 selects: vec![SelectExpr::Column { column: "id".into(), alias: None }],
502 filters: FilterNode::Empty, having: FilterNode::Empty,
503 group_by: vec![], order_by: vec![], limit: 10, offset: 0,
504 limit_by: None,
505 use_final: true,
506 joins: vec![],
507 custom_query_builder: None,
508 from_subquery: None,
509 };
510 let r = ch().compile(&ir);
511 assert!(r.sql.contains("FROM `db`.`tokens` FINAL"), "FINAL should be appended, got: {}", r.sql);
512 }
513
514 #[test]
515 fn test_uniq_uses_native_function() {
516 let ir = QueryIR {
517 cube: "T".into(), schema: "db".into(), table: "t".into(),
518 selects: vec![
519 SelectExpr::Aggregate { function: "UNIQ".into(), column: "wallet".into(), alias: "__uniq".into(), condition: None },
520 ],
521 filters: FilterNode::Empty, having: FilterNode::Empty,
522 group_by: vec![], order_by: vec![], limit: 10, offset: 0,
523 limit_by: None, use_final: false, joins: vec![], custom_query_builder: None, from_subquery: None,
524 };
525 let r = ch().compile(&ir);
526 assert!(r.sql.contains("uniq(`wallet`) AS `__uniq`"), "ClickHouse should use native uniq(), got: {}", r.sql);
527 }
528
529 #[test]
530 fn test_count_star() {
531 let ir = QueryIR {
532 cube: "T".into(), schema: "db".into(), table: "t".into(),
533 selects: vec![
534 SelectExpr::Aggregate { function: "COUNT".into(), column: "*".into(), alias: "__count".into(), condition: None },
535 ],
536 filters: FilterNode::Empty, having: FilterNode::Empty,
537 group_by: vec![], order_by: vec![], limit: 10, offset: 0,
538 limit_by: None, use_final: false, joins: vec![], custom_query_builder: None, from_subquery: None,
539 };
540 let r = ch().compile(&ir);
541 assert!(r.sql.contains("count() AS `__count`"), "ClickHouse should use count() not COUNT(*), got: {}", r.sql);
542 }
543
544 #[test]
545 fn test_aggregate_lowercase() {
546 let ir = QueryIR {
547 cube: "T".into(), schema: "db".into(), table: "t".into(),
548 selects: vec![
549 SelectExpr::Aggregate { function: "SUM".into(), column: "amount".into(), alias: "__sum".into(), condition: None },
550 SelectExpr::Aggregate { function: "AVG".into(), column: "price".into(), alias: "__avg".into(), condition: None },
551 ],
552 filters: FilterNode::Empty, having: FilterNode::Empty,
553 group_by: vec![], order_by: vec![], limit: 10, offset: 0,
554 limit_by: None, use_final: false, joins: vec![], custom_query_builder: None, from_subquery: None,
555 };
556 let r = ch().compile(&ir);
557 assert!(r.sql.contains("sum(`amount`) AS `__sum`"), "ClickHouse functions should be lowercase, got: {}", r.sql);
558 assert!(r.sql.contains("avg(`price`) AS `__avg`"), "got: {}", r.sql);
559 }
560
561 #[test]
562 fn test_where_and_order() {
563 let ir = QueryIR {
564 cube: "T".into(), schema: "db".into(), table: "t".into(),
565 selects: vec![SelectExpr::Column { column: "id".into(), alias: None }],
566 filters: FilterNode::And(vec![
567 FilterNode::Condition { column: "chain_id".into(), op: CompareOp::Eq, value: SqlValue::Int(1) },
568 FilterNode::Condition { column: "amount_usd".into(), op: CompareOp::Gt, value: SqlValue::Float(1000.0) },
569 ]),
570 having: FilterNode::Empty, group_by: vec![],
571 order_by: vec![OrderExpr { column: "block_timestamp".into(), descending: true }],
572 limit: 25, offset: 0,
573 limit_by: None, use_final: false, joins: vec![], custom_query_builder: None, from_subquery: None,
574 };
575 let r = ch().compile(&ir);
576 assert!(r.sql.contains("WHERE (`chain_id` = ? AND `amount_usd` > ?)"));
577 assert!(r.sql.contains("ORDER BY `block_timestamp` DESC"));
578 assert_eq!(r.bindings.len(), 2);
579 }
580
581 #[test]
582 fn test_having_with_aggregate_expr() {
583 let ir = QueryIR {
584 cube: "T".into(), schema: "db".into(), table: "t".into(),
585 selects: vec![
586 SelectExpr::Column { column: "token_address".into(), alias: None },
587 SelectExpr::Aggregate { function: "SUM".into(), column: "amount_usd".into(), alias: "__sum".into(), condition: None },
588 ],
589 filters: FilterNode::Empty,
590 having: FilterNode::Condition {
591 column: "sum(`amount_usd`)".into(), op: CompareOp::Gt, value: SqlValue::Float(1000000.0),
592 },
593 group_by: vec!["token_address".into()], order_by: vec![], limit: 25, offset: 0,
594 limit_by: None, use_final: false, joins: vec![], custom_query_builder: None, from_subquery: None,
595 };
596 let r = ch().compile(&ir);
597 assert!(r.sql.contains("GROUP BY `token_address`"));
598 assert!(r.sql.contains("HAVING `__f_0` > ?"), "expected alias in HAVING, got: {}", r.sql);
599 assert!(r.sql.contains("sum(`amount_usd`) AS `__f_0`"), "expected alias in SELECT, got: {}", r.sql);
600 assert_eq!(r.bindings.len(), 1);
601 }
602
603 #[test]
604 fn test_having_appends_missing_agg_column() {
605 let ir = QueryIR {
606 cube: "T".into(), schema: "db".into(), table: "t".into(),
607 selects: vec![
608 SelectExpr::Column { column: "pool_address".into(), alias: None },
609 SelectExpr::Column { column: "argMaxMerge(latest_liquidity_usd_state)".into(), alias: None },
610 ],
611 filters: FilterNode::Empty,
612 having: FilterNode::And(vec![
613 FilterNode::Condition {
614 column: "argMaxMerge(latest_liquidity_usd_state)".into(),
615 op: CompareOp::Gt, value: SqlValue::Float(2.0),
616 },
617 FilterNode::Condition {
618 column: "argMaxMerge(latest_token_a_amount_state)".into(),
619 op: CompareOp::Gt, value: SqlValue::Float(3.0),
620 },
621 ]),
622 group_by: vec!["pool_address".into()], order_by: vec![], limit: 25, offset: 0,
623 limit_by: None, use_final: false, joins: vec![], custom_query_builder: None, from_subquery: None,
624 };
625 let r = ch().compile(&ir);
626 assert!(r.sql.contains("argMaxMerge(latest_liquidity_usd_state) AS `__f_0`"),
627 "existing HAVING col should be aliased, got: {}", r.sql);
628 assert!(r.sql.contains("argMaxMerge(latest_token_a_amount_state) AS `__f_1`"),
629 "missing agg col should be appended, got: {}", r.sql);
630 assert!(r.sql.contains("HAVING (`__f_0` > ? AND `__f_1` > ?)"),
631 "HAVING should use aliases, got: {}", r.sql);
632 assert_eq!(r.bindings.len(), 2);
633 assert_eq!(r.alias_remap.len(), 1);
634 assert_eq!(r.alias_remap[0], ("__f_0".to_string(), "argMaxMerge(latest_liquidity_usd_state)".to_string()));
635 }
636
637 #[test]
638 fn test_limit_by() {
639 let ir = QueryIR {
640 cube: "T".into(), schema: "db".into(), table: "t".into(),
641 selects: vec![
642 SelectExpr::Column { column: "owner".into(), alias: None },
643 SelectExpr::Column { column: "amount".into(), alias: None },
644 ],
645 filters: FilterNode::Empty, having: FilterNode::Empty,
646 group_by: vec![],
647 order_by: vec![OrderExpr { column: "amount".into(), descending: true }],
648 limit: 100, offset: 0,
649 limit_by: Some(LimitByExpr { count: 3, offset: 0, columns: vec!["owner".into()] }),
650 use_final: false, joins: vec![], custom_query_builder: None, from_subquery: None,
651 };
652 let r = ch().compile(&ir);
653 let sql = &r.sql;
654 assert!(sql.contains("LIMIT 3 BY `owner`"), "LIMIT BY should be present, got: {sql}");
655 assert!(sql.contains("ORDER BY `amount` DESC"), "ORDER BY should be present, got: {sql}");
656 assert!(sql.contains("LIMIT 100"), "outer LIMIT should be present, got: {sql}");
657 let order_by_pos = sql.find("ORDER BY").unwrap();
658 let limit_by_pos = sql.find("LIMIT 3 BY").unwrap();
659 let limit_pos = sql.rfind("LIMIT 100").unwrap();
660 assert!(order_by_pos < limit_by_pos, "ORDER BY should come before LIMIT BY in ClickHouse");
661 assert!(limit_by_pos < limit_pos, "LIMIT BY should come before outer LIMIT");
662 }
663
664 #[test]
665 fn test_limit_by_with_offset() {
666 let ir = QueryIR {
667 cube: "T".into(), schema: "db".into(), table: "t".into(),
668 selects: vec![SelectExpr::Column { column: "id".into(), alias: None }],
669 filters: FilterNode::Empty, having: FilterNode::Empty,
670 group_by: vec![], order_by: vec![], limit: 10, offset: 0,
671 limit_by: Some(LimitByExpr { count: 5, offset: 2, columns: vec!["token".into(), "wallet".into()] }),
672 use_final: false, joins: vec![], custom_query_builder: None, from_subquery: None,
673 };
674 let r = ch().compile(&ir);
675 assert!(r.sql.contains("LIMIT 5 BY `token`, `wallet` OFFSET 2"), "multi-column LIMIT BY with offset, got: {}", r.sql);
676 }
677
678 #[test]
679 fn test_join_direct() {
680 let ir = QueryIR {
681 cube: "DEXTrades".into(), schema: "dexes_dwd".into(),
682 table: "sol_dex_trades".into(),
683 selects: vec![
684 SelectExpr::Column { column: "tx_hash".into(), alias: None },
685 SelectExpr::Column { column: "buy_token_address".into(), alias: None },
686 ],
687 filters: FilterNode::Empty, having: FilterNode::Empty,
688 group_by: vec![], order_by: vec![], limit: 25, offset: 0,
689 limit_by: None, use_final: false,
690 joins: vec![JoinExpr {
691 schema: "dexes_dim".into(), table: "sol_tokens".into(),
692 alias: "_j0".into(),
693 conditions: vec![("buy_token_address".into(), "token_address".into())],
694 selects: vec![
695 SelectExpr::Column { column: "name".into(), alias: None },
696 SelectExpr::Column { column: "symbol".into(), alias: None },
697 ],
698 group_by: vec![], use_final: true, is_aggregate: false,
699 target_cube: "TokenSearch".into(), join_field: "joinBuyToken".into(),
700 join_type: JoinType::Left,
701 }],
702 custom_query_builder: None,
703 from_subquery: None,
704 };
705 let r = ch().compile(&ir);
706 assert!(r.sql.contains("FROM (SELECT"), "main query should be wrapped, got: {}", r.sql);
707 assert!(r.sql.contains("LEFT JOIN `dexes_dim`.`sol_tokens` AS _j0 FINAL"),
708 "direct JOIN with FINAL after alias, got: {}", r.sql);
709 assert!(r.sql.contains("_main.`buy_token_address` = _j0.`token_address`"),
710 "ON condition, got: {}", r.sql);
711 assert!(r.sql.contains("_j0.`name` AS `_j0.name`"), "joined col alias, got: {}", r.sql);
712 }
713
714 #[test]
715 fn test_join_aggregate_subquery() {
716 let ir = QueryIR {
717 cube: "DEXTrades".into(), schema: "dexes_dwd".into(),
718 table: "sol_dex_trades".into(),
719 selects: vec![
720 SelectExpr::Column { column: "tx_hash".into(), alias: None },
721 SelectExpr::Column { column: "buy_token_address".into(), alias: None },
722 ],
723 filters: FilterNode::Empty, having: FilterNode::Empty,
724 group_by: vec![], order_by: vec![], limit: 10, offset: 0,
725 limit_by: None, use_final: false,
726 joins: vec![JoinExpr {
727 schema: "dexes_dws".into(), table: "sol_token_market_cap".into(),
728 alias: "_j0".into(),
729 conditions: vec![("buy_token_address".into(), "token_address".into())],
730 selects: vec![
731 SelectExpr::Column { column: "argMaxMerge(latest_market_cap_usd_state)".into(), alias: None },
732 ],
733 group_by: vec!["token_address".into()],
734 use_final: false, is_aggregate: true,
735 target_cube: "TokenMarketCap".into(), join_field: "joinBuyTokenMarketCap".into(),
736 join_type: JoinType::Left,
737 }],
738 custom_query_builder: None,
739 from_subquery: None,
740 };
741 let r = ch().compile(&ir);
742 assert!(r.sql.contains("LEFT JOIN (SELECT"), "aggregate should use subquery, got: {}", r.sql);
743 assert!(r.sql.contains("GROUP BY `token_address`"), "subquery GROUP BY, got: {}", r.sql);
744 assert!(r.sql.contains("FROM `dexes_dws`.`sol_token_market_cap`"), "subquery FROM, got: {}", r.sql);
745 assert!(r.sql.contains("argMaxMerge(latest_market_cap_usd_state) AS `__jc_0`"),
746 "join func expr should be aliased in subquery, got: {}", r.sql);
747 assert!(r.sql.contains("_j0.`__jc_0` AS `_j0.__jc_0`"),
748 "outer SELECT should use alias for join func col, got: {}", r.sql);
749 }
750
751 #[test]
752 fn test_join_main_query_func_expression_columns() {
753 let ir = QueryIR {
754 cube: "TokenHolders".into(), schema: "dws".into(),
755 table: "sol_token_holders".into(),
756 selects: vec![
757 SelectExpr::Column { column: "token".into(), alias: None },
758 SelectExpr::Column { column: "holder".into(), alias: None },
759 SelectExpr::Column { column: "argMaxMerge(latest_balance)".into(), alias: None },
760 SelectExpr::Column { column: "argMaxMerge(latest_balance_usd)".into(), alias: None },
761 SelectExpr::Column { column: "minMerge(first_seen)".into(), alias: None },
762 SelectExpr::Column { column: "maxMerge(last_seen)".into(), alias: None },
763 ],
764 filters: FilterNode::Empty, having: FilterNode::Empty,
765 group_by: vec![], order_by: vec![
766 OrderExpr { column: "argMaxMerge(latest_balance_usd)".into(), descending: true },
767 ],
768 limit: 100, offset: 0,
769 limit_by: None, use_final: false,
770 joins: vec![JoinExpr {
771 schema: "dim".into(), table: "sol_tokens".into(),
772 alias: "_j0".into(),
773 conditions: vec![("token".into(), "token_address".into())],
774 selects: vec![
775 SelectExpr::Column { column: "name".into(), alias: None },
776 SelectExpr::Column { column: "symbol".into(), alias: None },
777 ],
778 group_by: vec![], use_final: true, is_aggregate: false,
779 target_cube: "TokenSearch".into(), join_field: "joinToken".into(),
780 join_type: JoinType::Left,
781 }],
782 custom_query_builder: None,
783 from_subquery: None,
784 };
785 let r = ch().compile(&ir);
786 let sql = &r.sql;
787
788 assert!(sql.contains("_main.`__mc_0` AS `__mc_0`"),
789 "func expr should use alias __mc_0 in outer SELECT, got: {sql}");
790 assert!(sql.contains("_main.`__mc_1` AS `__mc_1`"),
791 "func expr should use alias __mc_1, got: {sql}");
792 assert!(sql.contains("_main.`token` AS `token`"),
793 "simple col should be backtick-quoted, got: {sql}");
794
795 assert!(!sql.contains("_main.argMaxMerge("),
796 "outer SELECT must NOT have bare _main.argMaxMerge(...), got: {sql}");
797
798 assert!(sql.contains("argMaxMerge(latest_balance) AS `__mc_0`"),
799 "inner query should alias func expr, got: {sql}");
800
801 assert!(r.alias_remap.iter().any(|(a, o)| a == "__mc_0" && o == "argMaxMerge(latest_balance)"),
802 "alias_remap should map __mc_0 → original, got: {:?}", r.alias_remap);
803 assert!(r.alias_remap.iter().any(|(a, o)| a == "__mc_1" && o == "argMaxMerge(latest_balance_usd)"),
804 "alias_remap should map __mc_1, got: {:?}", r.alias_remap);
805 }
806
807 #[test]
808 fn test_join_inner_type() {
809 let ir = QueryIR {
810 cube: "DEXTrades".into(), schema: "dexes_dwd".into(),
811 table: "sol_dex_trades".into(),
812 selects: vec![
813 SelectExpr::Column { column: "tx_hash".into(), alias: None },
814 ],
815 filters: FilterNode::Empty, having: FilterNode::Empty,
816 group_by: vec![], order_by: vec![], limit: 10, offset: 0,
817 limit_by: None, use_final: false,
818 joins: vec![JoinExpr {
819 schema: "dexes_dim".into(), table: "sol_tokens".into(),
820 alias: "_j0".into(),
821 conditions: vec![("buy_token_address".into(), "token_address".into())],
822 selects: vec![
823 SelectExpr::Column { column: "name".into(), alias: None },
824 ],
825 group_by: vec![], use_final: false, is_aggregate: false,
826 target_cube: "TokenSearch".into(), join_field: "joinBuyToken".into(),
827 join_type: JoinType::Inner,
828 }],
829 custom_query_builder: None,
830 from_subquery: None,
831 };
832 let r = ch().compile(&ir);
833 assert!(r.sql.contains("INNER JOIN `dexes_dim`.`sol_tokens` AS _j0"),
834 "should use INNER JOIN, got: {}", r.sql);
835 }
836
837 #[test]
838 fn test_join_full_outer_type() {
839 let ir = QueryIR {
840 cube: "T".into(), schema: "db".into(), table: "t".into(),
841 selects: vec![
842 SelectExpr::Column { column: "id".into(), alias: None },
843 ],
844 filters: FilterNode::Empty, having: FilterNode::Empty,
845 group_by: vec![], order_by: vec![], limit: 10, offset: 0,
846 limit_by: None, use_final: false,
847 joins: vec![JoinExpr {
848 schema: "db2".into(), table: "t2".into(),
849 alias: "_j0".into(),
850 conditions: vec![("id".into(), "ref_id".into())],
851 selects: vec![
852 SelectExpr::Column { column: "val".into(), alias: None },
853 ],
854 group_by: vec![], use_final: false, is_aggregate: false,
855 target_cube: "Other".into(), join_field: "joinOther".into(),
856 join_type: JoinType::Full,
857 }],
858 custom_query_builder: None,
859 from_subquery: None,
860 };
861 let r = ch().compile(&ir);
862 assert!(r.sql.contains("FULL OUTER JOIN `db2`.`t2` AS _j0"),
863 "should use FULL OUTER JOIN, got: {}", r.sql);
864 }
865
866 #[test]
867 fn test_custom_query_builder() {
868 let ir = QueryIR {
869 cube: "Custom".into(), schema: "db".into(), table: "t".into(),
870 selects: vec![
871 SelectExpr::Column { column: "id".into(), alias: None },
872 ],
873 filters: FilterNode::Empty, having: FilterNode::Empty,
874 group_by: vec![], order_by: vec![], limit: 10, offset: 0,
875 limit_by: None, use_final: false, joins: vec![],
876 custom_query_builder: Some(QueryBuilderFn(std::sync::Arc::new(|_ir| {
877 CompileResult {
878 sql: "SELECT 1 FROM custom_view".into(),
879 bindings: vec![],
880 alias_remap: vec![],
881 }
882 }))),
883 from_subquery: None,
884 };
885 let r = ch().compile(&ir);
886 assert_eq!(r.sql, "SELECT 1 FROM custom_view",
887 "custom builder should bypass standard compilation, got: {}", r.sql);
888 }
889
890 #[test]
891 fn test_from_subquery() {
892 let ir = QueryIR {
893 cube: "DEXTradeByTokens".into(), schema: "dwd".into(),
894 table: "sol_trades".into(),
895 selects: vec![
896 SelectExpr::Column { column: "amount".into(), alias: None },
897 SelectExpr::Column { column: "side_type".into(), alias: None },
898 ],
899 filters: FilterNode::Condition {
900 column: "token".into(), op: CompareOp::Eq,
901 value: SqlValue::String("SOL".into()),
902 },
903 having: FilterNode::Empty,
904 group_by: vec![], order_by: vec![], limit: 10, offset: 0,
905 limit_by: None, use_final: false, joins: vec![],
906 custom_query_builder: None,
907 from_subquery: Some(
908 "SELECT amount, 'buy' AS side_type, token FROM dwd.sol_a UNION ALL SELECT amount, 'sell' AS side_type, token FROM dwd.sol_b".into()
909 ),
910 };
911 let r = ch().compile(&ir);
912 assert!(r.sql.starts_with("SELECT `amount`, `side_type` FROM (SELECT"),
913 "should use subquery in FROM, got: {}", r.sql);
914 assert!(r.sql.contains("UNION ALL"),
915 "subquery should contain UNION ALL, got: {}", r.sql);
916 assert!(r.sql.contains(") AS _t"),
917 "subquery should be aliased as _t, got: {}", r.sql);
918 assert!(r.sql.contains("WHERE `token` = ?"),
919 "WHERE clause should be applied to subquery result, got: {}", r.sql);
920 assert!(!r.sql.contains("FROM `dwd`.`sol_trades`"),
921 "should NOT use schema.table when from_subquery is set, got: {}", r.sql);
922 }
923}