iceberg_datafusion/physical_plan/
scan.rs1use std::any::Any;
19use std::pin::Pin;
20use std::sync::Arc;
21use std::vec;
22
23use datafusion::arrow::array::RecordBatch;
24use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef;
25use datafusion::error::Result as DFResult;
26use datafusion::execution::{SendableRecordBatchStream, TaskContext};
27use datafusion::physical_expr::EquivalenceProperties;
28use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
29use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
30use datafusion::physical_plan::{DisplayAs, ExecutionPlan, Partitioning, PlanProperties};
31use datafusion::prelude::Expr;
32use futures::{Stream, TryStreamExt};
33use iceberg::expr::Predicate;
34use iceberg::table::Table;
35
36use super::expr_to_predicate::convert_filters_to_predicate;
37use crate::to_datafusion_error;
38
39#[derive(Debug)]
42pub struct IcebergTableScan {
43 table: Table,
45 snapshot_id: Option<i64>,
47 plan_properties: PlanProperties,
50 projection: Option<Vec<String>>,
52 predicates: Option<Predicate>,
54}
55
56impl IcebergTableScan {
57 pub(crate) fn new(
59 table: Table,
60 snapshot_id: Option<i64>,
61 schema: ArrowSchemaRef,
62 projection: Option<&Vec<usize>>,
63 filters: &[Expr],
64 ) -> Self {
65 let output_schema = match projection {
66 None => schema.clone(),
67 Some(projection) => Arc::new(schema.project(projection).unwrap()),
68 };
69 let plan_properties = Self::compute_properties(output_schema.clone());
70 let projection = get_column_names(schema.clone(), projection);
71 let predicates = convert_filters_to_predicate(filters);
72
73 Self {
74 table,
75 snapshot_id,
76 plan_properties,
77 projection,
78 predicates,
79 }
80 }
81
82 pub fn table(&self) -> &Table {
83 &self.table
84 }
85
86 pub fn snapshot_id(&self) -> Option<i64> {
87 self.snapshot_id
88 }
89
90 pub fn projection(&self) -> Option<&[String]> {
91 self.projection.as_deref()
92 }
93
94 pub fn predicates(&self) -> Option<&Predicate> {
95 self.predicates.as_ref()
96 }
97
98 fn compute_properties(schema: ArrowSchemaRef) -> PlanProperties {
100 PlanProperties::new(
104 EquivalenceProperties::new(schema),
105 Partitioning::UnknownPartitioning(1),
106 EmissionType::Incremental,
107 Boundedness::Bounded,
108 )
109 }
110}
111
112impl ExecutionPlan for IcebergTableScan {
113 fn name(&self) -> &str {
114 "IcebergTableScan"
115 }
116
117 fn as_any(&self) -> &dyn Any {
118 self
119 }
120
121 fn children(&self) -> Vec<&Arc<(dyn ExecutionPlan + 'static)>> {
122 vec![]
123 }
124
125 fn with_new_children(
126 self: Arc<Self>,
127 _children: Vec<Arc<dyn ExecutionPlan>>,
128 ) -> DFResult<Arc<dyn ExecutionPlan>> {
129 Ok(self)
130 }
131
132 fn properties(&self) -> &PlanProperties {
133 &self.plan_properties
134 }
135
136 fn execute(
137 &self,
138 _partition: usize,
139 _context: Arc<TaskContext>,
140 ) -> DFResult<SendableRecordBatchStream> {
141 let fut = get_batch_stream(
142 self.table.clone(),
143 self.snapshot_id,
144 self.projection.clone(),
145 self.predicates.clone(),
146 );
147 let stream = futures::stream::once(fut).try_flatten();
148
149 Ok(Box::pin(RecordBatchStreamAdapter::new(
150 self.schema(),
151 stream,
152 )))
153 }
154}
155
156impl DisplayAs for IcebergTableScan {
157 fn fmt_as(
158 &self,
159 _t: datafusion::physical_plan::DisplayFormatType,
160 f: &mut std::fmt::Formatter,
161 ) -> std::fmt::Result {
162 write!(
163 f,
164 "IcebergTableScan projection:[{}] predicate:[{}]",
165 self.projection
166 .clone()
167 .map_or(String::new(), |v| v.join(",")),
168 self.predicates
169 .clone()
170 .map_or(String::from(""), |p| format!("{}", p))
171 )
172 }
173}
174
175async fn get_batch_stream(
181 table: Table,
182 snapshot_id: Option<i64>,
183 column_names: Option<Vec<String>>,
184 predicates: Option<Predicate>,
185) -> DFResult<Pin<Box<dyn Stream<Item = DFResult<RecordBatch>> + Send>>> {
186 let scan_builder = match snapshot_id {
187 Some(snapshot_id) => table.scan().snapshot_id(snapshot_id),
188 None => table.scan(),
189 };
190
191 let mut scan_builder = match column_names {
192 Some(column_names) => scan_builder.select(column_names),
193 None => scan_builder.select_all(),
194 };
195 if let Some(pred) = predicates {
196 scan_builder = scan_builder.with_filter(pred);
197 }
198 let table_scan = scan_builder.build().map_err(to_datafusion_error)?;
199
200 let stream = table_scan
201 .to_arrow()
202 .await
203 .map_err(to_datafusion_error)?
204 .map_err(to_datafusion_error);
205 Ok(Box::pin(stream))
206}
207
208fn get_column_names(
209 schema: ArrowSchemaRef,
210 projection: Option<&Vec<usize>>,
211) -> Option<Vec<String>> {
212 projection.map(|v| {
213 v.iter()
214 .map(|p| schema.field(*p).name().clone())
215 .collect::<Vec<String>>()
216 })
217}