datafusion_datasource/
source.rs1use std::any::Any;
21use std::fmt;
22use std::fmt::{Debug, Formatter};
23use std::sync::Arc;
24
25use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
26use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
27use datafusion_physical_plan::projection::ProjectionExec;
28use datafusion_physical_plan::{
29 DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
30};
31
32use datafusion_common::config::ConfigOptions;
33use datafusion_common::{Constraints, Statistics};
34use datafusion_execution::{SendableRecordBatchStream, TaskContext};
35use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
36use datafusion_physical_expr_common::sort_expr::LexOrdering;
37
38pub trait DataSource: Send + Sync + Debug {
49 fn open(
50 &self,
51 partition: usize,
52 context: Arc<TaskContext>,
53 ) -> datafusion_common::Result<SendableRecordBatchStream>;
54 fn as_any(&self) -> &dyn Any;
55 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result;
56
57 fn repartitioned(
59 &self,
60 _target_partitions: usize,
61 _repartition_file_min_size: usize,
62 _output_ordering: Option<LexOrdering>,
63 ) -> datafusion_common::Result<Option<Arc<dyn DataSource>>> {
64 Ok(None)
65 }
66
67 fn output_partitioning(&self) -> Partitioning;
68 fn eq_properties(&self) -> EquivalenceProperties;
69 fn statistics(&self) -> datafusion_common::Result<Statistics>;
70 fn with_fetch(&self, _limit: Option<usize>) -> Option<Arc<dyn DataSource>>;
72 fn fetch(&self) -> Option<usize>;
73 fn metrics(&self) -> ExecutionPlanMetricsSet {
74 ExecutionPlanMetricsSet::new()
75 }
76 fn try_swapping_with_projection(
77 &self,
78 _projection: &ProjectionExec,
79 ) -> datafusion_common::Result<Option<Arc<dyn ExecutionPlan>>>;
80}
81
82#[derive(Clone, Debug)]
93pub struct DataSourceExec {
94 data_source: Arc<dyn DataSource>,
96 cache: PlanProperties,
98}
99
100impl DisplayAs for DataSourceExec {
101 fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
102 write!(f, "DataSourceExec: ")?;
103 self.data_source.fmt_as(t, f)
104 }
105}
106
107impl ExecutionPlan for DataSourceExec {
108 fn name(&self) -> &'static str {
109 "DataSourceExec"
110 }
111
112 fn as_any(&self) -> &dyn Any {
113 self
114 }
115
116 fn properties(&self) -> &PlanProperties {
117 &self.cache
118 }
119
120 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
121 Vec::new()
122 }
123
124 fn with_new_children(
125 self: Arc<Self>,
126 _: Vec<Arc<dyn ExecutionPlan>>,
127 ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
128 Ok(self)
129 }
130
131 fn repartitioned(
132 &self,
133 target_partitions: usize,
134 config: &ConfigOptions,
135 ) -> datafusion_common::Result<Option<Arc<dyn ExecutionPlan>>> {
136 let data_source = self.data_source.repartitioned(
137 target_partitions,
138 config.optimizer.repartition_file_min_size,
139 self.properties().eq_properties.output_ordering(),
140 )?;
141
142 if let Some(source) = data_source {
143 let output_partitioning = source.output_partitioning();
144 let plan = self
145 .clone()
146 .with_data_source(source)
147 .with_partitioning(output_partitioning);
149 Ok(Some(Arc::new(plan)))
150 } else {
151 Ok(Some(Arc::new(self.clone())))
152 }
153 }
154
155 fn execute(
156 &self,
157 partition: usize,
158 context: Arc<TaskContext>,
159 ) -> datafusion_common::Result<SendableRecordBatchStream> {
160 self.data_source.open(partition, context)
161 }
162
163 fn metrics(&self) -> Option<MetricsSet> {
164 Some(self.data_source.metrics().clone_inner())
165 }
166
167 fn statistics(&self) -> datafusion_common::Result<Statistics> {
168 self.data_source.statistics()
169 }
170
171 fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
172 let data_source = self.data_source.with_fetch(limit)?;
173 let cache = self.cache.clone();
174
175 Some(Arc::new(Self { data_source, cache }))
176 }
177
178 fn fetch(&self) -> Option<usize> {
179 self.data_source.fetch()
180 }
181
182 fn try_swapping_with_projection(
183 &self,
184 projection: &ProjectionExec,
185 ) -> datafusion_common::Result<Option<Arc<dyn ExecutionPlan>>> {
186 self.data_source.try_swapping_with_projection(projection)
187 }
188}
189
190impl DataSourceExec {
191 pub fn new(data_source: Arc<dyn DataSource>) -> Self {
192 let cache = Self::compute_properties(Arc::clone(&data_source));
193 Self { data_source, cache }
194 }
195
196 pub fn data_source(&self) -> &Arc<dyn DataSource> {
198 &self.data_source
199 }
200
201 pub fn with_data_source(mut self, data_source: Arc<dyn DataSource>) -> Self {
202 self.cache = Self::compute_properties(Arc::clone(&data_source));
203 self.data_source = data_source;
204 self
205 }
206
207 pub fn with_constraints(mut self, constraints: Constraints) -> Self {
209 self.cache = self.cache.with_constraints(constraints);
210 self
211 }
212
213 pub fn with_partitioning(mut self, partitioning: Partitioning) -> Self {
215 self.cache = self.cache.with_partitioning(partitioning);
216 self
217 }
218
219 fn compute_properties(data_source: Arc<dyn DataSource>) -> PlanProperties {
220 PlanProperties::new(
221 data_source.eq_properties(),
222 data_source.output_partitioning(),
223 EmissionType::Incremental,
224 Boundedness::Bounded,
225 )
226 }
227}