vortex_datafusion/v2/
table.rs1use std::any::Any;
8use std::fmt;
9use std::sync::Arc;
10
11use arrow_schema::SchemaRef;
12use async_trait::async_trait;
13use datafusion_catalog::Session;
14use datafusion_catalog::TableProvider;
15use datafusion_common::ColumnStatistics;
16use datafusion_common::DataFusionError;
17use datafusion_common::Result as DFResult;
18use datafusion_common::Statistics;
19use datafusion_common::stats::Precision;
20use datafusion_datasource::source::DataSourceExec;
21use datafusion_expr::Expr;
22use datafusion_expr::TableType;
23use datafusion_physical_plan::ExecutionPlan;
24use vortex::scan::DataSourceRef;
25use vortex::session::VortexSession;
26
27use crate::v2::source::VortexDataSource;
28
29pub struct VortexTable {
31 data_source: DataSourceRef,
32 session: VortexSession,
33 arrow_schema: SchemaRef,
34}
35
36impl fmt::Debug for VortexTable {
37 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
38 f.debug_struct("VortexTable")
39 .field("schema", &self.arrow_schema)
40 .finish()
41 }
42}
43
44impl VortexTable {
45 pub fn new(
50 data_source: DataSourceRef,
51 session: VortexSession,
52 arrow_schema: SchemaRef,
53 ) -> Self {
54 Self {
55 data_source,
56 session,
57 arrow_schema,
58 }
59 }
60}
61
62#[async_trait]
63impl TableProvider for VortexTable {
64 fn as_any(&self) -> &dyn Any {
65 self
66 }
67
68 fn schema(&self) -> SchemaRef {
69 self.arrow_schema.clone()
70 }
71
72 fn table_type(&self) -> TableType {
73 TableType::Base
74 }
75
76 async fn scan(
77 &self,
78 _state: &dyn Session,
79 projection: Option<&Vec<usize>>,
80 _filters: &[Expr],
81 _limit: Option<usize>,
82 ) -> DFResult<Arc<dyn ExecutionPlan>> {
83 let data_source = VortexDataSource::builder(self.data_source.clone(), self.session.clone())
85 .with_arrow_schema(self.arrow_schema.clone())
86 .with_some_projection(projection.cloned())
89 .build()
98 .await
99 .map_err(|e| DataFusionError::External(Box::new(e)))?;
100
101 Ok(DataSourceExec::from_data_source(data_source))
102 }
103
104 fn statistics(&self) -> Option<Statistics> {
112 let num_rows = match self.data_source.row_count() {
113 Some(vortex::expr::stats::Precision::Exact(v)) => {
114 usize::try_from(v).map(Precision::Exact).unwrap_or_default()
115 }
116 _ => Precision::Absent,
117 };
118
119 let total_byte_size = match self.data_source.byte_size() {
120 Some(vortex::expr::stats::Precision::Exact(v)) => {
121 usize::try_from(v).map(Precision::Exact).unwrap_or_default()
122 }
123 _ => Precision::Absent,
124 };
125
126 let column_statistics =
127 vec![ColumnStatistics::new_unknown(); self.arrow_schema.fields.len()];
128
129 Some(Statistics {
130 num_rows,
131 total_byte_size,
132 column_statistics,
133 })
134 }
135}