Skip to main content

OxiSqlStreamProvider

Struct OxiSqlStreamProvider 

Source
pub struct OxiSqlStreamProvider { /* private fields */ }
Expand description

A DataFusion TableProvider that executes a live SQL query at scan time, pushing filters and projections down to the OxiSQL backend.

§Filter pushdown

Simple comparison predicates (=, <>, <, <=, >, >=, AND, OR, IS NULL, IS NOT NULL, NOT) are converted to SQL WHERE clauses. Complex expressions (functions, subqueries, arithmetic) are left to DataFusion’s post-scan filtering.

§Projection pushdown

When DataFusion requests specific columns, the provider generates SELECT col1, col2, ... instead of SELECT *.

§Limit pushdown

When DataFusion specifies a limit, the provider appends LIMIT N to the query, reducing the number of rows transferred.

§Sort pushdown

An optional ORDER BY clause can be injected into the generated SQL via Self::with_sort. Each entry is a (column_name, SortOrder) pair.

§Automatic partitioning

Self::with_auto_partition splits the scan into multiple parallel partitions using LIMIT / OFFSET pagination, each fetching at most target_batch_size rows. At most n_parallel partitions are created.

Implementations§

Source§

impl OxiSqlStreamProvider

Source

pub fn new( conn: Arc<dyn Connection>, table_name: impl Into<String>, schema: SchemaRef, ) -> Self

Construct from a connection, table name, and Arrow schema.

The schema must match the column layout returned by the backend for SELECT * FROM table_name. Column names in the schema are used verbatim as SQL column references.

Source

pub fn with_sort(self, order: Vec<(String, SortOrder)>) -> Self

Attach an ORDER BY clause to the SQL generated at scan time.

Each entry is a (column_name, SortOrder) pair. The resulting SQL fragment is appended between the WHERE clause and any LIMIT clause.

This is a “sort-into-SQL” approach: the backend database performs the ordering, reducing the work DataFusion must do on the result set.

§Example
use oxisql_datafusion::stream::{OxiSqlStreamProvider, SortOrder};

let provider = OxiSqlStreamProvider::new(conn, "users", schema)
    .with_sort(vec![
        ("score".into(), SortOrder::Desc),
        ("id".into(), SortOrder::Asc),
    ]);
Source

pub fn sort_order(&self) -> Option<&[(String, SortOrder)]>

Return the configured sort order, if any.

Returns None if no sort has been attached via Self::with_sort.

Source

pub fn with_auto_partition( self, n_parallel: usize, target_batch_size: usize, ) -> Self

Configure automatic partitioning for parallel scan execution.

When set, scan() creates up to n_parallel partition slots, each issuing LIMIT target_batch_size OFFSET i * target_batch_size to the backend. This allows DataFusion to execute multiple partitions in parallel without knowing the total row count ahead of time.

Partitions beyond the actual data silently return empty batches, so setting n_parallel generously is safe.

  • n_parallel: maximum number of parallel partitions (e.g. CPU thread count).
  • target_batch_size: rows fetched per partition page (e.g. 8192).
§Example
let provider = OxiSqlStreamProvider::new(conn, "users", schema)
    .with_auto_partition(4, 8192);
Source

pub fn auto_partition_config(&self) -> Option<(usize, usize)>

Return the auto-partition configuration (n_parallel, target_batch_size), if any.

Trait Implementations§

Source§

impl Debug for OxiSqlStreamProvider

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
Source§

impl TableProvider for OxiSqlStreamProvider

Source§

fn schema(&self) -> SchemaRef

Get a reference to the schema for this table
Source§

fn table_type(&self) -> TableType

Get the type of this table for metadata/catalog purposes.
Source§

