Skip to main content

vortex_datafusion/v2/
source.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Use [`VortexDataSource`] to adapt an existing Vortex [`DataSourceRef`] into
5//! a DataFusion [`DataSource`] without going through file discovery.
6//!
7//! [`VortexDataSource`] is responsible for:
8//!
9//! - exposing an Arrow schema and output statistics to DataFusion,
10//! - translating DataFusion projection, filter, and limit pushdown into a
11//!   Vortex [`ScanRequest`],
12//! - executing the Vortex scan and converting the results into Arrow
13//!   `RecordBatch` values.
14//!
15//! # Example: Create a `DataSourceExec`
16//!
17//! ```no_run
18//! use std::sync::Arc;
19//!
20//! use arrow_schema::Schema;
21//! use datafusion_datasource::source::DataSourceExec;
22//! use vortex::VortexSessionDefault;
23//! use vortex::scan::DataSourceRef;
24//! use vortex::session::VortexSession;
25//! use vortex_datafusion::v2::VortexDataSource;
26//!
27//! # #[tokio::main]
28//! # async fn main() -> Result<(), Box<dyn std::error::Error>> {
29//! # let data_source: DataSourceRef = todo!();
30//! let data_source = VortexDataSource::builder(data_source, VortexSession::default())
31//!     .with_arrow_schema(Arc::new(Schema::empty()))
32//!     .build()
33//!     .await?;
34//!
35//! let exec = DataSourceExec::from_data_source(data_source);
36//! # let _ = exec;
37//! # Ok(())
38//! # }
39//! ```
40//!
41//! # Execution Flow
42//!
43//! ```text
44//!             ▲
45//!             │  RecordBatch stream
46//!             │
47//! ┌───────────────────────┐
48//! │     DataSourceExec    │
49//! └───────────────────────┘
50//!             ▲
51//!             │  DataFusion pushdown
52//!             │  (projection/filter/limit)
53//! ┌───────────────────────┐
54//! │   VortexDataSource    │
55//! └───────────────────────┘
56//!             ▲
57//!             │  final ScanRequest
58//! ┌───────────────────────┐
59//! │    DataSourceRef      │
60//! └───────────────────────┘
61//! ```
62//!
63//! Compared with [`crate::VortexSource`], this path starts from an existing
64//! Vortex source rather than from DataFusion-managed file discovery.
65//!
66//! [`DataSource`]: datafusion_datasource::source::DataSource
67//! [`DataSourceRef`]: vortex::scan::DataSourceRef
68//! [`ScanRequest`]: vortex::scan::ScanRequest
69
70use std::fmt;
71use std::fmt::Formatter;
72use std::sync::Arc;
73
74use arrow_schema::Field;
75use arrow_schema::Schema;
76use arrow_schema::SchemaRef;
77use datafusion_common::ColumnStatistics;
78use datafusion_common::DataFusionError;
79use datafusion_common::Result as DFResult;
80use datafusion_common::Statistics;
81use datafusion_common::arrow::array::AsArray;
82use datafusion_common::arrow::array::RecordBatch;
83use datafusion_common::stats::Precision as DFPrecision;
84use datafusion_datasource::source::DataSource;
85use datafusion_execution::SendableRecordBatchStream;
86use datafusion_execution::TaskContext;
87use datafusion_physical_expr::EquivalenceProperties;
88use datafusion_physical_expr::Partitioning;
89use datafusion_physical_expr::PhysicalExpr;
90use datafusion_physical_expr::projection::ProjectionExprs;
91use datafusion_physical_expr::utils::reassign_expr_columns;
92use datafusion_physical_expr_common::sort_expr::LexOrdering;
93use datafusion_physical_plan::DisplayFormatType;
94use datafusion_physical_plan::filter_pushdown::FilterPushdownPropagation;
95use datafusion_physical_plan::filter_pushdown::PushedDown;
96use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
97use futures::StreamExt;
98use futures::TryStreamExt;
99use futures::future::try_join_all;
100use vortex::array::VortexSessionExecute;
101use vortex::array::arrow::ArrowSessionExt;
102use vortex::dtype::DType;
103use vortex::dtype::FieldPath;
104use vortex::dtype::Nullability;
105use vortex::error::VortexResult;
106use vortex::error::vortex_bail;
107use vortex::expr::Expression;
108use vortex::expr::and as vx_and;
109use vortex::expr::get_item;
110use vortex::expr::pack;
111use vortex::expr::root;
112use vortex::expr::stats::Precision;
113use vortex::expr::transform::replace;
114use vortex::io::session::RuntimeSessionExt;
115use vortex::scan::DataSourceRef;
116use vortex::scan::ScanRequest;
117use vortex::session::VortexSession;
118use vortex_utils::parallelism::get_available_parallelism;
119
120use crate::convert::exprs::DefaultExpressionConvertor;
121use crate::convert::exprs::ExpressionConvertor;
122use crate::convert::exprs::ProcessedProjection;
123use crate::convert::exprs::make_vortex_predicate;
124use crate::convert::stats::stats_set_to_df;
125
126/// Builder for [`VortexDataSource`].
127///
128/// Use the builder to declare how an existing Vortex
129/// [`DataSourceRef`] should appear to DataFusion.
130/// In particular, it lets you choose:
131///
132/// - the Arrow schema DataFusion should see,
133/// - an initial top-level projection if the embedding system already knows
134///   which columns are needed.
135///
136/// The resulting [`VortexDataSource`] is ready to plug into
137/// [`DataSourceExec`] or other DataFusion physical planning code.
138///
139/// # Example
140///
141/// ```no_run
142/// use std::sync::Arc;
143///
144/// use arrow_schema::Schema;
145/// use vortex::VortexSessionDefault;
146/// use vortex::scan::DataSourceRef;
147/// use vortex::session::VortexSession;
148/// use vortex_datafusion::v2::VortexDataSource;
149///
150/// # #[tokio::main]
151/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
152/// # let data_source: DataSourceRef = todo!();
153/// let data_source = VortexDataSource::builder(data_source, VortexSession::default())
154///     .with_arrow_schema(Arc::new(Schema::empty()))
155///     .with_projection(vec![0])
156///     .build()
157///     .await?;
158/// # let _ = data_source;
159/// # Ok(())
160/// # }
161/// ```
162///
163/// [`DataSourceRef`]: vortex::scan::DataSourceRef
164/// [`DataSourceExec`]: datafusion_datasource::source::DataSourceExec
165pub struct VortexDataSourceBuilder {
166    data_source: DataSourceRef,
167    session: VortexSession,
168
169    arrow_schema: Option<SchemaRef>,
170    projection: Option<Vec<usize>>,
171}
172
173impl VortexDataSourceBuilder {
174    /// Sets the Arrow schema exposed to DataFusion.
175    ///
176    /// If not specified, the builder derives an Arrow schema from the Vortex
177    /// dtype.
178    ///
179    /// Note that this schema is not validated against the Vortex DType so any errors will be
180    /// deferred until read time.
181    pub fn with_arrow_schema(mut self, arrow_schema: SchemaRef) -> Self {
182        self.arrow_schema = Some(arrow_schema);
183        self
184    }
185
186    /// Configures an initial top-level projection.
187    ///
188    /// This is useful when the embedding system already knows which columns are
189    /// needed before DataFusion applies its own optimizer pushdown.
190    pub fn with_projection(mut self, indices: Vec<usize>) -> Self {
191        self.projection = Some(indices);
192        self
193    }
194
195    /// Like [`Self::with_projection`], but accepts an optional projection.
196    pub fn with_some_projection(mut self, indices: Option<Vec<usize>>) -> Self {
197        self.projection = indices;
198        self
199    }
200
201    /// Builds the [`VortexDataSource`].
202    ///
203    /// The builder eagerly resolves statistics for the initial projection
204    /// columns because DataFusion expects the `DataSource` to report output
205    /// statistics before execution begins.
206    pub async fn build(self) -> VortexResult<VortexDataSource> {
207        // The projection expression
208        let mut projection = root();
209
210        // Resolve the Arrow schema
211        let mut arrow_schema = match self.arrow_schema {
212            Some(schema) => schema,
213            None => Arc::new(
214                self.session
215                    .arrow()
216                    .to_arrow_schema(self.data_source.dtype())?,
217            ),
218        };
219
220        // Apply any selection and create a projection expression.
221        if let Some(indices) = self.projection {
222            let fields = indices.iter().map(|&i| {
223                let name = arrow_schema.field(i).name().clone();
224                let expr = get_item(name.as_str(), root());
225                (name, expr)
226            });
227
228            // Update the projection expression
229            projection = pack(fields, Nullability::NonNullable);
230
231            // Update the arrow schema
232            arrow_schema = Arc::new(Schema::new(
233                indices
234                    .iter()
235                    .map(|&i| arrow_schema.field(i).clone())
236                    .collect::<Vec<_>>(),
237            ));
238        }
239
240        let DType::Struct(fields, ..) = projection.return_dtype(self.data_source.dtype())? else {
241            vortex_bail!("Projection does not evaluate to a struct");
242        };
243
244        // We now compute initial statistics.
245        let field_paths: Vec<_> = fields
246            .names()
247            .iter()
248            .cloned()
249            .map(FieldPath::from_name)
250            .collect();
251        let statistics = try_join_all(
252            field_paths
253                .iter()
254                .map(|path| self.data_source.field_statistics(path)),
255        )
256        .await?
257        .iter()
258        .zip(fields.fields())
259        .map(|(stats, dtype)| stats_set_to_df(stats, &dtype))
260        .collect::<VortexResult<Vec<_>>>()?;
261
262        Ok(VortexDataSource {
263            data_source: self.data_source,
264            session: self.session,
265            initial_schema: Arc::clone(&arrow_schema),
266            initial_projection: projection.clone(),
267            initial_statistics: statistics.clone(),
268            projected_projection: projection.clone(),
269            projected_schema: Arc::clone(&arrow_schema),
270            projected_statistics: statistics.clone(),
271            leftover_projection: None,
272            leftover_schema: arrow_schema,
273            leftover_statistics: statistics,
274            filter: None,
275            limit: None,
276            ordered: false,
277            num_partitions: get_available_parallelism().unwrap_or(1),
278        })
279    }
280}
281
282impl VortexDataSource {
283    /// Create a builder for a [`VortexDataSource`].
284    pub fn builder(data_source: DataSourceRef, session: VortexSession) -> VortexDataSourceBuilder {
285        VortexDataSourceBuilder {
286            data_source,
287            session,
288            arrow_schema: None,
289            projection: None,
290        }
291    }
292}
293
294/// DataFusion [`DataSource`] backed by a Vortex [`DataSourceRef`].
295///
296/// `VortexDataSource` is the core execution adapter for the `v2` integration.
297/// It presents DataFusion with a scanable Arrow data source while preserving the
298/// underlying Vortex source until execution time.
299///
300/// During planning, it reports the current output schema and column statistics.
301/// During execution, it builds the final Vortex [`ScanRequest`] from the
302/// current projection, pushed filters, ordering hints, and row limit.
303///
304/// This integration intentionally reports a single DataFusion output partition.
305/// Vortex then handles split-level concurrency internally by polling multiple
306/// split streams concurrently.
307///
308/// Use [`crate::VortexSource`] instead when DataFusion should discover and plan
309/// `.vortex` files on its own.
310#[derive(Clone)]
311pub struct VortexDataSource {
312    /// The Vortex data source.
313    data_source: DataSourceRef,
314    /// Vortex session handle.
315    session: VortexSession,
316
317    // --- Phase 1: Initial (from the builder, before any optimizer pushdown) ---
318    /// The Arrow schema of the data source before any DataFusion projection pushdown.
319    initial_schema: SchemaRef,
320    /// The initial Vortex projection expression (e.g. column selection from the builder).
321    initial_projection: Expression,
322    /// Column statistics for the initial projection columns.
323    #[expect(dead_code)]
324    initial_statistics: Vec<ColumnStatistics>,
325
326    // --- Phase 2: Projected (pushed into the Vortex scan) ---
327    /// The Vortex projection expression sent in the [`ScanRequest`].
328    /// Composed with `initial_projection` so it operates on the original source columns.
329    projected_projection: Expression,
330    /// The Arrow schema of the Vortex scan output (before any leftover projection).
331    projected_schema: SchemaRef,
332    /// Column statistics for the projected (scan output) columns.
333    projected_statistics: Vec<ColumnStatistics>,
334
335    // --- Phase 3: Leftover (applied by DataFusion after the scan) ---
336    /// DataFusion projection expressions that could not be pushed into the Vortex scan.
337    /// Applied after converting arrays to record batches in [`DataSource::open`].
338    /// `None` when all projection expressions were successfully pushed down.
339    leftover_projection: Option<ProjectionExprs>,
340    /// The Arrow schema after applying the leftover projection.
341    /// This is the output schema seen by DataFusion.
342    leftover_schema: SchemaRef,
343    /// Column statistics matching `leftover_schema`.
344    leftover_statistics: Vec<ColumnStatistics>,
345
346    /// An optional filter expression.
347    /// Populated by [`DataSource::try_pushdown_filters`] when DataFusion pushes filters down.
348    filter: Option<Expression>,
349    /// An optional row limit populated by [`DataSource::with_fetch`].
350    limit: Option<usize>,
351    /// Whether to preserve the order of the output rows.
352    ordered: bool,
353
354    /// The requested partition count from DataFusion, populated by [`DataSource::repartitioned`].
355    /// We use this as a hint for how many splits to execute concurrently in `open()`, but we
356    /// always declare to DataFusion that we only have a single partition so that we can
357    /// internally manage concurrency and fix the problem of partition skew.
358    num_partitions: usize,
359}
360
361impl fmt::Debug for VortexDataSource {
362    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
363        f.debug_struct("VortexScanSource")
364            .field("schema", &self.leftover_schema)
365            .field("projection", &format!("{}", &self.projected_projection))
366            .field("filter", &self.filter.as_ref().map(|e| format!("{}", e)))
367            .field("limit", &self.limit)
368            .finish()
369    }
370}
371
372impl DataSource for VortexDataSource {
373    fn open(
374        &self,
375        partition: usize,
376        _context: Arc<TaskContext>,
377    ) -> DFResult<SendableRecordBatchStream> {
378        // VortexScanSource always uses a single partition since Vortex handles parallelism
379        // and concurrency internally.
380        if partition != 0 {
381            return Err(DataFusionError::Internal(format!(
382                "VortexScanSource: expected partition 0, got {partition}"
383            )));
384        }
385
386        // Build the scan request with pushed-down projection, filter, and limit.
387        // The projection is included so the scan can prune columns at the I/O level.
388        let scan_request = ScanRequest {
389            projection: self.projected_projection.clone(),
390            filter: self.filter.clone(),
391            limit: self.limit.map(|l| u64::try_from(l).unwrap_or(u64::MAX)),
392            ordered: self.ordered,
393            ..Default::default()
394        };
395
396        let data_source = Arc::clone(&self.data_source);
397        let projected_schema = Arc::clone(&self.projected_schema);
398        let projected_target_field = Arc::new(Field::new_struct(
399            "",
400            projected_schema.fields().clone(),
401            false,
402        ));
403        let session = self.session.clone();
404        let num_partitions = self.num_partitions;
405
406        // Pre-build the leftover projector (if any) so we can apply it after batch conversion.
407        let leftover_projector = self
408            .leftover_projection
409            .as_ref()
410            .map(|proj| proj.make_projector(&self.projected_schema))
411            .transpose()?;
412
413        // Defer the async DataSource::scan() call to the first poll of the stream.
414        let stream = futures::stream::once(async move {
415            let scan = data_source
416                .scan(scan_request)
417                .await
418                .map_err(|e| DataFusionError::External(Box::new(e)))?;
419
420            // Each split.execute() returns a lazy stream whose early polls do preparation
421            // work (expression resolution, layout traversal, first I/O spawns). We use
422            // try_flatten_unordered to poll multiple split streams concurrently so that
423            // the next split is already warm when the current one finishes.
424            let scan_streams = scan.partitions().map(|split_result| {
425                let split = split_result?;
426                split.execute()
427            });
428
429            let handle = session.handle();
430            let stream = scan_streams
431                .try_flatten_unordered(Some(num_partitions * 2))
432                .map(move |result| {
433                    let session = session.clone();
434                    let target_field = Arc::clone(&projected_target_field);
435                    handle.spawn_cpu(move || {
436                        let mut ctx = session.create_execution_ctx();
437                        result.and_then(|chunk| {
438                            let arrow = session.arrow().execute_arrow(
439                                chunk,
440                                Some(target_field.as_ref()),
441                                &mut ctx,
442                            )?;
443                            Ok(RecordBatch::from(arrow.as_struct().clone()))
444                        })
445                    })
446                })
447                .buffered(num_partitions)
448                .map(|result| result.map_err(|e| DataFusionError::External(Box::new(e))));
449
450            // Apply leftover projection (expressions that couldn't be pushed into Vortex).
451            let stream = if let Some(projector) = leftover_projector {
452                stream
453                    .map(move |batch_result| {
454                        batch_result.and_then(|batch| projector.project_batch(&batch))
455                    })
456                    .boxed()
457            } else {
458                stream.boxed()
459            };
460
461            Ok::<_, DataFusionError>(stream)
462        })
463        .try_flatten();
464
465        Ok(Box::pin(RecordBatchStreamAdapter::new(
466            Arc::clone(&self.leftover_schema),
467            stream,
468        )))
469    }
470
471    fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
472        write!(
473            f,
474            "VortexScanSource: projection={}",
475            self.projected_projection
476        )?;
477        if let Some(filter) = &self.filter {
478            write!(f, ", filter={filter}")?;
479        }
480        if let Some(limit) = self.limit {
481            write!(f, ", limit={limit}")?;
482        }
483        Ok(())
484    }
485
486    fn repartitioned(
487        &self,
488        target_partitions: usize,
489        _repartition_file_min_size: usize,
490        output_ordering: Option<LexOrdering>,
491    ) -> DFResult<Option<Arc<dyn DataSource>>> {
492        // Vortex handles parallelism internally — always use a single partition.
493        let mut this = self.clone();
494        this.num_partitions = target_partitions;
495        this.ordered |= output_ordering.is_some();
496        Ok(Some(Arc::new(this)))
497    }
498
499    fn output_partitioning(&self) -> Partitioning {
500        Partitioning::UnknownPartitioning(1)
501    }
502
503    fn eq_properties(&self) -> EquivalenceProperties {
504        EquivalenceProperties::new(Arc::clone(&self.leftover_schema))
505    }
506
507    fn partition_statistics(&self, _partition: Option<usize>) -> DFResult<Arc<Statistics>> {
508        // FIXME(ngates): this should be adjusted based on filters. See DuckDB for heuristics,
509        //  and in the future, store the selectivity stats in the session.
510        let num_rows = estimate_to_df_precision(&self.data_source.row_count());
511
512        // FIXME(ngates): byte size should be adjusted for the initial projection...
513        let total_byte_size = estimate_to_df_precision(&self.data_source.byte_size());
514
515        // Column statistics must match the output schema (leftover_schema), which may differ
516        // from the initial schema after try_swapping_with_projection adds computed columns.
517        let column_statistics = self.leftover_statistics.clone();
518
519        Ok(Arc::new(Statistics {
520            num_rows,
521            total_byte_size,
522            column_statistics,
523        }))
524    }
525
526    fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn DataSource>> {
527        let mut this = self.clone();
528        this.limit = limit;
529        Some(Arc::new(this))
530    }
531
532    fn fetch(&self) -> Option<usize> {
533        self.limit
534    }
535
536    // Note that we're explicitly "swapping" the projection. That means everything we do must
537    // be computed over the original input schema, rather than the projected output schema.
538    fn try_swapping_with_projection(
539        &self,
540        projection: &ProjectionExprs,
541    ) -> DFResult<Option<Arc<dyn DataSource>>> {
542        tracing::debug!(
543            "VortexScanSource: trying to swap with projection: {}",
544            projection
545        );
546
547        let convertor = DefaultExpressionConvertor::default();
548        let input_schema = self.initial_schema.as_ref();
549        let projected_schema = projection.project_schema(input_schema)?;
550
551        // Use the shared ExpressionConvertor to split the projection into a Vortex
552        // scan_projection and a leftover DataFusion projection for expressions that
553        // can't be pushed down (e.g., unsupported scalar functions, decimal binary).
554        let ProcessedProjection {
555            scan_projection,
556            leftover_projection,
557        } = convertor.split_projection(projection.clone(), input_schema, &projected_schema)?;
558
559        // Compose with the initial projection so the scan operates on the original
560        // source columns, not the initial projection's output columns.
561        let scan_projection = replace(scan_projection, &root(), self.initial_projection.clone());
562
563        // Compute the scan output schema from the Vortex expression's return dtype.
564        let scan_dtype = scan_projection
565            .return_dtype(self.data_source.dtype())
566            .map_err(|e| DataFusionError::External(Box::new(e)))?;
567        let scan_output_schema = Arc::new(
568            self.session
569                .arrow()
570                .to_arrow_schema(&scan_dtype)
571                .map_err(|e| DataFusionError::External(Box::new(e)))?,
572        );
573
574        // Remap the leftover column references to match the scan output schema.
575        let leftover_projection = leftover_projection
576            .try_map_exprs(|expr| reassign_expr_columns(expr, &scan_output_schema))?;
577
578        let final_schema = Arc::new(projected_schema);
579
580        let mut this = self.clone();
581        this.projected_projection = scan_projection;
582        this.projected_schema = Arc::clone(&scan_output_schema);
583        this.projected_statistics =
584            vec![ColumnStatistics::new_unknown(); scan_output_schema.fields().len()];
585        this.leftover_projection = Some(leftover_projection);
586        this.leftover_schema = Arc::clone(&final_schema);
587        this.leftover_statistics =
588            vec![ColumnStatistics::new_unknown(); final_schema.fields().len()];
589
590        Ok(Some(Arc::new(this)))
591    }
592
593    fn try_pushdown_filters(
594        &self,
595        filters: Vec<Arc<dyn PhysicalExpr>>,
596        _config: &datafusion_common::config::ConfigOptions,
597    ) -> DFResult<FilterPushdownPropagation<Arc<dyn DataSource>>> {
598        if filters.is_empty() {
599            return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
600                vec![],
601            ));
602        }
603
604        let convertor = DefaultExpressionConvertor::default();
605        let input_schema = self.initial_schema.as_ref();
606
607        // Classify each filter: pushable filters are passed into the ScanRequest in open(),
608        // so we can safely claim PushedDown::Yes for them.
609        let pushdown_results: Vec<PushedDown> = filters
610            .iter()
611            .map(|expr| {
612                if convertor.can_be_pushed_down(expr, input_schema) {
613                    PushedDown::Yes
614                } else {
615                    PushedDown::No
616                }
617            })
618            .collect();
619
620        // If nothing can be pushed down, return early.
621        if pushdown_results.iter().all(|p| matches!(p, PushedDown::No)) {
622            return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
623                pushdown_results,
624            ));
625        }
626
627        // Collect the pushable filter expressions.
628        let pushable: Vec<Arc<dyn PhysicalExpr>> = filters
629            .iter()
630            .zip(pushdown_results.iter())
631            .filter_map(|(expr, pushed)| match pushed {
632                PushedDown::Yes => Some(Arc::clone(expr)),
633                PushedDown::No => None,
634            })
635            .collect();
636
637        // Convert to Vortex conjunction.
638        let vortex_pred = make_vortex_predicate(&convertor, &pushable)?;
639
640        // Combine with existing filter.
641        let new_filter = match (&self.filter, vortex_pred) {
642            (Some(existing), Some(new_pred)) => Some(vx_and(existing.clone(), new_pred)),
643            (Some(existing), None) => Some(existing.clone()),
644            (None, Some(new_pred)) => Some(new_pred),
645            (None, None) => None,
646        };
647
648        let mut this = self.clone();
649        this.filter = new_filter;
650        Ok(
651            FilterPushdownPropagation::with_parent_pushdown_result(pushdown_results)
652                .with_updated_node(Arc::new(this) as _),
653        )
654    }
655}
656
657/// Convert a Vortex [`Option<Precision>`] to a DataFusion
658/// [`DataFusionPrecision`].
659///
660/// [`DataFusionPrecision`]: datafusion_common::stats::Precision
661fn estimate_to_df_precision(est: &Precision<u64>) -> DFPrecision<usize> {
662    match est {
663        Precision::Exact(v) => DFPrecision::Exact(usize::try_from(*v).unwrap_or(usize::MAX)),
664        Precision::Inexact(v) => DFPrecision::Inexact(usize::try_from(*v).unwrap_or(usize::MAX)),
665        Precision::Absent => DFPrecision::Absent,
666    }
667}