Skip to main content

vortex_datafusion/v2/
source.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Use [`VortexDataSource`] to adapt an existing Vortex [`DataSourceRef`] into
5//! a DataFusion [`DataSource`] without going through file discovery.
6//!
7//! [`VortexDataSource`] is responsible for:
8//!
9//! - exposing an Arrow schema and output statistics to DataFusion,
10//! - translating DataFusion projection, filter, and limit pushdown into a
11//!   Vortex [`ScanRequest`],
12//! - executing the Vortex scan and converting the results into Arrow
13//!   `RecordBatch` values.
14//!
15//! # Example: Create a `DataSourceExec`
16//!
17//! ```no_run
18//! use std::sync::Arc;
19//!
20//! use arrow_schema::Schema;
21//! use datafusion_datasource::source::DataSourceExec;
22//! use vortex::VortexSessionDefault;
23//! use vortex::scan::DataSourceRef;
24//! use vortex::session::VortexSession;
25//! use vortex_datafusion::v2::VortexDataSource;
26//!
27//! # #[tokio::main]
28//! # async fn main() -> Result<(), Box<dyn std::error::Error>> {
29//! # let data_source: DataSourceRef = todo!();
30//! let data_source = VortexDataSource::builder(data_source, VortexSession::default())
31//!     .with_arrow_schema(Arc::new(Schema::empty()))
32//!     .build()
33//!     .await?;
34//!
35//! let exec = DataSourceExec::from_data_source(data_source);
36//! # let _ = exec;
37//! # Ok(())
38//! # }
39//! ```
40//!
41//! # Execution Flow
42//!
43//! ```text
44//!             ▲
45//!             │  RecordBatch stream
46//!             │
47//! ┌───────────────────────┐
48//! │     DataSourceExec    │
49//! └───────────────────────┘
50//!             ▲
51//!             │  DataFusion pushdown
52//!             │  (projection/filter/limit)
53//! ┌───────────────────────┐
54//! │   VortexDataSource    │
55//! └───────────────────────┘
56//!             ▲
57//!             │  final ScanRequest
58//! ┌───────────────────────┐
59//! │    DataSourceRef      │
60//! └───────────────────────┘
61//! ```
62//!
63//! Compared with [`crate::VortexSource`], this path starts from an existing
64//! Vortex source rather than from DataFusion-managed file discovery.
65//!
66//! [`DataSource`]: datafusion_datasource::source::DataSource
67//! [`DataSourceRef`]: vortex::scan::DataSourceRef
68//! [`ScanRequest`]: vortex::scan::ScanRequest
69
70use std::any::Any;
71use std::fmt;
72use std::fmt::Formatter;
73use std::sync::Arc;
74
75use arrow_schema::DataType;
76use arrow_schema::Schema;
77use arrow_schema::SchemaRef;
78use datafusion_common::ColumnStatistics;
79use datafusion_common::DataFusionError;
80use datafusion_common::Result as DFResult;
81use datafusion_common::Statistics;
82use datafusion_common::stats::Precision as DFPrecision;
83use datafusion_datasource::source::DataSource;
84use datafusion_execution::SendableRecordBatchStream;
85use datafusion_execution::TaskContext;
86use datafusion_physical_expr::EquivalenceProperties;
87use datafusion_physical_expr::Partitioning;
88use datafusion_physical_expr::PhysicalExpr;
89use datafusion_physical_expr::projection::ProjectionExprs;
90use datafusion_physical_expr::utils::reassign_expr_columns;
91use datafusion_physical_expr_common::sort_expr::LexOrdering;
92use datafusion_physical_plan::DisplayFormatType;
93use datafusion_physical_plan::filter_pushdown::FilterPushdownPropagation;
94use datafusion_physical_plan::filter_pushdown::PushedDown;
95use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
96use futures::StreamExt;
97use futures::TryStreamExt;
98use futures::future::try_join_all;
99use vortex::array::VortexSessionExecute;
100use vortex::array::arrow::ArrowArrayExecutor;
101use vortex::dtype::DType;
102use vortex::dtype::FieldPath;
103use vortex::dtype::Nullability;
104use vortex::error::VortexResult;
105use vortex::error::vortex_bail;
106use vortex::expr::Expression;
107use vortex::expr::and as vx_and;
108use vortex::expr::get_item;
109use vortex::expr::pack;
110use vortex::expr::root;
111use vortex::expr::stats::Precision;
112use vortex::expr::transform::replace;
113use vortex::io::session::RuntimeSessionExt;
114use vortex::scan::DataSourceRef;
115use vortex::scan::ScanRequest;
116use vortex::session::VortexSession;
117use vortex_utils::parallelism::get_available_parallelism;
118
119use crate::convert::exprs::DefaultExpressionConvertor;
120use crate::convert::exprs::ExpressionConvertor;
121use crate::convert::exprs::ProcessedProjection;
122use crate::convert::exprs::make_vortex_predicate;
123use crate::convert::stats::stats_set_to_df;
124
125/// Builder for [`VortexDataSource`].
126///
127/// Use the builder to declare how an existing Vortex
128/// [`DataSourceRef`] should appear to DataFusion.
129/// In particular, it lets you choose:
130///
131/// - the Arrow schema DataFusion should see,
132/// - an initial top-level projection if the embedding system already knows
133///   which columns are needed.
134///
135/// The resulting [`VortexDataSource`] is ready to plug into
136/// [`DataSourceExec`] or other DataFusion physical planning code.
137///
138/// # Example
139///
140/// ```no_run
141/// use std::sync::Arc;
142///
143/// use arrow_schema::Schema;
144/// use vortex::VortexSessionDefault;
145/// use vortex::scan::DataSourceRef;
146/// use vortex::session::VortexSession;
147/// use vortex_datafusion::v2::VortexDataSource;
148///
149/// # #[tokio::main]
150/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
151/// # let data_source: DataSourceRef = todo!();
152/// let data_source = VortexDataSource::builder(data_source, VortexSession::default())
153///     .with_arrow_schema(Arc::new(Schema::empty()))
154///     .with_projection(vec![0])
155///     .build()
156///     .await?;
157/// # let _ = data_source;
158/// # Ok(())
159/// # }
160/// ```
161///
162/// [`DataSourceRef`]: vortex::scan::DataSourceRef
163/// [`DataSourceExec`]: datafusion_datasource::source::DataSourceExec
164pub struct VortexDataSourceBuilder {
165    data_source: DataSourceRef,
166    session: VortexSession,
167
168    arrow_schema: Option<SchemaRef>,
169    projection: Option<Vec<usize>>,
170}
171
172impl VortexDataSourceBuilder {
173    /// Sets the Arrow schema exposed to DataFusion.
174    ///
175    /// If not specified, the builder derives an Arrow schema from the Vortex
176    /// dtype.
177    ///
178    /// Note that this schema is not validated against the Vortex DType so any errors will be
179    /// deferred until read time.
180    pub fn with_arrow_schema(mut self, arrow_schema: SchemaRef) -> Self {
181        self.arrow_schema = Some(arrow_schema);
182        self
183    }
184
185    /// Configures an initial top-level projection.
186    ///
187    /// This is useful when the embedding system already knows which columns are
188    /// needed before DataFusion applies its own optimizer pushdown.
189    pub fn with_projection(mut self, indices: Vec<usize>) -> Self {
190        self.projection = Some(indices);
191        self
192    }
193
194    /// Like [`Self::with_projection`], but accepts an optional projection.
195    pub fn with_some_projection(mut self, indices: Option<Vec<usize>>) -> Self {
196        self.projection = indices;
197        self
198    }
199
200    /// Builds the [`VortexDataSource`].
201    ///
202    /// The builder eagerly resolves statistics for the initial projection
203    /// columns because DataFusion expects the `DataSource` to report output
204    /// statistics before execution begins.
205    pub async fn build(self) -> VortexResult<VortexDataSource> {
206        // The projection expression
207        let mut projection = root();
208
209        // Resolve the Arrow schema
210        let mut arrow_schema = match self.arrow_schema {
211            Some(schema) => schema,
212            None => {
213                let data_type = self.data_source.dtype().to_arrow_dtype()?;
214                let DataType::Struct(fields) = data_type else {
215                    vortex_bail!("Expected a struct-like DataType, found {}", data_type);
216                };
217                Arc::new(Schema::new(fields))
218            }
219        };
220
221        // Apply any selection and create a projection expression.
222        if let Some(indices) = self.projection {
223            let fields = indices.iter().map(|&i| {
224                let name = arrow_schema.field(i).name().clone();
225                let expr = get_item(name.as_str(), root());
226                (name, expr)
227            });
228
229            // Update the projection expression
230            projection = pack(fields, Nullability::NonNullable);
231
232            // Update the arrow schema
233            arrow_schema = Arc::new(Schema::new(
234                indices
235                    .iter()
236                    .map(|&i| arrow_schema.field(i).clone())
237                    .collect::<Vec<_>>(),
238            ));
239        }
240
241        let DType::Struct(fields, ..) = projection.return_dtype(self.data_source.dtype())? else {
242            vortex_bail!("Projection does not evaluate to a struct");
243        };
244
245        // We now compute initial statistics.
246        let field_paths: Vec<_> = fields
247            .names()
248            .iter()
249            .cloned()
250            .map(FieldPath::from_name)
251            .collect();
252        let statistics = try_join_all(
253            field_paths
254                .iter()
255                .map(|path| self.data_source.field_statistics(path)),
256        )
257        .await?
258        .iter()
259        .zip(fields.fields())
260        .map(|(stats, dtype)| stats_set_to_df(stats, &dtype))
261        .collect::<VortexResult<Vec<_>>>()?;
262
263        Ok(VortexDataSource {
264            data_source: self.data_source,
265            session: self.session,
266            initial_schema: Arc::clone(&arrow_schema),
267            initial_projection: projection.clone(),
268            initial_statistics: statistics.clone(),
269            projected_projection: projection.clone(),
270            projected_schema: Arc::clone(&arrow_schema),
271            projected_statistics: statistics.clone(),
272            leftover_projection: None,
273            leftover_schema: arrow_schema,
274            leftover_statistics: statistics,
275            filter: None,
276            limit: None,
277            ordered: false,
278            num_partitions: get_available_parallelism().unwrap_or(1),
279        })
280    }
281}
282
283impl VortexDataSource {
284    /// Create a builder for a [`VortexDataSource`].
285    pub fn builder(data_source: DataSourceRef, session: VortexSession) -> VortexDataSourceBuilder {
286        VortexDataSourceBuilder {
287            data_source,
288            session,
289            arrow_schema: None,
290            projection: None,
291        }
292    }
293}
294
295/// DataFusion [`DataSource`] backed by a Vortex [`DataSourceRef`].
296///
297/// `VortexDataSource` is the core execution adapter for the `v2` integration.
298/// It presents DataFusion with a scanable Arrow data source while preserving the
299/// underlying Vortex source until execution time.
300///
301/// During planning, it reports the current output schema and column statistics.
302/// During execution, it builds the final Vortex [`ScanRequest`] from the
303/// current projection, pushed filters, ordering hints, and row limit.
304///
305/// This integration intentionally reports a single DataFusion output partition.
306/// Vortex then handles split-level concurrency internally by polling multiple
307/// split streams concurrently.
308///
309/// Use [`crate::VortexSource`] instead when DataFusion should discover and plan
310/// `.vortex` files on its own.
311#[derive(Clone)]
312pub struct VortexDataSource {
313    /// The Vortex data source.
314    data_source: DataSourceRef,
315    /// Vortex session handle.
316    session: VortexSession,
317
318    // --- Phase 1: Initial (from the builder, before any optimizer pushdown) ---
319    /// The Arrow schema of the data source before any DataFusion projection pushdown.
320    initial_schema: SchemaRef,
321    /// The initial Vortex projection expression (e.g. column selection from the builder).
322    initial_projection: Expression,
323    /// Column statistics for the initial projection columns.
324    #[expect(dead_code)]
325    initial_statistics: Vec<ColumnStatistics>,
326
327    // --- Phase 2: Projected (pushed into the Vortex scan) ---
328    /// The Vortex projection expression sent in the [`ScanRequest`].
329    /// Composed with `initial_projection` so it operates on the original source columns.
330    projected_projection: Expression,
331    /// The Arrow schema of the Vortex scan output (before any leftover projection).
332    projected_schema: SchemaRef,
333    /// Column statistics for the projected (scan output) columns.
334    projected_statistics: Vec<ColumnStatistics>,
335
336    // --- Phase 3: Leftover (applied by DataFusion after the scan) ---
337    /// DataFusion projection expressions that could not be pushed into the Vortex scan.
338    /// Applied after converting arrays to record batches in [`DataSource::open`].
339    /// `None` when all projection expressions were successfully pushed down.
340    leftover_projection: Option<ProjectionExprs>,
341    /// The Arrow schema after applying the leftover projection.
342    /// This is the output schema seen by DataFusion.
343    leftover_schema: SchemaRef,
344    /// Column statistics matching `leftover_schema`.
345    leftover_statistics: Vec<ColumnStatistics>,
346
347    /// An optional filter expression.
348    /// Populated by [`DataSource::try_pushdown_filters`] when DataFusion pushes filters down.
349    filter: Option<Expression>,
350    /// An optional row limit populated by [`DataSource::with_fetch`].
351    limit: Option<usize>,
352    /// Whether to preserve the order of the output rows.
353    ordered: bool,
354
355    /// The requested partition count from DataFusion, populated by [`DataSource::repartitioned`].
356    /// We use this as a hint for how many splits to execute concurrently in `open()`, but we
357    /// always declare to DataFusion that we only have a single partition so that we can
358    /// internally manage concurrency and fix the problem of partition skew.
359    num_partitions: usize,
360}
361
362impl fmt::Debug for VortexDataSource {
363    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
364        f.debug_struct("VortexScanSource")
365            .field("schema", &self.leftover_schema)
366            .field("projection", &format!("{}", &self.projected_projection))
367            .field("filter", &self.filter.as_ref().map(|e| format!("{}", e)))
368            .field("limit", &self.limit)
369            .finish()
370    }
371}
372
373impl DataSource for VortexDataSource {
374    fn open(
375        &self,
376        partition: usize,
377        _context: Arc<TaskContext>,
378    ) -> DFResult<SendableRecordBatchStream> {
379        // VortexScanSource always uses a single partition since Vortex handles parallelism
380        // and concurrency internally.
381        if partition != 0 {
382            return Err(DataFusionError::Internal(format!(
383                "VortexScanSource: expected partition 0, got {partition}"
384            )));
385        }
386
387        // Build the scan request with pushed-down projection, filter, and limit.
388        // The projection is included so the scan can prune columns at the I/O level.
389        let scan_request = ScanRequest {
390            projection: self.projected_projection.clone(),
391            filter: self.filter.clone(),
392            limit: self.limit.map(|l| u64::try_from(l).unwrap_or(u64::MAX)),
393            ordered: self.ordered,
394            ..Default::default()
395        };
396
397        let data_source = Arc::clone(&self.data_source);
398        let projected_schema = Arc::clone(&self.projected_schema);
399        let session = self.session.clone();
400        let num_partitions = self.num_partitions;
401
402        // Pre-build the leftover projector (if any) so we can apply it after batch conversion.
403        let leftover_projector = self
404            .leftover_projection
405            .as_ref()
406            .map(|proj| proj.make_projector(&self.projected_schema))
407            .transpose()?;
408
409        // Defer the async DataSource::scan() call to the first poll of the stream.
410        let stream = futures::stream::once(async move {
411            let scan = data_source
412                .scan(scan_request)
413                .await
414                .map_err(|e| DataFusionError::External(Box::new(e)))?;
415
416            // Each split.execute() returns a lazy stream whose early polls do preparation
417            // work (expression resolution, layout traversal, first I/O spawns). We use
418            // try_flatten_unordered to poll multiple split streams concurrently so that
419            // the next split is already warm when the current one finishes.
420            let scan_streams = scan.partitions().map(|split_result| {
421                let split = split_result?;
422                split.execute()
423            });
424
425            let handle = session.handle();
426            let stream = scan_streams
427                .try_flatten_unordered(Some(num_partitions * 2))
428                .map(move |result| {
429                    let session = session.clone();
430                    let schema = Arc::clone(&projected_schema);
431                    handle.spawn_cpu(move || {
432                        let mut ctx = session.create_execution_ctx();
433                        result.and_then(|chunk| chunk.execute_record_batch(&schema, &mut ctx))
434                    })
435                })
436                .buffered(num_partitions)
437                .map(|result| result.map_err(|e| DataFusionError::External(Box::new(e))));
438
439            // Apply leftover projection (expressions that couldn't be pushed into Vortex).
440            let stream = if let Some(projector) = leftover_projector {
441                stream
442                    .map(move |batch_result| {
443                        batch_result.and_then(|batch| projector.project_batch(&batch))
444                    })
445                    .boxed()
446            } else {
447                stream.boxed()
448            };
449
450            Ok::<_, DataFusionError>(stream)
451        })
452        .try_flatten();
453
454        Ok(Box::pin(RecordBatchStreamAdapter::new(
455            Arc::clone(&self.leftover_schema),
456            stream,
457        )))
458    }
459
460    fn as_any(&self) -> &dyn Any {
461        self
462    }
463
464    fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
465        write!(
466            f,
467            "VortexScanSource: projection={}",
468            self.projected_projection
469        )?;
470        if let Some(filter) = &self.filter {
471            write!(f, ", filter={filter}")?;
472        }
473        if let Some(limit) = self.limit {
474            write!(f, ", limit={limit}")?;
475        }
476        Ok(())
477    }
478
479    fn repartitioned(
480        &self,
481        target_partitions: usize,
482        _repartition_file_min_size: usize,
483        output_ordering: Option<LexOrdering>,
484    ) -> DFResult<Option<Arc<dyn DataSource>>> {
485        // Vortex handles parallelism internally — always use a single partition.
486        let mut this = self.clone();
487        this.num_partitions = target_partitions;
488        this.ordered |= output_ordering.is_some();
489        Ok(Some(Arc::new(this)))
490    }
491
492    fn output_partitioning(&self) -> Partitioning {
493        Partitioning::UnknownPartitioning(1)
494    }
495
496    fn eq_properties(&self) -> EquivalenceProperties {
497        EquivalenceProperties::new(Arc::clone(&self.leftover_schema))
498    }
499
500    fn partition_statistics(&self, _partition: Option<usize>) -> DFResult<Statistics> {
501        // FIXME(ngates): this should be adjusted based on filters. See DuckDB for heuristics,
502        //  and in the future, store the selectivity stats in the session.
503        let num_rows = estimate_to_df_precision(self.data_source.row_count().as_ref());
504
505        // FIXME(ngates): byte size should be adjusted for the initial projection...
506        let total_byte_size = estimate_to_df_precision(self.data_source.byte_size().as_ref());
507
508        // Column statistics must match the output schema (leftover_schema), which may differ
509        // from the initial schema after try_swapping_with_projection adds computed columns.
510        let column_statistics = self.leftover_statistics.clone();
511
512        Ok(Statistics {
513            num_rows,
514            total_byte_size,
515            column_statistics,
516        })
517    }
518
519    fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn DataSource>> {
520        let mut this = self.clone();
521        this.limit = limit;
522        Some(Arc::new(this))
523    }
524
525    fn fetch(&self) -> Option<usize> {
526        self.limit
527    }
528
529    // Note that we're explicitly "swapping" the projection. That means everything we do must
530    // be computed over the original input schema, rather than the projected output schema.
531    fn try_swapping_with_projection(
532        &self,
533        projection: &ProjectionExprs,
534    ) -> DFResult<Option<Arc<dyn DataSource>>> {
535        tracing::debug!(
536            "VortexScanSource: trying to swap with projection: {}",
537            projection
538        );
539
540        let convertor = DefaultExpressionConvertor::default();
541        let input_schema = self.initial_schema.as_ref();
542        let projected_schema = projection.project_schema(input_schema)?;
543
544        // Use the shared ExpressionConvertor to split the projection into a Vortex
545        // scan_projection and a leftover DataFusion projection for expressions that
546        // can't be pushed down (e.g., unsupported scalar functions, decimal binary).
547        let ProcessedProjection {
548            scan_projection,
549            leftover_projection,
550        } = convertor.split_projection(projection.clone(), input_schema, &projected_schema)?;
551
552        // Compose with the initial projection so the scan operates on the original
553        // source columns, not the initial projection's output columns.
554        let scan_projection = replace(scan_projection, &root(), self.initial_projection.clone());
555
556        // Compute the scan output schema from the Vortex expression's return dtype.
557        let scan_dtype = scan_projection
558            .return_dtype(self.data_source.dtype())
559            .map_err(|e| DataFusionError::External(Box::new(e)))?;
560        let scan_arrow_type = scan_dtype
561            .to_arrow_dtype()
562            .map_err(|e| DataFusionError::External(Box::new(e)))?;
563        let DataType::Struct(scan_fields) = scan_arrow_type else {
564            return Err(DataFusionError::Internal(
565                "Scan projection must produce a struct type".to_string(),
566            ));
567        };
568        let scan_output_schema = Arc::new(Schema::new(scan_fields));
569
570        // Remap the leftover column references to match the scan output schema.
571        let leftover_projection = leftover_projection
572            .try_map_exprs(|expr| reassign_expr_columns(expr, &scan_output_schema))?;
573
574        let final_schema = Arc::new(projected_schema);
575
576        let mut this = self.clone();
577        this.projected_projection = scan_projection;
578        this.projected_schema = Arc::clone(&scan_output_schema);
579        this.projected_statistics =
580            vec![ColumnStatistics::new_unknown(); scan_output_schema.fields().len()];
581        this.leftover_projection = Some(leftover_projection);
582        this.leftover_schema = Arc::clone(&final_schema);
583        this.leftover_statistics =
584            vec![ColumnStatistics::new_unknown(); final_schema.fields().len()];
585
586        Ok(Some(Arc::new(this)))
587    }
588
589    fn try_pushdown_filters(
590        &self,
591        filters: Vec<Arc<dyn PhysicalExpr>>,
592        _config: &datafusion_common::config::ConfigOptions,
593    ) -> DFResult<FilterPushdownPropagation<Arc<dyn DataSource>>> {
594        if filters.is_empty() {
595            return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
596                vec![],
597            ));
598        }
599
600        let convertor = DefaultExpressionConvertor::default();
601        let input_schema = self.initial_schema.as_ref();
602
603        // Classify each filter: pushable filters are passed into the ScanRequest in open(),
604        // so we can safely claim PushedDown::Yes for them.
605        let pushdown_results: Vec<PushedDown> = filters
606            .iter()
607            .map(|expr| {
608                if convertor.can_be_pushed_down(expr, input_schema) {
609                    PushedDown::Yes
610                } else {
611                    PushedDown::No
612                }
613            })
614            .collect();
615
616        // If nothing can be pushed down, return early.
617        if pushdown_results.iter().all(|p| matches!(p, PushedDown::No)) {
618            return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
619                pushdown_results,
620            ));
621        }
622
623        // Collect the pushable filter expressions.
624        let pushable: Vec<Arc<dyn PhysicalExpr>> = filters
625            .iter()
626            .zip(pushdown_results.iter())
627            .filter_map(|(expr, pushed)| match pushed {
628                PushedDown::Yes => Some(Arc::clone(expr)),
629                PushedDown::No => None,
630            })
631            .collect();
632
633        // Convert to Vortex conjunction.
634        let vortex_pred = make_vortex_predicate(&convertor, &pushable)?;
635
636        // Combine with existing filter.
637        let new_filter = match (&self.filter, vortex_pred) {
638            (Some(existing), Some(new_pred)) => Some(vx_and(existing.clone(), new_pred)),
639            (Some(existing), None) => Some(existing.clone()),
640            (None, Some(new_pred)) => Some(new_pred),
641            (None, None) => None,
642        };
643
644        let mut this = self.clone();
645        this.filter = new_filter;
646        Ok(
647            FilterPushdownPropagation::with_parent_pushdown_result(pushdown_results)
648                .with_updated_node(Arc::new(this) as _),
649        )
650    }
651}
652
653/// Convert a Vortex [`Option<Precision>`] to a DataFusion
654/// [`DataFusionPrecision`].
655///
656/// [`DataFusionPrecision`]: datafusion_common::stats::Precision
657fn estimate_to_df_precision(est: Option<&Precision<u64>>) -> DFPrecision<usize> {
658    match est {
659        Some(Precision::Exact(v)) => DFPrecision::Exact(usize::try_from(*v).unwrap_or(usize::MAX)),
660        Some(Precision::Inexact(v)) => {
661            DFPrecision::Inexact(usize::try_from(*v).unwrap_or(usize::MAX))
662        }
663        None => DFPrecision::Absent,
664    }
665}