oxisql_datafusion/stream.rs
1//! Live-streaming DataFusion table provider backed by a real OxiSQL connection.
2//!
3//! Unlike [`OxiSqlTableProvider`], which materialises all rows at construction
4//! time, [`OxiSqlStreamProvider`] queries the database at plan-execution time,
5//! translating DataFusion filter and projection hints into SQL WHERE / SELECT
6//! clauses so that the backend does the heavy lifting.
7//!
8//! [`OxiSqlTableProvider`]: crate::OxiSqlTableProvider
9
10use std::any::Any;
11use std::fmt;
12use std::sync::Arc;
13
14use arrow::datatypes::SchemaRef;
15use async_trait::async_trait;
16use datafusion::catalog::Session;
17use datafusion::datasource::{TableProvider, TableType};
18use datafusion::error::Result as DFResult;
19use datafusion::execution::TaskContext;
20use datafusion::logical_expr::{
21 Between, BinaryExpr, Expr, Like, Operator, TableProviderFilterPushDown,
22};
23use datafusion::physical_expr::EquivalenceProperties;
24use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
25use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
26use datafusion::physical_plan::{
27 DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
28 SendableRecordBatchStream,
29};
30use datafusion::scalar::ScalarValue;
31use oxisql_core::Connection;
32
33// ── Sort order ────────────────────────────────────────────────────────────────
34
35/// Sort direction for an `ORDER BY` column pushed down to the SQL backend.
36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
37pub enum SortOrder {
38 /// Sort ascending (default SQL ordering).
39 Asc,
40 /// Sort descending.
41 Desc,
42}
43
44impl fmt::Display for SortOrder {
45 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
46 match self {
47 SortOrder::Asc => f.write_str("ASC"),
48 SortOrder::Desc => f.write_str("DESC"),
49 }
50 }
51}
52
53/// A DataFusion [`TableProvider`] that executes a live SQL query at scan time,
54/// pushing filters and projections down to the OxiSQL backend.
55///
56/// # Filter pushdown
57///
58/// Simple comparison predicates (`=`, `<>`, `<`, `<=`, `>`, `>=`, `AND`, `OR`,
59/// `IS NULL`, `IS NOT NULL`, `NOT`) are converted to SQL WHERE clauses.
60/// Complex expressions (functions, subqueries, arithmetic) are left to
61/// DataFusion's post-scan filtering.
62///
63/// # Projection pushdown
64///
65/// When DataFusion requests specific columns, the provider generates
66/// `SELECT col1, col2, ...` instead of `SELECT *`.
67///
68/// # Limit pushdown
69///
70/// When DataFusion specifies a limit, the provider appends `LIMIT N` to the
71/// query, reducing the number of rows transferred.
72///
73/// # Sort pushdown
74///
75/// An optional `ORDER BY` clause can be injected into the generated SQL via
76/// [`Self::with_sort`]. Each entry is a `(column_name, SortOrder)` pair.
77///
78/// # Automatic partitioning
79///
80/// [`Self::with_auto_partition`] splits the scan into multiple parallel
81/// partitions using `LIMIT` / `OFFSET` pagination, each fetching at most
82/// `target_batch_size` rows. At most `n_parallel` partitions are created.
83pub struct OxiSqlStreamProvider {
84 schema: SchemaRef,
85 table_name: String,
86 conn: Arc<dyn Connection>,
87 /// Optional sort order to push to the backend as an `ORDER BY` clause.
88 sort_order: Option<Vec<(String, SortOrder)>>,
89 /// Optional auto-partition configuration `(n_parallel, target_batch_size)`.
90 ///
91 /// When set, `scan()` spawns up to `n_parallel` partition slots, each
92 /// issuing `LIMIT target_batch_size OFFSET i * target_batch_size` to the
93 /// backend for parallel execution.
94 auto_partition_config: Option<(usize, usize)>,
95}
96
97impl OxiSqlStreamProvider {
98 /// Construct from a connection, table name, and Arrow schema.
99 ///
100 /// The `schema` must match the column layout returned by the backend for
101 /// `SELECT * FROM table_name`. Column names in the schema are used
102 /// verbatim as SQL column references.
103 pub fn new(
104 conn: Arc<dyn Connection>,
105 table_name: impl Into<String>,
106 schema: SchemaRef,
107 ) -> Self {
108 Self {
109 schema,
110 table_name: table_name.into(),
111 conn,
112 sort_order: None,
113 auto_partition_config: None,
114 }
115 }
116
117 /// Attach an `ORDER BY` clause to the SQL generated at scan time.
118 ///
119 /// Each entry is a `(column_name, SortOrder)` pair. The resulting SQL
120 /// fragment is appended between the WHERE clause and any LIMIT clause.
121 ///
122 /// This is a "sort-into-SQL" approach: the backend database performs the
123 /// ordering, reducing the work DataFusion must do on the result set.
124 ///
125 /// # Example
126 ///
127 /// ```rust,ignore
128 /// use oxisql_datafusion::stream::{OxiSqlStreamProvider, SortOrder};
129 ///
130 /// let provider = OxiSqlStreamProvider::new(conn, "users", schema)
131 /// .with_sort(vec![
132 /// ("score".into(), SortOrder::Desc),
133 /// ("id".into(), SortOrder::Asc),
134 /// ]);
135 /// ```
136 #[must_use]
137 pub fn with_sort(mut self, order: Vec<(String, SortOrder)>) -> Self {
138 self.sort_order = Some(order);
139 self
140 }
141
142 /// Return the configured sort order, if any.
143 ///
144 /// Returns `None` if no sort has been attached via [`Self::with_sort`].
145 pub fn sort_order(&self) -> Option<&[(String, SortOrder)]> {
146 self.sort_order.as_deref()
147 }
148
149 /// Configure automatic partitioning for parallel scan execution.
150 ///
151 /// When set, `scan()` creates up to `n_parallel` partition slots, each
152 /// issuing `LIMIT target_batch_size OFFSET i * target_batch_size` to the
153 /// backend. This allows DataFusion to execute multiple partitions in
154 /// parallel without knowing the total row count ahead of time.
155 ///
156 /// Partitions beyond the actual data silently return empty batches, so
157 /// setting `n_parallel` generously is safe.
158 ///
159 /// - `n_parallel`: maximum number of parallel partitions (e.g. CPU thread count).
160 /// - `target_batch_size`: rows fetched per partition page (e.g. `8192`).
161 ///
162 /// # Example
163 ///
164 /// ```rust,ignore
165 /// let provider = OxiSqlStreamProvider::new(conn, "users", schema)
166 /// .with_auto_partition(4, 8192);
167 /// ```
168 #[must_use]
169 pub fn with_auto_partition(mut self, n_parallel: usize, target_batch_size: usize) -> Self {
170 self.auto_partition_config = Some((n_parallel, target_batch_size));
171 self
172 }
173
174 /// Return the auto-partition configuration `(n_parallel, target_batch_size)`, if any.
175 pub fn auto_partition_config(&self) -> Option<(usize, usize)> {
176 self.auto_partition_config
177 }
178
179 /// Construct from a live [`oxisql_mysql::MyConnection`] for streaming MySQL
180 /// query results through DataFusion.
181 ///
182 /// The connection is wrapped in an `Arc` and used as the underlying
183 /// [`Connection`] trait object. All filter, projection, limit, and sort
184 /// pushdown features of [`OxiSqlStreamProvider`] are available on the
185 /// resulting provider.
186 ///
187 /// # Feature flag
188 ///
189 /// Only available when the `mysql` Cargo feature of `oxisql-datafusion` is
190 /// enabled.
191 ///
192 /// # Example
193 ///
194 /// ```rust,no_run
195 /// # #[tokio::main]
196 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
197 /// use std::sync::Arc;
198 /// use arrow::datatypes::{DataType, Field, Schema};
199 /// use oxisql_mysql::{MyConnection, TlsMode};
200 /// use oxisql_datafusion::stream::OxiSqlStreamProvider;
201 ///
202 /// let conn = MyConnection::connect(
203 /// "mysql://root:secret@localhost:3306/mydb",
204 /// TlsMode::Disabled,
205 /// ).await?;
206 ///
207 /// let schema = Arc::new(Schema::new(vec![
208 /// Field::new("id", DataType::Int64, true),
209 /// Field::new("name", DataType::Utf8, true),
210 /// ]));
211 ///
212 /// let provider = OxiSqlStreamProvider::from_mysql(conn, "users", schema);
213 /// # Ok(())
214 /// # }
215 /// ```
216 #[cfg(feature = "mysql")]
217 pub fn from_mysql(
218 conn: oxisql_mysql::MyConnection,
219 table_name: impl Into<String>,
220 schema: SchemaRef,
221 ) -> Self {
222 Self::new(Arc::new(conn) as Arc<dyn Connection>, table_name, schema)
223 }
224
225 /// Construct from a live [`oxisql_postgres::PgConnection`] for streaming
226 /// Postgres query results through DataFusion.
227 ///
228 /// The connection is wrapped in an `Arc` and used as the underlying
229 /// [`Connection`] trait object. All filter, projection, limit, and sort
230 /// pushdown features of [`OxiSqlStreamProvider`] are available on the
231 /// resulting provider.
232 ///
233 /// # Feature flag
234 ///
235 /// Only available when the `postgres` Cargo feature of `oxisql-datafusion`
236 /// is enabled.
237 ///
238 /// # Example
239 ///
240 /// ```rust,no_run
241 /// # #[tokio::main]
242 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
243 /// use std::sync::Arc;
244 /// use arrow::datatypes::{DataType, Field, Schema};
245 /// use oxisql_postgres::{PgConnection, TlsMode};
246 /// use oxisql_datafusion::stream::OxiSqlStreamProvider;
247 ///
248 /// let conn = PgConnection::connect(
249 /// "host=localhost user=postgres password=secret dbname=mydb",
250 /// TlsMode::Disabled,
251 /// ).await?;
252 ///
253 /// let schema = Arc::new(Schema::new(vec![
254 /// Field::new("id", DataType::Int64, true),
255 /// Field::new("name", DataType::Utf8, true),
256 /// ]));
257 ///
258 /// let provider = OxiSqlStreamProvider::from_postgres(conn, "users", schema);
259 /// # Ok(())
260 /// # }
261 /// ```
262 #[cfg(feature = "postgres")]
263 pub fn from_postgres(
264 conn: oxisql_postgres::PgConnection,
265 table_name: impl Into<String>,
266 schema: SchemaRef,
267 ) -> Self {
268 Self::new(Arc::new(conn) as Arc<dyn Connection>, table_name, schema)
269 }
270
271 /// Construct from a live [`oxisql_sqlite_compat::SqliteConnection`] for
272 /// streaming SQLite query results through DataFusion.
273 ///
274 /// The connection is wrapped in an `Arc` and used as the underlying
275 /// [`Connection`] trait object. All filter, projection, limit, and sort
276 /// pushdown features of [`OxiSqlStreamProvider`] are available on the
277 /// resulting provider.
278 ///
279 /// Both in-memory databases (created with
280 /// [`SqliteConnection::open_memory`]) and file-backed databases are
281 /// supported.
282 ///
283 /// # Feature flag
284 ///
285 /// Only available when the `sqlite` Cargo feature of `oxisql-datafusion`
286 /// is enabled.
287 ///
288 /// # Example
289 ///
290 /// ```rust,no_run
291 /// # #[tokio::main]
292 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
293 /// use std::sync::Arc;
294 /// use arrow::datatypes::{DataType, Field, Schema};
295 /// use oxisql_sqlite_compat::SqliteConnection;
296 /// use oxisql_datafusion::stream::OxiSqlStreamProvider;
297 ///
298 /// let conn = SqliteConnection::open_memory().await?;
299 ///
300 /// let schema = Arc::new(Schema::new(vec![
301 /// Field::new("id", DataType::Int64, true),
302 /// Field::new("name", DataType::Utf8, true),
303 /// ]));
304 ///
305 /// let provider = OxiSqlStreamProvider::from_sqlite(conn, "users", schema);
306 /// # Ok(())
307 /// # }
308 /// ```
309 ///
310 /// [`SqliteConnection::open_memory`]: oxisql_sqlite_compat::SqliteConnection::open_memory
311 #[cfg(feature = "sqlite")]
312 pub fn from_sqlite(
313 conn: oxisql_sqlite_compat::SqliteConnection,
314 table_name: impl Into<String>,
315 schema: SchemaRef,
316 ) -> Self {
317 Self::new(Arc::new(conn) as Arc<dyn Connection>, table_name, schema)
318 }
319
320 /// Return the SQL SELECT column list for the given projection.
321 fn project_clause(&self, projection: Option<&Vec<usize>>) -> String {
322 match projection {
323 None => "*".to_string(),
324 Some(indices) => indices
325 .iter()
326 .map(|&i| self.schema.field(i).name().as_str())
327 .collect::<Vec<_>>()
328 .join(", "),
329 }
330 }
331
332 /// Return the projected Arrow schema for the given column indices.
333 fn projected_schema(&self, projection: Option<&Vec<usize>>) -> SchemaRef {
334 match projection {
335 None => Arc::clone(&self.schema),
336 Some(indices) => {
337 let fields: Vec<_> = indices
338 .iter()
339 .map(|&i| self.schema.field(i).clone())
340 .collect();
341 Arc::new(arrow::datatypes::Schema::new(fields))
342 }
343 }
344 }
345}
346
347impl fmt::Debug for OxiSqlStreamProvider {
348 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
349 f.debug_struct("OxiSqlStreamProvider")
350 .field("table_name", &self.table_name)
351 .field("schema", &self.schema)
352 .finish()
353 }
354}
355
356#[async_trait]
357impl TableProvider for OxiSqlStreamProvider {
358 fn as_any(&self) -> &dyn Any {
359 self
360 }
361
362 fn schema(&self) -> SchemaRef {
363 Arc::clone(&self.schema)
364 }
365
366 fn table_type(&self) -> TableType {
367 TableType::Base
368 }
369
370 async fn scan(
371 &self,
372 _state: &dyn Session,
373 projection: Option<&Vec<usize>>,
374 filters: &[Expr],
375 limit: Option<usize>,
376 ) -> DFResult<Arc<dyn ExecutionPlan>> {
377 let col_clause = self.project_clause(projection);
378 let output_schema = self.projected_schema(projection);
379
380 // Build the base SQL (LIMIT/OFFSET appended per-partition below).
381 let mut base_sql = format!("SELECT {} FROM {}", col_clause, self.table_name);
382
383 // Build WHERE clause from pushed-down filters.
384 let where_parts: Vec<String> = filters.iter().filter_map(expr_to_sql).collect();
385 if !where_parts.is_empty() {
386 base_sql.push_str(" WHERE ");
387 base_sql.push_str(&where_parts.join(" AND "));
388 }
389
390 // Append ORDER BY clause if sort pushdown has been configured.
391 if let Some(ref order) = self.sort_order {
392 if !order.is_empty() {
393 base_sql.push_str(" ORDER BY ");
394 let order_clause = order
395 .iter()
396 .map(|(col, dir)| format!("{col} {dir}"))
397 .collect::<Vec<_>>()
398 .join(", ");
399 base_sql.push_str(&order_clause);
400 }
401 }
402
403 // When DataFusion pushes a LIMIT down, use a single partition regardless
404 // of auto_partition_config — a LIMIT scan is already cheap and splitting
405 // it would interfere with the fetch count semantics.
406 if let Some(n) = limit {
407 let mut sql = base_sql;
408 sql.push_str(&format!(" LIMIT {n}"));
409 let exec = OxiSqlExecPlan::new(Arc::clone(&self.conn), sql, Arc::clone(&output_schema));
410 return Ok(Arc::new(exec) as Arc<dyn ExecutionPlan>);
411 }
412
413 // Auto-partition: split the scan into N page-based partition slots using
414 // LIMIT/OFFSET pagination so DataFusion can execute them in parallel.
415 if let Some((n_parallel, target_batch_size)) = self.auto_partition_config {
416 if n_parallel > 1 && target_batch_size > 0 {
417 let sqls: Vec<String> = (0..n_parallel)
418 .map(|i| {
419 format!(
420 "{} LIMIT {} OFFSET {}",
421 base_sql,
422 target_batch_size,
423 i * target_batch_size
424 )
425 })
426 .collect();
427 let exec = OxiSqlMultiPartExecPlan::new(
428 Arc::clone(&self.conn),
429 sqls,
430 Arc::clone(&output_schema),
431 );
432 return Ok(Arc::new(exec) as Arc<dyn ExecutionPlan>);
433 }
434 }
435
436 // Default single-partition path.
437 let exec =
438 OxiSqlExecPlan::new(Arc::clone(&self.conn), base_sql, Arc::clone(&output_schema));
439 Ok(Arc::new(exec) as Arc<dyn ExecutionPlan>)
440 }
441
442 fn supports_filters_pushdown(
443 &self,
444 filters: &[&Expr],
445 ) -> DFResult<Vec<TableProviderFilterPushDown>> {
446 Ok(filters
447 .iter()
448 .map(|f| {
449 if can_push_filter(f) {
450 TableProviderFilterPushDown::Exact
451 } else {
452 TableProviderFilterPushDown::Unsupported
453 }
454 })
455 .collect())
456 }
457}
458
459// ── Expression translation ────────────────────────────────────────────────────
460
461/// Check whether `expr` can be fully translated to SQL and pushed to the backend.
462///
463/// Returns `true` only for expressions that [`expr_to_sql`] can translate
464/// completely. The two functions must stay in sync: every pattern that returns
465/// `false` here must also return `None` from [`expr_to_sql`].
466pub fn can_push_filter(expr: &Expr) -> bool {
467 match expr {
468 Expr::BinaryExpr(BinaryExpr { left, op, right }) => {
469 matches!(
470 op,
471 Operator::Eq
472 | Operator::NotEq
473 | Operator::Lt
474 | Operator::LtEq
475 | Operator::Gt
476 | Operator::GtEq
477 | Operator::And
478 | Operator::Or
479 ) && can_push_atom(left)
480 && can_push_atom(right)
481 }
482 Expr::IsNull(inner) | Expr::IsNotNull(inner) => can_push_atom(inner),
483 Expr::Not(inner) => can_push_filter(inner),
484 // `x IN (a, b, c)` / `x NOT IN (...)`
485 Expr::InList(inlist) => {
486 can_push_atom(&inlist.expr) && inlist.list.iter().all(can_push_atom)
487 }
488 // `x BETWEEN low AND high` / `x NOT BETWEEN …`
489 Expr::Between(Between {
490 expr, low, high, ..
491 }) => can_push_atom(expr) && can_push_atom(low) && can_push_atom(high),
492 // `x LIKE 'pat'` / `x ILIKE 'pat'` / negated variants
493 Expr::Like(Like { expr, pattern, .. }) => can_push_atom(expr) && can_push_atom(pattern),
494 _ => false,
495 }
496}
497
498/// Check whether a leaf expression (column or literal) can appear inside a
499/// pushed-down filter.
500fn can_push_atom(expr: &Expr) -> bool {
501 match expr {
502 Expr::Column(_) => true,
503 Expr::Literal(_, _) => true,
504 other => can_push_filter(other),
505 }
506}
507
508/// Translate a DataFusion expression to a SQL WHERE-clause fragment.
509///
510/// Returns `None` for unsupported expressions.
511/// [`can_push_filter`] can be used to check translatability without executing
512/// the translation.
513pub fn expr_to_sql(expr: &Expr) -> Option<String> {
514 match expr {
515 Expr::BinaryExpr(BinaryExpr { left, op, right }) => {
516 let l = atom_to_sql(left)?;
517 let r = atom_to_sql(right)?;
518 let op_str = match op {
519 Operator::Eq => "=",
520 Operator::NotEq => "<>",
521 Operator::Lt => "<",
522 Operator::LtEq => "<=",
523 Operator::Gt => ">",
524 Operator::GtEq => ">=",
525 Operator::And => "AND",
526 Operator::Or => "OR",
527 _ => return None,
528 };
529 Some(format!("({l} {op_str} {r})"))
530 }
531 Expr::IsNull(inner) => Some(format!("({} IS NULL)", atom_to_sql(inner)?)),
532 Expr::IsNotNull(inner) => Some(format!("({} IS NOT NULL)", atom_to_sql(inner)?)),
533 Expr::Not(inner) => Some(format!("(NOT {})", expr_to_sql(inner)?)),
534 // `x IN (a, b, c)` / `x NOT IN (...)`
535 Expr::InList(inlist) => {
536 let e = atom_to_sql(&inlist.expr)?;
537 let items: Option<Vec<String>> = inlist.list.iter().map(atom_to_sql).collect();
538 let items = items?;
539 let not_kw = if inlist.negated { "NOT " } else { "" };
540 Some(format!("({e} {not_kw}IN ({}))", items.join(", ")))
541 }
542 // `x BETWEEN low AND high` / `x NOT BETWEEN …`
543 Expr::Between(Between {
544 expr,
545 low,
546 high,
547 negated,
548 }) => {
549 let e = atom_to_sql(expr)?;
550 let lo = atom_to_sql(low)?;
551 let hi = atom_to_sql(high)?;
552 let not_kw = if *negated { "NOT " } else { "" };
553 Some(format!("({e} {not_kw}BETWEEN {lo} AND {hi})"))
554 }
555 // `x LIKE 'pat'` / `x ILIKE 'pat'` and their negations
556 Expr::Like(Like {
557 expr,
558 pattern,
559 negated,
560 case_insensitive,
561 escape_char,
562 }) => {
563 let e = atom_to_sql(expr)?;
564 let p = atom_to_sql(pattern)?;
565 let not_kw = if *negated { "NOT " } else { "" };
566 let like_kw = if *case_insensitive { "ILIKE" } else { "LIKE" };
567 let escape_clause = match escape_char {
568 Some(c) => format!(" ESCAPE '{c}'"),
569 None => String::new(),
570 };
571 Some(format!("({e} {not_kw}{like_kw} {p}{escape_clause})"))
572 }
573 _ => None,
574 }
575}
576
577/// Translate a leaf expression (column or literal) or a nested filter
578/// expression to SQL.
579fn atom_to_sql(expr: &Expr) -> Option<String> {
580 match expr {
581 Expr::Column(col) => Some(col.name.clone()),
582 Expr::Literal(scalar, _metadata) => scalar_to_sql(scalar),
583 other => expr_to_sql(other),
584 }
585}
586
587/// Render a [`ScalarValue`] as a SQL literal fragment.
588///
589/// Returns `None` for types that have no safe, round-trippable SQL literal
590/// representation (e.g. `Decimal128`, `Date64`, `Time32`, complex types).
591fn scalar_to_sql(scalar: &ScalarValue) -> Option<String> {
592 match scalar {
593 ScalarValue::Int8(Some(v)) => Some(v.to_string()),
594 ScalarValue::Int16(Some(v)) => Some(v.to_string()),
595 ScalarValue::Int32(Some(v)) => Some(v.to_string()),
596 ScalarValue::Int64(Some(v)) => Some(v.to_string()),
597 ScalarValue::Float32(Some(v)) => Some(v.to_string()),
598 ScalarValue::Float64(Some(v)) => Some(v.to_string()),
599 ScalarValue::Boolean(Some(v)) => Some(if *v { "TRUE" } else { "FALSE" }.to_string()),
600 ScalarValue::Utf8(Some(s)) | ScalarValue::LargeUtf8(Some(s)) => {
601 // SQL-escape single quotes to prevent injection (connection is trusted).
602 Some(format!("'{}'", s.replace('\'', "''")))
603 }
604 ScalarValue::Null => Some("NULL".to_string()),
605 // Typed NULLs (e.g. Int8(None)) are also SQL NULL.
606 ScalarValue::Int8(None)
607 | ScalarValue::Int16(None)
608 | ScalarValue::Int32(None)
609 | ScalarValue::Int64(None)
610 | ScalarValue::Float32(None)
611 | ScalarValue::Float64(None)
612 | ScalarValue::Boolean(None)
613 | ScalarValue::Utf8(None)
614 | ScalarValue::LargeUtf8(None) => Some("NULL".to_string()),
615 _ => None,
616 }
617}
618
619// ── OxiSqlExecPlan ────────────────────────────────────────────────────────────
620
621/// A DataFusion [`ExecutionPlan`] that executes a SQL query lazily against an
622/// OxiSQL connection at physical-plan execution time.
623///
624/// Unlike `MemorySourceConfig`, this defers the SQL query until DataFusion
625/// calls `execute()`, keeping `scan()` cheap and allocation-free at plan time.
626struct OxiSqlExecPlan {
627 schema: SchemaRef,
628 sql: String,
629 conn: Arc<dyn Connection>,
630 cache: Arc<PlanProperties>,
631}
632
633impl fmt::Debug for OxiSqlExecPlan {
634 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
635 f.debug_struct("OxiSqlExecPlan")
636 .field("sql", &self.sql)
637 .field("schema", &self.schema)
638 .finish()
639 }
640}
641
642impl OxiSqlExecPlan {
643 fn new(conn: Arc<dyn Connection>, sql: String, schema: SchemaRef) -> Self {
644 let eq = EquivalenceProperties::new(Arc::clone(&schema));
645 let properties = PlanProperties::new(
646 eq,
647 Partitioning::UnknownPartitioning(1),
648 EmissionType::Incremental,
649 Boundedness::Bounded,
650 );
651 Self {
652 schema,
653 sql,
654 conn,
655 cache: Arc::new(properties),
656 }
657 }
658}
659
660impl DisplayAs for OxiSqlExecPlan {
661 fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
662 write!(f, "OxiSqlExecPlan sql={:?}", self.sql)
663 }
664}
665
666impl ExecutionPlan for OxiSqlExecPlan {
667 fn name(&self) -> &'static str {
668 "OxiSqlExecPlan"
669 }
670
671 fn as_any(&self) -> &dyn Any {
672 self
673 }
674
675 fn properties(&self) -> &Arc<PlanProperties> {
676 &self.cache
677 }
678
679 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
680 vec![]
681 }
682
683 fn with_new_children(
684 self: Arc<Self>,
685 children: Vec<Arc<dyn ExecutionPlan>>,
686 ) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
687 if children.is_empty() {
688 Ok(self)
689 } else {
690 Err(datafusion::error::DataFusionError::Internal(
691 "OxiSqlExecPlan has no children".into(),
692 ))
693 }
694 }
695
696 fn execute(
697 &self,
698 _partition: usize,
699 _context: Arc<TaskContext>,
700 ) -> datafusion::error::Result<SendableRecordBatchStream> {
701 let sql = self.sql.clone();
702 let conn = Arc::clone(&self.conn);
703 let schema = Arc::clone(&self.schema);
704
705 let stream = futures::stream::once(async move {
706 let rows = conn
707 .query(&sql, &[])
708 .await
709 .map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))?;
710 crate::types::rows_to_record_batch(rows, schema)
711 .map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))
712 });
713
714 Ok(Box::pin(RecordBatchStreamAdapter::new(
715 Arc::clone(&self.schema),
716 stream,
717 )))
718 }
719}
720
721// ── OxiSqlMultiPartExecPlan ───────────────────────────────────────────────────
722
723/// A DataFusion [`ExecutionPlan`] that exposes multiple parallel partitions,
724/// each executing a distinct pre-built SQL query (typically with `LIMIT`/`OFFSET`
725/// pagination) against the same OxiSQL connection.
726///
727/// Used by [`OxiSqlStreamProvider::with_auto_partition`] to split a full-table
728/// scan into `N` independently-executable page queries so DataFusion's thread
729/// pool can run them concurrently.
730struct OxiSqlMultiPartExecPlan {
731 schema: SchemaRef,
732 /// One SQL string per partition.
733 sqls: Vec<String>,
734 conn: Arc<dyn Connection>,
735 cache: Arc<PlanProperties>,
736}
737
738impl fmt::Debug for OxiSqlMultiPartExecPlan {
739 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
740 f.debug_struct("OxiSqlMultiPartExecPlan")
741 .field("partitions", &self.sqls.len())
742 .field("schema", &self.schema)
743 .finish()
744 }
745}
746
747impl OxiSqlMultiPartExecPlan {
748 fn new(conn: Arc<dyn Connection>, sqls: Vec<String>, schema: SchemaRef) -> Self {
749 let n = sqls.len().max(1);
750 let eq = EquivalenceProperties::new(Arc::clone(&schema));
751 let properties = PlanProperties::new(
752 eq,
753 Partitioning::UnknownPartitioning(n),
754 EmissionType::Incremental,
755 Boundedness::Bounded,
756 );
757 Self {
758 schema,
759 sqls,
760 conn,
761 cache: Arc::new(properties),
762 }
763 }
764}
765
766impl DisplayAs for OxiSqlMultiPartExecPlan {
767 fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
768 write!(f, "OxiSqlMultiPartExecPlan partitions={}", self.sqls.len())
769 }
770}
771
772impl ExecutionPlan for OxiSqlMultiPartExecPlan {
773 fn name(&self) -> &'static str {
774 "OxiSqlMultiPartExecPlan"
775 }
776
777 fn as_any(&self) -> &dyn Any {
778 self
779 }
780
781 fn properties(&self) -> &Arc<PlanProperties> {
782 &self.cache
783 }
784
785 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
786 vec![]
787 }
788
789 fn with_new_children(
790 self: Arc<Self>,
791 children: Vec<Arc<dyn ExecutionPlan>>,
792 ) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
793 if children.is_empty() {
794 Ok(self)
795 } else {
796 Err(datafusion::error::DataFusionError::Internal(
797 "OxiSqlMultiPartExecPlan has no children".into(),
798 ))
799 }
800 }
801
802 fn execute(
803 &self,
804 partition: usize,
805 _context: Arc<TaskContext>,
806 ) -> datafusion::error::Result<SendableRecordBatchStream> {
807 let sql = self.sqls.get(partition).cloned().ok_or_else(|| {
808 datafusion::error::DataFusionError::Internal(format!(
809 "OxiSqlMultiPartExecPlan: partition index {partition} out of range ({})",
810 self.sqls.len()
811 ))
812 })?;
813 let conn = Arc::clone(&self.conn);
814 let schema = Arc::clone(&self.schema);
815
816 let stream = futures::stream::once(async move {
817 let rows = conn
818 .query(&sql, &[])
819 .await
820 .map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))?;
821 crate::types::rows_to_record_batch(rows, schema)
822 .map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))
823 });
824
825 Ok(Box::pin(RecordBatchStreamAdapter::new(
826 Arc::clone(&self.schema),
827 stream,
828 )))
829 }
830}