exon/datasources/gtf/
table_provider.rs

1// Copyright 2023 WHERE TRUE Technologies.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{any::Any, sync::Arc};
16
17use arrow::datatypes::{Field, Schema, SchemaRef};
18use async_trait::async_trait;
19use datafusion::{
20    catalog::Session,
21    datasource::{
22        file_format::file_compression_type::FileCompressionType, physical_plan::FileScanConfig,
23        TableProvider,
24    },
25    error::Result,
26    logical_expr::{TableProviderFilterPushDown, TableType},
27    physical_plan::{empty::EmptyExec, ExecutionPlan},
28    prelude::Expr,
29};
30use exon_common::TableSchema;
31use exon_gtf::new_gtf_schema_builder;
32use futures::TryStreamExt;
33
34use crate::{
35    datasources::{
36        exon_listing_table_options::{ExonListingConfig, ExonListingOptions},
37        hive_partition::filter_matches_partition_cols,
38        ExonFileType,
39    },
40    physical_plan::{
41        file_scan_config_builder::FileScanConfigBuilder, object_store::pruned_partition_list,
42    },
43};
44
45use super::GTFScan;
46
47#[derive(Debug, Clone)]
48/// Listing options for a GTF table
49pub struct ListingGTFTableOptions {
50    /// The file extension for the table including the compression type
51    file_extension: String,
52
53    /// The compression type of the file
54    file_compression_type: FileCompressionType,
55
56    /// The partition columns
57    table_partition_cols: Vec<Field>,
58}
59
60#[async_trait]
61impl ExonListingOptions for ListingGTFTableOptions {
62    fn table_partition_cols(&self) -> &[Field] {
63        &self.table_partition_cols
64    }
65
66    fn file_extension(&self) -> &str {
67        &self.file_extension
68    }
69
70    fn file_compression_type(&self) -> FileCompressionType {
71        self.file_compression_type
72    }
73
74    async fn create_physical_plan(
75        &self,
76        conf: FileScanConfig,
77    ) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
78        let scan = GTFScan::new(conf.clone(), self.file_compression_type);
79
80        Ok(Arc::new(scan))
81    }
82}
83
84impl Default for ListingGTFTableOptions {
85    fn default() -> Self {
86        Self::new(FileCompressionType::UNCOMPRESSED)
87    }
88}
89
90impl ListingGTFTableOptions {
91    /// Create a new set of options
92    pub fn new(file_compression_type: FileCompressionType) -> Self {
93        let file_extension = ExonFileType::GTF.get_file_extension(file_compression_type);
94
95        Self {
96            file_extension,
97            file_compression_type,
98            table_partition_cols: Vec::new(),
99        }
100    }
101
102    /// Set the table partition columns
103    pub fn with_table_partition_cols(self, table_partition_cols: Vec<Field>) -> Self {
104        Self {
105            table_partition_cols,
106            ..self
107        }
108    }
109
110    /// Infer the schema for the table
111    pub fn infer_schema(&self) -> TableSchema {
112        let builder =
113            new_gtf_schema_builder().add_partition_fields(self.table_partition_cols.clone());
114
115        builder.build()
116    }
117}
118
119#[derive(Debug, Clone)]
120/// A GTF listing table
121pub struct ListingGTFTable<T> {
122    table_schema: TableSchema,
123
124    config: ExonListingConfig<T>,
125}
126
127impl<T> ListingGTFTable<T> {
128    /// Create a new VCF listing table
129    pub fn new(config: ExonListingConfig<T>, table_schema: TableSchema) -> Self {
130        Self {
131            config,
132            table_schema,
133        }
134    }
135}
136
137#[async_trait]
138impl<T: ExonListingOptions + 'static> TableProvider for ListingGTFTable<T> {
139    fn as_any(&self) -> &dyn Any {
140        self
141    }
142
143    fn schema(&self) -> SchemaRef {
144        Arc::clone(&self.table_schema.table_schema())
145    }
146
147    fn table_type(&self) -> TableType {
148        TableType::Base
149    }
150
151    fn supports_filters_pushdown(
152        &self,
153        filters: &[&Expr],
154    ) -> Result<Vec<TableProviderFilterPushDown>> {
155        Ok(filters
156            .iter()
157            .map(|f| filter_matches_partition_cols(f, self.config.options.table_partition_cols()))
158            .collect())
159    }
160
161    async fn scan(
162        &self,
163        state: &dyn Session,
164        projection: Option<&Vec<usize>>,
165        filters: &[Expr],
166        limit: Option<usize>,
167    ) -> Result<Arc<dyn ExecutionPlan>> {
168        let url = if let Some(url) = self.config.inner.table_paths.first() {
169            url
170        } else {
171            return Ok(Arc::new(EmptyExec::new(Arc::new(Schema::empty()))));
172        };
173
174        let object_store = state.runtime_env().object_store(url.object_store())?;
175
176        let file_list = pruned_partition_list(
177            &object_store,
178            url,
179            filters,
180            self.config.options.file_extension(),
181            self.config.options.table_partition_cols(),
182        )
183        .await?
184        .try_collect::<Vec<_>>()
185        .await?;
186
187        let file_schema = self.table_schema.file_schema()?;
188        let file_scan_config =
189            FileScanConfigBuilder::new(url.object_store(), file_schema, vec![file_list])
190                .projection_option(projection.cloned())
191                .table_partition_cols(self.config.options.table_partition_cols().to_vec())
192                .limit_option(limit)
193                .build();
194
195        let plan = self
196            .config
197            .options
198            .create_physical_plan(file_scan_config)
199            .await?;
200
201        Ok(plan)
202    }
203}