1use crate::errors::journal::ControllerError;
2use anyhow::{Error as AnyError, anyhow};
3use arrow::array::Array;
4use datafusion::common::ScalarValue;
5use datafusion::common::arrow::array::{AsArray, RecordBatch};
6use datafusion::execution::SessionStateBuilder;
7use datafusion::execution::memory_pool::FairSpillPool;
8use datafusion::execution::runtime_env::{RuntimeEnv, RuntimeEnvBuilder};
9use datafusion::logical_expr::sqlparser::parser::ParserError;
10use datafusion::prelude::{SQLOptions, SessionConfig, SessionContext};
11use datafusion::sql::sqlparser::ast::{Expr, visit_expressions};
12use datafusion::sql::sqlparser::dialect::GenericDialect;
13use datafusion::sql::sqlparser::parser::Parser;
14use datafusion::sql::sqlparser::tokenizer::Token;
15use feldera_types::config::PipelineConfig;
16use feldera_types::constants::DATAFUSION_TEMP_DIR;
17use feldera_types::program_schema::{ColumnType, Field, Relation, SqlType};
18use std::collections::BTreeSet;
19use std::ffi::OsStr;
20use std::fs::{create_dir_all, read_dir, remove_dir_all, remove_file};
21use std::io::Error as IoError;
22use std::ops::ControlFlow;
23use std::path::{Path, PathBuf};
24use std::sync::Arc;
25use tracing::warn;
26
27const SORT_IN_PLACE_THRESHOLD_BYTES: usize = 1 << 26;
33
34const SORT_SPILL_RESERVATION_BYTES: usize = 1 << 26;
49
50pub fn create_runtime_env(
56 pipeline_config: &PipelineConfig,
57) -> Result<Arc<RuntimeEnv>, ControllerError> {
58 let mut builder = RuntimeEnvBuilder::new();
59 if let Some(datafusion_memory_mb) = pipeline_config.global.resolved_datafusion_memory_mb() {
60 let memory_bytes_max = datafusion_memory_mb * 1_000_000;
61 builder = builder.with_memory_pool(Arc::new(FairSpillPool::new(memory_bytes_max as usize)));
62 warn_if_pool_too_small_for_adhoc_sort(pipeline_config, datafusion_memory_mb);
63 }
64 if let Some(storage) = &pipeline_config.storage_config {
65 let path = PathBuf::from(storage.path.clone()).join(DATAFUSION_TEMP_DIR);
66 create_dir_all(&path).map_err(|error| {
67 ControllerError::io_error(
68 format!(
69 "unable to create datafusion scratch space directory '{}'",
70 path.display()
71 ),
72 error,
73 )
74 })?;
75 clean_stale_scratch_entries(&path);
76 builder = builder.with_temp_file_path(path);
77 }
78 builder.build_arc().map_err(|error| {
79 ControllerError::io_error(
80 "unable to build datafusion runtime environment",
81 IoError::other(error.to_string()),
82 )
83 })
84}
85
86fn min_pool_mb_for_adhoc_sort(workers: u64) -> u64 {
98 let needed_bytes = (SORT_SPILL_RESERVATION_BYTES as u64).saturating_mul(workers);
99 needed_bytes.div_ceil(1_000_000)
100}
101
102fn warn_if_pool_too_small_for_adhoc_sort(pipeline_config: &PipelineConfig, pool_mb: u64) {
110 let workers = pipeline_config.global.workers as u64;
111 if workers == 0 {
114 return;
115 }
116 let min_pool_mb = min_pool_mb_for_adhoc_sort(workers);
117 if pool_mb <= min_pool_mb {
123 let per_worker_mb = min_pool_mb_for_adhoc_sort(1);
124 warn!(
125 "DataFusion memory pool is {pool_mb} MB; sort-heavy ad-hoc \
126 queries (ORDER BY, EXCEPT, hash joins) need at least \
127 {min_pool_mb} MB ({workers} workers x {per_worker_mb} MB \
128 reservation per worker). Such queries may fail at first \
129 allocation with 'Resources exhausted'. Increase \
130 'datafusion_memory_mb' or reduce 'workers'."
131 );
132 }
133}
134
135fn clean_stale_scratch_entries(scratch_dir: &Path) {
143 if scratch_dir.file_name() != Some(OsStr::new(DATAFUSION_TEMP_DIR)) {
147 warn!(
148 "refusing to clean unexpected scratch directory '{}'; expected final component '{DATAFUSION_TEMP_DIR}'",
149 scratch_dir.display(),
150 );
151 return;
152 }
153 let entries = match read_dir(scratch_dir) {
154 Ok(entries) => entries,
155 Err(error) => {
156 warn!(
157 "unable to read datafusion scratch directory '{}' for startup cleanup: {error}",
158 scratch_dir.display(),
159 );
160 return;
161 }
162 };
163 for entry in entries.flatten() {
164 let path = entry.path();
165 let file_type = match entry.file_type() {
166 Ok(ft) => ft,
167 Err(error) => {
168 warn!(
169 "unable to stat stale datafusion scratch entry '{}': {error}",
170 path.display(),
171 );
172 continue;
173 }
174 };
175 let result = if file_type.is_dir() {
176 remove_dir_all(&path)
177 } else {
178 remove_file(&path)
179 };
180 if let Err(error) = result {
181 warn!(
182 "unable to remove stale datafusion scratch entry '{}': {error}",
183 path.display(),
184 );
185 }
186 }
187}
188
189pub fn create_session_context(
192 pipeline_config: &PipelineConfig,
193 runtime_env: Arc<RuntimeEnv>,
194) -> SessionContext {
195 create_session_context_with(pipeline_config, runtime_env, |cfg| cfg)
196}
197
198pub fn create_session_context_with<F>(
201 pipeline_config: &PipelineConfig,
202 runtime_env: Arc<RuntimeEnv>,
203 customize_config: F,
204) -> SessionContext
205where
206 F: FnOnce(SessionConfig) -> SessionConfig,
207{
208 let workers = pipeline_config
209 .global
210 .io_workers
211 .unwrap_or(pipeline_config.global.workers as u64);
212 let session_config = SessionConfig::new()
213 .with_target_partitions(workers as usize)
214 .with_sort_in_place_threshold_bytes(SORT_IN_PLACE_THRESHOLD_BYTES)
215 .with_sort_spill_reservation_bytes(SORT_SPILL_RESERVATION_BYTES)
216 .set(
217 "datafusion.execution.planning_concurrency",
218 &ScalarValue::UInt64(Some(workers)),
219 );
220 let session_config = customize_config(session_config);
221
222 let state = SessionStateBuilder::new()
223 .with_config(session_config)
224 .with_runtime_env(runtime_env)
225 .with_default_features()
226 .build();
227 SessionContext::from(state)
228}
229
230pub async fn execute_query_collect(
232 datafusion: &SessionContext,
233 query: &str,
234) -> Result<Vec<RecordBatch>, AnyError> {
235 let options = SQLOptions::new()
236 .with_allow_ddl(false)
237 .with_allow_dml(false);
238
239 let df = datafusion
240 .sql_with_options(query, options)
241 .await
242 .map_err(|e| anyhow!("error compiling query '{query}': {e}"))?;
243
244 df.collect()
245 .await
246 .map_err(|e| anyhow!("error executing query '{query}': {e}"))
247}
248
249pub async fn execute_singleton_query(
251 datafusion: &SessionContext,
252 query: &str,
253) -> Result<String, AnyError> {
254 let result = execute_query_collect(datafusion, query).await?;
255 if result.len() != 1 {
256 return Err(anyhow!(
257 "internal error: query '{query}' returned {} batches; expected: 1",
258 result.len()
259 ));
260 }
261
262 if result[0].num_rows() != 1 {
263 return Err(anyhow!(
264 "internal error: query '{query}' returned {} rows; expected: 1",
265 result[0].num_rows()
266 ));
267 }
268
269 if result[0].num_columns() != 1 {
270 return Err(anyhow!(
271 "internal error: query '{query}' returned {} columns; expected: 1",
272 result[0].num_columns()
273 ));
274 }
275
276 let column0 = result[0].column(0);
277
278 array_to_string(column0).ok_or_else(|| {
279 anyhow!("internal error: cannot retrieve the output of query '{query}' as a string")
280 })
281}
282
283pub fn array_to_string(array: &dyn Array) -> Option<String> {
284 if let Some(string_view_array) = array.as_string_view_opt() {
285 Some(string_view_array.value(0).to_string())
286 } else {
287 array
288 .as_string_opt::<i32>()
289 .map(|array| array.value(0).to_string())
290 }
291}
292
293pub fn validate_sql_expression(expr: &str) -> Result<(), ParserError> {
295 let mut parser = Parser::new(&GenericDialect).try_with_sql(expr)?;
296 parser.parse_expr()?;
297
298 Ok(())
299}
300
301pub fn validate_sql_order_by(order_by: &str) -> Result<(), ParserError> {
308 let mut parser = Parser::new(&GenericDialect).try_with_sql(order_by)?;
309 parser.parse_comma_separated(Parser::parse_order_by_expr)?;
310 parser.expect_token(&Token::EOF)?;
311
312 Ok(())
313}
314
315fn collect_referenced_columns(expr: &Expr, columns: &mut BTreeSet<String>) {
324 let _: ControlFlow<()> = visit_expressions(expr, |e| {
325 match e {
326 Expr::Identifier(ident) => {
327 columns.insert(ident.value.clone());
328 }
329 Expr::CompoundIdentifier(parts) => {
330 if let Some(column) = parts.last() {
331 columns.insert(column.value.clone());
332 }
333 }
334 _ => {}
335 }
336 ControlFlow::Continue(())
337 });
338}
339
340pub fn columns_referenced_by_expression(expr: &str) -> Result<BTreeSet<String>, ParserError> {
347 let mut parser = Parser::new(&GenericDialect).try_with_sql(expr)?;
348 let parsed = parser.parse_expr()?;
349 let mut columns = BTreeSet::new();
350 collect_referenced_columns(&parsed, &mut columns);
351 Ok(columns)
352}
353
354pub fn columns_referenced_by_order_by(order_by: &str) -> Result<BTreeSet<String>, ParserError> {
357 let mut parser = Parser::new(&GenericDialect).try_with_sql(order_by)?;
358 let keys = parser.parse_comma_separated(Parser::parse_order_by_expr)?;
359 parser.expect_token(&Token::EOF)?;
360 let mut columns = BTreeSet::new();
361 for key in &keys {
362 collect_referenced_columns(&key.expr, &mut columns);
363 }
364 Ok(columns)
365}
366
367pub fn timestamp_to_sql_expression(column_type: &ColumnType, expr: &str) -> String {
370 match column_type.typ {
371 SqlType::Timestamp => format!("timestamp '{expr}'"),
372 SqlType::Date => format!("date '{expr}'"),
373 _ => expr.to_string(),
374 }
375}
376
377pub fn validate_timestamp_type(
379 endpoint_name: &str,
380 timestamp: &Field,
381 docs: &str,
382) -> Result<(), ControllerError> {
383 if !timestamp.columntype.is_integral_type()
384 && !matches!(
385 ×tamp.columntype.typ,
386 SqlType::Date | SqlType::Timestamp
387 )
388 {
389 return Err(ControllerError::invalid_transport_configuration(
390 endpoint_name,
391 &format!(
392 "timestamp column '{}' has unsupported type {}; supported types for 'timestamp_column' are integer types, DATE, and TIMESTAMP; {docs}",
393 timestamp.name,
394 serde_json::to_string(×tamp.columntype).unwrap()
395 ),
396 ));
397 }
398
399 Ok(())
400}
401
402pub async fn validate_timestamp_column(
404 endpoint_name: &str,
405 timestamp_column: &str,
406 datafusion: &SessionContext,
407 schema: &Relation,
408 docs: &str,
409) -> Result<(), ControllerError> {
410 let Some(field) = schema.field(timestamp_column) else {
412 return Err(ControllerError::invalid_transport_configuration(
413 endpoint_name,
414 &format!("timestamp column '{timestamp_column}' not found in table schema"),
415 ));
416 };
417
418 validate_timestamp_type(endpoint_name, field, docs)?;
420
421 let Some(lateness) = &field.lateness else {
423 return Err(ControllerError::invalid_transport_configuration(
424 endpoint_name,
425 &format!(
426 "timestamp column '{timestamp_column}' does not have a LATENESS attribute; {docs}"
427 ),
428 ));
429 };
430
431 validate_sql_expression(lateness).map_err(|e|
433 ControllerError::invalid_transport_configuration(
434 endpoint_name,
435 &format!("error parsing LATENESS attribute '{lateness}' of the timestamp column '{timestamp_column}': {e}; {docs}"),
436 ),
437 )?;
438
439 let is_zero = execute_singleton_query(
443 datafusion,
444 &format!("select cast((({lateness} + {lateness}) = {lateness}) as string)"),
445 )
446 .await
447 .map_err(|e| ControllerError::invalid_transport_configuration(endpoint_name, &e.to_string()))?;
448
449 if &is_zero == "true" {
450 return Err(ControllerError::invalid_transport_configuration(
451 endpoint_name,
452 &format!(
453 "invalid LATENESS attribute '{lateness}' of the timestamp column '{timestamp_column}': LATENESS must be greater than zero; {docs}"
454 ),
455 ));
456 }
457
458 Ok(())
459}
460
461#[cfg(test)]
462mod tests {
463 use super::{
464 columns_referenced_by_expression, columns_referenced_by_order_by, create_runtime_env,
465 create_session_context,
466 };
467 use datafusion::execution::memory_pool::MemoryLimit;
468 use feldera_types::config::{PipelineConfig, ResourceConfig, RuntimeConfig, StorageConfig};
469 use feldera_types::constants::DATAFUSION_TEMP_DIR;
470 use std::collections::BTreeSet;
471 use std::fs;
472 use std::path::{Path, PathBuf};
473
474 struct TempStorage {
476 path: PathBuf,
477 }
478
479 impl TempStorage {
480 fn new(name: &str) -> Self {
481 let path = std::env::temp_dir().join(name);
482 let _ = fs::remove_dir_all(&path);
483 fs::create_dir_all(&path).unwrap();
484 Self { path }
485 }
486
487 fn path(&self) -> &Path {
488 &self.path
489 }
490 }
491
492 impl Drop for TempStorage {
493 fn drop(&mut self) {
494 let _ = fs::remove_dir_all(&self.path);
495 }
496 }
497
498 fn pipeline_config(global: RuntimeConfig, storage: Option<&Path>) -> PipelineConfig {
499 PipelineConfig {
500 global,
501 multihost: None,
502 name: None,
503 given_name: None,
504 storage_config: storage.map(|p| StorageConfig {
505 path: p.to_string_lossy().into(),
506 cache: Default::default(),
507 }),
508 secrets_dir: None,
509 inputs: Default::default(),
510 outputs: Default::default(),
511 program_ir: None,
512 }
513 }
514
515 #[test]
516 fn create_runtime_env_creates_tmp_dir_under_storage() {
517 let storage = TempStorage::new("feldera-datafusion-create-runtime-env-tmp-dir-test");
518 let cfg = pipeline_config(
519 RuntimeConfig {
520 workers: 1,
521 ..Default::default()
522 },
523 Some(storage.path()),
524 );
525
526 create_runtime_env(&cfg).unwrap();
527
528 let expected = storage.path().join(DATAFUSION_TEMP_DIR);
529 assert!(
530 expected.is_dir(),
531 "expected scratch directory at {}",
532 expected.display(),
533 );
534 }
535
536 #[test]
539 fn scratch_dir_name_matches_gc_allowlist_constant() {
540 assert_eq!(DATAFUSION_TEMP_DIR, "datafusion-tmp");
541 }
542
543 #[test]
544 fn create_runtime_env_without_storage_succeeds() {
545 let cfg = pipeline_config(
546 RuntimeConfig {
547 workers: 1,
548 ..Default::default()
549 },
550 None,
551 );
552 create_runtime_env(&cfg).unwrap();
553 }
554
555 #[test]
556 fn create_runtime_env_applies_memory_pool_when_budget_set() {
557 let storage = TempStorage::new("feldera-datafusion-create-runtime-env-pool-test");
559 let cfg = pipeline_config(
560 RuntimeConfig {
561 workers: 1,
562 max_rss_mb: Some(16_000),
563 ..Default::default()
564 },
565 Some(storage.path()),
566 );
567
568 let env = create_runtime_env(&cfg).unwrap();
569 match env.memory_pool.memory_limit() {
570 MemoryLimit::Finite(bytes) => assert_eq!(bytes, 800 * 1_000_000),
571 MemoryLimit::Infinite => panic!("expected a bounded memory pool, got Infinite"),
572 MemoryLimit::Unknown => panic!("expected a bounded memory pool, got Unknown"),
573 }
574 }
575
576 #[test]
577 fn create_runtime_env_no_memory_limit_when_budget_unset() {
578 let storage = TempStorage::new("feldera-datafusion-create-runtime-env-unbounded-test");
579 let cfg = pipeline_config(
580 RuntimeConfig {
581 workers: 1,
582 ..Default::default()
583 },
584 Some(storage.path()),
585 );
586
587 let env = create_runtime_env(&cfg).unwrap();
588 match env.memory_pool.memory_limit() {
590 MemoryLimit::Finite(bytes) => {
591 panic!("expected an unbounded pool, got finite limit of {bytes} bytes");
592 }
593 _ => {}
594 }
595 }
596
597 #[test]
598 fn create_runtime_env_uses_resources_memory_mb_max_fallback() {
599 let storage = TempStorage::new("feldera-datafusion-create-runtime-env-resources-test");
600 let cfg = pipeline_config(
601 RuntimeConfig {
602 workers: 1,
603 max_rss_mb: None,
604 resources: ResourceConfig {
605 memory_mb_max: Some(16_000),
606 ..Default::default()
607 },
608 ..Default::default()
609 },
610 Some(storage.path()),
611 );
612
613 let env = create_runtime_env(&cfg).unwrap();
614 match env.memory_pool.memory_limit() {
615 MemoryLimit::Finite(bytes) => assert_eq!(bytes, 800 * 1_000_000),
616 MemoryLimit::Infinite => panic!("expected a bounded memory pool, got Infinite"),
617 MemoryLimit::Unknown => panic!("expected a bounded memory pool, got Unknown"),
618 }
619 }
620
621 #[test]
622 fn create_runtime_env_wipes_stale_scratch_entries() {
623 let storage = TempStorage::new("feldera-datafusion-create-runtime-env-wipe-test");
624 let scratch = storage.path().join(DATAFUSION_TEMP_DIR);
625 fs::create_dir_all(&scratch).unwrap();
626
627 let stale_subdir = scratch.join("datafusion-stale1");
629 fs::create_dir_all(&stale_subdir).unwrap();
630 fs::write(stale_subdir.join("orphan.arrow"), b"stale").unwrap();
631 let stale_file = scratch.join("loose.tmp");
632 fs::write(&stale_file, b"stale").unwrap();
633
634 let cfg = pipeline_config(
635 RuntimeConfig {
636 workers: 1,
637 ..Default::default()
638 },
639 Some(storage.path()),
640 );
641 create_runtime_env(&cfg).unwrap();
642
643 assert!(
644 scratch.is_dir(),
645 "scratch root must survive cleanup; gc_startup keeps it on the allowlist",
646 );
647 assert!(
648 !stale_subdir.exists(),
649 "stale per-DiskManager subdir should be removed on startup",
650 );
651 assert!(
652 !stale_file.exists(),
653 "stale loose file should be removed on startup",
654 );
655 }
656
657 #[test]
658 fn create_session_context_target_partitions_match_workers() {
659 let storage = TempStorage::new("feldera-datafusion-create-session-context-workers-test");
660 let cfg = pipeline_config(
661 RuntimeConfig {
662 workers: 7,
663 ..Default::default()
664 },
665 Some(storage.path()),
666 );
667 let env = create_runtime_env(&cfg).unwrap();
668 let ctx = create_session_context(&cfg, env);
669 assert_eq!(ctx.copied_config().target_partitions(), 7);
670 }
671
672 #[test]
673 fn create_session_context_target_partitions_prefer_io_workers() {
674 let storage = TempStorage::new("feldera-datafusion-create-session-context-io-workers-test");
675 let cfg = pipeline_config(
676 RuntimeConfig {
677 workers: 4,
678 io_workers: Some(12),
679 ..Default::default()
680 },
681 Some(storage.path()),
682 );
683 let env = create_runtime_env(&cfg).unwrap();
684 let ctx = create_session_context(&cfg, env);
685 assert_eq!(ctx.copied_config().target_partitions(), 12);
686 }
687
688 #[test]
689 fn create_session_context_with_customise_overrides_defaults() {
690 use super::create_session_context_with;
691 let storage = TempStorage::new("feldera-datafusion-create-session-context-override-test");
692 let cfg = pipeline_config(
693 RuntimeConfig {
694 workers: 4,
695 ..Default::default()
696 },
697 Some(storage.path()),
698 );
699 let env = create_runtime_env(&cfg).unwrap();
700 let ctx = create_session_context_with(&cfg, env, |c| {
702 c.set_usize("datafusion.execution.target_partitions", 99)
703 });
704 assert_eq!(ctx.copied_config().target_partitions(), 99);
705 }
706
707 #[test]
711 fn clean_stale_scratch_entries_refuses_unexpected_paths() {
712 use super::clean_stale_scratch_entries;
713 let storage = TempStorage::new("feldera-datafusion-clean-scratch-guard-test");
714 let bogus = storage.path().join("not-the-scratch-dir");
715 fs::create_dir_all(&bogus).unwrap();
716 let canary = bogus.join("canary.txt");
717 fs::write(&canary, b"do not delete").unwrap();
718
719 clean_stale_scratch_entries(&bogus);
720
721 assert!(
722 canary.exists(),
723 "guard must not delete contents of a directory whose name != DATAFUSION_TEMP_DIR",
724 );
725 }
726
727 #[test]
731 fn min_pool_mb_for_adhoc_sort_matches_reservation_times_workers() {
732 use super::min_pool_mb_for_adhoc_sort;
733 assert_eq!(min_pool_mb_for_adhoc_sort(0), 0);
737 assert_eq!(min_pool_mb_for_adhoc_sort(1), 68);
738 assert_eq!(min_pool_mb_for_adhoc_sort(2), 135);
739 assert_eq!(min_pool_mb_for_adhoc_sort(8), 537);
740 }
741
742 #[test]
745 fn cdc_connector_expr_shapes_validate() {
746 use super::{validate_sql_expression, validate_sql_order_by};
747
748 const FILTER_SHAPES: &[&str] = &[
750 "0=0",
751 "0=0 AND (a = 's0' AND b = 's1')",
752 "0=0 AND (a = 's0')",
753 "0=0 AND (a IN ('s0'))",
754 "0=0 AND (a IN ('s0','s1'))",
755 "0=0 AND (a IN (1,2) OR a IS NULL)",
756 "0=0 AND (a IN (1,2) OR a IS NULL) AND (b = false)",
757 "0=0 AND (a IN (1,2) OR a IS NULL) AND (b IN ('s0'))",
758 "0=0 AND (a IN (1,2) OR a IS NULL) AND (b IN ('s0','s1'))",
759 "0=0 AND (a IN (1,2) OR a IS NULL) AND (b IS NOT NULL)",
760 "0=0 AND (a IN (1,2) OR a IS NULL) AND (b IS NULL AND c IS NULL)",
761 "0=0 AND (a IN (1,2) OR a IS NULL) AND (b IS NULL)",
762 "0=0 AND (a IN (1,2) OR a IS NULL) AND (b NOT IN ('s0','s1') AND c IS NOT NULL)",
763 "0=0 AND (a IN('s0','s1'))",
764 "0=0 AND (a IS NOT NULL AND b IS NOT NULL)",
765 "0=0 AND a = false",
766 "0=0 AND a = false AND (b = 's0' AND c = 's1')",
767 "0=0 AND a = false AND (b = 's0')",
768 "0=0 AND a = false AND (b IN ('s0'))",
769 "0=0 AND a = false AND (b IN ('s0','s1'))",
770 "0=0 AND a = false AND (b IN (1,2) OR b IS NULL)",
771 "0=0 AND a = false AND (b IN (1,2) OR b IS NULL) AND (c = false)",
772 "0=0 AND a = false AND (b IN (1,2) OR b IS NULL) AND (c IS NOT NULL)",
773 "0=0 AND a = false AND (b IS NOT NULL AND c IS NOT NULL)",
774 "0=0 AND a = false AND b is null",
775 "0=0 AND a = false AND b is null AND (c = 's0')",
776 "0=0 AND a = false AND b is null AND (c IN ('s0','s1'))",
777 "0=0 AND a = false AND b is null AND (c IN (1,2) OR c IS NULL)",
778 "0=0 AND a = false AND b is null AND (c IN (1,2) OR c IS NULL) AND (d NOT IN ('s0','s1') AND e IS NOT NULL)",
779 "a > 0",
780 "a >= 0 AND a <= 9",
781 "a <> 's0'",
782 "a != 's0'",
783 "a BETWEEN 0 AND 9",
784 "a LIKE 's0'",
785 "a IS NULL OR b IS NOT NULL",
786 "NOT (a = false)",
787 "lower(a) = 's0'",
788 "cast(a AS bigint) = 0",
789 "a + b > 0",
790 "coalesce(a, b) = 's0'",
791 "a > timestamp '2020-01-02 03:04:05'",
792 "a = 's0''s1'",
793 ];
794 const CDC_DELETE_FILTER_SHAPES: &[&str] = &[
795 "a = true",
796 "a = true OR b is not null",
797 "a = true AND b = false",
798 "a IN ('s0','s1')",
799 "a IS NOT NULL",
800 "NOT a",
801 ];
802 const CDC_ORDER_BY_SHAPES: &[&str] = &[
803 "a",
804 "a, b",
805 "a asc, b asc",
806 "a ASC",
807 "a desc",
808 "a ASC, b DESC",
809 "a NULLS FIRST",
810 "a ASC NULLS LAST",
811 "a DESC NULLS FIRST",
812 "a asc nulls last, b desc nulls first",
813 "a asc, b desc, c asc nulls last",
814 "a + b asc",
815 "a % 2 asc, b desc",
816 "lower(a) asc",
817 "abs(a) desc, b asc",
818 "cast(a AS bigint) asc",
819 "coalesce(a, b) asc, c desc",
820 "case when a then 0 else 1 end desc",
821 "\"a b\" asc",
823 ];
824
825 let mut failures = Vec::new();
826 for expr in FILTER_SHAPES.iter().chain(CDC_DELETE_FILTER_SHAPES) {
827 if let Err(e) = validate_sql_expression(expr) {
828 failures.push(format!("predicate '{expr}' failed: {e}"));
829 }
830 }
831 for order_by in CDC_ORDER_BY_SHAPES {
832 if let Err(e) = validate_sql_order_by(order_by) {
833 failures.push(format!("cdc_order_by '{order_by}' failed: {e}"));
834 }
835 }
836
837 assert!(
838 failures.is_empty(),
839 "validation failures:\n{}",
840 failures.join("\n")
841 );
842 }
843
844 fn columns(names: &[&str]) -> BTreeSet<String> {
845 names.iter().map(|s| s.to_string()).collect()
846 }
847
848 #[test]
849 fn expression_columns_are_collected() {
850 for (expr, expected) in [
851 ("__is_deleted = true", columns(&["__is_deleted"])),
852 ("deleted_at is not null", columns(&["deleted_at"])),
853 (
854 "__is_deleted = true OR deleted_at is not null",
855 columns(&["__is_deleted", "deleted_at"]),
856 ),
857 ("lower(status) = 'gone'", columns(&["status"])),
859 ("t.deleted = true", columns(&["deleted"])),
861 ("1 = 1", columns(&[])),
863 ] {
864 assert_eq!(
865 columns_referenced_by_expression(expr).unwrap(),
866 expected,
867 "columns of '{expr}'"
868 );
869 }
870 }
871
872 #[test]
873 fn order_by_columns_are_collected() {
874 assert_eq!(
875 columns_referenced_by_order_by("ts asc, lsn desc").unwrap(),
876 columns(&["ts", "lsn"]),
877 );
878 assert_eq!(
879 columns_referenced_by_order_by("coalesce(ts, created_at) asc").unwrap(),
880 columns(&["ts", "created_at"]),
881 );
882 assert_eq!(
884 columns_referenced_by_order_by("ts desc nulls last, lsn asc").unwrap(),
885 columns(&["ts", "lsn"]),
886 );
887 }
888
889 #[test]
890 fn malformed_expressions_error() {
891 assert!(columns_referenced_by_expression("a =").is_err());
892 assert!(columns_referenced_by_order_by("ts asc,").is_err());
894 }
895}