1use std::collections::HashMap;
2
3use crate::compiler::ir::*;
4use crate::compiler::ir::CompileResult;
5use crate::sql::dialect::SqlDialect;
6
7pub struct ClickHouseDialect;
8
9impl ClickHouseDialect {
10 pub fn new() -> Self {
11 Self
12 }
13}
14
15impl Default for ClickHouseDialect {
16 fn default() -> Self {
17 Self::new()
18 }
19}
20
21impl SqlDialect for ClickHouseDialect {
22 fn compile(&self, ir: &QueryIR) -> CompileResult {
23 let mut bindings = Vec::new();
24 let mut alias_remap: Vec<(String, String)> = Vec::new();
25
26 let inner_sql = self.compile_inner(ir, &mut bindings, &mut alias_remap);
27
28 if ir.joins.is_empty() {
29 return CompileResult { sql: inner_sql, bindings, alias_remap };
30 }
31
32 let main_cols: Vec<String> = ir.selects.iter().map(|s| {
37 let col = match s {
38 SelectExpr::Column { column, alias } => alias.as_ref().unwrap_or(column).clone(),
39 SelectExpr::Aggregate { alias, .. } => alias.clone(),
40 };
41 format!("_main.{} AS {}", quote_col(&col), quote_col(&col))
42 }).collect();
43
44 let mut sql = String::from("SELECT ");
45 sql.push_str(&main_cols.join(", "));
46
47 for join in &ir.joins {
49 for sel in &join.selects {
50 let col_name = match sel {
51 SelectExpr::Column { column, .. } => column.clone(),
52 SelectExpr::Aggregate { alias, .. } => alias.clone(),
53 };
54 let outer_alias = format!("{}.{}", join.alias, col_name);
55 sql.push_str(&format!(", {}.{} AS `{}`",
56 join.alias, quote_col(&col_name), outer_alias));
57 }
58 }
59
60 sql.push_str(&format!(" FROM ({}) AS _main", inner_sql));
61
62 for join in &ir.joins {
63 if join.is_aggregate {
64 sql.push_str(" LEFT JOIN (SELECT ");
66 let mut sub_parts: Vec<String> = Vec::new();
67 for gb_col in &join.group_by {
68 sub_parts.push(quote_col(gb_col));
69 }
70 for sel in &join.selects {
71 match sel {
72 SelectExpr::Column { column, alias } => {
73 let col = if column.contains('(') { column.clone() } else { format!("`{column}`") };
74 if let Some(a) = alias {
75 sub_parts.push(format!("{col} AS `{a}`"));
76 } else if column.contains('(') || !join.group_by.contains(column) {
77 sub_parts.push(col);
78 }
79 }
80 SelectExpr::Aggregate { function, column, alias, condition } => {
81 let func = function.to_lowercase();
82 let expr = match (func.as_str(), column.as_str(), condition) {
83 ("count", "*", None) => format!("count() AS `{alias}`"),
84 ("uniq", col, None) => format!("uniq(`{col}`) AS `{alias}`"),
85 (f, col, None) => format!("{f}(`{col}`) AS `{alias}`"),
86 (f, col, Some(cond)) => format!("{f}If(`{col}`, {cond}) AS `{alias}`"),
87 };
88 sub_parts.push(expr);
89 }
90 }
91 }
92 sql.push_str(&sub_parts.join(", "));
93 sql.push_str(&format!(" FROM `{}`.`{}`", join.schema, join.table));
94 if !join.group_by.is_empty() {
95 sql.push_str(" GROUP BY ");
96 let gb: Vec<String> = join.group_by.iter().map(|c| quote_col(c)).collect();
97 sql.push_str(&gb.join(", "));
98 }
99 sql.push_str(&format!(") AS {}", join.alias));
100 } else {
101 sql.push_str(&format!(" LEFT JOIN `{}`.`{}`", join.schema, join.table));
103 if join.use_final {
104 sql.push_str(" FINAL");
105 }
106 sql.push_str(&format!(" AS {}", join.alias));
107 }
108
109 let on_parts: Vec<String> = join.conditions.iter().map(|(local, remote)| {
111 format!("_main.{} = {}.{}", quote_col(local), join.alias, quote_col(remote))
112 }).collect();
113 sql.push_str(" ON ");
114 sql.push_str(&on_parts.join(" AND "));
115 }
116
117 CompileResult { sql, bindings, alias_remap }
118 }
119
120 fn quote_identifier(&self, name: &str) -> String {
121 format!("`{name}`")
122 }
123
124 fn name(&self) -> &str {
125 "ClickHouse"
126 }
127}
128
129impl ClickHouseDialect {
130 fn compile_inner(
131 &self,
132 ir: &QueryIR,
133 bindings: &mut Vec<SqlValue>,
134 alias_remap: &mut Vec<(String, String)>,
135 ) -> String {
136 let mut sql = String::new();
137
138 let mut augmented_selects = ir.selects.clone();
139 let mut agg_alias_map: HashMap<String, String> = HashMap::new();
140 let mut alias_counter = 0u32;
141
142 let having_cols: std::collections::HashSet<String> =
143 collect_filter_columns(&ir.having).into_iter().collect();
144 let has_having_agg = having_cols.iter().any(|c| c.contains('('));
145
146 if has_having_agg {
147 for sel in &mut augmented_selects {
148 if let SelectExpr::Column { column, alias } = sel {
149 if column.contains('(') && having_cols.contains(column.as_str()) {
150 if alias.is_none() {
151 let a = format!("__f_{alias_counter}");
152 alias_counter += 1;
153 alias_remap.push((a.clone(), column.clone()));
154 agg_alias_map.insert(column.clone(), a.clone());
155 *alias = Some(a);
156 } else if let Some(existing) = alias {
157 agg_alias_map.insert(column.clone(), existing.clone());
158 }
159 }
160 }
161 }
162 for col in &having_cols {
163 if col.contains('(') && !agg_alias_map.contains_key(col.as_str()) {
164 let a = format!("__f_{alias_counter}");
165 alias_counter += 1;
166 agg_alias_map.insert(col.clone(), a.clone());
167 augmented_selects.push(SelectExpr::Column {
168 column: col.clone(),
169 alias: Some(a),
170 });
171 }
172 }
173 }
174
175 sql.push_str("SELECT ");
176 let select_parts: Vec<String> = augmented_selects.iter().map(|s| match s {
177 SelectExpr::Column { column, alias } => {
178 let col = if column.contains('(') { column.clone() } else { format!("`{column}`") };
179 match alias {
180 Some(a) => format!("{col} AS `{a}`"),
181 None => col,
182 }
183 },
184 SelectExpr::Aggregate { function, column, alias, condition } => {
185 let func = function.to_uppercase();
186 match (func.as_str(), column.as_str(), condition) {
187 ("COUNT", "*", None) => format!("count() AS `{alias}`"),
188 ("COUNT", "*", Some(cond)) => format!("countIf({cond}) AS `{alias}`"),
189 ("UNIQ", col, None) => format!("uniq(`{col}`) AS `{alias}`"),
190 ("UNIQ", col, Some(cond)) => format!("uniqIf(`{col}`, {cond}) AS `{alias}`"),
191 (_, col, None) => format!("{f}(`{col}`) AS `{alias}`", f = func.to_lowercase()),
192 (_, col, Some(cond)) => format!("{f}If(`{col}`, {cond}) AS `{alias}`", f = func.to_lowercase()),
193 }
194 }
195 }).collect();
196 sql.push_str(&select_parts.join(", "));
197
198 sql.push_str(&format!(" FROM `{}`.`{}`", ir.schema, ir.table));
199 if ir.use_final {
200 sql.push_str(" FINAL");
201 }
202
203 let where_clause = compile_filter(&ir.filters, bindings);
204 if !where_clause.is_empty() {
205 sql.push_str(" WHERE ");
206 sql.push_str(&where_clause);
207 }
208
209 let effective_group_by = if !ir.group_by.is_empty() {
210 ir.group_by.clone()
211 } else {
212 let has_merge_cols = augmented_selects.iter().any(|s| match s {
213 SelectExpr::Column { column, .. } => column.contains("Merge("),
214 SelectExpr::Aggregate { .. } => true,
215 });
216 if has_merge_cols {
217 augmented_selects.iter().filter_map(|s| match s {
218 SelectExpr::Column { column, .. } if !column.contains("Merge(") && !column.contains('(') => {
219 Some(column.clone())
220 }
221 _ => None,
222 }).collect()
223 } else {
224 vec![]
225 }
226 };
227
228 if !effective_group_by.is_empty() {
229 sql.push_str(" GROUP BY ");
230 let cols: Vec<String> = effective_group_by.iter().map(|c| format!("`{c}`")).collect();
231 sql.push_str(&cols.join(", "));
232 }
233
234 if has_having_agg {
235 let having_clause = compile_filter_with_aliases(&ir.having, bindings, &agg_alias_map);
236 if !having_clause.is_empty() {
237 sql.push_str(" HAVING ");
238 sql.push_str(&having_clause);
239 }
240 } else {
241 let having_clause = compile_filter(&ir.having, bindings);
242 if !having_clause.is_empty() {
243 sql.push_str(" HAVING ");
244 sql.push_str(&having_clause);
245 }
246 }
247
248 if !ir.order_by.is_empty() {
249 sql.push_str(" ORDER BY ");
250 let parts: Vec<String> = ir.order_by.iter().map(|o| {
251 let col = if o.column.contains('(') {
252 agg_alias_map.get(&o.column)
253 .map(|a| format!("`{a}`"))
254 .unwrap_or_else(|| o.column.clone())
255 } else {
256 format!("`{}`", o.column)
257 };
258 let dir = if o.descending { "DESC" } else { "ASC" };
259 format!("{col} {dir}")
260 }).collect();
261 sql.push_str(&parts.join(", "));
262 }
263
264 if let Some(ref lb) = ir.limit_by {
265 let by_cols: Vec<String> = lb.columns.iter().map(|c| format!("`{c}`")).collect();
266 sql.push_str(&format!(" LIMIT {} BY {}", lb.count, by_cols.join(", ")));
267 if lb.offset > 0 {
268 sql.push_str(&format!(" OFFSET {}", lb.offset));
269 }
270 }
271
272 sql.push_str(&format!(" LIMIT {}", ir.limit));
273 if ir.offset > 0 {
274 sql.push_str(&format!(" OFFSET {}", ir.offset));
275 }
276
277 sql
278 }
279}
280
281fn collect_filter_columns(node: &FilterNode) -> Vec<String> {
283 match node {
284 FilterNode::Empty => vec![],
285 FilterNode::Condition { column, .. } => vec![column.clone()],
286 FilterNode::And(children) | FilterNode::Or(children) => {
287 children.iter().flat_map(collect_filter_columns).collect()
288 }
289 }
290}
291
292fn compile_filter_with_aliases(
295 node: &FilterNode,
296 bindings: &mut Vec<SqlValue>,
297 aliases: &HashMap<String, String>,
298) -> String {
299 match node {
300 FilterNode::Empty => String::new(),
301 FilterNode::Condition { column, op, value } => {
302 let effective_col = aliases.get(column)
303 .map(|a| a.as_str())
304 .unwrap_or(column.as_str());
305 compile_condition(effective_col, op, value, bindings)
306 }
307 FilterNode::And(children) => {
308 let parts: Vec<String> = children.iter()
309 .map(|c| compile_filter_with_aliases(c, bindings, aliases))
310 .filter(|s| !s.is_empty())
311 .collect();
312 match parts.len() {
313 0 => String::new(),
314 1 => parts.into_iter().next().unwrap(),
315 _ => format!("({})", parts.join(" AND ")),
316 }
317 }
318 FilterNode::Or(children) => {
319 let parts: Vec<String> = children.iter()
320 .map(|c| compile_filter_with_aliases(c, bindings, aliases))
321 .filter(|s| !s.is_empty())
322 .collect();
323 match parts.len() {
324 0 => String::new(),
325 1 => parts.into_iter().next().unwrap(),
326 _ => format!("({})", parts.join(" OR ")),
327 }
328 }
329 }
330}
331
332fn compile_filter(node: &FilterNode, bindings: &mut Vec<SqlValue>) -> String {
333 match node {
334 FilterNode::Empty => String::new(),
335 FilterNode::Condition { column, op, value } => {
336 compile_condition(column, op, value, bindings)
337 }
338 FilterNode::And(children) => {
339 let parts: Vec<String> = children.iter()
340 .map(|c| compile_filter(c, bindings))
341 .filter(|s| !s.is_empty())
342 .collect();
343 match parts.len() {
344 0 => String::new(),
345 1 => parts.into_iter().next().unwrap(),
346 _ => format!("({})", parts.join(" AND ")),
347 }
348 }
349 FilterNode::Or(children) => {
350 let parts: Vec<String> = children.iter()
351 .map(|c| compile_filter(c, bindings))
352 .filter(|s| !s.is_empty())
353 .collect();
354 match parts.len() {
355 0 => String::new(),
356 1 => parts.into_iter().next().unwrap(),
357 _ => format!("({})", parts.join(" OR ")),
358 }
359 }
360 }
361}
362
363fn quote_col(column: &str) -> String {
364 if column.contains('(') {
365 column.to_string()
366 } else {
367 format!("`{column}`")
368 }
369}
370
371fn compile_condition(
372 column: &str, op: &CompareOp, value: &SqlValue, bindings: &mut Vec<SqlValue>,
373) -> String {
374 let col = quote_col(column);
375 match op {
376 CompareOp::In | CompareOp::NotIn => {
377 if let SqlValue::String(csv) = value {
378 let items: Vec<&str> = csv.split(',').collect();
379 let placeholders: Vec<&str> = items.iter().map(|_| "?").collect();
380 for item in &items {
381 bindings.push(SqlValue::String(item.trim().to_string()));
382 }
383 format!("{col} {} ({})", op.sql_op(), placeholders.join(", "))
384 } else {
385 bindings.push(value.clone());
386 format!("{col} {} (?)", op.sql_op())
387 }
388 }
389 CompareOp::Includes => {
390 if let SqlValue::String(s) = value {
391 bindings.push(SqlValue::String(format!("%{s}%")));
392 } else {
393 bindings.push(value.clone());
394 }
395 format!("{col} LIKE ?")
396 }
397 CompareOp::IsNull | CompareOp::IsNotNull => {
398 format!("{col} {}", op.sql_op())
399 }
400 _ => {
401 bindings.push(value.clone());
402 format!("{col} {} ?", op.sql_op())
403 }
404 }
405}
406
407#[cfg(test)]
408mod tests {
409 use super::*;
410
411 fn ch() -> ClickHouseDialect { ClickHouseDialect::new() }
412
413 #[test]
414 fn test_simple_select() {
415 let ir = QueryIR {
416 cube: "DEXTrades".into(), schema: "default".into(),
417 table: "dwd_dex_trades".into(),
418 selects: vec![
419 SelectExpr::Column { column: "tx_hash".into(), alias: None },
420 SelectExpr::Column { column: "token_a_amount".into(), alias: None },
421 ],
422 filters: FilterNode::Empty, having: FilterNode::Empty,
423 group_by: vec![], order_by: vec![], limit: 10, offset: 0,
424 limit_by: None,
425 use_final: false,
426 joins: vec![],
427 };
428 let r = ch().compile(&ir);
429 assert_eq!(r.sql, "SELECT `tx_hash`, `token_a_amount` FROM `default`.`dwd_dex_trades` LIMIT 10");
430 assert!(r.bindings.is_empty());
431 }
432
433 #[test]
434 fn test_final_keyword() {
435 let ir = QueryIR {
436 cube: "T".into(), schema: "db".into(), table: "tokens".into(),
437 selects: vec![SelectExpr::Column { column: "id".into(), alias: None }],
438 filters: FilterNode::Empty, having: FilterNode::Empty,
439 group_by: vec![], order_by: vec![], limit: 10, offset: 0,
440 limit_by: None,
441 use_final: true,
442 joins: vec![],
443 };
444 let r = ch().compile(&ir);
445 assert!(r.sql.contains("FROM `db`.`tokens` FINAL"), "FINAL should be appended, got: {}", r.sql);
446 }
447
448 #[test]
449 fn test_uniq_uses_native_function() {
450 let ir = QueryIR {
451 cube: "T".into(), schema: "db".into(), table: "t".into(),
452 selects: vec![
453 SelectExpr::Aggregate { function: "UNIQ".into(), column: "wallet".into(), alias: "__uniq".into(), condition: None },
454 ],
455 filters: FilterNode::Empty, having: FilterNode::Empty,
456 group_by: vec![], order_by: vec![], limit: 10, offset: 0,
457 limit_by: None, use_final: false, joins: vec![],
458 };
459 let r = ch().compile(&ir);
460 assert!(r.sql.contains("uniq(`wallet`) AS `__uniq`"), "ClickHouse should use native uniq(), got: {}", r.sql);
461 }
462
463 #[test]
464 fn test_count_star() {
465 let ir = QueryIR {
466 cube: "T".into(), schema: "db".into(), table: "t".into(),
467 selects: vec![
468 SelectExpr::Aggregate { function: "COUNT".into(), column: "*".into(), alias: "__count".into(), condition: None },
469 ],
470 filters: FilterNode::Empty, having: FilterNode::Empty,
471 group_by: vec![], order_by: vec![], limit: 10, offset: 0,
472 limit_by: None, use_final: false, joins: vec![],
473 };
474 let r = ch().compile(&ir);
475 assert!(r.sql.contains("count() AS `__count`"), "ClickHouse should use count() not COUNT(*), got: {}", r.sql);
476 }
477
478 #[test]
479 fn test_aggregate_lowercase() {
480 let ir = QueryIR {
481 cube: "T".into(), schema: "db".into(), table: "t".into(),
482 selects: vec![
483 SelectExpr::Aggregate { function: "SUM".into(), column: "amount".into(), alias: "__sum".into(), condition: None },
484 SelectExpr::Aggregate { function: "AVG".into(), column: "price".into(), alias: "__avg".into(), condition: None },
485 ],
486 filters: FilterNode::Empty, having: FilterNode::Empty,
487 group_by: vec![], order_by: vec![], limit: 10, offset: 0,
488 limit_by: None, use_final: false, joins: vec![],
489 };
490 let r = ch().compile(&ir);
491 assert!(r.sql.contains("sum(`amount`) AS `__sum`"), "ClickHouse functions should be lowercase, got: {}", r.sql);
492 assert!(r.sql.contains("avg(`price`) AS `__avg`"), "got: {}", r.sql);
493 }
494
495 #[test]
496 fn test_where_and_order() {
497 let ir = QueryIR {
498 cube: "T".into(), schema: "db".into(), table: "t".into(),
499 selects: vec![SelectExpr::Column { column: "id".into(), alias: None }],
500 filters: FilterNode::And(vec![
501 FilterNode::Condition { column: "chain_id".into(), op: CompareOp::Eq, value: SqlValue::Int(1) },
502 FilterNode::Condition { column: "amount_usd".into(), op: CompareOp::Gt, value: SqlValue::Float(1000.0) },
503 ]),
504 having: FilterNode::Empty, group_by: vec![],
505 order_by: vec![OrderExpr { column: "block_timestamp".into(), descending: true }],
506 limit: 25, offset: 0,
507 limit_by: None, use_final: false, joins: vec![],
508 };
509 let r = ch().compile(&ir);
510 assert!(r.sql.contains("WHERE (`chain_id` = ? AND `amount_usd` > ?)"));
511 assert!(r.sql.contains("ORDER BY `block_timestamp` DESC"));
512 assert_eq!(r.bindings.len(), 2);
513 }
514
515 #[test]
516 fn test_having_with_aggregate_expr() {
517 let ir = QueryIR {
518 cube: "T".into(), schema: "db".into(), table: "t".into(),
519 selects: vec![
520 SelectExpr::Column { column: "token_address".into(), alias: None },
521 SelectExpr::Aggregate { function: "SUM".into(), column: "amount_usd".into(), alias: "__sum".into(), condition: None },
522 ],
523 filters: FilterNode::Empty,
524 having: FilterNode::Condition {
525 column: "sum(`amount_usd`)".into(), op: CompareOp::Gt, value: SqlValue::Float(1000000.0),
526 },
527 group_by: vec!["token_address".into()], order_by: vec![], limit: 25, offset: 0,
528 limit_by: None, use_final: false, joins: vec![],
529 };
530 let r = ch().compile(&ir);
531 assert!(r.sql.contains("GROUP BY `token_address`"));
532 assert!(r.sql.contains("HAVING `__f_0` > ?"), "expected alias in HAVING, got: {}", r.sql);
533 assert!(r.sql.contains("sum(`amount_usd`) AS `__f_0`"), "expected alias in SELECT, got: {}", r.sql);
534 assert_eq!(r.bindings.len(), 1);
535 }
536
537 #[test]
538 fn test_having_appends_missing_agg_column() {
539 let ir = QueryIR {
540 cube: "T".into(), schema: "db".into(), table: "t".into(),
541 selects: vec![
542 SelectExpr::Column { column: "pool_address".into(), alias: None },
543 SelectExpr::Column { column: "argMaxMerge(latest_liquidity_usd_state)".into(), alias: None },
544 ],
545 filters: FilterNode::Empty,
546 having: FilterNode::And(vec![
547 FilterNode::Condition {
548 column: "argMaxMerge(latest_liquidity_usd_state)".into(),
549 op: CompareOp::Gt, value: SqlValue::Float(2.0),
550 },
551 FilterNode::Condition {
552 column: "argMaxMerge(latest_token_a_amount_state)".into(),
553 op: CompareOp::Gt, value: SqlValue::Float(3.0),
554 },
555 ]),
556 group_by: vec!["pool_address".into()], order_by: vec![], limit: 25, offset: 0,
557 limit_by: None, use_final: false, joins: vec![],
558 };
559 let r = ch().compile(&ir);
560 assert!(r.sql.contains("argMaxMerge(latest_liquidity_usd_state) AS `__f_0`"),
561 "existing HAVING col should be aliased, got: {}", r.sql);
562 assert!(r.sql.contains("argMaxMerge(latest_token_a_amount_state) AS `__f_1`"),
563 "missing agg col should be appended, got: {}", r.sql);
564 assert!(r.sql.contains("HAVING (`__f_0` > ? AND `__f_1` > ?)"),
565 "HAVING should use aliases, got: {}", r.sql);
566 assert_eq!(r.bindings.len(), 2);
567 assert_eq!(r.alias_remap.len(), 1);
568 assert_eq!(r.alias_remap[0], ("__f_0".to_string(), "argMaxMerge(latest_liquidity_usd_state)".to_string()));
569 }
570
571 #[test]
572 fn test_limit_by() {
573 let ir = QueryIR {
574 cube: "T".into(), schema: "db".into(), table: "t".into(),
575 selects: vec![
576 SelectExpr::Column { column: "owner".into(), alias: None },
577 SelectExpr::Column { column: "amount".into(), alias: None },
578 ],
579 filters: FilterNode::Empty, having: FilterNode::Empty,
580 group_by: vec![],
581 order_by: vec![OrderExpr { column: "amount".into(), descending: true }],
582 limit: 100, offset: 0,
583 limit_by: Some(LimitByExpr { count: 3, offset: 0, columns: vec!["owner".into()] }),
584 use_final: false, joins: vec![],
585 };
586 let r = ch().compile(&ir);
587 let sql = &r.sql;
588 assert!(sql.contains("LIMIT 3 BY `owner`"), "LIMIT BY should be present, got: {sql}");
589 assert!(sql.contains("ORDER BY `amount` DESC"), "ORDER BY should be present, got: {sql}");
590 assert!(sql.contains("LIMIT 100"), "outer LIMIT should be present, got: {sql}");
591 let order_by_pos = sql.find("ORDER BY").unwrap();
592 let limit_by_pos = sql.find("LIMIT 3 BY").unwrap();
593 let limit_pos = sql.rfind("LIMIT 100").unwrap();
594 assert!(order_by_pos < limit_by_pos, "ORDER BY should come before LIMIT BY in ClickHouse");
595 assert!(limit_by_pos < limit_pos, "LIMIT BY should come before outer LIMIT");
596 }
597
598 #[test]
599 fn test_limit_by_with_offset() {
600 let ir = QueryIR {
601 cube: "T".into(), schema: "db".into(), table: "t".into(),
602 selects: vec![SelectExpr::Column { column: "id".into(), alias: None }],
603 filters: FilterNode::Empty, having: FilterNode::Empty,
604 group_by: vec![], order_by: vec![], limit: 10, offset: 0,
605 limit_by: Some(LimitByExpr { count: 5, offset: 2, columns: vec!["token".into(), "wallet".into()] }),
606 use_final: false, joins: vec![],
607 };
608 let r = ch().compile(&ir);
609 assert!(r.sql.contains("LIMIT 5 BY `token`, `wallet` OFFSET 2"), "multi-column LIMIT BY with offset, got: {}", r.sql);
610 }
611
612 #[test]
613 fn test_join_direct() {
614 let ir = QueryIR {
615 cube: "DEXTrades".into(), schema: "dexes_dwd".into(),
616 table: "sol_dex_trades".into(),
617 selects: vec![
618 SelectExpr::Column { column: "tx_hash".into(), alias: None },
619 SelectExpr::Column { column: "buy_token_address".into(), alias: None },
620 ],
621 filters: FilterNode::Empty, having: FilterNode::Empty,
622 group_by: vec![], order_by: vec![], limit: 25, offset: 0,
623 limit_by: None, use_final: false,
624 joins: vec![JoinExpr {
625 schema: "dexes_dim".into(), table: "sol_tokens".into(),
626 alias: "_j0".into(),
627 conditions: vec![("buy_token_address".into(), "token_address".into())],
628 selects: vec![
629 SelectExpr::Column { column: "name".into(), alias: None },
630 SelectExpr::Column { column: "symbol".into(), alias: None },
631 ],
632 group_by: vec![], use_final: true, is_aggregate: false,
633 target_cube: "TokenSearch".into(), join_field: "joinBuyToken".into(),
634 }],
635 };
636 let r = ch().compile(&ir);
637 assert!(r.sql.contains("FROM (SELECT"), "main query should be wrapped, got: {}", r.sql);
638 assert!(r.sql.contains("LEFT JOIN `dexes_dim`.`sol_tokens` FINAL AS _j0"),
639 "direct JOIN with FINAL, got: {}", r.sql);
640 assert!(r.sql.contains("_main.`buy_token_address` = _j0.`token_address`"),
641 "ON condition, got: {}", r.sql);
642 assert!(r.sql.contains("_j0.`name` AS `_j0.name`"), "joined col alias, got: {}", r.sql);
643 }
644
645 #[test]
646 fn test_join_aggregate_subquery() {
647 let ir = QueryIR {
648 cube: "DEXTrades".into(), schema: "dexes_dwd".into(),
649 table: "sol_dex_trades".into(),
650 selects: vec![
651 SelectExpr::Column { column: "tx_hash".into(), alias: None },
652 SelectExpr::Column { column: "buy_token_address".into(), alias: None },
653 ],
654 filters: FilterNode::Empty, having: FilterNode::Empty,
655 group_by: vec![], order_by: vec![], limit: 10, offset: 0,
656 limit_by: None, use_final: false,
657 joins: vec![JoinExpr {
658 schema: "dexes_dws".into(), table: "sol_token_market_cap".into(),
659 alias: "_j0".into(),
660 conditions: vec![("buy_token_address".into(), "token_address".into())],
661 selects: vec![
662 SelectExpr::Column { column: "argMaxMerge(latest_market_cap_usd_state)".into(), alias: None },
663 ],
664 group_by: vec!["token_address".into()],
665 use_final: false, is_aggregate: true,
666 target_cube: "TokenMarketCap".into(), join_field: "joinBuyTokenMarketCap".into(),
667 }],
668 };
669 let r = ch().compile(&ir);
670 assert!(r.sql.contains("LEFT JOIN (SELECT"), "aggregate should use subquery, got: {}", r.sql);
671 assert!(r.sql.contains("GROUP BY `token_address`"), "subquery GROUP BY, got: {}", r.sql);
672 assert!(r.sql.contains("FROM `dexes_dws`.`sol_token_market_cap`"), "subquery FROM, got: {}", r.sql);
673 }
674}