Skip to main content

oxisql_datafusion/
stream.rs

1//! Live-streaming DataFusion table provider backed by a real OxiSQL connection.
2//!
3//! Unlike [`OxiSqlTableProvider`], which materialises all rows at construction
4//! time, [`OxiSqlStreamProvider`] queries the database at plan-execution time,
5//! translating DataFusion filter and projection hints into SQL WHERE / SELECT
6//! clauses so that the backend does the heavy lifting.
7//!
8//! [`OxiSqlTableProvider`]: crate::OxiSqlTableProvider
9
10use std::any::Any;
11use std::fmt;
12use std::sync::Arc;
13
14use arrow::datatypes::SchemaRef;
15use async_trait::async_trait;
16use datafusion::catalog::Session;
17use datafusion::datasource::{TableProvider, TableType};
18use datafusion::error::Result as DFResult;
19use datafusion::execution::TaskContext;
20use datafusion::logical_expr::{
21    Between, BinaryExpr, Expr, Like, Operator, TableProviderFilterPushDown,
22};
23use datafusion::physical_expr::EquivalenceProperties;
24use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
25use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
26use datafusion::physical_plan::{
27    DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
28    SendableRecordBatchStream,
29};
30use datafusion::scalar::ScalarValue;
31use oxisql_core::Connection;
32
33// ── Sort order ────────────────────────────────────────────────────────────────
34
35/// Sort direction for an `ORDER BY` column pushed down to the SQL backend.
36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
37pub enum SortOrder {
38    /// Sort ascending (default SQL ordering).
39    Asc,
40    /// Sort descending.
41    Desc,
42}
43
44impl fmt::Display for SortOrder {
45    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
46        match self {
47            SortOrder::Asc => f.write_str("ASC"),
48            SortOrder::Desc => f.write_str("DESC"),
49        }
50    }
51}
52
53/// A DataFusion [`TableProvider`] that executes a live SQL query at scan time,
54/// pushing filters and projections down to the OxiSQL backend.
55///
56/// # Filter pushdown
57///
58/// Simple comparison predicates (`=`, `<>`, `<`, `<=`, `>`, `>=`, `AND`, `OR`,
59/// `IS NULL`, `IS NOT NULL`, `NOT`) are converted to SQL WHERE clauses.
60/// Complex expressions (functions, subqueries, arithmetic) are left to
61/// DataFusion's post-scan filtering.
62///
63/// # Projection pushdown
64///
65/// When DataFusion requests specific columns, the provider generates
66/// `SELECT col1, col2, ...` instead of `SELECT *`.
67///
68/// # Limit pushdown
69///
70/// When DataFusion specifies a limit, the provider appends `LIMIT N` to the
71/// query, reducing the number of rows transferred.
72///
73/// # Sort pushdown
74///
75/// An optional `ORDER BY` clause can be injected into the generated SQL via
76/// [`Self::with_sort`].  Each entry is a `(column_name, SortOrder)` pair.
77///
78/// # Automatic partitioning
79///
80/// [`Self::with_auto_partition`] splits the scan into multiple parallel
81/// partitions using `LIMIT` / `OFFSET` pagination, each fetching at most
82/// `target_batch_size` rows.  At most `n_parallel` partitions are created.
83pub struct OxiSqlStreamProvider {
84    schema: SchemaRef,
85    table_name: String,
86    conn: Arc<dyn Connection>,
87    /// Optional sort order to push to the backend as an `ORDER BY` clause.
88    sort_order: Option<Vec<(String, SortOrder)>>,
89    /// Optional auto-partition configuration `(n_parallel, target_batch_size)`.
90    ///
91    /// When set, `scan()` spawns up to `n_parallel` partition slots, each
92    /// issuing `LIMIT target_batch_size OFFSET i * target_batch_size` to the
93    /// backend for parallel execution.
94    auto_partition_config: Option<(usize, usize)>,
95}
96
97impl OxiSqlStreamProvider {
98    /// Construct from a connection, table name, and Arrow schema.
99    ///
100    /// The `schema` must match the column layout returned by the backend for
101    /// `SELECT * FROM table_name`.  Column names in the schema are used
102    /// verbatim as SQL column references.
103    pub fn new(
104        conn: Arc<dyn Connection>,
105        table_name: impl Into<String>,
106        schema: SchemaRef,
107    ) -> Self {
108        Self {
109            schema,
110            table_name: table_name.into(),
111            conn,
112            sort_order: None,
113            auto_partition_config: None,
114        }
115    }
116
117    /// Attach an `ORDER BY` clause to the SQL generated at scan time.
118    ///
119    /// Each entry is a `(column_name, SortOrder)` pair.  The resulting SQL
120    /// fragment is appended between the WHERE clause and any LIMIT clause.
121    ///
122    /// This is a "sort-into-SQL" approach: the backend database performs the
123    /// ordering, reducing the work DataFusion must do on the result set.
124    ///
125    /// # Example
126    ///
127    /// ```rust,ignore
128    /// use oxisql_datafusion::stream::{OxiSqlStreamProvider, SortOrder};
129    ///
130    /// let provider = OxiSqlStreamProvider::new(conn, "users", schema)
131    ///     .with_sort(vec![
132    ///         ("score".into(), SortOrder::Desc),
133    ///         ("id".into(), SortOrder::Asc),
134    ///     ]);
135    /// ```
136    #[must_use]
137    pub fn with_sort(mut self, order: Vec<(String, SortOrder)>) -> Self {
138        self.sort_order = Some(order);
139        self
140    }
141
142    /// Return the configured sort order, if any.
143    ///
144    /// Returns `None` if no sort has been attached via [`Self::with_sort`].
145    pub fn sort_order(&self) -> Option<&[(String, SortOrder)]> {
146        self.sort_order.as_deref()
147    }
148
149    /// Configure automatic partitioning for parallel scan execution.
150    ///
151    /// When set, `scan()` creates up to `n_parallel` partition slots, each
152    /// issuing `LIMIT target_batch_size OFFSET i * target_batch_size` to the
153    /// backend.  This allows DataFusion to execute multiple partitions in
154    /// parallel without knowing the total row count ahead of time.
155    ///
156    /// Partitions beyond the actual data silently return empty batches, so
157    /// setting `n_parallel` generously is safe.
158    ///
159    /// - `n_parallel`: maximum number of parallel partitions (e.g. CPU thread count).
160    /// - `target_batch_size`: rows fetched per partition page (e.g. `8192`).
161    ///
162    /// # Example
163    ///
164    /// ```rust,ignore
165    /// let provider = OxiSqlStreamProvider::new(conn, "users", schema)
166    ///     .with_auto_partition(4, 8192);
167    /// ```
168    #[must_use]
169    pub fn with_auto_partition(mut self, n_parallel: usize, target_batch_size: usize) -> Self {
170        self.auto_partition_config = Some((n_parallel, target_batch_size));
171        self
172    }
173
174    /// Return the auto-partition configuration `(n_parallel, target_batch_size)`, if any.
175    pub fn auto_partition_config(&self) -> Option<(usize, usize)> {
176        self.auto_partition_config
177    }
178
179    /// Construct from a live [`oxisql_mysql::MyConnection`] for streaming MySQL
180    /// query results through DataFusion.
181    ///
182    /// The connection is wrapped in an `Arc` and used as the underlying
183    /// [`Connection`] trait object.  All filter, projection, limit, and sort
184    /// pushdown features of [`OxiSqlStreamProvider`] are available on the
185    /// resulting provider.
186    ///
187    /// # Feature flag
188    ///
189    /// Only available when the `mysql` Cargo feature of `oxisql-datafusion` is
190    /// enabled.
191    ///
192    /// # Example
193    ///
194    /// ```rust,no_run
195    /// # #[tokio::main]
196    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
197    /// use std::sync::Arc;
198    /// use arrow::datatypes::{DataType, Field, Schema};
199    /// use oxisql_mysql::{MyConnection, TlsMode};
200    /// use oxisql_datafusion::stream::OxiSqlStreamProvider;
201    ///
202    /// let conn = MyConnection::connect(
203    ///     "mysql://root:secret@localhost:3306/mydb",
204    ///     TlsMode::Disabled,
205    /// ).await?;
206    ///
207    /// let schema = Arc::new(Schema::new(vec![
208    ///     Field::new("id",   DataType::Int64, true),
209    ///     Field::new("name", DataType::Utf8,  true),
210    /// ]));
211    ///
212    /// let provider = OxiSqlStreamProvider::from_mysql(conn, "users", schema);
213    /// # Ok(())
214    /// # }
215    /// ```
216    #[cfg(feature = "mysql")]
217    pub fn from_mysql(
218        conn: oxisql_mysql::MyConnection,
219        table_name: impl Into<String>,
220        schema: SchemaRef,
221    ) -> Self {
222        Self::new(Arc::new(conn) as Arc<dyn Connection>, table_name, schema)
223    }
224
225    /// Construct from a live [`oxisql_postgres::PgConnection`] for streaming
226    /// Postgres query results through DataFusion.
227    ///
228    /// The connection is wrapped in an `Arc` and used as the underlying
229    /// [`Connection`] trait object.  All filter, projection, limit, and sort
230    /// pushdown features of [`OxiSqlStreamProvider`] are available on the
231    /// resulting provider.
232    ///
233    /// # Feature flag
234    ///
235    /// Only available when the `postgres` Cargo feature of `oxisql-datafusion`
236    /// is enabled.
237    ///
238    /// # Example
239    ///
240    /// ```rust,no_run
241    /// # #[tokio::main]
242    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
243    /// use std::sync::Arc;
244    /// use arrow::datatypes::{DataType, Field, Schema};
245    /// use oxisql_postgres::{PgConnection, TlsMode};
246    /// use oxisql_datafusion::stream::OxiSqlStreamProvider;
247    ///
248    /// let conn = PgConnection::connect(
249    ///     "host=localhost user=postgres password=secret dbname=mydb",
250    ///     TlsMode::Disabled,
251    /// ).await?;
252    ///
253    /// let schema = Arc::new(Schema::new(vec![
254    ///     Field::new("id",   DataType::Int64, true),
255    ///     Field::new("name", DataType::Utf8,  true),
256    /// ]));
257    ///
258    /// let provider = OxiSqlStreamProvider::from_postgres(conn, "users", schema);
259    /// # Ok(())
260    /// # }
261    /// ```
262    #[cfg(feature = "postgres")]
263    pub fn from_postgres(
264        conn: oxisql_postgres::PgConnection,
265        table_name: impl Into<String>,
266        schema: SchemaRef,
267    ) -> Self {
268        Self::new(Arc::new(conn) as Arc<dyn Connection>, table_name, schema)
269    }
270
271    /// Construct from a live [`oxisql_sqlite_compat::SqliteConnection`] for
272    /// streaming SQLite query results through DataFusion.
273    ///
274    /// The connection is wrapped in an `Arc` and used as the underlying
275    /// [`Connection`] trait object.  All filter, projection, limit, and sort
276    /// pushdown features of [`OxiSqlStreamProvider`] are available on the
277    /// resulting provider.
278    ///
279    /// Both in-memory databases (created with
280    /// [`SqliteConnection::open_memory`]) and file-backed databases are
281    /// supported.
282    ///
283    /// # Feature flag
284    ///
285    /// Only available when the `sqlite` Cargo feature of `oxisql-datafusion`
286    /// is enabled.
287    ///
288    /// # Example
289    ///
290    /// ```rust,no_run
291    /// # #[tokio::main]
292    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
293    /// use std::sync::Arc;
294    /// use arrow::datatypes::{DataType, Field, Schema};
295    /// use oxisql_sqlite_compat::SqliteConnection;
296    /// use oxisql_datafusion::stream::OxiSqlStreamProvider;
297    ///
298    /// let conn = SqliteConnection::open_memory().await?;
299    ///
300    /// let schema = Arc::new(Schema::new(vec![
301    ///     Field::new("id",   DataType::Int64, true),
302    ///     Field::new("name", DataType::Utf8,  true),
303    /// ]));
304    ///
305    /// let provider = OxiSqlStreamProvider::from_sqlite(conn, "users", schema);
306    /// # Ok(())
307    /// # }
308    /// ```
309    ///
310    /// [`SqliteConnection::open_memory`]: oxisql_sqlite_compat::SqliteConnection::open_memory
311    #[cfg(feature = "sqlite")]
312    pub fn from_sqlite(
313        conn: oxisql_sqlite_compat::SqliteConnection,
314        table_name: impl Into<String>,
315        schema: SchemaRef,
316    ) -> Self {
317        Self::new(Arc::new(conn) as Arc<dyn Connection>, table_name, schema)
318    }
319
320    /// Return the SQL SELECT column list for the given projection.
321    fn project_clause(&self, projection: Option<&Vec<usize>>) -> String {
322        match projection {
323            None => "*".to_string(),
324            Some(indices) => indices
325                .iter()
326                .map(|&i| self.schema.field(i).name().as_str())
327                .collect::<Vec<_>>()
328                .join(", "),
329        }
330    }
331
332    /// Return the projected Arrow schema for the given column indices.
333    fn projected_schema(&self, projection: Option<&Vec<usize>>) -> SchemaRef {
334        match projection {
335            None => Arc::clone(&self.schema),
336            Some(indices) => {
337                let fields: Vec<_> = indices
338                    .iter()
339                    .map(|&i| self.schema.field(i).clone())
340                    .collect();
341                Arc::new(arrow::datatypes::Schema::new(fields))
342            }
343        }
344    }
345}
346
347impl fmt::Debug for OxiSqlStreamProvider {
348    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
349        f.debug_struct("OxiSqlStreamProvider")
350            .field("table_name", &self.table_name)
351            .field("schema", &self.schema)
352            .finish()
353    }
354}
355
356#[async_trait]
357impl TableProvider for OxiSqlStreamProvider {
358    fn as_any(&self) -> &dyn Any {
359        self
360    }
361
362    fn schema(&self) -> SchemaRef {
363        Arc::clone(&self.schema)
364    }
365
366    fn table_type(&self) -> TableType {
367        TableType::Base
368    }
369
370    async fn scan(
371        &self,
372        _state: &dyn Session,
373        projection: Option<&Vec<usize>>,
374        filters: &[Expr],
375        limit: Option<usize>,
376    ) -> DFResult<Arc<dyn ExecutionPlan>> {
377        let col_clause = self.project_clause(projection);
378        let output_schema = self.projected_schema(projection);
379
380        // Build the base SQL (LIMIT/OFFSET appended per-partition below).
381        let mut base_sql = format!("SELECT {} FROM {}", col_clause, self.table_name);
382
383        // Build WHERE clause from pushed-down filters.
384        let where_parts: Vec<String> = filters.iter().filter_map(expr_to_sql).collect();
385        if !where_parts.is_empty() {
386            base_sql.push_str(" WHERE ");
387            base_sql.push_str(&where_parts.join(" AND "));
388        }
389
390        // Append ORDER BY clause if sort pushdown has been configured.
391        if let Some(ref order) = self.sort_order {
392            if !order.is_empty() {
393                base_sql.push_str(" ORDER BY ");
394                let order_clause = order
395                    .iter()
396                    .map(|(col, dir)| format!("{col} {dir}"))
397                    .collect::<Vec<_>>()
398                    .join(", ");
399                base_sql.push_str(&order_clause);
400            }
401        }
402
403        // When DataFusion pushes a LIMIT down, use a single partition regardless
404        // of auto_partition_config — a LIMIT scan is already cheap and splitting
405        // it would interfere with the fetch count semantics.
406        if let Some(n) = limit {
407            let mut sql = base_sql;
408            sql.push_str(&format!(" LIMIT {n}"));
409            let exec = OxiSqlExecPlan::new(Arc::clone(&self.conn), sql, Arc::clone(&output_schema));
410            return Ok(Arc::new(exec) as Arc<dyn ExecutionPlan>);
411        }
412
413        // Auto-partition: split the scan into N page-based partition slots using
414        // LIMIT/OFFSET pagination so DataFusion can execute them in parallel.
415        if let Some((n_parallel, target_batch_size)) = self.auto_partition_config {
416            if n_parallel > 1 && target_batch_size > 0 {
417                let sqls: Vec<String> = (0..n_parallel)
418                    .map(|i| {
419                        format!(
420                            "{} LIMIT {} OFFSET {}",
421                            base_sql,
422                            target_batch_size,
423                            i * target_batch_size
424                        )
425                    })
426                    .collect();
427                let exec = OxiSqlMultiPartExecPlan::new(
428                    Arc::clone(&self.conn),
429                    sqls,
430                    Arc::clone(&output_schema),
431                );
432                return Ok(Arc::new(exec) as Arc<dyn ExecutionPlan>);
433            }
434        }
435
436        // Default single-partition path.
437        let exec =
438            OxiSqlExecPlan::new(Arc::clone(&self.conn), base_sql, Arc::clone(&output_schema));
439        Ok(Arc::new(exec) as Arc<dyn ExecutionPlan>)
440    }
441
442    fn supports_filters_pushdown(
443        &self,
444        filters: &[&Expr],
445    ) -> DFResult<Vec<TableProviderFilterPushDown>> {
446        Ok(filters
447            .iter()
448            .map(|f| {
449                if can_push_filter(f) {
450                    TableProviderFilterPushDown::Exact
451                } else {
452                    TableProviderFilterPushDown::Unsupported
453                }
454            })
455            .collect())
456    }
457}
458
459// ── Expression translation ────────────────────────────────────────────────────
460
461/// Check whether `expr` can be fully translated to SQL and pushed to the backend.
462///
463/// Returns `true` only for expressions that [`expr_to_sql`] can translate
464/// completely.  The two functions must stay in sync: every pattern that returns
465/// `false` here must also return `None` from [`expr_to_sql`].
466pub fn can_push_filter(expr: &Expr) -> bool {
467    match expr {
468        Expr::BinaryExpr(BinaryExpr { left, op, right }) => {
469            matches!(
470                op,
471                Operator::Eq
472                    | Operator::NotEq
473                    | Operator::Lt
474                    | Operator::LtEq
475                    | Operator::Gt
476                    | Operator::GtEq
477                    | Operator::And
478                    | Operator::Or
479            ) && can_push_atom(left)
480                && can_push_atom(right)
481        }
482        Expr::IsNull(inner) | Expr::IsNotNull(inner) => can_push_atom(inner),
483        Expr::Not(inner) => can_push_filter(inner),
484        // `x IN (a, b, c)` / `x NOT IN (...)`
485        Expr::InList(inlist) => {
486            can_push_atom(&inlist.expr) && inlist.list.iter().all(can_push_atom)
487        }
488        // `x BETWEEN low AND high` / `x NOT BETWEEN …`
489        Expr::Between(Between {
490            expr, low, high, ..
491        }) => can_push_atom(expr) && can_push_atom(low) && can_push_atom(high),
492        // `x LIKE 'pat'` / `x ILIKE 'pat'` / negated variants
493        Expr::Like(Like { expr, pattern, .. }) => can_push_atom(expr) && can_push_atom(pattern),
494        _ => false,
495    }
496}
497
498/// Check whether a leaf expression (column or literal) can appear inside a
499/// pushed-down filter.
500fn can_push_atom(expr: &Expr) -> bool {
501    match expr {
502        Expr::Column(_) => true,
503        Expr::Literal(_, _) => true,
504        other => can_push_filter(other),
505    }
506}
507
508/// Translate a DataFusion expression to a SQL WHERE-clause fragment.
509///
510/// Returns `None` for unsupported expressions.
511/// [`can_push_filter`] can be used to check translatability without executing
512/// the translation.
513pub fn expr_to_sql(expr: &Expr) -> Option<String> {
514    match expr {
515        Expr::BinaryExpr(BinaryExpr { left, op, right }) => {
516            let l = atom_to_sql(left)?;
517            let r = atom_to_sql(right)?;
518            let op_str = match op {
519                Operator::Eq => "=",
520                Operator::NotEq => "<>",
521                Operator::Lt => "<",
522                Operator::LtEq => "<=",
523                Operator::Gt => ">",
524                Operator::GtEq => ">=",
525                Operator::And => "AND",
526                Operator::Or => "OR",
527                _ => return None,
528            };
529            Some(format!("({l} {op_str} {r})"))
530        }
531        Expr::IsNull(inner) => Some(format!("({} IS NULL)", atom_to_sql(inner)?)),
532        Expr::IsNotNull(inner) => Some(format!("({} IS NOT NULL)", atom_to_sql(inner)?)),
533        Expr::Not(inner) => Some(format!("(NOT {})", expr_to_sql(inner)?)),
534        // `x IN (a, b, c)` / `x NOT IN (...)`
535        Expr::InList(inlist) => {
536            let e = atom_to_sql(&inlist.expr)?;
537            let items: Option<Vec<String>> = inlist.list.iter().map(atom_to_sql).collect();
538            let items = items?;
539            let not_kw = if inlist.negated { "NOT " } else { "" };
540            Some(format!("({e} {not_kw}IN ({}))", items.join(", ")))
541        }
542        // `x BETWEEN low AND high` / `x NOT BETWEEN …`
543        Expr::Between(Between {
544            expr,
545            low,
546            high,
547            negated,
548        }) => {
549            let e = atom_to_sql(expr)?;
550            let lo = atom_to_sql(low)?;
551            let hi = atom_to_sql(high)?;
552            let not_kw = if *negated { "NOT " } else { "" };
553            Some(format!("({e} {not_kw}BETWEEN {lo} AND {hi})"))
554        }
555        // `x LIKE 'pat'` / `x ILIKE 'pat'` and their negations
556        Expr::Like(Like {
557            expr,
558            pattern,
559            negated,
560            case_insensitive,
561            escape_char,
562        }) => {
563            let e = atom_to_sql(expr)?;
564            let p = atom_to_sql(pattern)?;
565            let not_kw = if *negated { "NOT " } else { "" };
566            let like_kw = if *case_insensitive { "ILIKE" } else { "LIKE" };
567            let escape_clause = match escape_char {
568                Some(c) => format!(" ESCAPE '{c}'"),
569                None => String::new(),
570            };
571            Some(format!("({e} {not_kw}{like_kw} {p}{escape_clause})"))
572        }
573        _ => None,
574    }
575}
576
577/// Translate a leaf expression (column or literal) or a nested filter
578/// expression to SQL.
579fn atom_to_sql(expr: &Expr) -> Option<String> {
580    match expr {
581        Expr::Column(col) => Some(col.name.clone()),
582        Expr::Literal(scalar, _metadata) => scalar_to_sql(scalar),
583        other => expr_to_sql(other),
584    }
585}
586
587/// Render a [`ScalarValue`] as a SQL literal fragment.
588///
589/// Returns `None` for types that have no safe, round-trippable SQL literal
590/// representation (e.g. `Decimal128`, `Date64`, `Time32`, complex types).
591fn scalar_to_sql(scalar: &ScalarValue) -> Option<String> {
592    match scalar {
593        ScalarValue::Int8(Some(v)) => Some(v.to_string()),
594        ScalarValue::Int16(Some(v)) => Some(v.to_string()),
595        ScalarValue::Int32(Some(v)) => Some(v.to_string()),
596        ScalarValue::Int64(Some(v)) => Some(v.to_string()),
597        ScalarValue::Float32(Some(v)) => Some(v.to_string()),
598        ScalarValue::Float64(Some(v)) => Some(v.to_string()),
599        ScalarValue::Boolean(Some(v)) => Some(if *v { "TRUE" } else { "FALSE" }.to_string()),
600        ScalarValue::Utf8(Some(s)) | ScalarValue::LargeUtf8(Some(s)) => {
601            // SQL-escape single quotes to prevent injection (connection is trusted).
602            Some(format!("'{}'", s.replace('\'', "''")))
603        }
604        ScalarValue::Null => Some("NULL".to_string()),
605        // Typed NULLs (e.g. Int8(None)) are also SQL NULL.
606        ScalarValue::Int8(None)
607        | ScalarValue::Int16(None)
608        | ScalarValue::Int32(None)
609        | ScalarValue::Int64(None)
610        | ScalarValue::Float32(None)
611        | ScalarValue::Float64(None)
612        | ScalarValue::Boolean(None)
613        | ScalarValue::Utf8(None)
614        | ScalarValue::LargeUtf8(None) => Some("NULL".to_string()),
615        _ => None,
616    }
617}
618
619// ── OxiSqlExecPlan ────────────────────────────────────────────────────────────
620
621/// A DataFusion [`ExecutionPlan`] that executes a SQL query lazily against an
622/// OxiSQL connection at physical-plan execution time.
623///
624/// Unlike `MemorySourceConfig`, this defers the SQL query until DataFusion
625/// calls `execute()`, keeping `scan()` cheap and allocation-free at plan time.
626struct OxiSqlExecPlan {
627    schema: SchemaRef,
628    sql: String,
629    conn: Arc<dyn Connection>,
630    cache: Arc<PlanProperties>,
631}
632
633impl fmt::Debug for OxiSqlExecPlan {
634    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
635        f.debug_struct("OxiSqlExecPlan")
636            .field("sql", &self.sql)
637            .field("schema", &self.schema)
638            .finish()
639    }
640}
641
642impl OxiSqlExecPlan {
643    fn new(conn: Arc<dyn Connection>, sql: String, schema: SchemaRef) -> Self {
644        let eq = EquivalenceProperties::new(Arc::clone(&schema));
645        let properties = PlanProperties::new(
646            eq,
647            Partitioning::UnknownPartitioning(1),
648            EmissionType::Incremental,
649            Boundedness::Bounded,
650        );
651        Self {
652            schema,
653            sql,
654            conn,
655            cache: Arc::new(properties),
656        }
657    }
658}
659
660impl DisplayAs for OxiSqlExecPlan {
661    fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
662        write!(f, "OxiSqlExecPlan sql={:?}", self.sql)
663    }
664}
665
666impl ExecutionPlan for OxiSqlExecPlan {
667    fn name(&self) -> &'static str {
668        "OxiSqlExecPlan"
669    }
670
671    fn as_any(&self) -> &dyn Any {
672        self
673    }
674
675    fn properties(&self) -> &Arc<PlanProperties> {
676        &self.cache
677    }
678
679    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
680        vec![]
681    }
682
683    fn with_new_children(
684        self: Arc<Self>,
685        children: Vec<Arc<dyn ExecutionPlan>>,
686    ) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
687        if children.is_empty() {
688            Ok(self)
689        } else {
690            Err(datafusion::error::DataFusionError::Internal(
691                "OxiSqlExecPlan has no children".into(),
692            ))
693        }
694    }
695
696    fn execute(
697        &self,
698        _partition: usize,
699        _context: Arc<TaskContext>,
700    ) -> datafusion::error::Result<SendableRecordBatchStream> {
701        let sql = self.sql.clone();
702        let conn = Arc::clone(&self.conn);
703        let schema = Arc::clone(&self.schema);
704
705        let stream = futures::stream::once(async move {
706            let rows = conn
707                .query(&sql, &[])
708                .await
709                .map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))?;
710            crate::types::rows_to_record_batch(rows, schema)
711                .map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))
712        });
713
714        Ok(Box::pin(RecordBatchStreamAdapter::new(
715            Arc::clone(&self.schema),
716            stream,
717        )))
718    }
719}
720
721// ── OxiSqlMultiPartExecPlan ───────────────────────────────────────────────────
722
723/// A DataFusion [`ExecutionPlan`] that exposes multiple parallel partitions,
724/// each executing a distinct pre-built SQL query (typically with `LIMIT`/`OFFSET`
725/// pagination) against the same OxiSQL connection.
726///
727/// Used by [`OxiSqlStreamProvider::with_auto_partition`] to split a full-table
728/// scan into `N` independently-executable page queries so DataFusion's thread
729/// pool can run them concurrently.
730struct OxiSqlMultiPartExecPlan {
731    schema: SchemaRef,
732    /// One SQL string per partition.
733    sqls: Vec<String>,
734    conn: Arc<dyn Connection>,
735    cache: Arc<PlanProperties>,
736}
737
738impl fmt::Debug for OxiSqlMultiPartExecPlan {
739    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
740        f.debug_struct("OxiSqlMultiPartExecPlan")
741            .field("partitions", &self.sqls.len())
742            .field("schema", &self.schema)
743            .finish()
744    }
745}
746
747impl OxiSqlMultiPartExecPlan {
748    fn new(conn: Arc<dyn Connection>, sqls: Vec<String>, schema: SchemaRef) -> Self {
749        let n = sqls.len().max(1);
750        let eq = EquivalenceProperties::new(Arc::clone(&schema));
751        let properties = PlanProperties::new(
752            eq,
753            Partitioning::UnknownPartitioning(n),
754            EmissionType::Incremental,
755            Boundedness::Bounded,
756        );
757        Self {
758            schema,
759            sqls,
760            conn,
761            cache: Arc::new(properties),
762        }
763    }
764}
765
766impl DisplayAs for OxiSqlMultiPartExecPlan {
767    fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
768        write!(f, "OxiSqlMultiPartExecPlan partitions={}", self.sqls.len())
769    }
770}
771
772impl ExecutionPlan for OxiSqlMultiPartExecPlan {
773    fn name(&self) -> &'static str {
774        "OxiSqlMultiPartExecPlan"
775    }
776
777    fn as_any(&self) -> &dyn Any {
778        self
779    }
780
781    fn properties(&self) -> &Arc<PlanProperties> {
782        &self.cache
783    }
784
785    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
786        vec![]
787    }
788
789    fn with_new_children(
790        self: Arc<Self>,
791        children: Vec<Arc<dyn ExecutionPlan>>,
792    ) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
793        if children.is_empty() {
794            Ok(self)
795        } else {
796            Err(datafusion::error::DataFusionError::Internal(
797                "OxiSqlMultiPartExecPlan has no children".into(),
798            ))
799        }
800    }
801
802    fn execute(
803        &self,
804        partition: usize,
805        _context: Arc<TaskContext>,
806    ) -> datafusion::error::Result<SendableRecordBatchStream> {
807        let sql = self.sqls.get(partition).cloned().ok_or_else(|| {
808            datafusion::error::DataFusionError::Internal(format!(
809                "OxiSqlMultiPartExecPlan: partition index {partition} out of range ({})",
810                self.sqls.len()
811            ))
812        })?;
813        let conn = Arc::clone(&self.conn);
814        let schema = Arc::clone(&self.schema);
815
816        let stream = futures::stream::once(async move {
817            let rows = conn
818                .query(&sql, &[])
819                .await
820                .map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))?;
821            crate::types::rows_to_record_batch(rows, schema)
822                .map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))
823        });
824
825        Ok(Box::pin(RecordBatchStreamAdapter::new(
826            Arc::clone(&self.schema),
827            stream,
828        )))
829    }
830}