use std::{any::Any, sync::Arc};
use crate::datasources::ExonFileScanConfig;
use arrow::datatypes::SchemaRef;
use datafusion::{
common::Statistics,
datasource::{
file_format::file_compression_type::FileCompressionType,
physical_plan::{FileScanConfig, FileStream},
},
error::Result as DataFusionResult,
execution::SendableRecordBatchStream,
physical_plan::{
metrics::ExecutionPlanMetricsSet, DisplayAs, DisplayFormatType, ExecutionPlan,
PlanProperties,
},
};
use exon_fasta::FASTAConfig;
use super::indexed_file_opener::IndexedFASTAOpener;
#[derive(Debug, Clone)]
pub struct IndexedFASTAScanner {
base_config: FileScanConfig,
file_compression_type: FileCompressionType,
projected_schema: SchemaRef,
metrics: ExecutionPlanMetricsSet,
fasta_sequence_buffer_capacity: usize,
properties: PlanProperties,
statistics: Statistics,
}
impl IndexedFASTAScanner {
pub fn new(
base_config: FileScanConfig,
file_compression_type: FileCompressionType,
fasta_sequence_buffer_capacity: usize,
) -> Self {
let (projected_schema, statistics, properties) = base_config.project_with_properties();
Self {
base_config,
projected_schema,
file_compression_type,
metrics: ExecutionPlanMetricsSet::new(),
fasta_sequence_buffer_capacity,
properties,
statistics,
}
}
}
impl DisplayAs for IndexedFASTAScanner {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
let repr = format!(
"IndexedFASTAScanner: {}",
self.base_config.file_groups.len()
);
write!(f, "{}", repr)
}
}
impl ExecutionPlan for IndexedFASTAScanner {
fn as_any(&self) -> &dyn Any {
self
}
fn name(&self) -> &str {
"IndexedFASTAScanner"
}
fn schema(&self) -> SchemaRef {
Arc::clone(&self.projected_schema)
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![]
}
fn properties(&self) -> &PlanProperties {
&self.properties
}
fn statistics(&self) -> datafusion::error::Result<Statistics> {
Ok(self.statistics.clone())
}
fn with_new_children(
self: Arc<Self>,
_children: Vec<Arc<dyn ExecutionPlan>>,
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
Ok(self)
}
fn repartitioned(
&self,
target_partitions: usize,
_config: &datafusion::config::ConfigOptions,
) -> DataFusionResult<Option<Arc<dyn ExecutionPlan>>> {
if target_partitions == 1 || self.base_config.file_groups.is_empty() {
return Ok(None);
}
let file_groups = self.base_config.regroup_files_by_size(target_partitions);
let mut new_plan = self.clone();
new_plan.base_config.file_groups = file_groups;
new_plan.properties = new_plan.properties.with_partitioning(
datafusion::physical_plan::Partitioning::UnknownPartitioning(
new_plan.base_config.file_groups.len(),
),
);
Ok(Some(Arc::new(new_plan)))
}
fn execute(
&self,
partition: usize,
context: Arc<datafusion::execution::context::TaskContext>,
) -> DataFusionResult<datafusion::physical_plan::SendableRecordBatchStream> {
let object_store = context
.runtime_env()
.object_store(&self.base_config.object_store_url)?;
let config = FASTAConfig::new(object_store, Arc::clone(&self.base_config.file_schema))
.with_batch_size(context.session_config().batch_size())
.with_fasta_sequence_buffer_capacity(self.fasta_sequence_buffer_capacity)
.with_projection(self.base_config.file_projection());
let opener = IndexedFASTAOpener::new(Arc::new(config), self.file_compression_type);
let stream = FileStream::new(&self.base_config, partition, opener, &self.metrics)?;
Ok(Box::pin(stream) as SendableRecordBatchStream)
}
}