# oxisql-datafusion
> Apache DataFusion 53.x bridge exposing OxiSQL-backed tables to OLAP SQL, with live streaming and filter/projection/limit/sort pushdown.
[](https://crates.io/crates/oxisql-datafusion)
[](https://docs.rs/oxisql-datafusion)
[](LICENSE)
[](https://www.rust-lang.org)
## What it is
`oxisql-datafusion` bridges the [OxiSQL](https://github.com/cool-japan/oxisql) workspace to [Apache DataFusion](https://datafusion.apache.org/) (53.x), so OLAP SQL — joins, aggregations, window functions — can be planned and executed by DataFusion against OxiSQL data.
It offers two `TableProvider` flavours: a **snapshot** provider that materialises a fixed set of rows as an Arrow `RecordBatch`, and a **live-streaming** provider that drives a real `oxisql_core::Connection` at scan time and yields batches incrementally. Both push supported predicates down to SQL (filter, projection, limit, sort), and multiple OxiSQL tables can be registered in one catalog for cross-table joins. Everything is Pure Rust.
- **Status:** Alpha.
- **Edition:** 2021 · **MSRV:** 1.89 · **License:** Apache-2.0.
## Installation
```toml
[dependencies]
oxisql-datafusion = "0.1.2"
# Optional features:
# oxisql-datafusion = { version = "0.1.2", features = ["parse"] } # SQL → DataFusion plan bridge
# oxisql-datafusion = { version = "0.1.2", features = ["columnar"] } # Parquet table provider
```
## Quick start
Register a live embedded connection and run OLAP SQL through DataFusion:
```rust,ignore
use std::sync::Arc;
use arrow::datatypes::{DataType, Field, Schema};
use oxisql_core::Connection;
use oxisql_datafusion::OxiSqlContext;
use oxisql_embedded::EmbeddedConnection;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 1. Populate an OxiSQL-backed table.
let conn = Arc::new(EmbeddedConnection::open_memory()?);
conn.execute("CREATE TABLE sales (region TEXT, amount INTEGER)", &[]).await?;
conn.execute("INSERT INTO sales VALUES ('EU', 100), ('EU', 50), ('US', 75)", &[]).await?;
let schema = Arc::new(Schema::new(vec![
Field::new("region", DataType::Utf8, false),
Field::new("amount", DataType::Int64, false),
]));
// 2. Register it as a live-streaming DataFusion table.
let ctx = OxiSqlContext::new();
ctx.register_table("sales", conn, schema)?;
// 3. Let DataFusion plan and execute the OLAP query.
let batches = ctx
.execute_sql("SELECT region, SUM(amount) AS total FROM sales GROUP BY region")
.await?;
println!("{} record batch(es)", batches.len());
Ok(())
}
```
Or build a static snapshot provider directly from rows:
```rust
use std::sync::Arc;
use arrow::datatypes::{DataType, Field, Schema};
use oxisql_core::{Row, Value};
use oxisql_datafusion::OxiSqlTableProvider;
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("name", DataType::Utf8, false),
Field::new("score", DataType::Float64, false),
]));
let rows = vec![Row::new(
vec!["id".into(), "name".into(), "score".into()],
vec![Value::I64(1), Value::Text("Alice".into()), Value::F64(95.5)],
)];
// Optionally split into 4 contiguous partitions for parallel scans.
let provider = OxiSqlTableProvider::from_rows(rows, schema)
.with_range_partition("id", 4);
```
### Via the `oxisql` facade
```rust,ignore
// `datafusion://` (or its alias `memory://`) yields an empty OxiSqlContext.
let ctx = oxisql::connect_datafusion("datafusion://").await?;
```
## Key API
| `OxiSqlTableProvider::from_rows(rows, schema)` | Snapshot provider from a pre-collected row set + Arrow schema. |
| `OxiSqlTableProvider::from_connection(conn, table, schema)` *(async)* | Snapshot built by running `SELECT * FROM {table}`. |
| `provider.refresh(conn, table)` *(async)* | Re-query the connection to replace the snapshot. |
| `provider.with_range_partition(key_col, n)` | Sort by `key_col` and split into `n` contiguous partitions. |
| `OxiSqlStreamProvider::new(conn, table, schema)` | Live-streaming provider driving a real `Connection` at scan time. |
| `provider.with_sort(vec![(col, SortOrder::Asc \| SortOrder::Desc)])` | Push an `ORDER BY` into the backing query. |
| `OxiSqlContext::new()` / `from_session_context(ctx)` | DataFusion `SessionContext` wrapper, fresh or wrapping an existing context. |
| `ctx.register_table(name, conn, schema)` | Register a live connection (uses `OxiSqlStreamProvider`). |
| `ctx.register_snapshot(name, rows, schema)` | Register a static snapshot (uses `OxiSqlTableProvider`). |
| `ctx.execute_sql(sql)` *(async)* | Run SQL, return `Vec<RecordBatch>`. |
| `ctx.sql(sql)` *(async)* | Run SQL, return a DataFusion `DataFrame`. |
| `ctx.explain_plan(sql)` *(async)* | Logical + physical plan as a formatted string. |
| `ctx.register_scalar_function(...)` / `ctx.register_aggregate_function(...)` | Register a scalar UDF / aggregate UDAF. |
| `ctx.session_context()` / `into_session_context()` | Borrow / take the underlying `SessionContext`. |
| `register_oxisql_table(ctx, name, conn, schema)` | Free fn — one-line registration of any `Connection`-backed table. |
| `register_embedded_table(ctx, name, conn, schema)` *(async)* | Free fn — convenience for `EmbeddedConnection`. |
| `OxiSqlFusionError` | Error type: `OxiSql(String)`, `Arrow(ArrowError)`, `DataFusion(DataFusionError)`, `SchemaMismatch`, `UnsupportedType`. |
## Feature flags
| *(default)* | `OxiSqlTableProvider`, `OxiSqlStreamProvider`, `OxiSqlContext`, pushdown, catalog registration, type mapping. |
| `parse` | `plan_bridge` module: `sql_to_datafusion_plan` / `to_datafusion_plan` convert an `oxisql_parse::LogicalPlan` into a DataFusion `LogicalPlan` (Scan/Limit/Empty structurally, the rest via SQL round-trip). |
| `columnar` | `ParquetTableProvider` — scan Parquet files as DataFusion tables via `oxistore-columnar`. |
| `mysql` / `postgres` / `sqlite` | Wire up the corresponding OxiSQL backend (primarily for cross-backend testing). |
## Pushdown summary
Predicates and shaping operators the providers translate into the backing SQL query (the rest is handled by DataFusion):
| `=`, `<>`, `<`, `<=`, `>`, `>=` | SQL `WHERE` comparisons |
| `IS NULL` / `IS NOT NULL` | SQL `WHERE` null checks |
| Projection | `SELECT` of only the requested columns (instead of `SELECT *`) |
| Limit | `LIMIT N` |
| Sort | `ORDER BY` (live provider, via `with_sort`) |
Filters on the snapshot provider are reported as `Inexact`, so DataFusion still applies its own post-filter for correctness. The full `Value` ↔ Arrow type mapping covers all 13 `Value` variants (Null, Bool, I64, F64, Text, Blob, Timestamp, Date, Time, Uuid, Json, Decimal, Array). Range-based partitioning enables parallel scans.
## Test coverage
**57 tests pass** (4 ignored: 2 live-server backend tests for the optional `mysql`/`postgres` backends, plus 2 rustdoc-ignored examples). This crate is part of a workspace where **1,720 tests pass** in total.
## Part of the OxiSQL workspace
`oxisql-datafusion` is one of 17 Pure-Rust crates in the OxiSQL project. See the [workspace README](../../README.md) for the facade, the embedded engine that backs most providers here, connection pooling, and the wire-protocol backends.
## License
Apache-2.0 © 2024–2026 COOLJAPAN OU (Team Kitasan).