Skip to main content

oxisql_datafusion/
stream.rs

1//! Live-streaming DataFusion table provider backed by a real OxiSQL connection.
2//!
3//! Unlike [`OxiSqlTableProvider`], which materialises all rows at construction
4//! time, [`OxiSqlStreamProvider`] queries the database at plan-execution time,
5//! translating DataFusion filter and projection hints into SQL WHERE / SELECT
6//! clauses so that the backend does the heavy lifting.
7//!
8//! [`OxiSqlTableProvider`]: crate::OxiSqlTableProvider
9
10use std::fmt;
11use std::sync::Arc;
12
13use arrow::datatypes::SchemaRef;
14use async_trait::async_trait;
15use datafusion::catalog::Session;
16use datafusion::datasource::{TableProvider, TableType};
17use datafusion::error::Result as DFResult;
18use datafusion::execution::TaskContext;
19use datafusion::logical_expr::{
20    Between, BinaryExpr, Expr, Like, Operator, TableProviderFilterPushDown,
21};
22use datafusion::physical_expr::EquivalenceProperties;
23use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
24use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
25use datafusion::physical_plan::{
26    DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
27    SendableRecordBatchStream,
28};
29use datafusion::scalar::ScalarValue;
30use oxisql_core::Connection;
31
32// ── Sort order ────────────────────────────────────────────────────────────────
33
34/// Sort direction for an `ORDER BY` column pushed down to the SQL backend.
35#[derive(Debug, Clone, Copy, PartialEq, Eq)]
36pub enum SortOrder {
37    /// Sort ascending (default SQL ordering).
38    Asc,
39    /// Sort descending.
40    Desc,
41}
42
43impl fmt::Display for SortOrder {
44    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
45        match self {
46            SortOrder::Asc => f.write_str("ASC"),
47            SortOrder::Desc => f.write_str("DESC"),
48        }
49    }
50}
51
52/// A DataFusion [`TableProvider`] that executes a live SQL query at scan time,
53/// pushing filters and projections down to the OxiSQL backend.
54///
55/// # Filter pushdown
56///
57/// Simple comparison predicates (`=`, `<>`, `<`, `<=`, `>`, `>=`, `AND`, `OR`,
58/// `IS NULL`, `IS NOT NULL`, `NOT`) are converted to SQL WHERE clauses.
59/// Complex expressions (functions, subqueries, arithmetic) are left to
60/// DataFusion's post-scan filtering.
61///
62/// # Projection pushdown
63///
64/// When DataFusion requests specific columns, the provider generates
65/// `SELECT col1, col2, ...` instead of `SELECT *`.
66///
67/// # Limit pushdown
68///
69/// When DataFusion specifies a limit, the provider appends `LIMIT N` to the
70/// query, reducing the number of rows transferred.
71///
72/// # Sort pushdown
73///
74/// An optional `ORDER BY` clause can be injected into the generated SQL via
75/// [`Self::with_sort`].  Each entry is a `(column_name, SortOrder)` pair.
76///
77/// # Automatic partitioning
78///
79/// [`Self::with_auto_partition`] splits the scan into multiple parallel
80/// partitions using `LIMIT` / `OFFSET` pagination, each fetching at most
81/// `target_batch_size` rows.  At most `n_parallel` partitions are created.
82pub struct OxiSqlStreamProvider {
83    schema: SchemaRef,
84    table_name: String,
85    conn: Arc<dyn Connection>,
86    /// Optional sort order to push to the backend as an `ORDER BY` clause.
87    sort_order: Option<Vec<(String, SortOrder)>>,
88    /// Optional auto-partition configuration `(n_parallel, target_batch_size)`.
89    ///
90    /// When set, `scan()` spawns up to `n_parallel` partition slots, each
91    /// issuing `LIMIT target_batch_size OFFSET i * target_batch_size` to the
92    /// backend for parallel execution.
93    auto_partition_config: Option<(usize, usize)>,
94}
95
96impl OxiSqlStreamProvider {
97    /// Construct from a connection, table name, and Arrow schema.
98    ///
99    /// The `schema` must match the column layout returned by the backend for
100    /// `SELECT * FROM table_name`.  Column names in the schema are used
101    /// verbatim as SQL column references.
102    pub fn new(
103        conn: Arc<dyn Connection>,
104        table_name: impl Into<String>,
105        schema: SchemaRef,
106    ) -> Self {
107        Self {
108            schema,
109            table_name: table_name.into(),
110            conn,
111            sort_order: None,
112            auto_partition_config: None,
113        }
114    }
115
116    /// Attach an `ORDER BY` clause to the SQL generated at scan time.
117    ///
118    /// Each entry is a `(column_name, SortOrder)` pair.  The resulting SQL
119    /// fragment is appended between the WHERE clause and any LIMIT clause.
120    ///
121    /// This is a "sort-into-SQL" approach: the backend database performs the
122    /// ordering, reducing the work DataFusion must do on the result set.
123    ///
124    /// # Example
125    ///
126    /// ```rust,ignore
127    /// use oxisql_datafusion::stream::{OxiSqlStreamProvider, SortOrder};
128    ///
129    /// let provider = OxiSqlStreamProvider::new(conn, "users", schema)
130    ///     .with_sort(vec![
131    ///         ("score".into(), SortOrder::Desc),
132    ///         ("id".into(), SortOrder::Asc),
133    ///     ]);
134    /// ```
135    #[must_use]
136    pub fn with_sort(mut self, order: Vec<(String, SortOrder)>) -> Self {
137        self.sort_order = Some(order);
138        self
139    }
140
141    /// Return the configured sort order, if any.
142    ///
143    /// Returns `None` if no sort has been attached via [`Self::with_sort`].
144    pub fn sort_order(&self) -> Option<&[(String, SortOrder)]> {
145        self.sort_order.as_deref()
146    }
147
148    /// Configure automatic partitioning for parallel scan execution.
149    ///
150    /// When set, `scan()` creates up to `n_parallel` partition slots, each
151    /// issuing `LIMIT target_batch_size OFFSET i * target_batch_size` to the
152    /// backend.  This allows DataFusion to execute multiple partitions in
153    /// parallel without knowing the total row count ahead of time.
154    ///
155    /// Partitions beyond the actual data silently return empty batches, so
156    /// setting `n_parallel` generously is safe.
157    ///
158    /// - `n_parallel`: maximum number of parallel partitions (e.g. CPU thread count).
159    /// - `target_batch_size`: rows fetched per partition page (e.g. `8192`).
160    ///
161    /// # Example
162    ///
163    /// ```rust,ignore
164    /// let provider = OxiSqlStreamProvider::new(conn, "users", schema)
165    ///     .with_auto_partition(4, 8192);
166    /// ```
167    #[must_use]
168    pub fn with_auto_partition(mut self, n_parallel: usize, target_batch_size: usize) -> Self {
169        self.auto_partition_config = Some((n_parallel, target_batch_size));
170        self
171    }
172
173    /// Return the auto-partition configuration `(n_parallel, target_batch_size)`, if any.
174    pub fn auto_partition_config(&self) -> Option<(usize, usize)> {
175        self.auto_partition_config
176    }
177
178    /// Construct from a live [`oxisql_mysql::MyConnection`] for streaming MySQL
179    /// query results through DataFusion.
180    ///
181    /// The connection is wrapped in an `Arc` and used as the underlying
182    /// [`Connection`] trait object.  All filter, projection, limit, and sort
183    /// pushdown features of [`OxiSqlStreamProvider`] are available on the
184    /// resulting provider.
185    ///
186    /// # Feature flag
187    ///
188    /// Only available when the `mysql` Cargo feature of `oxisql-datafusion` is
189    /// enabled.
190    ///
191    /// # Example
192    ///
193    /// ```rust,no_run
194    /// # #[tokio::main]
195    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
196    /// use std::sync::Arc;
197    /// use arrow::datatypes::{DataType, Field, Schema};
198    /// use oxisql_mysql::{MyConnection, TlsMode};
199    /// use oxisql_datafusion::stream::OxiSqlStreamProvider;
200    ///
201    /// let conn = MyConnection::connect(
202    ///     "mysql://root:secret@localhost:3306/mydb",
203    ///     TlsMode::Disabled,
204    /// ).await?;
205    ///
206    /// let schema = Arc::new(Schema::new(vec![
207    ///     Field::new("id",   DataType::Int64, true),
208    ///     Field::new("name", DataType::Utf8,  true),
209    /// ]));
210    ///
211    /// let provider = OxiSqlStreamProvider::from_mysql(conn, "users", schema);
212    /// # Ok(())
213    /// # }
214    /// ```
215    #[cfg(feature = "mysql")]
216    pub fn from_mysql(
217        conn: oxisql_mysql::MyConnection,
218        table_name: impl Into<String>,
219        schema: SchemaRef,
220    ) -> Self {
221        Self::new(Arc::new(conn) as Arc<dyn Connection>, table_name, schema)
222    }
223
224    /// Construct from a live [`oxisql_postgres::PgConnection`] for streaming
225    /// Postgres query results through DataFusion.
226    ///
227    /// The connection is wrapped in an `Arc` and used as the underlying
228    /// [`Connection`] trait object.  All filter, projection, limit, and sort
229    /// pushdown features of [`OxiSqlStreamProvider`] are available on the
230    /// resulting provider.
231    ///
232    /// # Feature flag
233    ///
234    /// Only available when the `postgres` Cargo feature of `oxisql-datafusion`
235    /// is enabled.
236    ///
237    /// # Example
238    ///
239    /// ```rust,no_run
240    /// # #[tokio::main]
241    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
242    /// use std::sync::Arc;
243    /// use arrow::datatypes::{DataType, Field, Schema};
244    /// use oxisql_postgres::{PgConnection, TlsMode};
245    /// use oxisql_datafusion::stream::OxiSqlStreamProvider;
246    ///
247    /// let conn = PgConnection::connect(
248    ///     "host=localhost user=postgres password=secret dbname=mydb",
249    ///     TlsMode::Disabled,
250    /// ).await?;
251    ///
252    /// let schema = Arc::new(Schema::new(vec![
253    ///     Field::new("id",   DataType::Int64, true),
254    ///     Field::new("name", DataType::Utf8,  true),
255    /// ]));
256    ///
257    /// let provider = OxiSqlStreamProvider::from_postgres(conn, "users", schema);
258    /// # Ok(())
259    /// # }
260    /// ```
261    #[cfg(feature = "postgres")]
262    pub fn from_postgres(
263        conn: oxisql_postgres::PgConnection,
264        table_name: impl Into<String>,
265        schema: SchemaRef,
266    ) -> Self {
267        Self::new(Arc::new(conn) as Arc<dyn Connection>, table_name, schema)
268    }
269
270    /// Construct from a live [`oxisql_sqlite_compat::SqliteConnection`] for
271    /// streaming SQLite query results through DataFusion.
272    ///
273    /// The connection is wrapped in an `Arc` and used as the underlying
274    /// [`Connection`] trait object.  All filter, projection, limit, and sort
275    /// pushdown features of [`OxiSqlStreamProvider`] are available on the
276    /// resulting provider.
277    ///
278    /// Both in-memory databases (created with
279    /// [`SqliteConnection::open_memory`]) and file-backed databases are
280    /// supported.
281    ///
282    /// # Feature flag
283    ///
284    /// Only available when the `sqlite` Cargo feature of `oxisql-datafusion`
285    /// is enabled.
286    ///
287    /// # Example
288    ///
289    /// ```rust,no_run
290    /// # #[tokio::main]
291    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
292    /// use std::sync::Arc;
293    /// use arrow::datatypes::{DataType, Field, Schema};
294    /// use oxisql_sqlite_compat::SqliteConnection;
295    /// use oxisql_datafusion::stream::OxiSqlStreamProvider;
296    ///
297    /// let conn = SqliteConnection::open_memory().await?;
298    ///
299    /// let schema = Arc::new(Schema::new(vec![
300    ///     Field::new("id",   DataType::Int64, true),
301    ///     Field::new("name", DataType::Utf8,  true),
302    /// ]));
303    ///
304    /// let provider = OxiSqlStreamProvider::from_sqlite(conn, "users", schema);
305    /// # Ok(())
306    /// # }
307    /// ```
308    ///
309    /// [`SqliteConnection::open_memory`]: oxisql_sqlite_compat::SqliteConnection::open_memory
310    #[cfg(feature = "sqlite")]
311    pub fn from_sqlite(
312        conn: oxisql_sqlite_compat::SqliteConnection,
313        table_name: impl Into<String>,
314        schema: SchemaRef,
315    ) -> Self {
316        Self::new(Arc::new(conn) as Arc<dyn Connection>, table_name, schema)
317    }
318
319    /// Return the SQL SELECT column list for the given projection.
320    fn project_clause(&self, projection: Option<&Vec<usize>>) -> String {
321        match projection {
322            None => "*".to_string(),
323            Some(indices) => indices
324                .iter()
325                .map(|&i| self.schema.field(i).name().as_str())
326                .collect::<Vec<_>>()
327                .join(", "),
328        }
329    }
330
331    /// Return the projected Arrow schema for the given column indices.
332    fn projected_schema(&self, projection: Option<&Vec<usize>>) -> SchemaRef {
333        match projection {
334            None => Arc::clone(&self.schema),
335            Some(indices) => {
336                let fields: Vec<_> = indices
337                    .iter()
338                    .map(|&i| self.schema.field(i).clone())
339                    .collect();
340                Arc::new(arrow::datatypes::Schema::new(fields))
341            }
342        }
343    }
344}
345
346impl fmt::Debug for OxiSqlStreamProvider {
347    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
348        f.debug_struct("OxiSqlStreamProvider")
349            .field("table_name", &self.table_name)
350            .field("schema", &self.schema)
351            .finish()
352    }
353}
354
355#[async_trait]
356impl TableProvider for OxiSqlStreamProvider {
357    fn schema(&self) -> SchemaRef {
358        Arc::clone(&self.schema)
359    }
360
361    fn table_type(&self) -> TableType {
362        TableType::Base
363    }
364
365    async fn scan(
366        &self,
367        _state: &dyn Session,
368        projection: Option<&Vec<usize>>,
369        filters: &[Expr],
370        limit: Option<usize>,
371    ) -> DFResult<Arc<dyn ExecutionPlan>> {
372        let col_clause = self.project_clause(projection);
373        let output_schema = self.projected_schema(projection);
374
375        // Build the base SQL (LIMIT/OFFSET appended per-partition below).
376        let mut base_sql = format!("SELECT {} FROM {}", col_clause, self.table_name);
377
378        // Build WHERE clause from pushed-down filters.
379        let where_parts: Vec<String> = filters.iter().filter_map(expr_to_sql).collect();
380        if !where_parts.is_empty() {
381            base_sql.push_str(" WHERE ");
382            base_sql.push_str(&where_parts.join(" AND "));
383        }
384
385        // Append ORDER BY clause if sort pushdown has been configured.
386        if let Some(ref order) = self.sort_order {
387            if !order.is_empty() {
388                base_sql.push_str(" ORDER BY ");
389                let order_clause = order
390                    .iter()
391                    .map(|(col, dir)| format!("{col} {dir}"))
392                    .collect::<Vec<_>>()
393                    .join(", ");
394                base_sql.push_str(&order_clause);
395            }
396        }
397
398        // When DataFusion pushes a LIMIT down, use a single partition regardless
399        // of auto_partition_config — a LIMIT scan is already cheap and splitting
400        // it would interfere with the fetch count semantics.
401        if let Some(n) = limit {
402            let mut sql = base_sql;
403            sql.push_str(&format!(" LIMIT {n}"));
404            let exec = OxiSqlExecPlan::new(Arc::clone(&self.conn), sql, Arc::clone(&output_schema));
405            return Ok(Arc::new(exec) as Arc<dyn ExecutionPlan>);
406        }
407
408        // Auto-partition: split the scan into N page-based partition slots using
409        // LIMIT/OFFSET pagination so DataFusion can execute them in parallel.
410        if let Some((n_parallel, target_batch_size)) = self.auto_partition_config {
411            if n_parallel > 1 && target_batch_size > 0 {
412                let sqls: Vec<String> = (0..n_parallel)
413                    .map(|i| {
414                        format!(
415                            "{} LIMIT {} OFFSET {}",
416                            base_sql,
417                            target_batch_size,
418                            i * target_batch_size
419                        )
420                    })
421                    .collect();
422                let exec = OxiSqlMultiPartExecPlan::new(
423                    Arc::clone(&self.conn),
424                    sqls,
425                    Arc::clone(&output_schema),
426                );
427                return Ok(Arc::new(exec) as Arc<dyn ExecutionPlan>);
428            }
429        }
430
431        // Default single-partition path.
432        let exec =
433            OxiSqlExecPlan::new(Arc::clone(&self.conn), base_sql, Arc::clone(&output_schema));
434        Ok(Arc::new(exec) as Arc<dyn ExecutionPlan>)
435    }
436
437    fn supports_filters_pushdown(
438        &self,
439        filters: &[&Expr],
440    ) -> DFResult<Vec<TableProviderFilterPushDown>> {
441        Ok(filters
442            .iter()
443            .map(|f| {
444                if can_push_filter(f) {
445                    TableProviderFilterPushDown::Exact
446                } else {
447                    TableProviderFilterPushDown::Unsupported
448                }
449            })
450            .collect())
451    }
452}
453
454// ── Expression translation ────────────────────────────────────────────────────
455
456/// Check whether `expr` can be fully translated to SQL and pushed to the backend.
457///
458/// Returns `true` only for expressions that [`expr_to_sql`] can translate
459/// completely.  The two functions must stay in sync: every pattern that returns
460/// `false` here must also return `None` from [`expr_to_sql`].
461pub fn can_push_filter(expr: &Expr) -> bool {
462    match expr {
463        Expr::BinaryExpr(BinaryExpr { left, op, right }) => {
464            matches!(
465                op,
466                Operator::Eq
467                    | Operator::NotEq
468                    | Operator::Lt
469                    | Operator::LtEq
470                    | Operator::Gt
471                    | Operator::GtEq
472                    | Operator::And
473                    | Operator::Or
474            ) && can_push_atom(left)
475                && can_push_atom(right)
476        }
477        Expr::IsNull(inner) | Expr::IsNotNull(inner) => can_push_atom(inner),
478        Expr::Not(inner) => can_push_filter(inner),
479        // `x IN (a, b, c)` / `x NOT IN (...)`
480        Expr::InList(inlist) => {
481            can_push_atom(&inlist.expr) && inlist.list.iter().all(can_push_atom)
482        }
483        // `x BETWEEN low AND high` / `x NOT BETWEEN …`
484        Expr::Between(Between {
485            expr, low, high, ..
486        }) => can_push_atom(expr) && can_push_atom(low) && can_push_atom(high),
487        // `x LIKE 'pat'` / `x ILIKE 'pat'` / negated variants
488        Expr::Like(Like { expr, pattern, .. }) => can_push_atom(expr) && can_push_atom(pattern),
489        _ => false,
490    }
491}
492
493/// Check whether a leaf expression (column or literal) can appear inside a
494/// pushed-down filter.
495fn can_push_atom(expr: &Expr) -> bool {
496    match expr {
497        Expr::Column(_) => true,
498        Expr::Literal(_, _) => true,
499        other => can_push_filter(other),
500    }
501}
502
503/// Translate a DataFusion expression to a SQL WHERE-clause fragment.
504///
505/// Returns `None` for unsupported expressions.
506/// [`can_push_filter`] can be used to check translatability without executing
507/// the translation.
508pub fn expr_to_sql(expr: &Expr) -> Option<String> {
509    match expr {
510        Expr::BinaryExpr(BinaryExpr { left, op, right }) => {
511            let l = atom_to_sql(left)?;
512            let r = atom_to_sql(right)?;
513            let op_str = match op {
514                Operator::Eq => "=",
515                Operator::NotEq => "<>",
516                Operator::Lt => "<",
517                Operator::LtEq => "<=",
518                Operator::Gt => ">",
519                Operator::GtEq => ">=",
520                Operator::And => "AND",
521                Operator::Or => "OR",
522                _ => return None,
523            };
524            Some(format!("({l} {op_str} {r})"))
525        }
526        Expr::IsNull(inner) => Some(format!("({} IS NULL)", atom_to_sql(inner)?)),
527        Expr::IsNotNull(inner) => Some(format!("({} IS NOT NULL)", atom_to_sql(inner)?)),
528        Expr::Not(inner) => Some(format!("(NOT {})", expr_to_sql(inner)?)),
529        // `x IN (a, b, c)` / `x NOT IN (...)`
530        Expr::InList(inlist) => {
531            let e = atom_to_sql(&inlist.expr)?;
532            let items: Option<Vec<String>> = inlist.list.iter().map(atom_to_sql).collect();
533            let items = items?;
534            let not_kw = if inlist.negated { "NOT " } else { "" };
535            Some(format!("({e} {not_kw}IN ({}))", items.join(", ")))
536        }
537        // `x BETWEEN low AND high` / `x NOT BETWEEN …`
538        Expr::Between(Between {
539            expr,
540            low,
541            high,
542            negated,
543        }) => {
544            let e = atom_to_sql(expr)?;
545            let lo = atom_to_sql(low)?;
546            let hi = atom_to_sql(high)?;
547            let not_kw = if *negated { "NOT " } else { "" };
548            Some(format!("({e} {not_kw}BETWEEN {lo} AND {hi})"))
549        }
550        // `x LIKE 'pat'` / `x ILIKE 'pat'` and their negations
551        Expr::Like(Like {
552            expr,
553            pattern,
554            negated,
555            case_insensitive,
556            escape_char,
557        }) => {
558            let e = atom_to_sql(expr)?;
559            let p = atom_to_sql(pattern)?;
560            let not_kw = if *negated { "NOT " } else { "" };
561            let like_kw = if *case_insensitive { "ILIKE" } else { "LIKE" };
562            let escape_clause = match escape_char {
563                Some(c) => format!(" ESCAPE '{c}'"),
564                None => String::new(),
565            };
566            Some(format!("({e} {not_kw}{like_kw} {p}{escape_clause})"))
567        }
568        _ => None,
569    }
570}
571
572/// Translate a leaf expression (column or literal) or a nested filter
573/// expression to SQL.
574fn atom_to_sql(expr: &Expr) -> Option<String> {
575    match expr {
576        Expr::Column(col) => Some(col.name.clone()),
577        Expr::Literal(scalar, _metadata) => scalar_to_sql(scalar),
578        other => expr_to_sql(other),
579    }
580}
581
582/// Render a [`ScalarValue`] as a SQL literal fragment.
583///
584/// Returns `None` for types that have no safe, round-trippable SQL literal
585/// representation (e.g. `Decimal128`, `Date64`, `Time32`, complex types).
586fn scalar_to_sql(scalar: &ScalarValue) -> Option<String> {
587    match scalar {
588        ScalarValue::Int8(Some(v)) => Some(v.to_string()),
589        ScalarValue::Int16(Some(v)) => Some(v.to_string()),
590        ScalarValue::Int32(Some(v)) => Some(v.to_string()),
591        ScalarValue::Int64(Some(v)) => Some(v.to_string()),
592        ScalarValue::Float32(Some(v)) => Some(v.to_string()),
593        ScalarValue::Float64(Some(v)) => Some(v.to_string()),
594        ScalarValue::Boolean(Some(v)) => Some(if *v { "TRUE" } else { "FALSE" }.to_string()),
595        ScalarValue::Utf8(Some(s)) | ScalarValue::LargeUtf8(Some(s)) => {
596            // SQL-escape single quotes to prevent injection (connection is trusted).
597            Some(format!("'{}'", s.replace('\'', "''")))
598        }
599        ScalarValue::Null => Some("NULL".to_string()),
600        // Typed NULLs (e.g. Int8(None)) are also SQL NULL.
601        ScalarValue::Int8(None)
602        | ScalarValue::Int16(None)
603        | ScalarValue::Int32(None)
604        | ScalarValue::Int64(None)
605        | ScalarValue::Float32(None)
606        | ScalarValue::Float64(None)
607        | ScalarValue::Boolean(None)
608        | ScalarValue::Utf8(None)
609        | ScalarValue::LargeUtf8(None) => Some("NULL".to_string()),
610        _ => None,
611    }
612}
613
614// ── OxiSqlExecPlan ────────────────────────────────────────────────────────────
615
616/// A DataFusion [`ExecutionPlan`] that executes a SQL query lazily against an
617/// OxiSQL connection at physical-plan execution time.
618///
619/// Unlike `MemorySourceConfig`, this defers the SQL query until DataFusion
620/// calls `execute()`, keeping `scan()` cheap and allocation-free at plan time.
621struct OxiSqlExecPlan {
622    schema: SchemaRef,
623    sql: String,
624    conn: Arc<dyn Connection>,
625    cache: Arc<PlanProperties>,
626}
627
628impl fmt::Debug for OxiSqlExecPlan {
629    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
630        f.debug_struct("OxiSqlExecPlan")
631            .field("sql", &self.sql)
632            .field("schema", &self.schema)
633            .finish()
634    }
635}
636
637impl OxiSqlExecPlan {
638    fn new(conn: Arc<dyn Connection>, sql: String, schema: SchemaRef) -> Self {
639        let eq = EquivalenceProperties::new(Arc::clone(&schema));
640        let properties = PlanProperties::new(
641            eq,
642            Partitioning::UnknownPartitioning(1),
643            EmissionType::Incremental,
644            Boundedness::Bounded,
645        );
646        Self {
647            schema,
648            sql,
649            conn,
650            cache: Arc::new(properties),
651        }
652    }
653}
654
655impl DisplayAs for OxiSqlExecPlan {
656    fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
657        write!(f, "OxiSqlExecPlan sql={:?}", self.sql)
658    }
659}
660
661impl ExecutionPlan for OxiSqlExecPlan {
662    fn name(&self) -> &'static str {
663        "OxiSqlExecPlan"
664    }
665
666    fn properties(&self) -> &Arc<PlanProperties> {
667        &self.cache
668    }
669
670    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
671        vec![]
672    }
673
674    fn with_new_children(
675        self: Arc<Self>,
676        children: Vec<Arc<dyn ExecutionPlan>>,
677    ) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
678        if children.is_empty() {
679            Ok(self)
680        } else {
681            Err(datafusion::error::DataFusionError::Internal(
682                "OxiSqlExecPlan has no children".into(),
683            ))
684        }
685    }
686
687    fn execute(
688        &self,
689        _partition: usize,
690        _context: Arc<TaskContext>,
691    ) -> datafusion::error::Result<SendableRecordBatchStream> {
692        let sql = self.sql.clone();
693        let conn = Arc::clone(&self.conn);
694        let schema = Arc::clone(&self.schema);
695
696        let stream = futures::stream::once(async move {
697            let rows = conn
698                .query(&sql, &[])
699                .await
700                .map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))?;
701            crate::types::rows_to_record_batch(rows, schema)
702                .map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))
703        });
704
705        Ok(Box::pin(RecordBatchStreamAdapter::new(
706            Arc::clone(&self.schema),
707            stream,
708        )))
709    }
710}
711
712// ── OxiSqlMultiPartExecPlan ───────────────────────────────────────────────────
713
714/// A DataFusion [`ExecutionPlan`] that exposes multiple parallel partitions,
715/// each executing a distinct pre-built SQL query (typically with `LIMIT`/`OFFSET`
716/// pagination) against the same OxiSQL connection.
717///
718/// Used by [`OxiSqlStreamProvider::with_auto_partition`] to split a full-table
719/// scan into `N` independently-executable page queries so DataFusion's thread
720/// pool can run them concurrently.
721struct OxiSqlMultiPartExecPlan {
722    schema: SchemaRef,
723    /// One SQL string per partition.
724    sqls: Vec<String>,
725    conn: Arc<dyn Connection>,
726    cache: Arc<PlanProperties>,
727}
728
729impl fmt::Debug for OxiSqlMultiPartExecPlan {
730    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
731        f.debug_struct("OxiSqlMultiPartExecPlan")
732            .field("partitions", &self.sqls.len())
733            .field("schema", &self.schema)
734            .finish()
735    }
736}
737
738impl OxiSqlMultiPartExecPlan {
739    fn new(conn: Arc<dyn Connection>, sqls: Vec<String>, schema: SchemaRef) -> Self {
740        let n = sqls.len().max(1);
741        let eq = EquivalenceProperties::new(Arc::clone(&schema));
742        let properties = PlanProperties::new(
743            eq,
744            Partitioning::UnknownPartitioning(n),
745            EmissionType::Incremental,
746            Boundedness::Bounded,
747        );
748        Self {
749            schema,
750            sqls,
751            conn,
752            cache: Arc::new(properties),
753        }
754    }
755}
756
757impl DisplayAs for OxiSqlMultiPartExecPlan {
758    fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
759        write!(f, "OxiSqlMultiPartExecPlan partitions={}", self.sqls.len())
760    }
761}
762
763impl ExecutionPlan for OxiSqlMultiPartExecPlan {
764    fn name(&self) -> &'static str {
765        "OxiSqlMultiPartExecPlan"
766    }
767
768    fn properties(&self) -> &Arc<PlanProperties> {
769        &self.cache
770    }
771
772    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
773        vec![]
774    }
775
776    fn with_new_children(
777        self: Arc<Self>,
778        children: Vec<Arc<dyn ExecutionPlan>>,
779    ) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
780        if children.is_empty() {
781            Ok(self)
782        } else {
783            Err(datafusion::error::DataFusionError::Internal(
784                "OxiSqlMultiPartExecPlan has no children".into(),
785            ))
786        }
787    }
788
789    fn execute(
790        &self,
791        partition: usize,
792        _context: Arc<TaskContext>,
793    ) -> datafusion::error::Result<SendableRecordBatchStream> {
794        let sql = self.sqls.get(partition).cloned().ok_or_else(|| {
795            datafusion::error::DataFusionError::Internal(format!(
796                "OxiSqlMultiPartExecPlan: partition index {partition} out of range ({})",
797                self.sqls.len()
798            ))
799        })?;
800        let conn = Arc::clone(&self.conn);
801        let schema = Arc::clone(&self.schema);
802
803        let stream = futures::stream::once(async move {
804            let rows = conn
805                .query(&sql, &[])
806                .await
807                .map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))?;
808            crate::types::rows_to_record_batch(rows, schema)
809                .map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))
810        });
811
812        Ok(Box::pin(RecordBatchStreamAdapter::new(
813            Arc::clone(&self.schema),
814            stream,
815        )))
816    }
817}