Skip to main content

datafusion_datasource/morsel/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Structures for Morsel Driven IO.
19//!
20//! NOTE: As of DataFusion 54.0.0, these are experimental APIs that may change
21//! substantially.
22//!
23//! Morsel Driven IO is a technique for parallelizing the reading of large files
24//! by dividing them into smaller "morsels" that are processed independently.
25//!
26//! It is inspired by the paper [Morsel-Driven Parallelism: A NUMA-Aware Query
27//! Evaluation Framework for the Many-Core Age](https://db.in.tum.de/~leis/papers/morsels.pdf).
28
29mod adapters;
30#[cfg(test)]
31pub(crate) mod mocks;
32
33use crate::PartitionedFile;
34pub(crate) use adapters::FileOpenerMorselizer;
35use arrow::array::RecordBatch;
36use datafusion_common::Result;
37use futures::FutureExt;
38use futures::future::BoxFuture;
39use futures::stream::BoxStream;
40use std::fmt::Debug;
41use std::pin::Pin;
42use std::task::{Context, Poll};
43
44/// A Morsel of work ready to resolve to a stream of [`RecordBatch`]es.
45///
46/// This represents a single morsel of work that is ready to be processed. It
47/// has all data necessary (does not need any I/O) and is ready to be turned
48/// into a stream of [`RecordBatch`]es for processing by the execution engine.
49pub trait Morsel: Send + Debug {
50    /// Consume this morsel and produce a stream of [`RecordBatch`]es for processing.
51    ///
52    /// Note: This may do CPU work to decode already-loaded data, but should not
53    /// do any I/O work such as reading from the file.
54    fn into_stream(self: Box<Self>) -> BoxStream<'static, Result<RecordBatch>>;
55}
56
57/// A Morselizer takes a single [`PartitionedFile`] and creates the initial planner
58/// for that file.
59///
60/// This is the entry point for morsel driven I/O.
61pub trait Morselizer: Send + Sync + Debug {
62    /// Return the initial [`MorselPlanner`] for this file.
63    ///
64    /// Morselizing a file may involve CPU work, such as parsing parquet
65    /// metadata and evaluating pruning predicates. It should NOT do any I/O
66    /// work, such as reading from the file. Any needed I/O should be done using
67    /// [`MorselPlan::with_pending_planner`].
68    fn plan_file(&self, file: PartitionedFile) -> Result<Box<dyn MorselPlanner>>;
69}
70
71/// A Morsel Planner is responsible for creating morsels for a given scan.
72///
73/// The [`MorselPlanner`] is the unit of I/O. There is only ever a single I/O
74/// outstanding for a specific planner. DataFusion may run
75/// multiple planners in parallel, which corresponds to multiple parallel
76/// I/O requests.
77///
78/// It is not a Rust `Stream` so that it can explicitly separate CPU bound
79/// work from I/O work.
80///
81/// The design is similar to `ParquetPushDecoder`: when `plan` is called, it
82/// should do CPU work to produce the next morsels or discover the next I/O
83/// phase.
84///
85/// Best practice is to spawn I/O in a Tokio task on a separate runtime to
86/// ensure that CPU work doesn't block or slow down I/O work, but this is not
87/// strictly required by the API.
88pub trait MorselPlanner: Send + Debug {
89    /// Attempt to plan morsels. This may involve CPU work, such as parsing
90    /// parquet metadata and evaluating pruning predicates.
91    ///
92    /// It should NOT do any I/O work, such as reading from the file. If I/O is
93    /// required, the returned [`MorselPlan`] should contain a pending planner
94    /// future that the caller polls to drive the I/O work to completion. Once
95    /// that future resolves, it yields a planner ready for work.
96    ///
97    /// Note this function is **not async** to make it explicitly clear that if
98    /// I/O is required, it should be done in the returned `io_future`.
99    ///
100    /// Returns `None` if the planner has no more work to do.
101    ///
102    /// # Empty Morsel Plans
103    ///
104    /// It may return `None`, which means no batches will be read from the file
105    /// (e.g. due to late-pruning based on statistics).
106    ///
107    /// # Output Ordering
108    ///
109    /// See the comments on [`MorselPlan`] for the logical output order.
110    fn plan(self: Box<Self>) -> Result<Option<MorselPlan>>;
111}
112
113/// Return result of [`MorselPlanner::plan`].
114///
115/// # Logical Ordering
116///
117/// For plans where the output order of rows is maintained, the output order of
118/// a [`MorselPlanner`] is logically defined as follows:
119/// 1. All morsels that are directly produced
120/// 2. Recursively, all morsels produced by the returned `planners`
121#[derive(Default)]
122pub struct MorselPlan {
123    /// Morsels ready for CPU work
124    morsels: Vec<Box<dyn Morsel>>,
125    /// Planners that are ready for CPU work.
126    ready_planners: Vec<Box<dyn MorselPlanner>>,
127    /// A future with planner I/O that resolves to a CPU ready planner.
128    ///
129    /// DataFusion will poll this future occasionally to drive the I/O work to
130    /// completion. Once it resolves, planning continues with the returned
131    /// planner.
132    pending_planner: Option<PendingMorselPlanner>,
133}
134
135impl MorselPlan {
136    /// Create an empty morsel plan.
137    pub fn new() -> Self {
138        Self::default()
139    }
140
141    /// Set the ready morsels.
142    pub fn with_morsels(mut self, morsels: Vec<Box<dyn Morsel>>) -> Self {
143        self.morsels = morsels;
144        self
145    }
146
147    /// Set the ready child planners.
148    pub fn with_planners(mut self, planners: Vec<Box<dyn MorselPlanner>>) -> Self {
149        self.ready_planners = planners;
150        self
151    }
152
153    /// Set the pending planner for an I/O phase.
154    pub fn with_pending_planner<F>(mut self, io_future: F) -> Self
155    where
156        F: Future<Output = Result<Box<dyn MorselPlanner>>> + Send + 'static,
157    {
158        self.pending_planner = Some(PendingMorselPlanner::new(io_future));
159        self
160    }
161
162    /// Set the pending planner  for an I/O phase.
163    pub fn set_pending_planner<F>(&mut self, io_future: F)
164    where
165        F: Future<Output = Result<Box<dyn MorselPlanner>>> + Send + 'static,
166    {
167        self.pending_planner = Some(PendingMorselPlanner::new(io_future));
168    }
169
170    /// Take the ready morsels.
171    pub fn take_morsels(&mut self) -> Vec<Box<dyn Morsel>> {
172        std::mem::take(&mut self.morsels)
173    }
174
175    /// Take the ready child planners.
176    pub fn take_ready_planners(&mut self) -> Vec<Box<dyn MorselPlanner>> {
177        std::mem::take(&mut self.ready_planners)
178    }
179
180    /// Take the pending I/O future, if any.
181    pub fn take_pending_planner(&mut self) -> Option<PendingMorselPlanner> {
182        self.pending_planner.take()
183    }
184
185    /// Returns `true` if this plan contains an I/O future.
186    pub fn has_io_future(&self) -> bool {
187        self.pending_planner.is_some()
188    }
189}
190
191/// Wrapper for I/O that must complete before planning can continue.
192pub struct PendingMorselPlanner {
193    future: BoxFuture<'static, Result<Box<dyn MorselPlanner>>>,
194}
195
196impl PendingMorselPlanner {
197    /// Create a new pending planner future.
198    ///
199    /// Example
200    /// ```
201    /// # use datafusion_common::DataFusionError;
202    /// # use datafusion_datasource::morsel::{MorselPlanner, PendingMorselPlanner};
203    /// let work = async move {
204    ///  let planner: Box<dyn MorselPlanner> = {
205    ///   // Do I/O work here, then return the next planner to run.
206    ///  # unimplemented!();
207    ///   };
208    ///   Ok(planner) as Result<_, DataFusionError>;
209    /// };
210    /// let pending_io = PendingMorselPlanner::new(work);
211    /// ```
212    pub fn new<F>(future: F) -> Self
213    where
214        F: Future<Output = Result<Box<dyn MorselPlanner>>> + Send + 'static,
215    {
216        Self {
217            future: future.boxed(),
218        }
219    }
220
221    /// Consume this wrapper and return the underlying future.
222    pub fn into_future(self) -> BoxFuture<'static, Result<Box<dyn MorselPlanner>>> {
223        self.future
224    }
225}
226
227/// Forwards polling to the underlying future.
228impl Future for PendingMorselPlanner {
229    type Output = Result<Box<dyn MorselPlanner>>;
230    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
231        // forward request to inner
232        self.future.as_mut().poll(cx)
233    }
234}