datafusion_distributed/work_unit_feed/work_unit_feed.rs
1use crate::WorkUnitFeedProvider;
2use crate::common::{deserialize_uuid, serialize_uuid};
3use crate::work_unit_feed::remote_work_unit_feed::RemoteFeedProvider;
4use datafusion::common::{Result, internal_err};
5use datafusion::execution::TaskContext;
6use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
7use futures::stream::BoxStream;
8use std::fmt::Debug;
9use std::sync::Arc;
10use uuid::Uuid;
11
12/// The [WorkUnitFeed] is created with a user-provided [WorkUnitFeedProvider] and is embedded
13/// in any custom [datafusion::physical_plan::ExecutionPlan] implementation as a field.
14///
15/// It exposes the [WorkUnitFeed::feed] method that users are expected to call in their
16/// [datafusion::physical_plan::ExecutionPlan::execute] implementation, which provides a stream
17/// of [crate::WorkUnit]s, representing individual units of work (e.g., file addresses) at runtime.
18/// This is useful for when these units of work cannot be known at planning time, and are
19/// expected to be discovered streamed at execution time instead, as the query makes progress.
20///
21/// The special thing about this structure, is that it automatically works under distributed
22/// scenarios:
23/// - The feeds are streamed from coordinator to workers, so the [WorkUnitFeedProvider::feed] method
24/// is never called from a remote worker.
25/// - When deserializing a plan containing a [WorkUnitFeed] in a remote worker, a gRPC remote
26/// streaming version of the [WorkUnitFeed] is deserialized instead, streaming back the contents
27/// from the original [WorkUnitFeed].
28///
29/// For the distributed layer to find the feed inside a leaf plan, register a getter
30/// closure via [`crate::DistributedExt::set_distributed_work_unit_feed`].
31///
32/// Keep in mind that, while interacting with [WorkUnitFeed] within a node, there's no compile-time
33/// guarantee that it will not be in "remote" mode, although it's guaranteed that this mode only
34/// applies after the [datafusion::physical_plan::ExecutionPlan] has been deserialized.
35///
36/// Upon serializing or de-serializing a plan containing a [WorkUnitFeed], use the
37/// [WorkUnitFeed::from_proto] and [WorkUnitFeed::to_proto] methods.
38///
39/// # Example of [WorkUnitFeed] in single-node
40///
41/// ```text
42/// ┌──────────────────────────────────────────────────────┐
43/// │ ExecutionPlan │
44/// │ │
45/// │ │
46/// │┌────────────────────────────────────────────────────┐│
47/// ││ WorkUnitFeed ││
48/// ││ ┌───────────┐ ┌───────────┐ ┌───────────┐ ││
49/// ││ │ .feed(0) │ │ .feed(1) │ │ .feed(2) │ ││
50/// ││ └────┬──────┘ └──┬────────┘ └──┬────────┘ ││
51/// │└──────┼───────────────┼─────────────────┼───────────┘│ .─.
52/// │┌──────┼─────────┐┌────┼───────────┐┌───.▼.──────────┐│ ( ) WorkUnit
53/// ││ │P0 ││ .▼. P1 ││ ( )P2 ││ `─' (e.g., a file address)
54/// ││ .▼. ││ ( ) ││ `┬' ││
55/// ││ ( ) ││ `┬' ││ │ ││
56/// ││ `─' ││ │ ││ .▼. ││
57/// ││ │ ││ .▼. ││ ( ) ││
58/// ││ │ ││ ( ) ││ `┬' ││
59/// ││ │ ││ `─' ││ │ ││
60/// ││ ▼ ││ ▼ ││ ▼ ││
61/// ││ processing... ││ processing... ││ processing... ││
62/// ││ │ ││ │ ││ │ ││
63/// ││ │ ││ │ ││ │ ││
64/// │└──────┼─────────┘└────┼───────────┘└────┼───────────┘│
65/// └───────┼───────────────┼─────────────────┼────────────┘
66/// ┌─────▼─────┐ ┌───▼───────┐ ┌──▼────────┐
67/// │RecordBatch│ │RecordBatch│ │RecordBatch│
68/// └───────────┘ └───────────┘ └───────────┘
69/// ```
70///
71///
72/// # Example of [WorkUnitFeed] during distributed execution
73///
74/// ```text
75/// ┌──────────────────┐
76/// │Coordinating Stage│
77/// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┴──────────────────┘
78/// │
79/// │
80/// ┌────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐│
81/// ││ WorkUnitFeed │
82/// │ ┌───────────┐ ┌───────────┐ ┌───────────┐ ┌───────────┐ ┌───────────┐ ┌───────────┐ ││
83/// ││ │ .feed(0) │ │ .feed(1) │ │ .feed(2) │ │ .feed(3) │ │ .feed(4) │ │ .feed(5) │ │
84/// │ └────┬──────┘ └──┬────────┘ └──┬────────┘ └─────┬─────┘ └──┬────────┘ └───┬───────┘ ││
85/// │└───────┼───────────────┼─────────────────┼───────────────────────────┼───────────────┼─────────────────┼────────────┘
86/// │ │ │ │ │ .┴. │
87/// └ ─ ─ ─ ─│─ ─ ─ ─ ─ ─ ─ ─│─ ─ ─ ─ ─ ─ ─ ─ ─│─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ .┴. ─ ─ ─ ─ ─ ─ ─│─ ─ ─ ─ ─ ─ ─ ─( )─ ─ ─ ─ ─ ─
88/// │ .┴. │ ( ) │ `┬'
89/// │ ( ) │ `┬' │ .┴.
90/// .┴. `┬' .┴. │ │ ( )
91/// ( ) │ ( ) │ .┴. `┬'
92/// `┬' .┴. `┬'────────────┐ │ ( ) │┌────────────┐
93/// │ ( ) ││ Worker 1 │ │ `┬' ││ Worker 2 │
94/// ┌ ─ ─ ─ ─│─ ─ ─ ─ ─ ─ ─ `┬' ─ ─ ─ ─ ─ ─ ─ ─│┴────────────┘ ┌ ─ ─ ─ ─│─ ─ ─ ─ ─ ─ ─ ─│─ ─ ─ ─ ─ ─ ─ ─ ─│┴────────────┘
95/// ┌───────┼───────────────┼─────────────────┼────────────┐│ ┌───────┼───────────────┼─────────────────┼────────────┐│
96/// ││ │ Exe│utionPlan │ │ ││ │ Exe│utionPlan │ │
97/// │ │ │ │ ││ │ │ │ │ ││
98/// ││┌──────┼───────────────┼─────────────────┼───────────┐│ ││┌──────┼───────────────┼─────────────────┼───────────┐│
99/// ││ │ Remot│WorkUnitFeed │ │││ ││ │ Remot│WorkUnitFeed │ │││
100/// │││ ┌────▼──────┐ ┌──▼────────┐ ┌──▼────────┐ ││ │││ ┌────▼──────┐ ┌──▼────────┐ ┌──▼────────┐ ││
101/// ││ │ .feed(0) │ │ .feed(1) │ │ .feed(2) │ │││ ││ │ .feed(0) │ │ .feed(1) │ │ .feed(2) │ │││
102/// │││ └────┬──────┘ └──┬────────┘ └──┬────────┘ ││ │││ └────┬──────┘ └──┬────────┘ └──┬────────┘ ││
103/// │└──────┼───────────────┼─────────────────┼───────────┘││ │└──────┼───────────────┼─────────────────┼───────────┘││
104/// ││ │ │ │ │ ││ │ │ │ │
105/// │┌──────┼─────────┐┌────┼───────────┐┌───.▼.──────────┐││ │┌──────┼─────────┐┌────┼───────────┐┌───.▼.──────────┐││
106/// │││ │P0 ││ .▼. P1 ││ ( )P2 ││ │││ │P0 ││ .▼. P1 ││ ( )P2 ││
107/// ││ .▼. ││ ( ) ││ `┬' │││ ││ .▼. ││ ( ) ││ `┬' │││
108/// │││ ( ) ││ `┬' ││ │ ││ │││ ( ) ││ `┬' ││ │ ││
109/// ││ `─' ││ ┼ ││ .▼. │││ ││ `─' ││ ┼ ││ .▼. │││
110/// │││ │ ││ .▼. ││ ( ) ││ │││ │ ││ .▼. ││ ( ) ││
111/// ││ │ ││ ( ) ││ `┬' │││ ││ │ ││ ( ) ││ `┬' │││
112/// │││ │ ││ `─' ││ │ ││ │││ │ ││ `─' ││ │ ││
113/// ││ ▼ ││ ││ ▼ │││ ││ ▼ ││ ││ ▼ │││
114/// │││ processing... ││ processing... ││ processing... ││ │││ processing... ││ processing... ││ processing... ││
115/// ││ │ ││ │ ││ │ │││ ││ │ ││ │ ││ │ │││
116/// │││ │ ││ │ ││ │ ││ │││ │ ││ │ ││ │ ││
117/// │└──────┼─────────┘└────┼───────────┘└────┼───────────┘││ │└──────┼─────────┘└────┼───────────┘└────┼───────────┘││
118/// │└───────┼───────────────┼─────────────────┼────────────┘ │└───────┼───────────────┼─────────────────┼────────────┘
119/// ┌─────▼─────┐ ┌───▼───────┐ ┌──▼────────┐ │ ┌─────▼─────┐ ┌───▼───────┐ ┌──▼────────┐ │
120/// │ │RecordBatch│ │RecordBatch│ │RecordBatch│ │ │RecordBatch│ │RecordBatch│ │RecordBatch│
121/// └───────────┘ └───────────┘ └───────────┘ │ └───────────┘ └───────────┘ └───────────┘ │
122/// └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
123/// ```
124#[derive(Debug, Clone)]
125pub struct WorkUnitFeed<T: WorkUnitFeedProvider> {
126 pub(crate) id: Uuid,
127 pub(crate) provider: RemoteOrLocalProvider<T>,
128}
129
130#[derive(Debug, Clone)]
131pub enum RemoteOrLocalProvider<T: WorkUnitFeedProvider> {
132 Local(T),
133 Remote(RemoteFeedProvider),
134}
135
136impl<T: WorkUnitFeedProvider> RemoteOrLocalProvider<T> {
137 fn feed(
138 &self,
139 partition: usize,
140 ctx: Arc<TaskContext>,
141 ) -> Result<BoxStream<'static, Result<T::WorkUnit>>> {
142 match self {
143 Self::Local(local) => local.feed(partition, ctx),
144 Self::Remote(remote) => Ok(remote.feed::<T::WorkUnit>(partition, ctx)?),
145 }
146 }
147
148 fn metrics(&self) -> ExecutionPlanMetricsSet {
149 match self {
150 Self::Local(local) => local.metrics(),
151 Self::Remote(remote) => remote.metrics(),
152 }
153 }
154}
155
156#[derive(Clone, PartialEq, ::prost::Message)]
157pub struct WorkUnitFeedProto {
158 #[prost(bytes, tag = "1")]
159 pub id: Vec<u8>,
160}
161
162impl<T: WorkUnitFeedProvider> WorkUnitFeed<T> {
163 /// Builds a new local [`WorkUnitFeed`] backed by the given `provider`. Store the
164 /// resulting feed as a field of your leaf [`datafusion::physical_plan::ExecutionPlan`]
165 /// and register a getter with [`crate::DistributedExt::set_distributed_work_unit_feed`]
166 /// so the distributed layer can find it.
167 pub fn new(provider: T) -> Self {
168 Self {
169 id: Uuid::new_v4(),
170 provider: RemoteOrLocalProvider::Local(provider),
171 }
172 }
173
174 /// Reconstructs a [`WorkUnitFeed`] from its serialized form. The resulting feed is in
175 /// the **remote** variant — it will read work units off the network using the
176 /// `RemoteWorkUnitFeedRegistry` installed in the worker's session config. Used by
177 /// physical plan codecs when deserializing a plan on a worker.
178 pub fn from_proto(proto: WorkUnitFeedProto) -> Result<Self> {
179 let id = deserialize_uuid(&proto.id)?;
180 Ok(WorkUnitFeed {
181 id,
182 provider: RemoteOrLocalProvider::Remote(RemoteFeedProvider {
183 id,
184 metrics: ExecutionPlanMetricsSet::new(),
185 }),
186 })
187 }
188
189 /// Serializes just the feed's identifier. The concrete provider is never sent over the
190 /// wire — the coordinator keeps the local provider to produce work units, and the
191 /// worker rebuilds a remote-variant feed via [`WorkUnitFeed::from_proto`] that reads
192 /// from the network.
193 pub fn to_proto(&self) -> WorkUnitFeedProto {
194 WorkUnitFeedProto {
195 id: serialize_uuid(&self.id),
196 }
197 }
198
199 /// Consumes the feed and returns the user-provided [`WorkUnitFeedProvider`] if this
200 /// feed is in the local variant. Returns an error if the feed is remote (i.e. we're on
201 /// a worker and there is no local provider to extract).
202 pub fn try_into_inner(self) -> Result<T> {
203 match self.provider {
204 RemoteOrLocalProvider::Local(local) => Ok(local),
205 RemoteOrLocalProvider::Remote(_) => {
206 internal_err!(
207 "Cannot get the inner local provider, as the remote variant was already set"
208 )
209 }
210 }
211 }
212
213 /// Consumes the feed and returns the user-provided [`WorkUnitFeedProvider`] if this
214 /// feed is in the local variant. Returns None otherwise.
215 pub fn into_inner(self) -> Option<T> {
216 match self.provider {
217 RemoteOrLocalProvider::Local(local) => Some(local),
218 RemoteOrLocalProvider::Remote(_) => None,
219 }
220 }
221
222 /// Returns a reference to the inner [`WorkUnitFeedProvider`] if this feed is
223 /// in the local variant. Returns None otherwise
224 pub fn inner(&self) -> Option<&T> {
225 match &self.provider {
226 RemoteOrLocalProvider::Local(local) => Some(local),
227 RemoteOrLocalProvider::Remote(_) => None,
228 }
229 }
230
231 /// Returns a mutable reference to the inner [`WorkUnitFeedProvider`] if this feed is
232 /// in the local variant. Returns None otherwise
233 pub fn inner_mut(&mut self) -> Option<&mut T> {
234 match &mut self.provider {
235 RemoteOrLocalProvider::Local(local) => Some(local),
236 RemoteOrLocalProvider::Remote(_) => None,
237 }
238 }
239
240 /// Returns a reference to the inner [`WorkUnitFeedProvider`] if this feed is
241 /// in the local variant. Returns an error if the feed is remote (i.e. we're on
242 /// a worker and there is no local provider to extract).
243 pub fn try_inner(&self) -> Result<&T> {
244 match &self.provider {
245 RemoteOrLocalProvider::Local(local) => Ok(local),
246 RemoteOrLocalProvider::Remote(_) => {
247 internal_err!(
248 "Cannot get the inner local provider, as the remote variant was already set"
249 )
250 }
251 }
252 }
253
254 /// Returns a mutable reference to the inner [`WorkUnitFeedProvider`] if this feed is
255 /// in the local variant. Returns an error if the feed is remote (i.e. we're on
256 /// a worker and there is no local provider to extract).
257 pub fn try_inner_mut(&mut self) -> Result<&mut T> {
258 match &mut self.provider {
259 RemoteOrLocalProvider::Local(local) => Ok(local),
260 RemoteOrLocalProvider::Remote(_) => {
261 internal_err!(
262 "Cannot get the inner local provider, as the remote variant was already set"
263 )
264 }
265 }
266 }
267
268 /// Returns the per-partition stream of [`WorkUnit`]s for `partition`. Refer to the
269 /// [WorkUnitFeed] docs for more details about how this works.
270 pub fn feed(
271 &self,
272 partition: usize,
273 ctx: Arc<TaskContext>,
274 ) -> Result<BoxStream<'static, Result<T::WorkUnit>>> {
275 self.provider.feed(partition, ctx)
276 }
277
278 /// DataFusion metrics collected at runtime while streaming [WorkUnit]s through [Self::feed].
279 pub fn metrics(&self) -> ExecutionPlanMetricsSet {
280 self.provider.metrics()
281 }
282}