fn scan<'life0, 'life1, 'life2, 'life3, 'async_trait>( &'life0 self, _state: &'life1 dyn Session, projection: Option<&'life2 Vec<usize>>, filters: &'life3 [Expr], limit: Option<usize>, ) -> Pin<Box<dyn Future<Output = DFResult<Arc<dyn ExecutionPlan>>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait, 'life3: 'async_trait,

Create an ExecutionPlan for scanning the table with optional projection, filter, and limit, described below. Read more
Source§

fn supports_filters_pushdown( &self, filters: &[&Expr], ) -> DFResult<Vec<TableProviderFilterPushDown>>

Specify if DataFusion should provide filter expressions to the TableProvider to apply during the scan. Read more
Source§

fn constraints(&self) -> Option<&Constraints>

Get a reference to the constraints of the table. Returns: Read more
Source§

fn get_table_definition(&self) -> Option<&str>

Get the create statement used to create this table, if available.
Source§

fn get_logical_plan(&self) -> Option<Cow<'_, LogicalPlan>>

Get the LogicalPlan of this table, if available.
Source§

fn get_column_default(&self, _column: &str) -> Option<&Expr>

Get the default value for a column, if available.
Source§

fn scan_with_args<'a, 'life0, 'life1, 'async_trait>( &'life0 self, state: &'life1 dyn Session, args: ScanArgs<'a>, ) -> Pin<Box<dyn Future<Output = Result<ScanResult, DataFusionError>> + Send + 'async_trait>>
where 'a: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, Self: 'async_trait,

Create an ExecutionPlan for scanning the table using structured arguments. Read more
Source§

fn statistics(&self) -> Option<Statistics>

Get statistics for this table, if available Although not presently used in mainline DataFusion, this allows implementation specific behavior for downstream repositories, in conjunction with specialized optimizer rules to perform operations such as re-ordering of joins.
Source§

fn insert_into<'life0, 'life1, 'async_trait>( &'life0 self, _state: &'life1 dyn Session, _input: Arc<dyn ExecutionPlan>, _insert_op: InsertOp, ) -> Pin<Box<dyn Future<Output = Result<Arc<dyn ExecutionPlan>, DataFusionError>> + Send + 'async_trait>>
where 'life0: 'async_trait, 'life1: 'async_trait, Self: 'async_trait,

Return an ExecutionPlan to insert data into this table, if supported. Read more
Source§

fn delete_from<'life0, 'life1, 'async_trait>( &'life0 self, _state: &'life1 dyn Session, _filters: Vec<Expr>, ) -> Pin<Box<dyn Future<Output = Result<Arc<dyn ExecutionPlan>, DataFusionError>> + Send + 'async_trait>>
where 'life0: 'async_trait, 'life1: 'async_trait, Self: 'async_trait,

Delete rows matching the filter predicates. Read more
Source§

fn update<'life0, 'life1, 'async_trait>( &'life0 self, _state: &'life1 dyn Session, _assignments: Vec<(String, Expr)>, _filters: Vec<Expr>, ) -> Pin<Box<dyn Future<Output = Result<Arc<dyn ExecutionPlan>, DataFusionError>> + Send + 'async_trait>>
where 'life0: 'async_trait, 'life1: 'async_trait, Self: 'async_trait,

Update rows matching the filter predicates. Read more
Source§

fn truncate<'life0, 'life1, 'async_trait>( &'life0 self, _state: &'life1 dyn Session, ) -> Pin<Box<dyn Future<Output = Result<Arc<dyn ExecutionPlan>, DataFusionError>> + Send + 'async_trait>>
where 'life0: 'async_trait, 'life1: 'async_trait, Self: 'async_trait,

Remove all rows from the table. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<ST, DT> CastableFrom<ST, Initialized, Initialized> for DT
where ST: ?Sized, DT: ?Sized,

Source§

impl<ST, DT> CastableFrom<ST, Uninit, Uninit> for DT
where ST: ?Sized, DT: ?Sized,

Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T> Read<Exclusive, BecauseExclusive> for T
where T: ?Sized,

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V