oxisql_datafusion/stream.rs
1//! Live-streaming DataFusion table provider backed by a real OxiSQL connection.
2//!
3//! Unlike [`OxiSqlTableProvider`], which materialises all rows at construction
4//! time, [`OxiSqlStreamProvider`] queries the database at plan-execution time,
5//! translating DataFusion filter and projection hints into SQL WHERE / SELECT
6//! clauses so that the backend does the heavy lifting.
7//!
8//! [`OxiSqlTableProvider`]: crate::OxiSqlTableProvider
9
10use std::fmt;
11use std::sync::Arc;
12
13use arrow::datatypes::SchemaRef;
14use async_trait::async_trait;
15use datafusion::catalog::Session;
16use datafusion::datasource::{TableProvider, TableType};
17use datafusion::error::Result as DFResult;
18use datafusion::execution::TaskContext;
19use datafusion::logical_expr::{
20 Between, BinaryExpr, Expr, Like, Operator, TableProviderFilterPushDown,
21};
22use datafusion::physical_expr::EquivalenceProperties;
23use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
24use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
25use datafusion::physical_plan::{
26 DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
27 SendableRecordBatchStream,
28};
29use datafusion::scalar::ScalarValue;
30use oxisql_core::Connection;
31
32// ── Sort order ────────────────────────────────────────────────────────────────
33
34/// Sort direction for an `ORDER BY` column pushed down to the SQL backend.
35#[derive(Debug, Clone, Copy, PartialEq, Eq)]
36pub enum SortOrder {
37 /// Sort ascending (default SQL ordering).
38 Asc,
39 /// Sort descending.
40 Desc,
41}
42
43impl fmt::Display for SortOrder {
44 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
45 match self {
46 SortOrder::Asc => f.write_str("ASC"),
47 SortOrder::Desc => f.write_str("DESC"),
48 }
49 }
50}
51
52/// A DataFusion [`TableProvider`] that executes a live SQL query at scan time,
53/// pushing filters and projections down to the OxiSQL backend.
54///
55/// # Filter pushdown
56///
57/// Simple comparison predicates (`=`, `<>`, `<`, `<=`, `>`, `>=`, `AND`, `OR`,
58/// `IS NULL`, `IS NOT NULL`, `NOT`) are converted to SQL WHERE clauses.
59/// Complex expressions (functions, subqueries, arithmetic) are left to
60/// DataFusion's post-scan filtering.
61///
62/// # Projection pushdown
63///
64/// When DataFusion requests specific columns, the provider generates
65/// `SELECT col1, col2, ...` instead of `SELECT *`.
66///
67/// # Limit pushdown
68///
69/// When DataFusion specifies a limit, the provider appends `LIMIT N` to the
70/// query, reducing the number of rows transferred.
71///
72/// # Sort pushdown
73///
74/// An optional `ORDER BY` clause can be injected into the generated SQL via
75/// [`Self::with_sort`]. Each entry is a `(column_name, SortOrder)` pair.
76///
77/// # Automatic partitioning
78///
79/// [`Self::with_auto_partition`] splits the scan into multiple parallel
80/// partitions using `LIMIT` / `OFFSET` pagination, each fetching at most
81/// `target_batch_size` rows. At most `n_parallel` partitions are created.
82pub struct OxiSqlStreamProvider {
83 schema: SchemaRef,
84 table_name: String,
85 conn: Arc<dyn Connection>,
86 /// Optional sort order to push to the backend as an `ORDER BY` clause.
87 sort_order: Option<Vec<(String, SortOrder)>>,
88 /// Optional auto-partition configuration `(n_parallel, target_batch_size)`.
89 ///
90 /// When set, `scan()` spawns up to `n_parallel` partition slots, each
91 /// issuing `LIMIT target_batch_size OFFSET i * target_batch_size` to the
92 /// backend for parallel execution.
93 auto_partition_config: Option<(usize, usize)>,
94}
95
96impl OxiSqlStreamProvider {
97 /// Construct from a connection, table name, and Arrow schema.
98 ///
99 /// The `schema` must match the column layout returned by the backend for
100 /// `SELECT * FROM table_name`. Column names in the schema are used
101 /// verbatim as SQL column references.
102 pub fn new(
103 conn: Arc<dyn Connection>,
104 table_name: impl Into<String>,
105 schema: SchemaRef,
106 ) -> Self {
107 Self {
108 schema,
109 table_name: table_name.into(),
110 conn,
111 sort_order: None,
112 auto_partition_config: None,
113 }
114 }
115
116 /// Attach an `ORDER BY` clause to the SQL generated at scan time.
117 ///
118 /// Each entry is a `(column_name, SortOrder)` pair. The resulting SQL
119 /// fragment is appended between the WHERE clause and any LIMIT clause.
120 ///
121 /// This is a "sort-into-SQL" approach: the backend database performs the
122 /// ordering, reducing the work DataFusion must do on the result set.
123 ///
124 /// # Example
125 ///
126 /// ```rust,ignore
127 /// use oxisql_datafusion::stream::{OxiSqlStreamProvider, SortOrder};
128 ///
129 /// let provider = OxiSqlStreamProvider::new(conn, "users", schema)
130 /// .with_sort(vec![
131 /// ("score".into(), SortOrder::Desc),
132 /// ("id".into(), SortOrder::Asc),
133 /// ]);
134 /// ```
135 #[must_use]
136 pub fn with_sort(mut self, order: Vec<(String, SortOrder)>) -> Self {
137 self.sort_order = Some(order);
138 self
139 }
140
141 /// Return the configured sort order, if any.
142 ///
143 /// Returns `None` if no sort has been attached via [`Self::with_sort`].
144 pub fn sort_order(&self) -> Option<&[(String, SortOrder)]> {
145 self.sort_order.as_deref()
146 }
147
148 /// Configure automatic partitioning for parallel scan execution.
149 ///
150 /// When set, `scan()` creates up to `n_parallel` partition slots, each
151 /// issuing `LIMIT target_batch_size OFFSET i * target_batch_size` to the
152 /// backend. This allows DataFusion to execute multiple partitions in
153 /// parallel without knowing the total row count ahead of time.
154 ///
155 /// Partitions beyond the actual data silently return empty batches, so
156 /// setting `n_parallel` generously is safe.
157 ///
158 /// - `n_parallel`: maximum number of parallel partitions (e.g. CPU thread count).
159 /// - `target_batch_size`: rows fetched per partition page (e.g. `8192`).
160 ///
161 /// # Example
162 ///
163 /// ```rust,ignore
164 /// let provider = OxiSqlStreamProvider::new(conn, "users", schema)
165 /// .with_auto_partition(4, 8192);
166 /// ```
167 #[must_use]
168 pub fn with_auto_partition(mut self, n_parallel: usize, target_batch_size: usize) -> Self {
169 self.auto_partition_config = Some((n_parallel, target_batch_size));
170 self
171 }
172
173 /// Return the auto-partition configuration `(n_parallel, target_batch_size)`, if any.
174 pub fn auto_partition_config(&self) -> Option<(usize, usize)> {
175 self.auto_partition_config
176 }
177
178 /// Construct from a live [`oxisql_mysql::MyConnection`] for streaming MySQL
179 /// query results through DataFusion.
180 ///
181 /// The connection is wrapped in an `Arc` and used as the underlying
182 /// [`Connection`] trait object. All filter, projection, limit, and sort
183 /// pushdown features of [`OxiSqlStreamProvider`] are available on the
184 /// resulting provider.
185 ///
186 /// # Feature flag
187 ///
188 /// Only available when the `mysql` Cargo feature of `oxisql-datafusion` is
189 /// enabled.
190 ///
191 /// # Example
192 ///
193 /// ```rust,no_run
194 /// # #[tokio::main]
195 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
196 /// use std::sync::Arc;
197 /// use arrow::datatypes::{DataType, Field, Schema};
198 /// use oxisql_mysql::{MyConnection, TlsMode};
199 /// use oxisql_datafusion::stream::OxiSqlStreamProvider;
200 ///
201 /// let conn = MyConnection::connect(
202 /// "mysql://root:secret@localhost:3306/mydb",
203 /// TlsMode::Disabled,
204 /// ).await?;
205 ///
206 /// let schema = Arc::new(Schema::new(vec![
207 /// Field::new("id", DataType::Int64, true),
208 /// Field::new("name", DataType::Utf8, true),
209 /// ]));
210 ///
211 /// let provider = OxiSqlStreamProvider::from_mysql(conn, "users", schema);
212 /// # Ok(())
213 /// # }
214 /// ```
215 #[cfg(feature = "mysql")]
216 pub fn from_mysql(
217 conn: oxisql_mysql::MyConnection,
218 table_name: impl Into<String>,
219 schema: SchemaRef,
220 ) -> Self {
221 Self::new(Arc::new(conn) as Arc<dyn Connection>, table_name, schema)
222 }
223
224 /// Construct from a live [`oxisql_postgres::PgConnection`] for streaming
225 /// Postgres query results through DataFusion.
226 ///
227 /// The connection is wrapped in an `Arc` and used as the underlying
228 /// [`Connection`] trait object. All filter, projection, limit, and sort
229 /// pushdown features of [`OxiSqlStreamProvider`] are available on the
230 /// resulting provider.
231 ///
232 /// # Feature flag
233 ///
234 /// Only available when the `postgres` Cargo feature of `oxisql-datafusion`
235 /// is enabled.
236 ///
237 /// # Example
238 ///
239 /// ```rust,no_run
240 /// # #[tokio::main]
241 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
242 /// use std::sync::Arc;
243 /// use arrow::datatypes::{DataType, Field, Schema};
244 /// use oxisql_postgres::{PgConnection, TlsMode};
245 /// use oxisql_datafusion::stream::OxiSqlStreamProvider;
246 ///
247 /// let conn = PgConnection::connect(
248 /// "host=localhost user=postgres password=secret dbname=mydb",
249 /// TlsMode::Disabled,
250 /// ).await?;
251 ///
252 /// let schema = Arc::new(Schema::new(vec![
253 /// Field::new("id", DataType::Int64, true),
254 /// Field::new("name", DataType::Utf8, true),
255 /// ]));
256 ///
257 /// let provider = OxiSqlStreamProvider::from_postgres(conn, "users", schema);
258 /// # Ok(())
259 /// # }
260 /// ```
261 #[cfg(feature = "postgres")]
262 pub fn from_postgres(
263 conn: oxisql_postgres::PgConnection,
264 table_name: impl Into<String>,
265 schema: SchemaRef,
266 ) -> Self {
267 Self::new(Arc::new(conn) as Arc<dyn Connection>, table_name, schema)
268 }
269
270 /// Construct from a live [`oxisql_sqlite_compat::SqliteConnection`] for
271 /// streaming SQLite query results through DataFusion.
272 ///
273 /// The connection is wrapped in an `Arc` and used as the underlying
274 /// [`Connection`] trait object. All filter, projection, limit, and sort
275 /// pushdown features of [`OxiSqlStreamProvider`] are available on the
276 /// resulting provider.
277 ///
278 /// Both in-memory databases (created with
279 /// [`SqliteConnection::open_memory`]) and file-backed databases are
280 /// supported.
281 ///
282 /// # Feature flag
283 ///
284 /// Only available when the `sqlite` Cargo feature of `oxisql-datafusion`
285 /// is enabled.
286 ///
287 /// # Example
288 ///
289 /// ```rust,no_run
290 /// # #[tokio::main]
291 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
292 /// use std::sync::Arc;
293 /// use arrow::datatypes::{DataType, Field, Schema};
294 /// use oxisql_sqlite_compat::SqliteConnection;
295 /// use oxisql_datafusion::stream::OxiSqlStreamProvider;
296 ///
297 /// let conn = SqliteConnection::open_memory().await?;
298 ///
299 /// let schema = Arc::new(Schema::new(vec![
300 /// Field::new("id", DataType::Int64, true),
301 /// Field::new("name", DataType::Utf8, true),
302 /// ]));
303 ///
304 /// let provider = OxiSqlStreamProvider::from_sqlite(conn, "users", schema);
305 /// # Ok(())
306 /// # }
307 /// ```
308 ///
309 /// [`SqliteConnection::open_memory`]: oxisql_sqlite_compat::SqliteConnection::open_memory
310 #[cfg(feature = "sqlite")]
311 pub fn from_sqlite(
312 conn: oxisql_sqlite_compat::SqliteConnection,
313 table_name: impl Into<String>,
314 schema: SchemaRef,
315 ) -> Self {
316 Self::new(Arc::new(conn) as Arc<dyn Connection>, table_name, schema)
317 }
318
319 /// Return the SQL SELECT column list for the given projection.
320 fn project_clause(&self, projection: Option<&Vec<usize>>) -> String {
321 match projection {
322 None => "*".to_string(),
323 Some(indices) => indices
324 .iter()
325 .map(|&i| self.schema.field(i).name().as_str())
326 .collect::<Vec<_>>()
327 .join(", "),
328 }
329 }
330
331 /// Return the projected Arrow schema for the given column indices.
332 fn projected_schema(&self, projection: Option<&Vec<usize>>) -> SchemaRef {
333 match projection {
334 None => Arc::clone(&self.schema),
335 Some(indices) => {
336 let fields: Vec<_> = indices
337 .iter()
338 .map(|&i| self.schema.field(i).clone())
339 .collect();
340 Arc::new(arrow::datatypes::Schema::new(fields))
341 }
342 }
343 }
344}
345
346impl fmt::Debug for OxiSqlStreamProvider {
347 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
348 f.debug_struct("OxiSqlStreamProvider")
349 .field("table_name", &self.table_name)
350 .field("schema", &self.schema)
351 .finish()
352 }
353}
354
355#[async_trait]
356impl TableProvider for OxiSqlStreamProvider {
357 fn schema(&self) -> SchemaRef {
358 Arc::clone(&self.schema)
359 }
360
361 fn table_type(&self) -> TableType {
362 TableType::Base
363 }
364
365 async fn scan(
366 &self,
367 _state: &dyn Session,
368 projection: Option<&Vec<usize>>,
369 filters: &[Expr],
370 limit: Option<usize>,
371 ) -> DFResult<Arc<dyn ExecutionPlan>> {
372 let col_clause = self.project_clause(projection);
373 let output_schema = self.projected_schema(projection);
374
375 // Build the base SQL (LIMIT/OFFSET appended per-partition below).
376 let mut base_sql = format!("SELECT {} FROM {}", col_clause, self.table_name);
377
378 // Build WHERE clause from pushed-down filters.
379 let where_parts: Vec<String> = filters.iter().filter_map(expr_to_sql).collect();
380 if !where_parts.is_empty() {
381 base_sql.push_str(" WHERE ");
382 base_sql.push_str(&where_parts.join(" AND "));
383 }
384
385 // Append ORDER BY clause if sort pushdown has been configured.
386 if let Some(ref order) = self.sort_order {
387 if !order.is_empty() {
388 base_sql.push_str(" ORDER BY ");
389 let order_clause = order
390 .iter()
391 .map(|(col, dir)| format!("{col} {dir}"))
392 .collect::<Vec<_>>()
393 .join(", ");
394 base_sql.push_str(&order_clause);
395 }
396 }
397
398 // When DataFusion pushes a LIMIT down, use a single partition regardless
399 // of auto_partition_config — a LIMIT scan is already cheap and splitting
400 // it would interfere with the fetch count semantics.
401 if let Some(n) = limit {
402 let mut sql = base_sql;
403 sql.push_str(&format!(" LIMIT {n}"));
404 let exec = OxiSqlExecPlan::new(Arc::clone(&self.conn), sql, Arc::clone(&output_schema));
405 return Ok(Arc::new(exec) as Arc<dyn ExecutionPlan>);
406 }
407
408 // Auto-partition: split the scan into N page-based partition slots using
409 // LIMIT/OFFSET pagination so DataFusion can execute them in parallel.
410 if let Some((n_parallel, target_batch_size)) = self.auto_partition_config {
411 if n_parallel > 1 && target_batch_size > 0 {
412 let sqls: Vec<String> = (0..n_parallel)
413 .map(|i| {
414 format!(
415 "{} LIMIT {} OFFSET {}",
416 base_sql,
417 target_batch_size,
418 i * target_batch_size
419 )
420 })
421 .collect();
422 let exec = OxiSqlMultiPartExecPlan::new(
423 Arc::clone(&self.conn),
424 sqls,
425 Arc::clone(&output_schema),
426 );
427 return Ok(Arc::new(exec) as Arc<dyn ExecutionPlan>);
428 }
429 }
430
431 // Default single-partition path.
432 let exec =
433 OxiSqlExecPlan::new(Arc::clone(&self.conn), base_sql, Arc::clone(&output_schema));
434 Ok(Arc::new(exec) as Arc<dyn ExecutionPlan>)
435 }
436
437 fn supports_filters_pushdown(
438 &self,
439 filters: &[&Expr],
440 ) -> DFResult<Vec<TableProviderFilterPushDown>> {
441 Ok(filters
442 .iter()
443 .map(|f| {
444 if can_push_filter(f) {
445 TableProviderFilterPushDown::Exact
446 } else {
447 TableProviderFilterPushDown::Unsupported
448 }
449 })
450 .collect())
451 }
452}
453
454// ── Expression translation ────────────────────────────────────────────────────
455
456/// Check whether `expr` can be fully translated to SQL and pushed to the backend.
457///
458/// Returns `true` only for expressions that [`expr_to_sql`] can translate
459/// completely. The two functions must stay in sync: every pattern that returns
460/// `false` here must also return `None` from [`expr_to_sql`].
461pub fn can_push_filter(expr: &Expr) -> bool {
462 match expr {
463 Expr::BinaryExpr(BinaryExpr { left, op, right }) => {
464 matches!(
465 op,
466 Operator::Eq
467 | Operator::NotEq
468 | Operator::Lt
469 | Operator::LtEq
470 | Operator::Gt
471 | Operator::GtEq
472 | Operator::And
473 | Operator::Or
474 ) && can_push_atom(left)
475 && can_push_atom(right)
476 }
477 Expr::IsNull(inner) | Expr::IsNotNull(inner) => can_push_atom(inner),
478 Expr::Not(inner) => can_push_filter(inner),
479 // `x IN (a, b, c)` / `x NOT IN (...)`
480 Expr::InList(inlist) => {
481 can_push_atom(&inlist.expr) && inlist.list.iter().all(can_push_atom)
482 }
483 // `x BETWEEN low AND high` / `x NOT BETWEEN …`
484 Expr::Between(Between {
485 expr, low, high, ..
486 }) => can_push_atom(expr) && can_push_atom(low) && can_push_atom(high),
487 // `x LIKE 'pat'` / `x ILIKE 'pat'` / negated variants
488 Expr::Like(Like { expr, pattern, .. }) => can_push_atom(expr) && can_push_atom(pattern),
489 _ => false,
490 }
491}
492
493/// Check whether a leaf expression (column or literal) can appear inside a
494/// pushed-down filter.
495fn can_push_atom(expr: &Expr) -> bool {
496 match expr {
497 Expr::Column(_) => true,
498 Expr::Literal(_, _) => true,
499 other => can_push_filter(other),
500 }
501}
502
503/// Translate a DataFusion expression to a SQL WHERE-clause fragment.
504///
505/// Returns `None` for unsupported expressions.
506/// [`can_push_filter`] can be used to check translatability without executing
507/// the translation.
508pub fn expr_to_sql(expr: &Expr) -> Option<String> {
509 match expr {
510 Expr::BinaryExpr(BinaryExpr { left, op, right }) => {
511 let l = atom_to_sql(left)?;
512 let r = atom_to_sql(right)?;
513 let op_str = match op {
514 Operator::Eq => "=",
515 Operator::NotEq => "<>",
516 Operator::Lt => "<",
517 Operator::LtEq => "<=",
518 Operator::Gt => ">",
519 Operator::GtEq => ">=",
520 Operator::And => "AND",
521 Operator::Or => "OR",
522 _ => return None,
523 };
524 Some(format!("({l} {op_str} {r})"))
525 }
526 Expr::IsNull(inner) => Some(format!("({} IS NULL)", atom_to_sql(inner)?)),
527 Expr::IsNotNull(inner) => Some(format!("({} IS NOT NULL)", atom_to_sql(inner)?)),
528 Expr::Not(inner) => Some(format!("(NOT {})", expr_to_sql(inner)?)),
529 // `x IN (a, b, c)` / `x NOT IN (...)`
530 Expr::InList(inlist) => {
531 let e = atom_to_sql(&inlist.expr)?;
532 let items: Option<Vec<String>> = inlist.list.iter().map(atom_to_sql).collect();
533 let items = items?;
534 let not_kw = if inlist.negated { "NOT " } else { "" };
535 Some(format!("({e} {not_kw}IN ({}))", items.join(", ")))
536 }
537 // `x BETWEEN low AND high` / `x NOT BETWEEN …`
538 Expr::Between(Between {
539 expr,
540 low,
541 high,
542 negated,
543 }) => {
544 let e = atom_to_sql(expr)?;
545 let lo = atom_to_sql(low)?;
546 let hi = atom_to_sql(high)?;
547 let not_kw = if *negated { "NOT " } else { "" };
548 Some(format!("({e} {not_kw}BETWEEN {lo} AND {hi})"))
549 }
550 // `x LIKE 'pat'` / `x ILIKE 'pat'` and their negations
551 Expr::Like(Like {
552 expr,
553 pattern,
554 negated,
555 case_insensitive,
556 escape_char,
557 }) => {
558 let e = atom_to_sql(expr)?;
559 let p = atom_to_sql(pattern)?;
560 let not_kw = if *negated { "NOT " } else { "" };
561 let like_kw = if *case_insensitive { "ILIKE" } else { "LIKE" };
562 let escape_clause = match escape_char {
563 Some(c) => format!(" ESCAPE '{c}'"),
564 None => String::new(),
565 };
566 Some(format!("({e} {not_kw}{like_kw} {p}{escape_clause})"))
567 }
568 _ => None,
569 }
570}
571
572/// Translate a leaf expression (column or literal) or a nested filter
573/// expression to SQL.
574fn atom_to_sql(expr: &Expr) -> Option<String> {
575 match expr {
576 Expr::Column(col) => Some(col.name.clone()),
577 Expr::Literal(scalar, _metadata) => scalar_to_sql(scalar),
578 other => expr_to_sql(other),
579 }
580}
581
582/// Render a [`ScalarValue`] as a SQL literal fragment.
583///
584/// Returns `None` for types that have no safe, round-trippable SQL literal
585/// representation (e.g. `Decimal128`, `Date64`, `Time32`, complex types).
586fn scalar_to_sql(scalar: &ScalarValue) -> Option<String> {
587 match scalar {
588 ScalarValue::Int8(Some(v)) => Some(v.to_string()),
589 ScalarValue::Int16(Some(v)) => Some(v.to_string()),
590 ScalarValue::Int32(Some(v)) => Some(v.to_string()),
591 ScalarValue::Int64(Some(v)) => Some(v.to_string()),
592 ScalarValue::Float32(Some(v)) => Some(v.to_string()),
593 ScalarValue::Float64(Some(v)) => Some(v.to_string()),
594 ScalarValue::Boolean(Some(v)) => Some(if *v { "TRUE" } else { "FALSE" }.to_string()),
595 ScalarValue::Utf8(Some(s)) | ScalarValue::LargeUtf8(Some(s)) => {
596 // SQL-escape single quotes to prevent injection (connection is trusted).
597 Some(format!("'{}'", s.replace('\'', "''")))
598 }
599 ScalarValue::Null => Some("NULL".to_string()),
600 // Typed NULLs (e.g. Int8(None)) are also SQL NULL.
601 ScalarValue::Int8(None)
602 | ScalarValue::Int16(None)
603 | ScalarValue::Int32(None)
604 | ScalarValue::Int64(None)
605 | ScalarValue::Float32(None)
606 | ScalarValue::Float64(None)
607 | ScalarValue::Boolean(None)
608 | ScalarValue::Utf8(None)
609 | ScalarValue::LargeUtf8(None) => Some("NULL".to_string()),
610 _ => None,
611 }
612}
613
614// ── OxiSqlExecPlan ────────────────────────────────────────────────────────────
615
616/// A DataFusion [`ExecutionPlan`] that executes a SQL query lazily against an
617/// OxiSQL connection at physical-plan execution time.
618///
619/// Unlike `MemorySourceConfig`, this defers the SQL query until DataFusion
620/// calls `execute()`, keeping `scan()` cheap and allocation-free at plan time.
621struct OxiSqlExecPlan {
622 schema: SchemaRef,
623 sql: String,
624 conn: Arc<dyn Connection>,
625 cache: Arc<PlanProperties>,
626}
627
628impl fmt::Debug for OxiSqlExecPlan {
629 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
630 f.debug_struct("OxiSqlExecPlan")
631 .field("sql", &self.sql)
632 .field("schema", &self.schema)
633 .finish()
634 }
635}
636
637impl OxiSqlExecPlan {
638 fn new(conn: Arc<dyn Connection>, sql: String, schema: SchemaRef) -> Self {
639 let eq = EquivalenceProperties::new(Arc::clone(&schema));
640 let properties = PlanProperties::new(
641 eq,
642 Partitioning::UnknownPartitioning(1),
643 EmissionType::Incremental,
644 Boundedness::Bounded,
645 );
646 Self {
647 schema,
648 sql,
649 conn,
650 cache: Arc::new(properties),
651 }
652 }
653}
654
655impl DisplayAs for OxiSqlExecPlan {
656 fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
657 write!(f, "OxiSqlExecPlan sql={:?}", self.sql)
658 }
659}
660
661impl ExecutionPlan for OxiSqlExecPlan {
662 fn name(&self) -> &'static str {
663 "OxiSqlExecPlan"
664 }
665
666 fn properties(&self) -> &Arc<PlanProperties> {
667 &self.cache
668 }
669
670 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
671 vec![]
672 }
673
674 fn with_new_children(
675 self: Arc<Self>,
676 children: Vec<Arc<dyn ExecutionPlan>>,
677 ) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
678 if children.is_empty() {
679 Ok(self)
680 } else {
681 Err(datafusion::error::DataFusionError::Internal(
682 "OxiSqlExecPlan has no children".into(),
683 ))
684 }
685 }
686
687 fn execute(
688 &self,
689 _partition: usize,
690 _context: Arc<TaskContext>,
691 ) -> datafusion::error::Result<SendableRecordBatchStream> {
692 let sql = self.sql.clone();
693 let conn = Arc::clone(&self.conn);
694 let schema = Arc::clone(&self.schema);
695
696 let stream = futures::stream::once(async move {
697 let rows = conn
698 .query(&sql, &[])
699 .await
700 .map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))?;
701 crate::types::rows_to_record_batch(rows, schema)
702 .map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))
703 });
704
705 Ok(Box::pin(RecordBatchStreamAdapter::new(
706 Arc::clone(&self.schema),
707 stream,
708 )))
709 }
710}
711
712// ── OxiSqlMultiPartExecPlan ───────────────────────────────────────────────────
713
714/// A DataFusion [`ExecutionPlan`] that exposes multiple parallel partitions,
715/// each executing a distinct pre-built SQL query (typically with `LIMIT`/`OFFSET`
716/// pagination) against the same OxiSQL connection.
717///
718/// Used by [`OxiSqlStreamProvider::with_auto_partition`] to split a full-table
719/// scan into `N` independently-executable page queries so DataFusion's thread
720/// pool can run them concurrently.
721struct OxiSqlMultiPartExecPlan {
722 schema: SchemaRef,
723 /// One SQL string per partition.
724 sqls: Vec<String>,
725 conn: Arc<dyn Connection>,
726 cache: Arc<PlanProperties>,
727}
728
729impl fmt::Debug for OxiSqlMultiPartExecPlan {
730 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
731 f.debug_struct("OxiSqlMultiPartExecPlan")
732 .field("partitions", &self.sqls.len())
733 .field("schema", &self.schema)
734 .finish()
735 }
736}
737
738impl OxiSqlMultiPartExecPlan {
739 fn new(conn: Arc<dyn Connection>, sqls: Vec<String>, schema: SchemaRef) -> Self {
740 let n = sqls.len().max(1);
741 let eq = EquivalenceProperties::new(Arc::clone(&schema));
742 let properties = PlanProperties::new(
743 eq,
744 Partitioning::UnknownPartitioning(n),
745 EmissionType::Incremental,
746 Boundedness::Bounded,
747 );
748 Self {
749 schema,
750 sqls,
751 conn,
752 cache: Arc::new(properties),
753 }
754 }
755}
756
757impl DisplayAs for OxiSqlMultiPartExecPlan {
758 fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
759 write!(f, "OxiSqlMultiPartExecPlan partitions={}", self.sqls.len())
760 }
761}
762
763impl ExecutionPlan for OxiSqlMultiPartExecPlan {
764 fn name(&self) -> &'static str {
765 "OxiSqlMultiPartExecPlan"
766 }
767
768 fn properties(&self) -> &Arc<PlanProperties> {
769 &self.cache
770 }
771
772 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
773 vec![]
774 }
775
776 fn with_new_children(
777 self: Arc<Self>,
778 children: Vec<Arc<dyn ExecutionPlan>>,
779 ) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
780 if children.is_empty() {
781 Ok(self)
782 } else {
783 Err(datafusion::error::DataFusionError::Internal(
784 "OxiSqlMultiPartExecPlan has no children".into(),
785 ))
786 }
787 }
788
789 fn execute(
790 &self,
791 partition: usize,
792 _context: Arc<TaskContext>,
793 ) -> datafusion::error::Result<SendableRecordBatchStream> {
794 let sql = self.sqls.get(partition).cloned().ok_or_else(|| {
795 datafusion::error::DataFusionError::Internal(format!(
796 "OxiSqlMultiPartExecPlan: partition index {partition} out of range ({})",
797 self.sqls.len()
798 ))
799 })?;
800 let conn = Arc::clone(&self.conn);
801 let schema = Arc::clone(&self.schema);
802
803 let stream = futures::stream::once(async move {
804 let rows = conn
805 .query(&sql, &[])
806 .await
807 .map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))?;
808 crate::types::rows_to_record_batch(rows, schema)
809 .map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))
810 });
811
812 Ok(Box::pin(RecordBatchStreamAdapter::new(
813 Arc::clone(&self.schema),
814 stream,
815 )))
816 }
817}