Skip to main content

vortex_datafusion/v2/
source.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Use [`VortexDataSource`] to adapt an existing Vortex [`DataSourceRef`] into
5//! a DataFusion [`DataSource`] without going through file discovery.
6//!
7//! [`VortexDataSource`] is responsible for:
8//!
9//! - exposing an Arrow schema and output statistics to DataFusion,
10//! - translating DataFusion projection, filter, and limit pushdown into a
11//!   Vortex [`ScanRequest`],
12//! - executing the Vortex scan and converting the results into Arrow
13//!   `RecordBatch` values.
14//!
15//! # Example: Create a `DataSourceExec`
16//!
17//! ```no_run
18//! use std::sync::Arc;
19//!
20//! use arrow_schema::Schema;
21//! use datafusion_datasource::source::DataSourceExec;
22//! use vortex::VortexSessionDefault;
23//! use vortex::scan::DataSourceRef;
24//! use vortex::session::VortexSession;
25//! use vortex_datafusion::v2::VortexDataSource;
26//!
27//! # #[tokio::main]
28//! # async fn main() -> Result<(), Box<dyn std::error::Error>> {
29//! # let data_source: DataSourceRef = todo!();
30//! let data_source = VortexDataSource::builder(data_source, VortexSession::default())
31//!     .with_arrow_schema(Arc::new(Schema::empty()))
32//!     .build()
33//!     .await?;
34//!
35//! let exec = DataSourceExec::from_data_source(data_source);
36//! # let _ = exec;
37//! # Ok(())
38//! # }
39//! ```
40//!
41//! # Execution Flow
42//!
43//! ```text
44//!             ▲
45//!             │  RecordBatch stream
46//!             │
47//! ┌───────────────────────┐
48//! │     DataSourceExec    │
49//! └───────────────────────┘
50//!             ▲
51//!             │  DataFusion pushdown
52//!             │  (projection/filter/limit)
53//! ┌───────────────────────┐
54//! │   VortexDataSource    │
55//! └───────────────────────┘
56//!             ▲
57//!             │  final ScanRequest
58//! ┌───────────────────────┐
59//! │    DataSourceRef      │
60//! └───────────────────────┘
61//! ```
62//!
63//! Compared with [`crate::VortexSource`], this path starts from an existing
64//! Vortex source rather than from DataFusion-managed file discovery.
65//!
66//! [`DataSource`]: datafusion_datasource::source::DataSource
67//! [`DataSourceRef`]: vortex::scan::DataSourceRef
68//! [`ScanRequest`]: vortex::scan::ScanRequest
69
70use std::any::Any;
71use std::fmt;
72use std::fmt::Formatter;
73use std::sync::Arc;
74
75use arrow_schema::Field;
76use arrow_schema::Schema;
77use arrow_schema::SchemaRef;
78use datafusion_common::ColumnStatistics;
79use datafusion_common::DataFusionError;
80use datafusion_common::Result as DFResult;
81use datafusion_common::Statistics;
82use datafusion_common::arrow::array::AsArray;
83use datafusion_common::arrow::array::RecordBatch;
84use datafusion_common::stats::Precision as DFPrecision;
85use datafusion_datasource::source::DataSource;
86use datafusion_execution::SendableRecordBatchStream;
87use datafusion_execution::TaskContext;
88use datafusion_physical_expr::EquivalenceProperties;
89use datafusion_physical_expr::Partitioning;
90use datafusion_physical_expr::PhysicalExpr;
91use datafusion_physical_expr::projection::ProjectionExprs;
92use datafusion_physical_expr::utils::reassign_expr_columns;
93use datafusion_physical_expr_common::sort_expr::LexOrdering;
94use datafusion_physical_plan::DisplayFormatType;
95use datafusion_physical_plan::filter_pushdown::FilterPushdownPropagation;
96use datafusion_physical_plan::filter_pushdown::PushedDown;
97use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
98use futures::StreamExt;
99use futures::TryStreamExt;
100use futures::future::try_join_all;
101use vortex::array::VortexSessionExecute;
102use vortex::array::arrow::ArrowSessionExt;
103use vortex::dtype::DType;
104use vortex::dtype::FieldPath;
105use vortex::dtype::Nullability;
106use vortex::error::VortexResult;
107use vortex::error::vortex_bail;
108use vortex::expr::Expression;
109use vortex::expr::and as vx_and;
110use vortex::expr::get_item;
111use vortex::expr::pack;
112use vortex::expr::root;
113use vortex::expr::stats::Precision;
114use vortex::expr::transform::replace;
115use vortex::io::session::RuntimeSessionExt;
116use vortex::scan::DataSourceRef;
117use vortex::scan::ScanRequest;
118use vortex::session::VortexSession;
119use vortex_utils::parallelism::get_available_parallelism;
120
121use crate::convert::exprs::DefaultExpressionConvertor;
122use crate::convert::exprs::ExpressionConvertor;
123use crate::convert::exprs::ProcessedProjection;
124use crate::convert::exprs::make_vortex_predicate;
125use crate::convert::stats::stats_set_to_df;
126
127/// Builder for [`VortexDataSource`].
128///
129/// Use the builder to declare how an existing Vortex
130/// [`DataSourceRef`] should appear to DataFusion.
131/// In particular, it lets you choose:
132///
133/// - the Arrow schema DataFusion should see,
134/// - an initial top-level projection if the embedding system already knows
135///   which columns are needed.
136///
137/// The resulting [`VortexDataSource`] is ready to plug into
138/// [`DataSourceExec`] or other DataFusion physical planning code.
139///
140/// # Example
141///
142/// ```no_run
143/// use std::sync::Arc;
144///
145/// use arrow_schema::Schema;
146/// use vortex::VortexSessionDefault;
147/// use vortex::scan::DataSourceRef;
148/// use vortex::session::VortexSession;
149/// use vortex_datafusion::v2::VortexDataSource;
150///
151/// # #[tokio::main]
152/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
153/// # let data_source: DataSourceRef = todo!();
154/// let data_source = VortexDataSource::builder(data_source, VortexSession::default())
155///     .with_arrow_schema(Arc::new(Schema::empty()))
156///     .with_projection(vec![0])
157///     .build()
158///     .await?;
159/// # let _ = data_source;
160/// # Ok(())
161/// # }
162/// ```
163///
164/// [`DataSourceRef`]: vortex::scan::DataSourceRef
165/// [`DataSourceExec`]: datafusion_datasource::source::DataSourceExec
166pub struct VortexDataSourceBuilder {
167    data_source: DataSourceRef,
168    session: VortexSession,
169
170    arrow_schema: Option<SchemaRef>,
171    projection: Option<Vec<usize>>,
172}
173
174impl VortexDataSourceBuilder {
175    /// Sets the Arrow schema exposed to DataFusion.
176    ///
177    /// If not specified, the builder derives an Arrow schema from the Vortex
178    /// dtype.
179    ///
180    /// Note that this schema is not validated against the Vortex DType so any errors will be
181    /// deferred until read time.
182    pub fn with_arrow_schema(mut self, arrow_schema: SchemaRef) -> Self {
183        self.arrow_schema = Some(arrow_schema);
184        self
185    }
186
187    /// Configures an initial top-level projection.
188    ///
189    /// This is useful when the embedding system already knows which columns are
190    /// needed before DataFusion applies its own optimizer pushdown.
191    pub fn with_projection(mut self, indices: Vec<usize>) -> Self {
192        self.projection = Some(indices);
193        self
194    }
195
196    /// Like [`Self::with_projection`], but accepts an optional projection.
197    pub fn with_some_projection(mut self, indices: Option<Vec<usize>>) -> Self {
198        self.projection = indices;
199        self
200    }
201
202    /// Builds the [`VortexDataSource`].
203    ///
204    /// The builder eagerly resolves statistics for the initial projection
205    /// columns because DataFusion expects the `DataSource` to report output
206    /// statistics before execution begins.
207    pub async fn build(self) -> VortexResult<VortexDataSource> {
208        // The projection expression
209        let mut projection = root();
210
211        // Resolve the Arrow schema
212        let mut arrow_schema = match self.arrow_schema {
213            Some(schema) => schema,
214            None => Arc::new(
215                self.session
216                    .arrow()
217                    .to_arrow_schema(self.data_source.dtype())?,
218            ),
219        };
220
221        // Apply any selection and create a projection expression.
222        if let Some(indices) = self.projection {
223            let fields = indices.iter().map(|&i| {
224                let name = arrow_schema.field(i).name().clone();
225                let expr = get_item(name.as_str(), root());
226                (name, expr)
227            });
228
229            // Update the projection expression
230            projection = pack(fields, Nullability::NonNullable);
231
232            // Update the arrow schema
233            arrow_schema = Arc::new(Schema::new(
234                indices
235                    .iter()
236                    .map(|&i| arrow_schema.field(i).clone())
237                    .collect::<Vec<_>>(),
238            ));
239        }
240
241        let DType::Struct(fields, ..) = projection.return_dtype(self.data_source.dtype())? else {
242            vortex_bail!("Projection does not evaluate to a struct");
243        };
244
245        // We now compute initial statistics.
246        let field_paths: Vec<_> = fields
247            .names()
248            .iter()
249            .cloned()
250            .map(FieldPath::from_name)
251            .collect();
252        let statistics = try_join_all(
253            field_paths
254                .iter()
255                .map(|path| self.data_source.field_statistics(path)),
256        )
257        .await?
258        .iter()
259        .zip(fields.fields())
260        .map(|(stats, dtype)| stats_set_to_df(stats, &dtype))
261        .collect::<VortexResult<Vec<_>>>()?;
262
263        Ok(VortexDataSource {
264            data_source: self.data_source,
265            session: self.session,
266            initial_schema: Arc::clone(&arrow_schema),
267            initial_projection: projection.clone(),
268            initial_statistics: statistics.clone(),
269            projected_projection: projection.clone(),
270            projected_schema: Arc::clone(&arrow_schema),
271            projected_statistics: statistics.clone(),
272            leftover_projection: None,
273            leftover_schema: arrow_schema,
274            leftover_statistics: statistics,
275            filter: None,
276            limit: None,
277            ordered: false,
278            num_partitions: get_available_parallelism().unwrap_or(1),
279        })
280    }
281}
282
283impl VortexDataSource {
284    /// Create a builder for a [`VortexDataSource`].
285    pub fn builder(data_source: DataSourceRef, session: VortexSession) -> VortexDataSourceBuilder {
286        VortexDataSourceBuilder {
287            data_source,
288            session,
289            arrow_schema: None,
290            projection: None,
291        }
292    }
293}
294
295/// DataFusion [`DataSource`] backed by a Vortex [`DataSourceRef`].
296///
297/// `VortexDataSource` is the core execution adapter for the `v2` integration.
298/// It presents DataFusion with a scanable Arrow data source while preserving the
299/// underlying Vortex source until execution time.
300///
301/// During planning, it reports the current output schema and column statistics.
302/// During execution, it builds the final Vortex [`ScanRequest`] from the
303/// current projection, pushed filters, ordering hints, and row limit.
304///
305/// This integration intentionally reports a single DataFusion output partition.
306/// Vortex then handles split-level concurrency internally by polling multiple
307/// split streams concurrently.
308///
309/// Use [`crate::VortexSource`] instead when DataFusion should discover and plan
310/// `.vortex` files on its own.
311#[derive(Clone)]
312pub struct VortexDataSource {
313    /// The Vortex data source.
314    data_source: DataSourceRef,
315    /// Vortex session handle.
316    session: VortexSession,
317
318    // --- Phase 1: Initial (from the builder, before any optimizer pushdown) ---
319    /// The Arrow schema of the data source before any DataFusion projection pushdown.
320    initial_schema: SchemaRef,
321    /// The initial Vortex projection expression (e.g. column selection from the builder).
322    initial_projection: Expression,
323    /// Column statistics for the initial projection columns.
324    #[expect(dead_code)]
325    initial_statistics: Vec<ColumnStatistics>,
326
327    // --- Phase 2: Projected (pushed into the Vortex scan) ---
328    /// The Vortex projection expression sent in the [`ScanRequest`].
329    /// Composed with `initial_projection` so it operates on the original source columns.
330    projected_projection: Expression,
331    /// The Arrow schema of the Vortex scan output (before any leftover projection).
332    projected_schema: SchemaRef,
333    /// Column statistics for the projected (scan output) columns.
334    projected_statistics: Vec<ColumnStatistics>,
335
336    // --- Phase 3: Leftover (applied by DataFusion after the scan) ---
337    /// DataFusion projection expressions that could not be pushed into the Vortex scan.
338    /// Applied after converting arrays to record batches in [`DataSource::open`].
339    /// `None` when all projection expressions were successfully pushed down.
340    leftover_projection: Option<ProjectionExprs>,
341    /// The Arrow schema after applying the leftover projection.
342    /// This is the output schema seen by DataFusion.
343    leftover_schema: SchemaRef,
344    /// Column statistics matching `leftover_schema`.
345    leftover_statistics: Vec<ColumnStatistics>,
346
347    /// An optional filter expression.
348    /// Populated by [`DataSource::try_pushdown_filters`] when DataFusion pushes filters down.
349    filter: Option<Expression>,
350    /// An optional row limit populated by [`DataSource::with_fetch`].
351    limit: Option<usize>,
352    /// Whether to preserve the order of the output rows.
353    ordered: bool,
354
355    /// The requested partition count from DataFusion, populated by [`DataSource::repartitioned`].
356    /// We use this as a hint for how many splits to execute concurrently in `open()`, but we
357    /// always declare to DataFusion that we only have a single partition so that we can
358    /// internally manage concurrency and fix the problem of partition skew.
359    num_partitions: usize,
360}
361
362impl fmt::Debug for VortexDataSource {
363    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
364        f.debug_struct("VortexScanSource")
365            .field("schema", &self.leftover_schema)
366            .field("projection", &format!("{}", &self.projected_projection))
367            .field("filter", &self.filter.as_ref().map(|e| format!("{}", e)))
368            .field("limit", &self.limit)
369            .finish()
370    }
371}
372
373impl DataSource for VortexDataSource {
374    fn open(
375        &self,
376        partition: usize,
377        _context: Arc<TaskContext>,
378    ) -> DFResult<SendableRecordBatchStream> {
379        // VortexScanSource always uses a single partition since Vortex handles parallelism
380        // and concurrency internally.
381        if partition != 0 {
382            return Err(DataFusionError::Internal(format!(
383                "VortexScanSource: expected partition 0, got {partition}"
384            )));
385        }
386
387        // Build the scan request with pushed-down projection, filter, and limit.
388        // The projection is included so the scan can prune columns at the I/O level.
389        let scan_request = ScanRequest {
390            projection: self.projected_projection.clone(),
391            filter: self.filter.clone(),
392            limit: self.limit.map(|l| u64::try_from(l).unwrap_or(u64::MAX)),
393            ordered: self.ordered,
394            ..Default::default()
395        };
396
397        let data_source = Arc::clone(&self.data_source);
398        let projected_schema = Arc::clone(&self.projected_schema);
399        let projected_target_field = Arc::new(Field::new_struct(
400            "",
401            projected_schema.fields().clone(),
402            false,
403        ));
404        let session = self.session.clone();
405        let num_partitions = self.num_partitions;
406
407        // Pre-build the leftover projector (if any) so we can apply it after batch conversion.
408        let leftover_projector = self
409            .leftover_projection
410            .as_ref()
411            .map(|proj| proj.make_projector(&self.projected_schema))
412            .transpose()?;
413
414        // Defer the async DataSource::scan() call to the first poll of the stream.
415        let stream = futures::stream::once(async move {
416            let scan = data_source
417                .scan(scan_request)
418                .await
419                .map_err(|e| DataFusionError::External(Box::new(e)))?;
420
421            // Each split.execute() returns a lazy stream whose early polls do preparation
422            // work (expression resolution, layout traversal, first I/O spawns). We use
423            // try_flatten_unordered to poll multiple split streams concurrently so that
424            // the next split is already warm when the current one finishes.
425            let scan_streams = scan.partitions().map(|split_result| {
426                let split = split_result?;
427                split.execute()
428            });
429
430            let handle = session.handle();
431            let stream = scan_streams
432                .try_flatten_unordered(Some(num_partitions * 2))
433                .map(move |result| {
434                    let session = session.clone();
435                    let target_field = Arc::clone(&projected_target_field);
436                    handle.spawn_cpu(move || {
437                        let mut ctx = session.create_execution_ctx();
438                        result.and_then(|chunk| {
439                            let arrow = session.arrow().execute_arrow(
440                                chunk,
441                                Some(target_field.as_ref()),
442                                &mut ctx,
443                            )?;
444                            Ok(RecordBatch::from(arrow.as_struct().clone()))
445                        })
446                    })
447                })
448                .buffered(num_partitions)
449                .map(|result| result.map_err(|e| DataFusionError::External(Box::new(e))));
450
451            // Apply leftover projection (expressions that couldn't be pushed into Vortex).
452            let stream = if let Some(projector) = leftover_projector {
453                stream
454                    .map(move |batch_result| {
455                        batch_result.and_then(|batch| projector.project_batch(&batch))
456                    })
457                    .boxed()
458            } else {
459                stream.boxed()
460            };
461
462            Ok::<_, DataFusionError>(stream)
463        })
464        .try_flatten();
465
466        Ok(Box::pin(RecordBatchStreamAdapter::new(
467            Arc::clone(&self.leftover_schema),
468            stream,
469        )))
470    }
471
472    fn as_any(&self) -> &dyn Any {
473        self
474    }
475
476    fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
477        write!(
478            f,
479            "VortexScanSource: projection={}",
480            self.projected_projection
481        )?;
482        if let Some(filter) = &self.filter {
483            write!(f, ", filter={filter}")?;
484        }
485        if let Some(limit) = self.limit {
486            write!(f, ", limit={limit}")?;
487        }
488        Ok(())
489    }
490
491    fn repartitioned(
492        &self,
493        target_partitions: usize,
494        _repartition_file_min_size: usize,
495        output_ordering: Option<LexOrdering>,
496    ) -> DFResult<Option<Arc<dyn DataSource>>> {
497        // Vortex handles parallelism internally — always use a single partition.
498        let mut this = self.clone();
499        this.num_partitions = target_partitions;
500        this.ordered |= output_ordering.is_some();
501        Ok(Some(Arc::new(this)))
502    }
503
504    fn output_partitioning(&self) -> Partitioning {
505        Partitioning::UnknownPartitioning(1)
506    }
507
508    fn eq_properties(&self) -> EquivalenceProperties {
509        EquivalenceProperties::new(Arc::clone(&self.leftover_schema))
510    }
511
512    fn partition_statistics(&self, _partition: Option<usize>) -> DFResult<Statistics> {
513        // FIXME(ngates): this should be adjusted based on filters. See DuckDB for heuristics,
514        //  and in the future, store the selectivity stats in the session.
515        let num_rows = estimate_to_df_precision(self.data_source.row_count().as_ref());
516
517        // FIXME(ngates): byte size should be adjusted for the initial projection...
518        let total_byte_size = estimate_to_df_precision(self.data_source.byte_size().as_ref());
519
520        // Column statistics must match the output schema (leftover_schema), which may differ
521        // from the initial schema after try_swapping_with_projection adds computed columns.
522        let column_statistics = self.leftover_statistics.clone();
523
524        Ok(Statistics {
525            num_rows,
526            total_byte_size,
527            column_statistics,
528        })
529    }
530
531    fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn DataSource>> {
532        let mut this = self.clone();
533        this.limit = limit;
534        Some(Arc::new(this))
535    }
536
537    fn fetch(&self) -> Option<usize> {
538        self.limit
539    }
540
541    // Note that we're explicitly "swapping" the projection. That means everything we do must
542    // be computed over the original input schema, rather than the projected output schema.
543    fn try_swapping_with_projection(
544        &self,
545        projection: &ProjectionExprs,
546    ) -> DFResult<Option<Arc<dyn DataSource>>> {
547        tracing::debug!(
548            "VortexScanSource: trying to swap with projection: {}",
549            projection
550        );
551
552        let convertor = DefaultExpressionConvertor::default();
553        let input_schema = self.initial_schema.as_ref();
554        let projected_schema = projection.project_schema(input_schema)?;
555
556        // Use the shared ExpressionConvertor to split the projection into a Vortex
557        // scan_projection and a leftover DataFusion projection for expressions that
558        // can't be pushed down (e.g., unsupported scalar functions, decimal binary).
559        let ProcessedProjection {
560            scan_projection,
561            leftover_projection,
562        } = convertor.split_projection(projection.clone(), input_schema, &projected_schema)?;
563
564        // Compose with the initial projection so the scan operates on the original
565        // source columns, not the initial projection's output columns.
566        let scan_projection = replace(scan_projection, &root(), self.initial_projection.clone());
567
568        // Compute the scan output schema from the Vortex expression's return dtype.
569        let scan_dtype = scan_projection
570            .return_dtype(self.data_source.dtype())
571            .map_err(|e| DataFusionError::External(Box::new(e)))?;
572        let scan_output_schema = Arc::new(
573            self.session
574                .arrow()
575                .to_arrow_schema(&scan_dtype)
576                .map_err(|e| DataFusionError::External(Box::new(e)))?,
577        );
578
579        // Remap the leftover column references to match the scan output schema.
580        let leftover_projection = leftover_projection
581            .try_map_exprs(|expr| reassign_expr_columns(expr, &scan_output_schema))?;
582
583        let final_schema = Arc::new(projected_schema);
584
585        let mut this = self.clone();
586        this.projected_projection = scan_projection;
587        this.projected_schema = Arc::clone(&scan_output_schema);
588        this.projected_statistics =
589            vec![ColumnStatistics::new_unknown(); scan_output_schema.fields().len()];
590        this.leftover_projection = Some(leftover_projection);
591        this.leftover_schema = Arc::clone(&final_schema);
592        this.leftover_statistics =
593            vec![ColumnStatistics::new_unknown(); final_schema.fields().len()];
594
595        Ok(Some(Arc::new(this)))
596    }
597
598    fn try_pushdown_filters(
599        &self,
600        filters: Vec<Arc<dyn PhysicalExpr>>,
601        _config: &datafusion_common::config::ConfigOptions,
602    ) -> DFResult<FilterPushdownPropagation<Arc<dyn DataSource>>> {
603        if filters.is_empty() {
604            return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
605                vec![],
606            ));
607        }
608
609        let convertor = DefaultExpressionConvertor::default();
610        let input_schema = self.initial_schema.as_ref();
611
612        // Classify each filter: pushable filters are passed into the ScanRequest in open(),
613        // so we can safely claim PushedDown::Yes for them.
614        let pushdown_results: Vec<PushedDown> = filters
615            .iter()
616            .map(|expr| {
617                if convertor.can_be_pushed_down(expr, input_schema) {
618                    PushedDown::Yes
619                } else {
620                    PushedDown::No
621                }
622            })
623            .collect();
624
625        // If nothing can be pushed down, return early.
626        if pushdown_results.iter().all(|p| matches!(p, PushedDown::No)) {
627            return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
628                pushdown_results,
629            ));
630        }
631
632        // Collect the pushable filter expressions.
633        let pushable: Vec<Arc<dyn PhysicalExpr>> = filters
634            .iter()
635            .zip(pushdown_results.iter())
636            .filter_map(|(expr, pushed)| match pushed {
637                PushedDown::Yes => Some(Arc::clone(expr)),
638                PushedDown::No => None,
639            })
640            .collect();
641
642        // Convert to Vortex conjunction.
643        let vortex_pred = make_vortex_predicate(&convertor, &pushable)?;
644
645        // Combine with existing filter.
646        let new_filter = match (&self.filter, vortex_pred) {
647            (Some(existing), Some(new_pred)) => Some(vx_and(existing.clone(), new_pred)),
648            (Some(existing), None) => Some(existing.clone()),
649            (None, Some(new_pred)) => Some(new_pred),
650            (None, None) => None,
651        };
652
653        let mut this = self.clone();
654        this.filter = new_filter;
655        Ok(
656            FilterPushdownPropagation::with_parent_pushdown_result(pushdown_results)
657                .with_updated_node(Arc::new(this) as _),
658        )
659    }
660}
661
662/// Convert a Vortex [`Option<Precision>`] to a DataFusion
663/// [`DataFusionPrecision`].
664///
665/// [`DataFusionPrecision`]: datafusion_common::stats::Precision
666fn estimate_to_df_precision(est: Option<&Precision<u64>>) -> DFPrecision<usize> {
667    match est {
668        Some(Precision::Exact(v)) => DFPrecision::Exact(usize::try_from(*v).unwrap_or(usize::MAX)),
669        Some(Precision::Inexact(v)) => {
670            DFPrecision::Inexact(usize::try_from(*v).unwrap_or(usize::MAX))
671        }
672        None => DFPrecision::Absent,
673    }
674}