Skip to main content

feldera_adapterlib/utils/
datafusion.rs

1use crate::errors::journal::ControllerError;
2use anyhow::{Error as AnyError, anyhow};
3use arrow::array::Array;
4use datafusion::common::ScalarValue;
5use datafusion::common::arrow::array::{AsArray, RecordBatch};
6use datafusion::execution::SessionStateBuilder;
7use datafusion::execution::memory_pool::FairSpillPool;
8use datafusion::execution::runtime_env::{RuntimeEnv, RuntimeEnvBuilder};
9use datafusion::logical_expr::sqlparser::parser::ParserError;
10use datafusion::prelude::{SQLOptions, SessionConfig, SessionContext};
11use datafusion::sql::sqlparser::ast::{Expr, visit_expressions};
12use datafusion::sql::sqlparser::dialect::GenericDialect;
13use datafusion::sql::sqlparser::parser::Parser;
14use datafusion::sql::sqlparser::tokenizer::Token;
15use feldera_types::config::PipelineConfig;
16use feldera_types::constants::DATAFUSION_TEMP_DIR;
17use feldera_types::program_schema::{ColumnType, Field, Relation, SqlType};
18use std::collections::BTreeSet;
19use std::ffi::OsStr;
20use std::fs::{create_dir_all, read_dir, remove_dir_all, remove_file};
21use std::io::Error as IoError;
22use std::ops::ControlFlow;
23use std::path::{Path, PathBuf};
24use std::sync::Arc;
25use tracing::warn;
26
27/// In-memory sort threshold; above this, sorts spill to disk. 64 MiB.
28///
29/// Powers of two align with page sizes (4 KiB / 2 MiB) the allocator
30/// hands back, so a `1 << 26` budget matches what the OS actually
31/// reserves rather than a round decimal value the OS rounds up anyway.
32const SORT_IN_PLACE_THRESHOLD_BYTES: usize = 1 << 26;
33
34/// Memory withheld from the sort phase for the merge phase to use. 64 MiB.
35///
36/// Reserved per partition: a sort with N partitions pre-allocates
37/// `N * SORT_SPILL_RESERVATION_BYTES` from the pool.
38/// If the pool can't satisfy that, the query fails immediately
39/// with `Resources exhausted`. `create_runtime_env` emits a startup warning
40/// when the configured pool is below `workers * SORT_SPILL_RESERVATION_BYTES`.
41///
42/// Note: DataFusion 52.x emits noisy `WARN datafusion_physical_plan::spill:
43/// Record batch memory usage ... exceeds the expected limit ... by more
44/// than the allowed tolerance` lines during spilled sorts. The overage is
45/// typically a handful of bytes over a 4 KB tolerance -- upstream
46/// accounting drift, tracked at
47/// <https://github.com/apache/datafusion/issues/17340> Not a query failure
48const SORT_SPILL_RESERVATION_BYTES: usize = 1 << 26;
49
50/// Build the shared datafusion [`RuntimeEnv`] for a pipeline.
51///
52/// Build once per pipeline and share the `Arc` across every
53/// `SessionContext`. A separate `RuntimeEnv` per context would give each its
54/// own pool, multiplying the effective memory budget by `(1 + #connectors)`.
55pub fn create_runtime_env(
56    pipeline_config: &PipelineConfig,
57) -> Result<Arc<RuntimeEnv>, ControllerError> {
58    let mut builder = RuntimeEnvBuilder::new();
59    if let Some(datafusion_memory_mb) = pipeline_config.global.resolved_datafusion_memory_mb() {
60        let memory_bytes_max = datafusion_memory_mb * 1_000_000;
61        builder = builder.with_memory_pool(Arc::new(FairSpillPool::new(memory_bytes_max as usize)));
62        warn_if_pool_too_small_for_adhoc_sort(pipeline_config, datafusion_memory_mb);
63    }
64    if let Some(storage) = &pipeline_config.storage_config {
65        let path = PathBuf::from(storage.path.clone()).join(DATAFUSION_TEMP_DIR);
66        create_dir_all(&path).map_err(|error| {
67            ControllerError::io_error(
68                format!(
69                    "unable to create datafusion scratch space directory '{}'",
70                    path.display()
71                ),
72                error,
73            )
74        })?;
75        clean_stale_scratch_entries(&path);
76        builder = builder.with_temp_file_path(path);
77    }
78    builder.build_arc().map_err(|error| {
79        ControllerError::io_error(
80            "unable to build datafusion runtime environment",
81            IoError::other(error.to_string()),
82        )
83    })
84}
85
86/// Minimum DataFusion pool size, in MB, that can satisfy the ad-hoc
87/// engine's per-partition sort reservation given `workers`.
88///
89/// Ad-hoc sessions set `target_partitions = workers`
90/// (see [`create_session_context`]), so an `ORDER BY` (or any other
91/// sort-based operator) reserves `workers * SORT_SPILL_RESERVATION_BYTES`
92/// from the pool *before* sorting any rows. The reservation is in
93/// binary MiB (`1 << 26`); the pool is sized from the user-facing
94/// `datafusion_memory_mb` (decimal MB). Compare in bytes, then
95/// ceil-divide to MB so the warning's threshold is never lower than
96/// the actual byte requirement.
97fn min_pool_mb_for_adhoc_sort(workers: u64) -> u64 {
98    let needed_bytes = (SORT_SPILL_RESERVATION_BYTES as u64).saturating_mul(workers);
99    needed_bytes.div_ceil(1_000_000)
100}
101
102/// Warn at startup when the DataFusion pool is too small to satisfy the
103/// per-partition sort reservation for the ad-hoc query engine.
104///
105/// If the pool can't satisfy that, the query fails on the first reservation
106/// attempt with `Resources exhausted`. Surface this as a single startup
107/// warning so the failure mode isn't silent. Connector sessions can override
108/// `target_partitions`; their reservation budget is not checked here.
109fn warn_if_pool_too_small_for_adhoc_sort(pipeline_config: &PipelineConfig, pool_mb: u64) {
110    let workers = pipeline_config.global.workers as u64;
111    // Degenerate configs (tests / synthetic) report `workers == 0`; nothing
112    // useful to say in that case and the message would print "0 MB".
113    if workers == 0 {
114        return;
115    }
116    let min_pool_mb = min_pool_mb_for_adhoc_sort(workers);
117    // `<=` not `<`: at exact equality every partition's reservation sums to
118    // the full pool with zero headroom. FairSpillPool's internal accounting
119    // takes a few bytes of overhead, so the last partition's reservation
120    // fails by a fraction of a MB. Empirically: pool=256 / workers=4
121    // fails; 257 succeeds.
122    if pool_mb <= min_pool_mb {
123        let per_worker_mb = min_pool_mb_for_adhoc_sort(1);
124        warn!(
125            "DataFusion memory pool is {pool_mb} MB; sort-heavy ad-hoc \
126             queries (ORDER BY, EXCEPT, hash joins) need at least \
127             {min_pool_mb} MB ({workers} workers x {per_worker_mb} MB \
128             reservation per worker). Such queries may fail at first \
129             allocation with 'Resources exhausted'. Increase \
130             'datafusion_memory_mb' or reduce 'workers'."
131        );
132    }
133}
134
135/// Remove leftovers from a previous process inside the scratch directory.
136///
137/// DataFusion's `DiskManager` leaks its `datafusion-XXXXXX/` subdir if the
138/// process is killed before `tempfile::TempDir::drop` runs. The previous
139/// process is gone by the time we get here, so anything still in the dir is
140/// orphaned. Spill files are per-query and never need to survive a restart.
141/// Errors only logged: a stuck file should not block startup.
142fn clean_stale_scratch_entries(scratch_dir: &Path) {
143    // Tripwire: refuse to recursively delete anything whose final
144    // component isn't the well-known scratch dir name. Defends against a
145    // future caller accidentally passing `/`, `~`, or the storage root.
146    if scratch_dir.file_name() != Some(OsStr::new(DATAFUSION_TEMP_DIR)) {
147        warn!(
148            "refusing to clean unexpected scratch directory '{}'; expected final component '{DATAFUSION_TEMP_DIR}'",
149            scratch_dir.display(),
150        );
151        return;
152    }
153    let entries = match read_dir(scratch_dir) {
154        Ok(entries) => entries,
155        Err(error) => {
156            warn!(
157                "unable to read datafusion scratch directory '{}' for startup cleanup: {error}",
158                scratch_dir.display(),
159            );
160            return;
161        }
162    };
163    for entry in entries.flatten() {
164        let path = entry.path();
165        let file_type = match entry.file_type() {
166            Ok(ft) => ft,
167            Err(error) => {
168                warn!(
169                    "unable to stat stale datafusion scratch entry '{}': {error}",
170                    path.display(),
171                );
172                continue;
173            }
174        };
175        let result = if file_type.is_dir() {
176            remove_dir_all(&path)
177        } else {
178            remove_file(&path)
179        };
180        if let Err(error) = result {
181            warn!(
182                "unable to remove stale datafusion scratch entry '{}': {error}",
183                path.display(),
184            );
185        }
186    }
187}
188
189/// `SessionContext` bound to the shared [`RuntimeEnv`], configured with the
190/// pipeline's worker count and feldera's sort-spill thresholds.
191pub fn create_session_context(
192    pipeline_config: &PipelineConfig,
193    runtime_env: Arc<RuntimeEnv>,
194) -> SessionContext {
195    create_session_context_with(pipeline_config, runtime_env, |cfg| cfg)
196}
197
198/// Like [`create_session_context`], with a hook to override individual
199/// datafusion settings (e.g. parquet decoding) before the context is built.
200pub fn create_session_context_with<F>(
201    pipeline_config: &PipelineConfig,
202    runtime_env: Arc<RuntimeEnv>,
203    customize_config: F,
204) -> SessionContext
205where
206    F: FnOnce(SessionConfig) -> SessionConfig,
207{
208    let workers = pipeline_config
209        .global
210        .io_workers
211        .unwrap_or(pipeline_config.global.workers as u64);
212    let session_config = SessionConfig::new()
213        .with_target_partitions(workers as usize)
214        .with_sort_in_place_threshold_bytes(SORT_IN_PLACE_THRESHOLD_BYTES)
215        .with_sort_spill_reservation_bytes(SORT_SPILL_RESERVATION_BYTES)
216        .set(
217            "datafusion.execution.planning_concurrency",
218            &ScalarValue::UInt64(Some(workers)),
219        );
220    let session_config = customize_config(session_config);
221
222    let state = SessionStateBuilder::new()
223        .with_config(session_config)
224        .with_runtime_env(runtime_env)
225        .with_default_features()
226        .build();
227    SessionContext::from(state)
228}
229
230/// Execute a SQL query and collect all results in a vector of `RecordBatch`'s.
231pub async fn execute_query_collect(
232    datafusion: &SessionContext,
233    query: &str,
234) -> Result<Vec<RecordBatch>, AnyError> {
235    let options = SQLOptions::new()
236        .with_allow_ddl(false)
237        .with_allow_dml(false);
238
239    let df = datafusion
240        .sql_with_options(query, options)
241        .await
242        .map_err(|e| anyhow!("error compiling query '{query}': {e}"))?;
243
244    df.collect()
245        .await
246        .map_err(|e| anyhow!("error executing query '{query}': {e}"))
247}
248
249/// Execute a SQL query that returns a result with exactly one row and column of type `string`.
250pub async fn execute_singleton_query(
251    datafusion: &SessionContext,
252    query: &str,
253) -> Result<String, AnyError> {
254    let result = execute_query_collect(datafusion, query).await?;
255    if result.len() != 1 {
256        return Err(anyhow!(
257            "internal error: query '{query}' returned {} batches; expected: 1",
258            result.len()
259        ));
260    }
261
262    if result[0].num_rows() != 1 {
263        return Err(anyhow!(
264            "internal error: query '{query}' returned {} rows; expected: 1",
265            result[0].num_rows()
266        ));
267    }
268
269    if result[0].num_columns() != 1 {
270        return Err(anyhow!(
271            "internal error: query '{query}' returned {} columns; expected: 1",
272            result[0].num_columns()
273        ));
274    }
275
276    let column0 = result[0].column(0);
277
278    array_to_string(column0).ok_or_else(|| {
279        anyhow!("internal error: cannot retrieve the output of query '{query}' as a string")
280    })
281}
282
283pub fn array_to_string(array: &dyn Array) -> Option<String> {
284    if let Some(string_view_array) = array.as_string_view_opt() {
285        Some(string_view_array.value(0).to_string())
286    } else {
287        array
288            .as_string_opt::<i32>()
289            .map(|array| array.value(0).to_string())
290    }
291}
292
293/// Parse expression only to validate it.
294pub fn validate_sql_expression(expr: &str) -> Result<(), ParserError> {
295    let mut parser = Parser::new(&GenericDialect).try_with_sql(expr)?;
296    parser.parse_expr()?;
297
298    Ok(())
299}
300
301/// Validate the body of an ORDER BY clause (e.g. "ts asc, lsn desc").
302///
303/// Unlike [`validate_sql_expression`], this accepts the comma-separated,
304/// ASC/DESC/NULLS annotated key list a real ORDER BY allows, and
305/// requires the whole string to parse so a malformed clause fails here rather
306/// than silently dropping every key after the first.
307pub fn validate_sql_order_by(order_by: &str) -> Result<(), ParserError> {
308    let mut parser = Parser::new(&GenericDialect).try_with_sql(order_by)?;
309    parser.parse_comma_separated(Parser::parse_order_by_expr)?;
310    parser.expect_token(&Token::EOF)?;
311
312    Ok(())
313}
314
315/// Collect into `columns` every column name an expression's AST references,
316/// walking nested sub-expressions. For a compound reference such as `t.c` only
317/// the trailing part (`c`) is taken, since that is the column; leading parts are
318/// table qualifiers.
319///
320/// Over-collecting is harmless for the callers here -- it merely keeps a column
321/// from being pruned -- but under-collecting would drop a column the connector
322/// needs, so an identifier is kept whenever it could name a column.
323fn collect_referenced_columns(expr: &Expr, columns: &mut BTreeSet<String>) {
324    let _: ControlFlow<()> = visit_expressions(expr, |e| {
325        match e {
326            Expr::Identifier(ident) => {
327                columns.insert(ident.value.clone());
328            }
329            Expr::CompoundIdentifier(parts) => {
330                if let Some(column) = parts.last() {
331                    columns.insert(column.value.clone());
332                }
333            }
334            _ => {}
335        }
336        ControlFlow::Continue(())
337    });
338}
339
340/// Column names referenced by a scalar SQL expression, e.g. a connector's
341/// `filter` or `cdc_delete_filter`. Names are returned verbatim; the caller
342/// case-folds when matching them against a schema.
343///
344/// Returns an error if the expression does not parse. Used when pruning
345/// "unused" columns, to keep the columns a connector expression depends on.
346pub fn columns_referenced_by_expression(expr: &str) -> Result<BTreeSet<String>, ParserError> {
347    let mut parser = Parser::new(&GenericDialect).try_with_sql(expr)?;
348    let parsed = parser.parse_expr()?;
349    let mut columns = BTreeSet::new();
350    collect_referenced_columns(&parsed, &mut columns);
351    Ok(columns)
352}
353
354/// Like [`columns_referenced_by_expression`], but for the comma-separated key
355/// list of an ORDER BY clause, e.g. a connector's `cdc_order_by`.
356pub fn columns_referenced_by_order_by(order_by: &str) -> Result<BTreeSet<String>, ParserError> {
357    let mut parser = Parser::new(&GenericDialect).try_with_sql(order_by)?;
358    let keys = parser.parse_comma_separated(Parser::parse_order_by_expr)?;
359    parser.expect_token(&Token::EOF)?;
360    let mut columns = BTreeSet::new();
361    for key in &keys {
362        collect_referenced_columns(&key.expr, &mut columns);
363    }
364    Ok(columns)
365}
366
367/// Convert a value of the timestamp column returned by a SQL query into a valid
368/// SQL expression.
369pub fn timestamp_to_sql_expression(column_type: &ColumnType, expr: &str) -> String {
370    match column_type.typ {
371        SqlType::Timestamp => format!("timestamp '{expr}'"),
372        SqlType::Date => format!("date '{expr}'"),
373        _ => expr.to_string(),
374    }
375}
376
377/// Check that the `timestamp` field has one of supported types.
378pub fn validate_timestamp_type(
379    endpoint_name: &str,
380    timestamp: &Field,
381    docs: &str,
382) -> Result<(), ControllerError> {
383    if !timestamp.columntype.is_integral_type()
384        && !matches!(
385            &timestamp.columntype.typ,
386            SqlType::Date | SqlType::Timestamp
387        )
388    {
389        return Err(ControllerError::invalid_transport_configuration(
390            endpoint_name,
391            &format!(
392                "timestamp column '{}' has unsupported type {}; supported types for 'timestamp_column' are integer types, DATE, and TIMESTAMP; {docs}",
393                timestamp.name,
394                serde_json::to_string(&timestamp.columntype).unwrap()
395            ),
396        ));
397    }
398
399    Ok(())
400}
401
402/// Validate 'timestamp_column'.
403pub async fn validate_timestamp_column(
404    endpoint_name: &str,
405    timestamp_column: &str,
406    datafusion: &SessionContext,
407    schema: &Relation,
408    docs: &str,
409) -> Result<(), ControllerError> {
410    // Lookup column in the schema.
411    let Some(field) = schema.field(timestamp_column) else {
412        return Err(ControllerError::invalid_transport_configuration(
413            endpoint_name,
414            &format!("timestamp column '{timestamp_column}' not found in table schema"),
415        ));
416    };
417
418    // Field must have a supported type.
419    validate_timestamp_type(endpoint_name, field, docs)?;
420
421    // Column must have lateness.
422    let Some(lateness) = &field.lateness else {
423        return Err(ControllerError::invalid_transport_configuration(
424            endpoint_name,
425            &format!(
426                "timestamp column '{timestamp_column}' does not have a LATENESS attribute; {docs}"
427            ),
428        ));
429    };
430
431    // Validate lateness expression.
432    validate_sql_expression(lateness).map_err(|e|
433                ControllerError::invalid_transport_configuration(
434                    endpoint_name,
435                    &format!("error parsing LATENESS attribute '{lateness}' of the timestamp column '{timestamp_column}': {e}; {docs}"),
436                ),
437            )?;
438
439    // Lateness has to be >0. Zero would mean that we need to ingest data strictly in order. If we need to support this case in the future,
440    // we could revert to our old (and very costly) strategy of issuing a single `select *` query with the 'ORDER BY timestamp_column' clause,
441    // which requires storing and sorting the entire collection locally.
442    let is_zero = execute_singleton_query(
443        datafusion,
444        &format!("select cast((({lateness} + {lateness}) = {lateness}) as string)"),
445    )
446    .await
447    .map_err(|e| ControllerError::invalid_transport_configuration(endpoint_name, &e.to_string()))?;
448
449    if &is_zero == "true" {
450        return Err(ControllerError::invalid_transport_configuration(
451            endpoint_name,
452            &format!(
453                "invalid LATENESS attribute '{lateness}' of the timestamp column '{timestamp_column}': LATENESS must be greater than zero; {docs}"
454            ),
455        ));
456    }
457
458    Ok(())
459}
460
461#[cfg(test)]
462mod tests {
463    use super::{
464        columns_referenced_by_expression, columns_referenced_by_order_by, create_runtime_env,
465        create_session_context,
466    };
467    use datafusion::execution::memory_pool::MemoryLimit;
468    use feldera_types::config::{PipelineConfig, ResourceConfig, RuntimeConfig, StorageConfig};
469    use feldera_types::constants::DATAFUSION_TEMP_DIR;
470    use std::collections::BTreeSet;
471    use std::fs;
472    use std::path::{Path, PathBuf};
473
474    /// Drop guard so a failing test does not leak temp directories.
475    struct TempStorage {
476        path: PathBuf,
477    }
478
479    impl TempStorage {
480        fn new(name: &str) -> Self {
481            let path = std::env::temp_dir().join(name);
482            let _ = fs::remove_dir_all(&path);
483            fs::create_dir_all(&path).unwrap();
484            Self { path }
485        }
486
487        fn path(&self) -> &Path {
488            &self.path
489        }
490    }
491
492    impl Drop for TempStorage {
493        fn drop(&mut self) {
494            let _ = fs::remove_dir_all(&self.path);
495        }
496    }
497
498    fn pipeline_config(global: RuntimeConfig, storage: Option<&Path>) -> PipelineConfig {
499        PipelineConfig {
500            global,
501            multihost: None,
502            name: None,
503            given_name: None,
504            storage_config: storage.map(|p| StorageConfig {
505                path: p.to_string_lossy().into(),
506                cache: Default::default(),
507            }),
508            secrets_dir: None,
509            inputs: Default::default(),
510            outputs: Default::default(),
511            program_ir: None,
512        }
513    }
514
515    #[test]
516    fn create_runtime_env_creates_tmp_dir_under_storage() {
517        let storage = TempStorage::new("feldera-datafusion-create-runtime-env-tmp-dir-test");
518        let cfg = pipeline_config(
519            RuntimeConfig {
520                workers: 1,
521                ..Default::default()
522            },
523            Some(storage.path()),
524        );
525
526        create_runtime_env(&cfg).unwrap();
527
528        let expected = storage.path().join(DATAFUSION_TEMP_DIR);
529        assert!(
530            expected.is_dir(),
531            "expected scratch directory at {}",
532            expected.display(),
533        );
534    }
535
536    /// Must match the value `checkpointer::gc_startup` allowlists, or the
537    /// scratch dir is wiped on every restart.
538    #[test]
539    fn scratch_dir_name_matches_gc_allowlist_constant() {
540        assert_eq!(DATAFUSION_TEMP_DIR, "datafusion-tmp");
541    }
542
543    #[test]
544    fn create_runtime_env_without_storage_succeeds() {
545        let cfg = pipeline_config(
546            RuntimeConfig {
547                workers: 1,
548                ..Default::default()
549            },
550            None,
551        );
552        create_runtime_env(&cfg).unwrap();
553    }
554
555    #[test]
556    fn create_runtime_env_applies_memory_pool_when_budget_set() {
557        // 5% of 16 GB = 800 MB; below the 2 GB ceiling.
558        let storage = TempStorage::new("feldera-datafusion-create-runtime-env-pool-test");
559        let cfg = pipeline_config(
560            RuntimeConfig {
561                workers: 1,
562                max_rss_mb: Some(16_000),
563                ..Default::default()
564            },
565            Some(storage.path()),
566        );
567
568        let env = create_runtime_env(&cfg).unwrap();
569        match env.memory_pool.memory_limit() {
570            MemoryLimit::Finite(bytes) => assert_eq!(bytes, 800 * 1_000_000),
571            MemoryLimit::Infinite => panic!("expected a bounded memory pool, got Infinite"),
572            MemoryLimit::Unknown => panic!("expected a bounded memory pool, got Unknown"),
573        }
574    }
575
576    #[test]
577    fn create_runtime_env_no_memory_limit_when_budget_unset() {
578        let storage = TempStorage::new("feldera-datafusion-create-runtime-env-unbounded-test");
579        let cfg = pipeline_config(
580            RuntimeConfig {
581                workers: 1,
582                ..Default::default()
583            },
584            Some(storage.path()),
585        );
586
587        let env = create_runtime_env(&cfg).unwrap();
588        // Anything other than `Finite(_)` proves no FairSpillPool was wired in.
589        match env.memory_pool.memory_limit() {
590            MemoryLimit::Finite(bytes) => {
591                panic!("expected an unbounded pool, got finite limit of {bytes} bytes");
592            }
593            _ => {}
594        }
595    }
596
597    #[test]
598    fn create_runtime_env_uses_resources_memory_mb_max_fallback() {
599        let storage = TempStorage::new("feldera-datafusion-create-runtime-env-resources-test");
600        let cfg = pipeline_config(
601            RuntimeConfig {
602                workers: 1,
603                max_rss_mb: None,
604                resources: ResourceConfig {
605                    memory_mb_max: Some(16_000),
606                    ..Default::default()
607                },
608                ..Default::default()
609            },
610            Some(storage.path()),
611        );
612
613        let env = create_runtime_env(&cfg).unwrap();
614        match env.memory_pool.memory_limit() {
615            MemoryLimit::Finite(bytes) => assert_eq!(bytes, 800 * 1_000_000),
616            MemoryLimit::Infinite => panic!("expected a bounded memory pool, got Infinite"),
617            MemoryLimit::Unknown => panic!("expected a bounded memory pool, got Unknown"),
618        }
619    }
620
621    #[test]
622    fn create_runtime_env_wipes_stale_scratch_entries() {
623        let storage = TempStorage::new("feldera-datafusion-create-runtime-env-wipe-test");
624        let scratch = storage.path().join(DATAFUSION_TEMP_DIR);
625        fs::create_dir_all(&scratch).unwrap();
626
627        // Simulate leftovers from a prior crashed process.
628        let stale_subdir = scratch.join("datafusion-stale1");
629        fs::create_dir_all(&stale_subdir).unwrap();
630        fs::write(stale_subdir.join("orphan.arrow"), b"stale").unwrap();
631        let stale_file = scratch.join("loose.tmp");
632        fs::write(&stale_file, b"stale").unwrap();
633
634        let cfg = pipeline_config(
635            RuntimeConfig {
636                workers: 1,
637                ..Default::default()
638            },
639            Some(storage.path()),
640        );
641        create_runtime_env(&cfg).unwrap();
642
643        assert!(
644            scratch.is_dir(),
645            "scratch root must survive cleanup; gc_startup keeps it on the allowlist",
646        );
647        assert!(
648            !stale_subdir.exists(),
649            "stale per-DiskManager subdir should be removed on startup",
650        );
651        assert!(
652            !stale_file.exists(),
653            "stale loose file should be removed on startup",
654        );
655    }
656
657    #[test]
658    fn create_session_context_target_partitions_match_workers() {
659        let storage = TempStorage::new("feldera-datafusion-create-session-context-workers-test");
660        let cfg = pipeline_config(
661            RuntimeConfig {
662                workers: 7,
663                ..Default::default()
664            },
665            Some(storage.path()),
666        );
667        let env = create_runtime_env(&cfg).unwrap();
668        let ctx = create_session_context(&cfg, env);
669        assert_eq!(ctx.copied_config().target_partitions(), 7);
670    }
671
672    #[test]
673    fn create_session_context_target_partitions_prefer_io_workers() {
674        let storage = TempStorage::new("feldera-datafusion-create-session-context-io-workers-test");
675        let cfg = pipeline_config(
676            RuntimeConfig {
677                workers: 4,
678                io_workers: Some(12),
679                ..Default::default()
680            },
681            Some(storage.path()),
682        );
683        let env = create_runtime_env(&cfg).unwrap();
684        let ctx = create_session_context(&cfg, env);
685        assert_eq!(ctx.copied_config().target_partitions(), 12);
686    }
687
688    #[test]
689    fn create_session_context_with_customise_overrides_defaults() {
690        use super::create_session_context_with;
691        let storage = TempStorage::new("feldera-datafusion-create-session-context-override-test");
692        let cfg = pipeline_config(
693            RuntimeConfig {
694                workers: 4,
695                ..Default::default()
696            },
697            Some(storage.path()),
698        );
699        let env = create_runtime_env(&cfg).unwrap();
700        // Customise hook must win over the worker-derived defaults.
701        let ctx = create_session_context_with(&cfg, env, |c| {
702            c.set_usize("datafusion.execution.target_partitions", 99)
703        });
704        assert_eq!(ctx.copied_config().target_partitions(), 99);
705    }
706
707    /// Tripwire: `clean_stale_scratch_entries` refuses to walk a directory
708    /// whose final component isn't `DATAFUSION_TEMP_DIR`, so a misuse can't
709    /// recursively wipe an arbitrary path.
710    #[test]
711    fn clean_stale_scratch_entries_refuses_unexpected_paths() {
712        use super::clean_stale_scratch_entries;
713        let storage = TempStorage::new("feldera-datafusion-clean-scratch-guard-test");
714        let bogus = storage.path().join("not-the-scratch-dir");
715        fs::create_dir_all(&bogus).unwrap();
716        let canary = bogus.join("canary.txt");
717        fs::write(&canary, b"do not delete").unwrap();
718
719        clean_stale_scratch_entries(&bogus);
720
721        assert!(
722            canary.exists(),
723            "guard must not delete contents of a directory whose name != DATAFUSION_TEMP_DIR",
724        );
725    }
726
727    /// Pins the boundary that drives the `warn_if_pool_too_small_for_adhoc_sort`
728    /// log line. If `SORT_SPILL_RESERVATION_BYTES` changes, the warning
729    /// threshold changes with it
730    #[test]
731    fn min_pool_mb_for_adhoc_sort_matches_reservation_times_workers() {
732        use super::min_pool_mb_for_adhoc_sort;
733        // SORT_SPILL_RESERVATION_BYTES is 64 MiB = 67_108_864 B; the
734        // resolved pool size is reported in decimal MB, so each worker's
735        // requirement ceil-divides to 68 MB.
736        assert_eq!(min_pool_mb_for_adhoc_sort(0), 0);
737        assert_eq!(min_pool_mb_for_adhoc_sort(1), 68);
738        assert_eq!(min_pool_mb_for_adhoc_sort(2), 135);
739        assert_eq!(min_pool_mb_for_adhoc_sort(8), 537);
740    }
741
742    /// Make sure random shapes for `filter`, `cdc_delete_filter`, and `cdc_order_by`
743    /// are parsed correctly by our connector.
744    #[test]
745    fn cdc_connector_expr_shapes_validate() {
746        use super::{validate_sql_expression, validate_sql_order_by};
747
748        // Set as `filter` or `cdc_delete_filter`; validated as a scalar predicate.
749        const FILTER_SHAPES: &[&str] = &[
750            "0=0",
751            "0=0 AND (a = 's0' AND b = 's1')",
752            "0=0 AND (a = 's0')",
753            "0=0 AND (a IN ('s0'))",
754            "0=0 AND (a IN ('s0','s1'))",
755            "0=0 AND (a IN (1,2) OR a IS NULL)",
756            "0=0 AND (a IN (1,2) OR a IS NULL) AND (b = false)",
757            "0=0 AND (a IN (1,2) OR a IS NULL) AND (b IN ('s0'))",
758            "0=0 AND (a IN (1,2) OR a IS NULL) AND (b IN ('s0','s1'))",
759            "0=0 AND (a IN (1,2) OR a IS NULL) AND (b IS NOT NULL)",
760            "0=0 AND (a IN (1,2) OR a IS NULL) AND (b IS NULL AND c IS NULL)",
761            "0=0 AND (a IN (1,2) OR a IS NULL) AND (b IS NULL)",
762            "0=0 AND (a IN (1,2) OR a IS NULL) AND (b NOT IN ('s0','s1') AND c IS NOT NULL)",
763            "0=0 AND (a IN('s0','s1'))",
764            "0=0 AND (a IS NOT NULL AND b IS NOT NULL)",
765            "0=0 AND a = false",
766            "0=0 AND a = false AND (b = 's0' AND c = 's1')",
767            "0=0 AND a = false AND (b = 's0')",
768            "0=0 AND a = false AND (b IN ('s0'))",
769            "0=0 AND a = false AND (b IN ('s0','s1'))",
770            "0=0 AND a = false AND (b IN (1,2) OR b IS NULL)",
771            "0=0 AND a = false AND (b IN (1,2) OR b IS NULL) AND (c = false)",
772            "0=0 AND a = false AND (b IN (1,2) OR b IS NULL) AND (c IS NOT NULL)",
773            "0=0 AND a = false AND (b IS NOT NULL AND c IS NOT NULL)",
774            "0=0 AND a = false AND b is null",
775            "0=0 AND a = false AND b is null AND (c = 's0')",
776            "0=0 AND a = false AND b is null AND (c IN ('s0','s1'))",
777            "0=0 AND a = false AND b is null AND (c IN (1,2) OR c IS NULL)",
778            "0=0 AND a = false AND b is null AND (c IN (1,2) OR c IS NULL) AND (d NOT IN ('s0','s1') AND e IS NOT NULL)",
779            "a > 0",
780            "a >= 0 AND a <= 9",
781            "a <> 's0'",
782            "a != 's0'",
783            "a BETWEEN 0 AND 9",
784            "a LIKE 's0'",
785            "a IS NULL OR b IS NOT NULL",
786            "NOT (a = false)",
787            "lower(a) = 's0'",
788            "cast(a AS bigint) = 0",
789            "a + b > 0",
790            "coalesce(a, b) = 's0'",
791            "a > timestamp '2020-01-02 03:04:05'",
792            "a = 's0''s1'",
793        ];
794        const CDC_DELETE_FILTER_SHAPES: &[&str] = &[
795            "a = true",
796            "a = true OR b is not null",
797            "a = true AND b = false",
798            "a IN ('s0','s1')",
799            "a IS NOT NULL",
800            "NOT a",
801        ];
802        const CDC_ORDER_BY_SHAPES: &[&str] = &[
803            "a",
804            "a, b",
805            "a asc, b asc",
806            "a ASC",
807            "a desc",
808            "a ASC, b DESC",
809            "a NULLS FIRST",
810            "a ASC NULLS LAST",
811            "a DESC NULLS FIRST",
812            "a asc nulls last, b desc nulls first",
813            "a asc, b desc, c asc nulls last",
814            "a + b asc",
815            "a % 2 asc, b desc",
816            "lower(a) asc",
817            "abs(a) desc, b asc",
818            "cast(a AS bigint) asc",
819            "coalesce(a, b) asc, c desc",
820            "case when a then 0 else 1 end desc",
821            // Quoted identifier containing a space.
822            "\"a b\" asc",
823        ];
824
825        let mut failures = Vec::new();
826        for expr in FILTER_SHAPES.iter().chain(CDC_DELETE_FILTER_SHAPES) {
827            if let Err(e) = validate_sql_expression(expr) {
828                failures.push(format!("predicate '{expr}' failed: {e}"));
829            }
830        }
831        for order_by in CDC_ORDER_BY_SHAPES {
832            if let Err(e) = validate_sql_order_by(order_by) {
833                failures.push(format!("cdc_order_by '{order_by}' failed: {e}"));
834            }
835        }
836
837        assert!(
838            failures.is_empty(),
839            "validation failures:\n{}",
840            failures.join("\n")
841        );
842    }
843
844    fn columns(names: &[&str]) -> BTreeSet<String> {
845        names.iter().map(|s| s.to_string()).collect()
846    }
847
848    #[test]
849    fn expression_columns_are_collected() {
850        for (expr, expected) in [
851            ("__is_deleted = true", columns(&["__is_deleted"])),
852            ("deleted_at is not null", columns(&["deleted_at"])),
853            (
854                "__is_deleted = true OR deleted_at is not null",
855                columns(&["__is_deleted", "deleted_at"]),
856            ),
857            // Function arguments are walked; the function name is not a column.
858            ("lower(status) = 'gone'", columns(&["status"])),
859            // A compound reference resolves to its trailing (column) part.
860            ("t.deleted = true", columns(&["deleted"])),
861            // A predicate over no columns yields the empty set.
862            ("1 = 1", columns(&[])),
863        ] {
864            assert_eq!(
865                columns_referenced_by_expression(expr).unwrap(),
866                expected,
867                "columns of '{expr}'"
868            );
869        }
870    }
871
872    #[test]
873    fn order_by_columns_are_collected() {
874        assert_eq!(
875            columns_referenced_by_order_by("ts asc, lsn desc").unwrap(),
876            columns(&["ts", "lsn"]),
877        );
878        assert_eq!(
879            columns_referenced_by_order_by("coalesce(ts, created_at) asc").unwrap(),
880            columns(&["ts", "created_at"]),
881        );
882        // ASC/DESC and NULLS FIRST/LAST modifiers parse but are not columns.
883        assert_eq!(
884            columns_referenced_by_order_by("ts desc nulls last, lsn asc").unwrap(),
885            columns(&["ts", "lsn"]),
886        );
887    }
888
889    #[test]
890    fn malformed_expressions_error() {
891        assert!(columns_referenced_by_expression("a =").is_err());
892        // A trailing key past the first must still fail rather than be dropped.
893        assert!(columns_referenced_by_order_by("ts asc,").is_err());
894    }
895}