vortex_datafusion/v2/
table.rs1use std::any::Any;
8use std::fmt;
9use std::sync::Arc;
10
11use arrow_schema::SchemaRef;
12use async_trait::async_trait;
13use datafusion_catalog::Session;
14use datafusion_catalog::TableProvider;
15use datafusion_common::ColumnStatistics;
16use datafusion_common::DataFusionError;
17use datafusion_common::Result as DFResult;
18use datafusion_common::Statistics;
19use datafusion_common::stats::Precision;
20use datafusion_datasource::source::DataSourceExec;
21use datafusion_expr::Expr;
22use datafusion_expr::TableType;
23use datafusion_physical_plan::ExecutionPlan;
24use vortex::scan::DataSourceRef;
25use vortex::session::VortexSession;
26
27use crate::v2::source::VortexDataSource;
28
29pub struct VortexTable {
31 data_source: DataSourceRef,
32 session: VortexSession,
33 arrow_schema: SchemaRef,
34}
35
36impl fmt::Debug for VortexTable {
37 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
38 f.debug_struct("VortexTable")
39 .field("schema", &self.arrow_schema)
40 .finish()
41 }
42}
43
44impl VortexTable {
45 pub fn new(
50 data_source: DataSourceRef,
51 session: VortexSession,
52 arrow_schema: SchemaRef,
53 ) -> Self {
54 Self {
55 data_source,
56 session,
57 arrow_schema,
58 }
59 }
60}
61
62#[async_trait]
63impl TableProvider for VortexTable {
64 fn as_any(&self) -> &dyn Any {
65 self
66 }
67
68 fn schema(&self) -> SchemaRef {
69 Arc::clone(&self.arrow_schema)
70 }
71
72 fn table_type(&self) -> TableType {
73 TableType::Base
74 }
75
76 async fn scan(
77 &self,
78 _state: &dyn Session,
79 projection: Option<&Vec<usize>>,
80 _filters: &[Expr],
81 _limit: Option<usize>,
82 ) -> DFResult<Arc<dyn ExecutionPlan>> {
83 let data_source =
85 VortexDataSource::builder(Arc::clone(&self.data_source), self.session.clone())
86 .with_arrow_schema(Arc::clone(&self.arrow_schema))
87 .with_some_projection(projection.cloned())
90 .build()
99 .await
100 .map_err(|e| DataFusionError::External(Box::new(e)))?;
101
102 Ok(DataSourceExec::from_data_source(data_source))
103 }
104
105 fn statistics(&self) -> Option<Statistics> {
113 let num_rows = match self.data_source.row_count() {
114 Some(vortex::expr::stats::Precision::Exact(v)) => {
115 usize::try_from(v).map(Precision::Exact).unwrap_or_default()
116 }
117 _ => Precision::Absent,
118 };
119
120 let total_byte_size = match self.data_source.byte_size() {
121 Some(vortex::expr::stats::Precision::Exact(v)) => {
122 usize::try_from(v).map(Precision::Exact).unwrap_or_default()
123 }
124 _ => Precision::Absent,
125 };
126
127 let column_statistics =
128 vec![ColumnStatistics::new_unknown(); self.arrow_schema.fields.len()];
129
130 Some(Statistics {
131 num_rows,
132 total_byte_size,
133 column_statistics,
134 })
135 }
136}