exon/datasources/gtf/
table_provider.rs1use std::{any::Any, sync::Arc};
16
17use arrow::datatypes::{Field, Schema, SchemaRef};
18use async_trait::async_trait;
19use datafusion::{
20 catalog::Session,
21 datasource::{
22 file_format::file_compression_type::FileCompressionType, physical_plan::FileScanConfig,
23 TableProvider,
24 },
25 error::Result,
26 logical_expr::{TableProviderFilterPushDown, TableType},
27 physical_plan::{empty::EmptyExec, ExecutionPlan},
28 prelude::Expr,
29};
30use exon_common::TableSchema;
31use exon_gtf::new_gtf_schema_builder;
32use futures::TryStreamExt;
33
34use crate::{
35 datasources::{
36 exon_listing_table_options::{ExonListingConfig, ExonListingOptions},
37 hive_partition::filter_matches_partition_cols,
38 ExonFileType,
39 },
40 physical_plan::{
41 file_scan_config_builder::FileScanConfigBuilder, object_store::pruned_partition_list,
42 },
43};
44
45use super::GTFScan;
46
47#[derive(Debug, Clone)]
48pub struct ListingGTFTableOptions {
50 file_extension: String,
52
53 file_compression_type: FileCompressionType,
55
56 table_partition_cols: Vec<Field>,
58}
59
60#[async_trait]
61impl ExonListingOptions for ListingGTFTableOptions {
62 fn table_partition_cols(&self) -> &[Field] {
63 &self.table_partition_cols
64 }
65
66 fn file_extension(&self) -> &str {
67 &self.file_extension
68 }
69
70 fn file_compression_type(&self) -> FileCompressionType {
71 self.file_compression_type
72 }
73
74 async fn create_physical_plan(
75 &self,
76 conf: FileScanConfig,
77 ) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
78 let scan = GTFScan::new(conf.clone(), self.file_compression_type);
79
80 Ok(Arc::new(scan))
81 }
82}
83
84impl Default for ListingGTFTableOptions {
85 fn default() -> Self {
86 Self::new(FileCompressionType::UNCOMPRESSED)
87 }
88}
89
90impl ListingGTFTableOptions {
91 pub fn new(file_compression_type: FileCompressionType) -> Self {
93 let file_extension = ExonFileType::GTF.get_file_extension(file_compression_type);
94
95 Self {
96 file_extension,
97 file_compression_type,
98 table_partition_cols: Vec::new(),
99 }
100 }
101
102 pub fn with_table_partition_cols(self, table_partition_cols: Vec<Field>) -> Self {
104 Self {
105 table_partition_cols,
106 ..self
107 }
108 }
109
110 pub fn infer_schema(&self) -> TableSchema {
112 let builder =
113 new_gtf_schema_builder().add_partition_fields(self.table_partition_cols.clone());
114
115 builder.build()
116 }
117}
118
119#[derive(Debug, Clone)]
120pub struct ListingGTFTable<T> {
122 table_schema: TableSchema,
123
124 config: ExonListingConfig<T>,
125}
126
127impl<T> ListingGTFTable<T> {
128 pub fn new(config: ExonListingConfig<T>, table_schema: TableSchema) -> Self {
130 Self {
131 config,
132 table_schema,
133 }
134 }
135}
136
137#[async_trait]
138impl<T: ExonListingOptions + 'static> TableProvider for ListingGTFTable<T> {
139 fn as_any(&self) -> &dyn Any {
140 self
141 }
142
143 fn schema(&self) -> SchemaRef {
144 Arc::clone(&self.table_schema.table_schema())
145 }
146
147 fn table_type(&self) -> TableType {
148 TableType::Base
149 }
150
151 fn supports_filters_pushdown(
152 &self,
153 filters: &[&Expr],
154 ) -> Result<Vec<TableProviderFilterPushDown>> {
155 Ok(filters
156 .iter()
157 .map(|f| filter_matches_partition_cols(f, self.config.options.table_partition_cols()))
158 .collect())
159 }
160
161 async fn scan(
162 &self,
163 state: &dyn Session,
164 projection: Option<&Vec<usize>>,
165 filters: &[Expr],
166 limit: Option<usize>,
167 ) -> Result<Arc<dyn ExecutionPlan>> {
168 let url = if let Some(url) = self.config.inner.table_paths.first() {
169 url
170 } else {
171 return Ok(Arc::new(EmptyExec::new(Arc::new(Schema::empty()))));
172 };
173
174 let object_store = state.runtime_env().object_store(url.object_store())?;
175
176 let file_list = pruned_partition_list(
177 &object_store,
178 url,
179 filters,
180 self.config.options.file_extension(),
181 self.config.options.table_partition_cols(),
182 )
183 .await?
184 .try_collect::<Vec<_>>()
185 .await?;
186
187 let file_schema = self.table_schema.file_schema()?;
188 let file_scan_config =
189 FileScanConfigBuilder::new(url.object_store(), file_schema, vec![file_list])
190 .projection_option(projection.cloned())
191 .table_partition_cols(self.config.options.table_partition_cols().to_vec())
192 .limit_option(limit)
193 .build();
194
195 let plan = self
196 .config
197 .options
198 .create_physical_plan(file_scan_config)
199 .await?;
200
201 Ok(plan)
202 }
203}