pub struct OxiSqlStreamProvider { /* private fields */ }Expand description
A DataFusion TableProvider that executes a live SQL query at scan time,
pushing filters and projections down to the OxiSQL backend.
§Filter pushdown
Simple comparison predicates (=, <>, <, <=, >, >=, AND, OR,
IS NULL, IS NOT NULL, NOT) are converted to SQL WHERE clauses.
Complex expressions (functions, subqueries, arithmetic) are left to
DataFusion’s post-scan filtering.
§Projection pushdown
When DataFusion requests specific columns, the provider generates
SELECT col1, col2, ... instead of SELECT *.
§Limit pushdown
When DataFusion specifies a limit, the provider appends LIMIT N to the
query, reducing the number of rows transferred.
§Sort pushdown
An optional ORDER BY clause can be injected into the generated SQL via
Self::with_sort. Each entry is a (column_name, SortOrder) pair.
§Automatic partitioning
Self::with_auto_partition splits the scan into multiple parallel
partitions using LIMIT / OFFSET pagination, each fetching at most
target_batch_size rows. At most n_parallel partitions are created.
Implementations§
Source§impl OxiSqlStreamProvider
impl OxiSqlStreamProvider
Sourcepub fn new(
conn: Arc<dyn Connection>,
table_name: impl Into<String>,
schema: SchemaRef,
) -> Self
pub fn new( conn: Arc<dyn Connection>, table_name: impl Into<String>, schema: SchemaRef, ) -> Self
Construct from a connection, table name, and Arrow schema.
The schema must match the column layout returned by the backend for
SELECT * FROM table_name. Column names in the schema are used
verbatim as SQL column references.
Sourcepub fn with_sort(self, order: Vec<(String, SortOrder)>) -> Self
pub fn with_sort(self, order: Vec<(String, SortOrder)>) -> Self
Attach an ORDER BY clause to the SQL generated at scan time.
Each entry is a (column_name, SortOrder) pair. The resulting SQL
fragment is appended between the WHERE clause and any LIMIT clause.
This is a “sort-into-SQL” approach: the backend database performs the ordering, reducing the work DataFusion must do on the result set.
§Example
use oxisql_datafusion::stream::{OxiSqlStreamProvider, SortOrder};
let provider = OxiSqlStreamProvider::new(conn, "users", schema)
.with_sort(vec![
("score".into(), SortOrder::Desc),
("id".into(), SortOrder::Asc),
]);Sourcepub fn sort_order(&self) -> Option<&[(String, SortOrder)]>
pub fn sort_order(&self) -> Option<&[(String, SortOrder)]>
Return the configured sort order, if any.
Returns None if no sort has been attached via Self::with_sort.
Sourcepub fn with_auto_partition(
self,
n_parallel: usize,
target_batch_size: usize,
) -> Self
pub fn with_auto_partition( self, n_parallel: usize, target_batch_size: usize, ) -> Self
Configure automatic partitioning for parallel scan execution.
When set, scan() creates up to n_parallel partition slots, each
issuing LIMIT target_batch_size OFFSET i * target_batch_size to the
backend. This allows DataFusion to execute multiple partitions in
parallel without knowing the total row count ahead of time.
Partitions beyond the actual data silently return empty batches, so
setting n_parallel generously is safe.
n_parallel: maximum number of parallel partitions (e.g. CPU thread count).target_batch_size: rows fetched per partition page (e.g.8192).
§Example
let provider = OxiSqlStreamProvider::new(conn, "users", schema)
.with_auto_partition(4, 8192);Sourcepub fn auto_partition_config(&self) -> Option<(usize, usize)>
pub fn auto_partition_config(&self) -> Option<(usize, usize)>
Return the auto-partition configuration (n_parallel, target_batch_size), if any.
Trait Implementations§
Source§impl Debug for OxiSqlStreamProvider
impl Debug for OxiSqlStreamProvider
Source§impl TableProvider for OxiSqlStreamProvider
impl TableProvider for OxiSqlStreamProvider
Source§fn table_type(&self) -> TableType
fn table_type(&self) -> TableType
Source§fn scan<'life0, 'life1, 'life2, 'life3, 'async_trait>(
&'life0 self,
_state: &'life1 dyn Session,
projection: Option<&'life2 Vec<usize>>,
filters: &'life3 [Expr],
limit: Option<usize>,
) -> Pin<Box<dyn Future<Output = DFResult<Arc<dyn ExecutionPlan>>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
'life3: 'async_trait,
fn scan<'life0, 'life1, 'life2, 'life3, 'async_trait>(
&'life0 self,
_state: &'life1 dyn Session,
projection: Option<&'life2 Vec<usize>>,
filters: &'life3 [Expr],
limit: Option<usize>,
) -> Pin<Box<dyn Future<Output = DFResult<Arc<dyn ExecutionPlan>>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
'life3: 'async_trait,
ExecutionPlan for scanning the table with optional
projection, filter, and limit, described below. Read moreSource§fn supports_filters_pushdown(
&self,
filters: &[&Expr],
) -> DFResult<Vec<TableProviderFilterPushDown>>
fn supports_filters_pushdown( &self, filters: &[&Expr], ) -> DFResult<Vec<TableProviderFilterPushDown>>
Source§fn constraints(&self) -> Option<&Constraints>
fn constraints(&self) -> Option<&Constraints>
Source§fn get_table_definition(&self) -> Option<&str>
fn get_table_definition(&self) -> Option<&str>
Source§fn get_logical_plan(&self) -> Option<Cow<'_, LogicalPlan>>
fn get_logical_plan(&self) -> Option<Cow<'_, LogicalPlan>>
LogicalPlan of this table, if available.Source§fn get_column_default(&self, _column: &str) -> Option<&Expr>
fn get_column_default(&self, _column: &str) -> Option<&Expr>
Source§fn scan_with_args<'a, 'life0, 'life1, 'async_trait>(
&'life0 self,
state: &'life1 dyn Session,
args: ScanArgs<'a>,
) -> Pin<Box<dyn Future<Output = Result<ScanResult, DataFusionError>> + Send + 'async_trait>>where
'a: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
fn scan_with_args<'a, 'life0, 'life1, 'async_trait>(
&'life0 self,
state: &'life1 dyn Session,
args: ScanArgs<'a>,
) -> Pin<Box<dyn Future<Output = Result<ScanResult, DataFusionError>> + Send + 'async_trait>>where
'a: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
ExecutionPlan for scanning the table using structured arguments. Read moreSource§fn statistics(&self) -> Option<Statistics>
fn statistics(&self) -> Option<Statistics>
Source§fn insert_into<'life0, 'life1, 'async_trait>(
&'life0 self,
_state: &'life1 dyn Session,
_input: Arc<dyn ExecutionPlan>,
_insert_op: InsertOp,
) -> Pin<Box<dyn Future<Output = Result<Arc<dyn ExecutionPlan>, DataFusionError>> + Send + 'async_trait>>where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
fn insert_into<'life0, 'life1, 'async_trait>(
&'life0 self,
_state: &'life1 dyn Session,
_input: Arc<dyn ExecutionPlan>,
_insert_op: InsertOp,
) -> Pin<Box<dyn Future<Output = Result<Arc<dyn ExecutionPlan>, DataFusionError>> + Send + 'async_trait>>where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
ExecutionPlan to insert data into this table, if
supported. Read moreSource§fn delete_from<'life0, 'life1, 'async_trait>(
&'life0 self,
_state: &'life1 dyn Session,
_filters: Vec<Expr>,
) -> Pin<Box<dyn Future<Output = Result<Arc<dyn ExecutionPlan>, DataFusionError>> + Send + 'async_trait>>where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
fn delete_from<'life0, 'life1, 'async_trait>(
&'life0 self,
_state: &'life1 dyn Session,
_filters: Vec<Expr>,
) -> Pin<Box<dyn Future<Output = Result<Arc<dyn ExecutionPlan>, DataFusionError>> + Send + 'async_trait>>where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
Source§fn update<'life0, 'life1, 'async_trait>(
&'life0 self,
_state: &'life1 dyn Session,
_assignments: Vec<(String, Expr)>,
_filters: Vec<Expr>,
) -> Pin<Box<dyn Future<Output = Result<Arc<dyn ExecutionPlan>, DataFusionError>> + Send + 'async_trait>>where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
fn update<'life0, 'life1, 'async_trait>(
&'life0 self,
_state: &'life1 dyn Session,
_assignments: Vec<(String, Expr)>,
_filters: Vec<Expr>,
) -> Pin<Box<dyn Future<Output = Result<Arc<dyn ExecutionPlan>, DataFusionError>> + Send + 'async_trait>>where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
Source§fn truncate<'life0, 'life1, 'async_trait>(
&'life0 self,
_state: &'life1 dyn Session,
) -> Pin<Box<dyn Future<Output = Result<Arc<dyn ExecutionPlan>, DataFusionError>> + Send + 'async_trait>>where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
fn truncate<'life0, 'life1, 'async_trait>(
&'life0 self,
_state: &'life1 dyn Session,
) -> Pin<Box<dyn Future<Output = Result<Arc<dyn ExecutionPlan>, DataFusionError>> + Send + 'async_trait>>where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
Auto Trait Implementations§
impl !RefUnwindSafe for OxiSqlStreamProvider
impl !UnwindSafe for OxiSqlStreamProvider
impl Freeze for OxiSqlStreamProvider
impl Send for OxiSqlStreamProvider
impl Sync for OxiSqlStreamProvider
impl Unpin for OxiSqlStreamProvider
impl UnsafeUnpin for OxiSqlStreamProvider
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
impl<ST, DT> CastableFrom<ST, Initialized, Initialized> for DT
impl<ST, DT> CastableFrom<ST, Uninit, Uninit> for DT
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more