datafusion_datasource/
source.rs1use std::any::Any;
21use std::fmt;
22use std::fmt::{Debug, Formatter};
23use std::sync::Arc;
24
25use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
26use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
27use datafusion_physical_plan::projection::ProjectionExec;
28use datafusion_physical_plan::{
29 DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
30};
31
32use crate::file_scan_config::FileScanConfig;
33use datafusion_common::config::ConfigOptions;
34use datafusion_common::{Constraints, Statistics};
35use datafusion_execution::{SendableRecordBatchStream, TaskContext};
36use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
37use datafusion_physical_expr_common::sort_expr::LexOrdering;
38
39pub trait DataSource: Send + Sync + Debug {
50 fn open(
51 &self,
52 partition: usize,
53 context: Arc<TaskContext>,
54 ) -> datafusion_common::Result<SendableRecordBatchStream>;
55 fn as_any(&self) -> &dyn Any;
56 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result;
58
59 fn repartitioned(
61 &self,
62 _target_partitions: usize,
63 _repartition_file_min_size: usize,
64 _output_ordering: Option<LexOrdering>,
65 ) -> datafusion_common::Result<Option<Arc<dyn DataSource>>> {
66 Ok(None)
67 }
68
69 fn output_partitioning(&self) -> Partitioning;
70 fn eq_properties(&self) -> EquivalenceProperties;
71 fn statistics(&self) -> datafusion_common::Result<Statistics>;
72 fn with_fetch(&self, _limit: Option<usize>) -> Option<Arc<dyn DataSource>>;
74 fn fetch(&self) -> Option<usize>;
75 fn metrics(&self) -> ExecutionPlanMetricsSet {
76 ExecutionPlanMetricsSet::new()
77 }
78 fn try_swapping_with_projection(
79 &self,
80 _projection: &ProjectionExec,
81 ) -> datafusion_common::Result<Option<Arc<dyn ExecutionPlan>>>;
82}
83
84#[derive(Clone, Debug)]
95pub struct DataSourceExec {
96 data_source: Arc<dyn DataSource>,
98 cache: PlanProperties,
100}
101
102impl DisplayAs for DataSourceExec {
103 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
104 match t {
105 DisplayFormatType::Default | DisplayFormatType::Verbose => {
106 write!(f, "DataSourceExec: ")?;
107 }
108 DisplayFormatType::TreeRender => {}
109 }
110 self.data_source.fmt_as(t, f)
111 }
112}
113
114impl ExecutionPlan for DataSourceExec {
115 fn name(&self) -> &'static str {
116 "DataSourceExec"
117 }
118
119 fn as_any(&self) -> &dyn Any {
120 self
121 }
122
123 fn properties(&self) -> &PlanProperties {
124 &self.cache
125 }
126
127 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
128 Vec::new()
129 }
130
131 fn with_new_children(
132 self: Arc<Self>,
133 _: Vec<Arc<dyn ExecutionPlan>>,
134 ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
135 Ok(self)
136 }
137
138 fn repartitioned(
139 &self,
140 target_partitions: usize,
141 config: &ConfigOptions,
142 ) -> datafusion_common::Result<Option<Arc<dyn ExecutionPlan>>> {
143 let data_source = self.data_source.repartitioned(
144 target_partitions,
145 config.optimizer.repartition_file_min_size,
146 self.properties().eq_properties.output_ordering(),
147 )?;
148
149 if let Some(source) = data_source {
150 let output_partitioning = source.output_partitioning();
151 let plan = self
152 .clone()
153 .with_data_source(source)
154 .with_partitioning(output_partitioning);
156 Ok(Some(Arc::new(plan)))
157 } else {
158 Ok(Some(Arc::new(self.clone())))
159 }
160 }
161
162 fn execute(
163 &self,
164 partition: usize,
165 context: Arc<TaskContext>,
166 ) -> datafusion_common::Result<SendableRecordBatchStream> {
167 self.data_source.open(partition, context)
168 }
169
170 fn metrics(&self) -> Option<MetricsSet> {
171 Some(self.data_source.metrics().clone_inner())
172 }
173
174 fn statistics(&self) -> datafusion_common::Result<Statistics> {
175 self.data_source.statistics()
176 }
177
178 fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
179 let data_source = self.data_source.with_fetch(limit)?;
180 let cache = self.cache.clone();
181
182 Some(Arc::new(Self { data_source, cache }))
183 }
184
185 fn fetch(&self) -> Option<usize> {
186 self.data_source.fetch()
187 }
188
189 fn try_swapping_with_projection(
190 &self,
191 projection: &ProjectionExec,
192 ) -> datafusion_common::Result<Option<Arc<dyn ExecutionPlan>>> {
193 self.data_source.try_swapping_with_projection(projection)
194 }
195}
196
197impl DataSourceExec {
198 pub fn from_data_source(data_source: impl DataSource + 'static) -> Arc<Self> {
199 Arc::new(Self::new(Arc::new(data_source)))
200 }
201
202 pub fn new(data_source: Arc<dyn DataSource>) -> Self {
203 let cache = Self::compute_properties(Arc::clone(&data_source));
204 Self { data_source, cache }
205 }
206
207 pub fn data_source(&self) -> &Arc<dyn DataSource> {
209 &self.data_source
210 }
211
212 pub fn with_data_source(mut self, data_source: Arc<dyn DataSource>) -> Self {
213 self.cache = Self::compute_properties(Arc::clone(&data_source));
214 self.data_source = data_source;
215 self
216 }
217
218 pub fn with_constraints(mut self, constraints: Constraints) -> Self {
220 self.cache = self.cache.with_constraints(constraints);
221 self
222 }
223
224 pub fn with_partitioning(mut self, partitioning: Partitioning) -> Self {
226 self.cache = self.cache.with_partitioning(partitioning);
227 self
228 }
229
230 fn compute_properties(data_source: Arc<dyn DataSource>) -> PlanProperties {
231 PlanProperties::new(
232 data_source.eq_properties(),
233 data_source.output_partitioning(),
234 EmissionType::Incremental,
235 Boundedness::Bounded,
236 )
237 }
238
239 pub fn downcast_to_file_source<T: 'static>(&self) -> Option<(&FileScanConfig, &T)> {
245 self.data_source()
246 .as_any()
247 .downcast_ref::<FileScanConfig>()
248 .and_then(|file_scan_conf| {
249 file_scan_conf
250 .file_source()
251 .as_any()
252 .downcast_ref::<T>()
253 .map(|source| (file_scan_conf, source))
254 })
255 }
256}