exon/datasources/gff/
scanner.rs

1// Copyright 2024 WHERE TRUE Technologies.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{any::Any, sync::Arc};
16
17use arrow::datatypes::SchemaRef;
18use datafusion::{
19    common::Statistics,
20    datasource::{
21        file_format::file_compression_type::FileCompressionType,
22        physical_plan::{FileScanConfig, FileStream},
23    },
24    physical_plan::{
25        metrics::ExecutionPlanMetricsSet, DisplayAs, DisplayFormatType, ExecutionPlan,
26        PlanProperties, SendableRecordBatchStream,
27    },
28};
29use exon_gff::GFFConfig;
30
31use crate::datasources::ExonFileScanConfig;
32
33use super::file_opener::GFFOpener;
34
35#[derive(Debug, Clone)]
36/// Implements a datafusion `ExecutionPlan` for GFF files.
37pub struct GFFScan {
38    /// The base configuration for the file scan.
39    base_config: FileScanConfig,
40
41    /// The projected schema for the scan.
42    projected_schema: SchemaRef,
43
44    /// The compression type of the file.
45    file_compression_type: FileCompressionType,
46
47    /// Metrics for the execution plan.
48    metrics: ExecutionPlanMetricsSet,
49
50    /// The plan properties cache.
51    properties: PlanProperties,
52
53    /// The statistics for the scan.
54    statistics: Statistics,
55}
56
57impl GFFScan {
58    /// Create a new GFF scan.
59    pub fn new(base_config: FileScanConfig, file_compression_type: FileCompressionType) -> Self {
60        let (projected_schema, statistics, properties) = base_config.project_with_properties();
61
62        Self {
63            base_config,
64            projected_schema,
65            file_compression_type,
66            metrics: ExecutionPlanMetricsSet::new(),
67            properties,
68            statistics,
69        }
70    }
71}
72
73impl DisplayAs for GFFScan {
74    fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
75        write!(f, "GFFScan")
76    }
77}
78
79impl ExecutionPlan for GFFScan {
80    fn as_any(&self) -> &dyn Any {
81        self
82    }
83
84    fn name(&self) -> &str {
85        "GFFScan"
86    }
87
88    fn properties(&self) -> &PlanProperties {
89        &self.properties
90    }
91
92    fn statistics(&self) -> datafusion::error::Result<Statistics> {
93        Ok(self.statistics.clone())
94    }
95
96    fn repartitioned(
97        &self,
98        target_partitions: usize,
99        _config: &datafusion::config::ConfigOptions,
100    ) -> datafusion::error::Result<Option<Arc<dyn ExecutionPlan>>> {
101        if target_partitions == 1 || self.base_config.file_groups.is_empty() {
102            return Ok(None);
103        }
104
105        let file_groups = self.base_config.regroup_files_by_size(target_partitions);
106
107        let mut new_plan = self.clone();
108        new_plan.base_config.file_groups = file_groups;
109
110        new_plan.properties = new_plan.properties.with_partitioning(
111            datafusion::physical_plan::Partitioning::UnknownPartitioning(
112                new_plan.base_config.file_groups.len(),
113            ),
114        );
115
116        Ok(Some(Arc::new(new_plan)))
117    }
118
119    fn schema(&self) -> SchemaRef {
120        Arc::clone(&self.projected_schema)
121    }
122
123    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
124        vec![]
125    }
126
127    fn with_new_children(
128        self: Arc<Self>,
129        _children: Vec<Arc<dyn ExecutionPlan>>,
130    ) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
131        Ok(self)
132    }
133
134    fn execute(
135        &self,
136        partition: usize,
137        context: Arc<datafusion::execution::context::TaskContext>,
138    ) -> datafusion::error::Result<datafusion::physical_plan::SendableRecordBatchStream> {
139        let object_store = context
140            .runtime_env()
141            .object_store(&self.base_config.object_store_url)?;
142
143        let config = GFFConfig::new(object_store, Arc::clone(&self.base_config.file_schema))
144            .with_batch_size(context.session_config().batch_size())
145            .with_projection(self.base_config.file_projection());
146
147        let opener = GFFOpener::new(Arc::new(config), self.file_compression_type);
148
149        // this should have the pc_projector, which would project the scalar fields from the PartitionFile to the RecordBatch
150        let stream = FileStream::new(&self.base_config, partition, opener, &self.metrics)?;
151
152        Ok(Box::pin(stream) as SendableRecordBatchStream)
153    }
154}