oxisql-datafusion — Apache DataFusion integration for OxiSQL
oxisql-datafusion exposes OxiSQL-backed tables to Apache DataFusion so that OLAP SQL queries can be planned and executed against OxiSQL data using the full DataFusion query engine.
Installation
[]
= "0.1.1"
# Optional features:
# oxisql-datafusion = { version = "0.1.1", features = ["columnar"] } # Parquet support
# oxisql-datafusion = { version = "0.1.1", features = ["parse"] } # Plan bridge
Quick Start
use Arc;
use ;
use ;
use OxiSqlTableProvider;
let schema = new;
let rows = vec!;
let provider = from_rows;
API Overview
OxiSqlTableProvider
A DataFusion TableProvider that serves a fixed snapshot of oxisql_core::Rows as a single Arrow RecordBatch partition.
| Method | Description |
|---|---|
OxiSqlTableProvider::from_rows(rows, schema) |
Construct from a pre-collected row snapshot and Arrow schema |
OxiSqlTableProvider::from_connection(conn, table_name, schema) |
Execute SELECT * FROM {table_name} on conn to populate |
provider.refresh(conn, table_name) |
Re-query conn to replace the current snapshot |
provider.with_range_partition(key_col, n) |
Sort by key_col and split into n contiguous partitions for parallel scans |
Filter pushdown is supported for binary comparisons (=, <>, <, <=, >, >=) and IS NULL / IS NOT NULL. Filters are applied in-process; Inexact is reported so DataFusion still applies its own post-filter.
The provider is cheaply cloneable (Arc internally).
OxiSqlStreamProvider
A live-streaming TableProvider that drives a real oxisql_core::Connection at scan time and yields batches incrementally.
| Method | Description |
|---|---|
OxiSqlStreamProvider::new(conn, table_name, schema) |
Wrap a live connection |
provider.with_sort_order(order) |
Specify sort ordering for the stream |
SortOrder variants: Ascending(col), Descending(col).
OxiSqlContext
A DataFusion SessionContext wrapper with convenience methods for registering OxiSQL-backed tables.
| Method | Description |
|---|---|
OxiSqlContext::new() |
Create with default DataFusion settings |
OxiSqlContext::from_session_context(ctx) |
Wrap an existing SessionContext |
ctx.register_table(name, conn, schema) |
Register a live connection as a DataFusion table (uses OxiSqlStreamProvider) |
ctx.register_snapshot(name, rows, schema) |
Register a static row snapshot (uses OxiSqlTableProvider) |
ctx.execute_sql(sql) |
Execute SQL and return Vec<RecordBatch> |
ctx.to_dataframe(sql) |
Execute SQL and return a DataFusion DataFrame |
ctx.register_udf(name, func, arg_types, return_type) |
Register a scalar UDF |
ctx.register_udaf(name, factory, arg_types, return_type) |
Register an aggregate UDF |
ctx.explain(sql) |
Return the physical plan explanation string |
ctx.inner() |
Access the underlying SessionContext |
Free functions also available:
use ;
// Register any Connection-backed table
register_oxisql_table?;
// Convenience for EmbeddedConnection
register_embedded_table?;
OxiSqlFusionError
Error type covering DataFusion and OxiSQL errors:
| Variant | Description |
|---|---|
DataFusion(DataFusionError) |
DataFusion engine error |
OxiSql(String) |
OxiSQL backend error (string form) |
Arrow(ArrowError) |
Arrow conversion error |
Features / Feature Flags
| Feature | Description |
|---|---|
| (default) | OxiSqlTableProvider, OxiSqlStreamProvider, OxiSqlContext |
columnar |
ParquetTableProvider — scan Parquet files as DataFusion tables |
parse |
plan_bridge module — convert oxisql_parse::LogicalPlan to DataFusion LogicalPlan |
plan_bridge (feature = parse)
use ;
use LogicalPlan;
// Convert an oxisql_parse LogicalPlan to a DataFusion LogicalPlan
let df_plan = to_datafusion_plan?;
// Parse SQL and produce a DataFusion plan in one step
let df_plan = sql_to_datafusion_plan?;
ParquetTableProvider (feature = columnar)
use ParquetTableProvider;
let provider = open.await?;
session_ctx.register_table?;
Type Mapping
OxiSQL Value variants are converted to Arrow arrays via the types module:
Value variant |
Arrow type |
|---|---|
Null |
null slot in the column array |
Bool |
Boolean |
I64 |
Int64 |
F64 |
Float64 |
Text |
Utf8 |
Blob |
Binary |
Timestamp |
Timestamp(Microsecond, UTC) |
Date |
Date32 |
Time |
Time64(Microsecond) |
Uuid |
FixedSizeBinary(16) |
Json |
Utf8 |
Decimal |
Utf8 |
Array |
LargeList |
Test Status
As of 2026-05-30: 67 tests passing, 4 skipped.
License
Apache-2.0 — COOLJAPAN OU (Team Kitasan)