exon/datasources/gff/
scanner.rs1use std::{any::Any, sync::Arc};
16
17use arrow::datatypes::SchemaRef;
18use datafusion::{
19 common::Statistics,
20 datasource::{
21 file_format::file_compression_type::FileCompressionType,
22 physical_plan::{FileScanConfig, FileStream},
23 },
24 physical_plan::{
25 metrics::ExecutionPlanMetricsSet, DisplayAs, DisplayFormatType, ExecutionPlan,
26 PlanProperties, SendableRecordBatchStream,
27 },
28};
29use exon_gff::GFFConfig;
30
31use crate::datasources::ExonFileScanConfig;
32
33use super::file_opener::GFFOpener;
34
35#[derive(Debug, Clone)]
36pub struct GFFScan {
38 base_config: FileScanConfig,
40
41 projected_schema: SchemaRef,
43
44 file_compression_type: FileCompressionType,
46
47 metrics: ExecutionPlanMetricsSet,
49
50 properties: PlanProperties,
52
53 statistics: Statistics,
55}
56
57impl GFFScan {
58 pub fn new(base_config: FileScanConfig, file_compression_type: FileCompressionType) -> Self {
60 let (projected_schema, statistics, properties) = base_config.project_with_properties();
61
62 Self {
63 base_config,
64 projected_schema,
65 file_compression_type,
66 metrics: ExecutionPlanMetricsSet::new(),
67 properties,
68 statistics,
69 }
70 }
71}
72
73impl DisplayAs for GFFScan {
74 fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
75 write!(f, "GFFScan")
76 }
77}
78
79impl ExecutionPlan for GFFScan {
80 fn as_any(&self) -> &dyn Any {
81 self
82 }
83
84 fn name(&self) -> &str {
85 "GFFScan"
86 }
87
88 fn properties(&self) -> &PlanProperties {
89 &self.properties
90 }
91
92 fn statistics(&self) -> datafusion::error::Result<Statistics> {
93 Ok(self.statistics.clone())
94 }
95
96 fn repartitioned(
97 &self,
98 target_partitions: usize,
99 _config: &datafusion::config::ConfigOptions,
100 ) -> datafusion::error::Result<Option<Arc<dyn ExecutionPlan>>> {
101 if target_partitions == 1 || self.base_config.file_groups.is_empty() {
102 return Ok(None);
103 }
104
105 let file_groups = self.base_config.regroup_files_by_size(target_partitions);
106
107 let mut new_plan = self.clone();
108 new_plan.base_config.file_groups = file_groups;
109
110 new_plan.properties = new_plan.properties.with_partitioning(
111 datafusion::physical_plan::Partitioning::UnknownPartitioning(
112 new_plan.base_config.file_groups.len(),
113 ),
114 );
115
116 Ok(Some(Arc::new(new_plan)))
117 }
118
119 fn schema(&self) -> SchemaRef {
120 Arc::clone(&self.projected_schema)
121 }
122
123 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
124 vec![]
125 }
126
127 fn with_new_children(
128 self: Arc<Self>,
129 _children: Vec<Arc<dyn ExecutionPlan>>,
130 ) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
131 Ok(self)
132 }
133
134 fn execute(
135 &self,
136 partition: usize,
137 context: Arc<datafusion::execution::context::TaskContext>,
138 ) -> datafusion::error::Result<datafusion::physical_plan::SendableRecordBatchStream> {
139 let object_store = context
140 .runtime_env()
141 .object_store(&self.base_config.object_store_url)?;
142
143 let config = GFFConfig::new(object_store, Arc::clone(&self.base_config.file_schema))
144 .with_batch_size(context.session_config().batch_size())
145 .with_projection(self.base_config.file_projection());
146
147 let opener = GFFOpener::new(Arc::new(config), self.file_compression_type);
148
149 let stream = FileStream::new(&self.base_config, partition, opener, &self.metrics)?;
151
152 Ok(Box::pin(stream) as SendableRecordBatchStream)
153 }
154}