datafusion_datasource/morsel/mod.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Structures for Morsel Driven IO.
19//!
20//! NOTE: As of DataFusion 54.0.0, these are experimental APIs that may change
21//! substantially.
22//!
23//! Morsel Driven IO is a technique for parallelizing the reading of large files
24//! by dividing them into smaller "morsels" that are processed independently.
25//!
26//! It is inspired by the paper [Morsel-Driven Parallelism: A NUMA-Aware Query
27//! Evaluation Framework for the Many-Core Age](https://db.in.tum.de/~leis/papers/morsels.pdf).
28
29mod adapters;
30#[cfg(test)]
31pub(crate) mod mocks;
32
33use crate::PartitionedFile;
34pub(crate) use adapters::FileOpenerMorselizer;
35use arrow::array::RecordBatch;
36use datafusion_common::Result;
37use futures::FutureExt;
38use futures::future::BoxFuture;
39use futures::stream::BoxStream;
40use std::fmt::Debug;
41use std::pin::Pin;
42use std::task::{Context, Poll};
43
44/// A Morsel of work ready to resolve to a stream of [`RecordBatch`]es.
45///
46/// This represents a single morsel of work that is ready to be processed. It
47/// has all data necessary (does not need any I/O) and is ready to be turned
48/// into a stream of [`RecordBatch`]es for processing by the execution engine.
49pub trait Morsel: Send + Debug {
50 /// Consume this morsel and produce a stream of [`RecordBatch`]es for processing.
51 ///
52 /// Note: This may do CPU work to decode already-loaded data, but should not
53 /// do any I/O work such as reading from the file.
54 fn into_stream(self: Box<Self>) -> BoxStream<'static, Result<RecordBatch>>;
55}
56
57/// A Morselizer takes a single [`PartitionedFile`] and creates the initial planner
58/// for that file.
59///
60/// This is the entry point for morsel driven I/O.
61pub trait Morselizer: Send + Sync + Debug {
62 /// Return the initial [`MorselPlanner`] for this file.
63 ///
64 /// Morselizing a file may involve CPU work, such as parsing parquet
65 /// metadata and evaluating pruning predicates. It should NOT do any I/O
66 /// work, such as reading from the file. Any needed I/O should be done using
67 /// [`MorselPlan::with_pending_planner`].
68 fn plan_file(&self, file: PartitionedFile) -> Result<Box<dyn MorselPlanner>>;
69}
70
71/// A Morsel Planner is responsible for creating morsels for a given scan.
72///
73/// The [`MorselPlanner`] is the unit of I/O. There is only ever a single I/O
74/// outstanding for a specific planner. DataFusion may run
75/// multiple planners in parallel, which corresponds to multiple parallel
76/// I/O requests.
77///
78/// It is not a Rust `Stream` so that it can explicitly separate CPU bound
79/// work from I/O work.
80///
81/// The design is similar to `ParquetPushDecoder`: when `plan` is called, it
82/// should do CPU work to produce the next morsels or discover the next I/O
83/// phase.
84///
85/// Best practice is to spawn I/O in a Tokio task on a separate runtime to
86/// ensure that CPU work doesn't block or slow down I/O work, but this is not
87/// strictly required by the API.
88pub trait MorselPlanner: Send + Debug {
89 /// Attempt to plan morsels. This may involve CPU work, such as parsing
90 /// parquet metadata and evaluating pruning predicates.
91 ///
92 /// It should NOT do any I/O work, such as reading from the file. If I/O is
93 /// required, the returned [`MorselPlan`] should contain a pending planner
94 /// future that the caller polls to drive the I/O work to completion. Once
95 /// that future resolves, it yields a planner ready for work.
96 ///
97 /// Note this function is **not async** to make it explicitly clear that if
98 /// I/O is required, it should be done in the returned `io_future`.
99 ///
100 /// Returns `None` if the planner has no more work to do.
101 ///
102 /// # Empty Morsel Plans
103 ///
104 /// It may return `None`, which means no batches will be read from the file
105 /// (e.g. due to late-pruning based on statistics).
106 ///
107 /// # Output Ordering
108 ///
109 /// See the comments on [`MorselPlan`] for the logical output order.
110 fn plan(self: Box<Self>) -> Result<Option<MorselPlan>>;
111}
112
113/// Return result of [`MorselPlanner::plan`].
114///
115/// # Logical Ordering
116///
117/// For plans where the output order of rows is maintained, the output order of
118/// a [`MorselPlanner`] is logically defined as follows:
119/// 1. All morsels that are directly produced
120/// 2. Recursively, all morsels produced by the returned `planners`
121#[derive(Default)]
122pub struct MorselPlan {
123 /// Morsels ready for CPU work
124 morsels: Vec<Box<dyn Morsel>>,
125 /// Planners that are ready for CPU work.
126 ready_planners: Vec<Box<dyn MorselPlanner>>,
127 /// A future with planner I/O that resolves to a CPU ready planner.
128 ///
129 /// DataFusion will poll this future occasionally to drive the I/O work to
130 /// completion. Once it resolves, planning continues with the returned
131 /// planner.
132 pending_planner: Option<PendingMorselPlanner>,
133}
134
135impl MorselPlan {
136 /// Create an empty morsel plan.
137 pub fn new() -> Self {
138 Self::default()
139 }
140
141 /// Set the ready morsels.
142 pub fn with_morsels(mut self, morsels: Vec<Box<dyn Morsel>>) -> Self {
143 self.morsels = morsels;
144 self
145 }
146
147 /// Set the ready child planners.
148 pub fn with_planners(mut self, planners: Vec<Box<dyn MorselPlanner>>) -> Self {
149 self.ready_planners = planners;
150 self
151 }
152
153 /// Set the pending planner for an I/O phase.
154 pub fn with_pending_planner<F>(mut self, io_future: F) -> Self
155 where
156 F: Future<Output = Result<Box<dyn MorselPlanner>>> + Send + 'static,
157 {
158 self.pending_planner = Some(PendingMorselPlanner::new(io_future));
159 self
160 }
161
162 /// Set the pending planner for an I/O phase.
163 pub fn set_pending_planner<F>(&mut self, io_future: F)
164 where
165 F: Future<Output = Result<Box<dyn MorselPlanner>>> + Send + 'static,
166 {
167 self.pending_planner = Some(PendingMorselPlanner::new(io_future));
168 }
169
170 /// Take the ready morsels.
171 pub fn take_morsels(&mut self) -> Vec<Box<dyn Morsel>> {
172 std::mem::take(&mut self.morsels)
173 }
174
175 /// Take the ready child planners.
176 pub fn take_ready_planners(&mut self) -> Vec<Box<dyn MorselPlanner>> {
177 std::mem::take(&mut self.ready_planners)
178 }
179
180 /// Take the pending I/O future, if any.
181 pub fn take_pending_planner(&mut self) -> Option<PendingMorselPlanner> {
182 self.pending_planner.take()
183 }
184
185 /// Returns `true` if this plan contains an I/O future.
186 pub fn has_io_future(&self) -> bool {
187 self.pending_planner.is_some()
188 }
189}
190
191/// Wrapper for I/O that must complete before planning can continue.
192pub struct PendingMorselPlanner {
193 future: BoxFuture<'static, Result<Box<dyn MorselPlanner>>>,
194}
195
196impl PendingMorselPlanner {
197 /// Create a new pending planner future.
198 ///
199 /// Example
200 /// ```
201 /// # use datafusion_common::DataFusionError;
202 /// # use datafusion_datasource::morsel::{MorselPlanner, PendingMorselPlanner};
203 /// let work = async move {
204 /// let planner: Box<dyn MorselPlanner> = {
205 /// // Do I/O work here, then return the next planner to run.
206 /// # unimplemented!();
207 /// };
208 /// Ok(planner) as Result<_, DataFusionError>;
209 /// };
210 /// let pending_io = PendingMorselPlanner::new(work);
211 /// ```
212 pub fn new<F>(future: F) -> Self
213 where
214 F: Future<Output = Result<Box<dyn MorselPlanner>>> + Send + 'static,
215 {
216 Self {
217 future: future.boxed(),
218 }
219 }
220
221 /// Consume this wrapper and return the underlying future.
222 pub fn into_future(self) -> BoxFuture<'static, Result<Box<dyn MorselPlanner>>> {
223 self.future
224 }
225}
226
227/// Forwards polling to the underlying future.
228impl Future for PendingMorselPlanner {
229 type Output = Result<Box<dyn MorselPlanner>>;
230 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
231 // forward request to inner
232 self.future.as_mut().poll(cx)
233 }
234}