oxisql-datafusion 0.3.0

Apache DataFusion TableProvider over oxisql Connection — enables OLAP SQL queries against oxisql-backed tables
docs.rs failed to build oxisql-datafusion-0.3.0
Please check the build logs for more information.
See Builds for ideas on how to fix a failed build, or Metadata for how to configure docs.rs builds.
If you believe this is docs.rs' fault, open an issue.
Visit the last successful build: oxisql-datafusion-0.3.1

oxisql-datafusion

Apache DataFusion 53.x bridge exposing OxiSQL-backed tables to OLAP SQL, with live streaming and filter/projection/limit/sort pushdown.

Crates.io Docs.rs License MSRV

What it is

oxisql-datafusion bridges the OxiSQL workspace to Apache DataFusion (53.x), so OLAP SQL — joins, aggregations, window functions — can be planned and executed by DataFusion against OxiSQL data.

It offers two TableProvider flavours: a snapshot provider that materialises a fixed set of rows as an Arrow RecordBatch, and a live-streaming provider that drives a real oxisql_core::Connection at scan time and yields batches incrementally. Both push supported predicates down to SQL (filter, projection, limit, sort), and multiple OxiSQL tables can be registered in one catalog for cross-table joins. Everything is Pure Rust.

  • Status: Alpha.
  • Edition: 2021 · MSRV: 1.89 · License: Apache-2.0.

Installation

[dependencies]
oxisql-datafusion = "0.1.2"

# Optional features:
# oxisql-datafusion = { version = "0.1.2", features = ["parse"] }     # SQL → DataFusion plan bridge
# oxisql-datafusion = { version = "0.1.2", features = ["columnar"] }  # Parquet table provider

Quick start

Register a live embedded connection and run OLAP SQL through DataFusion:

use std::sync::Arc;
use arrow::datatypes::{DataType, Field, Schema};
use oxisql_core::Connection;
use oxisql_datafusion::OxiSqlContext;
use oxisql_embedded::EmbeddedConnection;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 1. Populate an OxiSQL-backed table.
    let conn = Arc::new(EmbeddedConnection::open_memory()?);
    conn.execute("CREATE TABLE sales (region TEXT, amount INTEGER)", &[]).await?;
    conn.execute("INSERT INTO sales VALUES ('EU', 100), ('EU', 50), ('US', 75)", &[]).await?;

    let schema = Arc::new(Schema::new(vec![
        Field::new("region", DataType::Utf8,  false),
        Field::new("amount", DataType::Int64, false),
    ]));

    // 2. Register it as a live-streaming DataFusion table.
    let ctx = OxiSqlContext::new();
    ctx.register_table("sales", conn, schema)?;

    // 3. Let DataFusion plan and execute the OLAP query.
    let batches = ctx
        .execute_sql("SELECT region, SUM(amount) AS total FROM sales GROUP BY region")
        .await?;

    println!("{} record batch(es)", batches.len());
    Ok(())
}

Or build a static snapshot provider directly from rows:

use std::sync::Arc;
use arrow::datatypes::{DataType, Field, Schema};
use oxisql_core::{Row, Value};
use oxisql_datafusion::OxiSqlTableProvider;

let schema = Arc::new(Schema::new(vec![
    Field::new("id",    DataType::Int64,   false),
    Field::new("name",  DataType::Utf8,    false),
    Field::new("score", DataType::Float64, false),
]));

let rows = vec![Row::new(
    vec!["id".into(), "name".into(), "score".into()],
    vec![Value::I64(1), Value::Text("Alice".into()), Value::F64(95.5)],
)];

// Optionally split into 4 contiguous partitions for parallel scans.
let provider = OxiSqlTableProvider::from_rows(rows, schema)
    .with_range_partition("id", 4);

Via the oxisql facade

// `datafusion://` (or its alias `memory://`) yields an empty OxiSqlContext.
let ctx = oxisql::connect_datafusion("datafusion://").await?;

Key API

Item Description
OxiSqlTableProvider::from_rows(rows, schema) Snapshot provider from a pre-collected row set + Arrow schema.
OxiSqlTableProvider::from_connection(conn, table, schema) (async) Snapshot built by running SELECT * FROM {table}.
provider.refresh(conn, table) (async) Re-query the connection to replace the snapshot.
provider.with_range_partition(key_col, n) Sort by key_col and split into n contiguous partitions.
OxiSqlStreamProvider::new(conn, table, schema) Live-streaming provider driving a real Connection at scan time.
provider.with_sort(vec![(col, SortOrder::Asc | SortOrder::Desc)]) Push an ORDER BY into the backing query.
OxiSqlContext::new() / from_session_context(ctx) DataFusion SessionContext wrapper, fresh or wrapping an existing context.
ctx.register_table(name, conn, schema) Register a live connection (uses OxiSqlStreamProvider).
ctx.register_snapshot(name, rows, schema) Register a static snapshot (uses OxiSqlTableProvider).
ctx.execute_sql(sql) (async) Run SQL, return Vec<RecordBatch>.
ctx.sql(sql) (async) Run SQL, return a DataFusion DataFrame.
ctx.explain_plan(sql) (async) Logical + physical plan as a formatted string.
ctx.register_scalar_function(...) / ctx.register_aggregate_function(...) Register a scalar UDF / aggregate UDAF.
ctx.session_context() / into_session_context() Borrow / take the underlying SessionContext.
register_oxisql_table(ctx, name, conn, schema) Free fn — one-line registration of any Connection-backed table.
register_embedded_table(ctx, name, conn, schema) (async) Free fn — convenience for EmbeddedConnection.
OxiSqlFusionError Error type: OxiSql(String), Arrow(ArrowError), DataFusion(DataFusionError), SchemaMismatch, UnsupportedType.

Feature flags

Feature Effect
(default) OxiSqlTableProvider, OxiSqlStreamProvider, OxiSqlContext, pushdown, catalog registration, type mapping.
parse plan_bridge module: sql_to_datafusion_plan / to_datafusion_plan convert an oxisql_parse::LogicalPlan into a DataFusion LogicalPlan (Scan/Limit/Empty structurally, the rest via SQL round-trip).
columnar ParquetTableProvider — scan Parquet files as DataFusion tables via oxistore-columnar.
mysql / postgres / sqlite Wire up the corresponding OxiSQL backend (primarily for cross-backend testing).

Pushdown summary

Predicates and shaping operators the providers translate into the backing SQL query (the rest is handled by DataFusion):

Operator Pushed down as
=, <>, <, <=, >, >= SQL WHERE comparisons
IS NULL / IS NOT NULL SQL WHERE null checks
Projection SELECT of only the requested columns (instead of SELECT *)
Limit LIMIT N
Sort ORDER BY (live provider, via with_sort)

Filters on the snapshot provider are reported as Inexact, so DataFusion still applies its own post-filter for correctness. The full Value ↔ Arrow type mapping covers all 13 Value variants (Null, Bool, I64, F64, Text, Blob, Timestamp, Date, Time, Uuid, Json, Decimal, Array). Range-based partitioning enables parallel scans.

Test coverage

57 tests pass (4 ignored: 2 live-server backend tests for the optional mysql/postgres backends, plus 2 rustdoc-ignored examples). This crate is part of a workspace where 1,720 tests pass in total.

Part of the OxiSQL workspace

oxisql-datafusion is one of 17 Pure-Rust crates in the OxiSQL project. See the workspace README for the facade, the embedded engine that backs most providers here, connection pooling, and the wire-protocol backends.

License

Apache-2.0 © 2024–2026 COOLJAPAN OU (Team Kitasan).