vortex-scan
A high-performance scanning and (non-shuffling) query execution engine for the Vortex columnar format, featuring work-stealing parallelism and exhaustively tested concurrent execution.
Overview
The vortex-scan crate provides efficient scanning operations over Vortex arrays with support for:
- Projection pushdown - Only read the columns you need
- Filter predicates - Push filters down to the storage layer
- Row selection - Efficiently skip unwanted rows
- Multi-threaded execution - Work-stealing parallelism for CPU-bound operations
- Async I/O - Tokio-based async execution for I/O operations
- Arrow integration - Seamless conversion to Apache Arrow format
Features
Core Capabilities
- ScanBuilder API: Fluent interface for constructing scan operations
- Flexible Execution: Single-threaded, multi-threaded, and async execution modes
- Row Filtering: Support for complex boolean expressions and dynamic filters
- Selection Modes: Include/exclude by index or using Roaring bitmaps
- Split Strategies: Split scans by row count or file size for parallel processing
Performance Features
- Work Stealing: Efficient work distribution across threads
- Zero-Copy Operations: Minimize memory allocations and copies
- Pruning Evaluation: Skip reading data that won't match filters
- Concurrent Iteration: Multiple threads can process results simultaneously
Usage
Basic Scan
use ScanBuilder;
use lit;
// Create a scan that reads specific columns with a filter
let scan = new
.with_projection
.with_filter
.build?;
// Execute the scan
for batch in scan.into_array_iter?
Multi-threaded Execution
// Execute scan across multiple threads
let scan = new
.with_projection
.with_filter
.into_array_iter_multithread?;
for batch in scan
Arrow Integration
use RecordBatch;
// Convert scan results to Arrow RecordBatches
let reader = new
.with_filter
.into_record_batch_reader?;
for batch in reader
Row Selection
use Selection;
// Select specific rows by index
let scan = new
.with_selection
.build?;
// Or use row ranges
let scan = new
.with_row_range
.build?;
Architecture
Work-Stealing Queue
The crate implements a sophisticated work-stealing queue that allows multiple worker threads to efficiently share work:
- Dynamic Task Addition: Tasks can be added while processing is ongoing
- Fair Work Distribution: Threads steal work from each other to balance load
- Lock-Free Operations: Uses crossbeam's deque for efficient concurrent access
Filter Optimization
Filters are automatically optimized using:
- Conjunct Reordering: Most selective filters are evaluated first
- Dynamic Statistics: Filter selectivity is tracked and used for optimization
- Pruning Pushdown: Filters are pushed to the storage layer when possible
Memory Safety
All concurrent code has been verified using:
- Loom Testing: Exhaustive verification of all possible thread interleavings
- Address Sanitizer: Memory safety verification in CI
- Debug Assertions: Runtime checks for invariants in debug builds
Testing
Unit Tests
Run the standard test suite:
Loom Concurrency Tests
The crate includes comprehensive Loom tests that exhaustively verify concurrent behavior. These tests run by default but can be disabled if need be:
# Skip Loom tests when using incompatible tools like address sanitizer
RUSTFLAGS="--cfg disable_loom" cargo test -p vortex-scan
Loom tests verify:
- Memory ordering correctness in the work-stealing queue
- Absence of data races in filter expression evaluation
- Proper synchronization in concurrent task factories
- Thread termination conditions and cleanup
Performance Considerations
Concurrency Level
The default concurrency level is 2, meaning each worker thread can have 2 tasks in flight. This can be adjusted:
let scan = new
.with_concurrency // Increase for more I/O parallelism
.build?;
Buffer Sizes
The multi-threaded executor uses buffering based on the formula:
buffer_size = num_workers * concurrency
This controls how many splits are processed concurrently.
Memory Usage
- Streaming Processing: Results are streamed rather than materialized
- Bounded Buffers: Memory usage is bounded by the concurrency level
- Lazy Evaluation: Computation is deferred until results are consumed
Dependencies
Core dependencies:
vortex-array: Core array types and operationsvortex-layout: Layout reader abstractionvortex-expr: Expression evaluation frameworkfutures: Async runtime abstractionstokio(optional): Multi-threaded async runtimearrow-array(optional): Arrow integration
Feature Flags
default: Standard features for most use casestokio: Enable multi-threaded execution with Tokio runtimeroaring: Support for Roaring bitmap selections