Skip to main content

datafusion_distributed/work_unit_feed/
work_unit_feed.rs

1use crate::WorkUnitFeedProvider;
2use crate::common::{deserialize_uuid, serialize_uuid};
3use crate::work_unit_feed::remote_work_unit_feed::RemoteFeedProvider;
4use datafusion::common::{Result, internal_err};
5use datafusion::execution::TaskContext;
6use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
7use futures::stream::BoxStream;
8use std::fmt::Debug;
9use std::sync::Arc;
10use uuid::Uuid;
11
12/// The [WorkUnitFeed] is created with a user-provided [WorkUnitFeedProvider] and is embedded
13/// in any custom [datafusion::physical_plan::ExecutionPlan] implementation as a field.
14///
15/// It exposes the [WorkUnitFeed::feed] method that users are expected to call in their
16/// [datafusion::physical_plan::ExecutionPlan::execute] implementation, which provides a stream
17/// of [crate::WorkUnit]s, representing individual units of work (e.g., file addresses) at runtime.
18/// This is useful for when these units of work cannot be known at planning time, and are
19/// expected to be discovered streamed at execution time instead, as the query makes progress.
20///
21/// The special thing about this structure, is that it automatically works under distributed
22/// scenarios:
23/// - The feeds are streamed from coordinator to workers, so the [WorkUnitFeedProvider::feed] method
24///   is never called from a remote worker.
25/// - When deserializing a plan containing a [WorkUnitFeed] in a remote worker, a gRPC remote
26///   streaming version of the [WorkUnitFeed] is deserialized instead, streaming back the contents
27///   from the original [WorkUnitFeed].
28///
29/// For the distributed layer to find the feed inside a leaf plan, register a getter
30/// closure via [`crate::DistributedExt::set_distributed_work_unit_feed`].
31///
32/// Keep in mind that, while interacting with [WorkUnitFeed] within a node, there's no compile-time
33/// guarantee that it will not be in "remote" mode, although it's guaranteed that this mode only
34/// applies after the [datafusion::physical_plan::ExecutionPlan] has been deserialized.
35///
36/// Upon serializing or de-serializing a plan containing a [WorkUnitFeed], use the
37/// [WorkUnitFeed::from_proto] and [WorkUnitFeed::to_proto] methods.
38///
39/// # Example of [WorkUnitFeed] in single-node
40///
41/// ```text
42/// ┌──────────────────────────────────────────────────────┐
43/// │                    ExecutionPlan                     │
44/// │                                                      │
45/// │                                                      │
46/// │┌────────────────────────────────────────────────────┐│
47/// ││                    WorkUnitFeed                    ││
48/// ││ ┌───────────┐     ┌───────────┐     ┌───────────┐  ││
49/// ││ │ .feed(0)  │     │ .feed(1)  │     │ .feed(2)  │  ││
50/// ││ └────┬──────┘     └──┬────────┘     └──┬────────┘  ││
51/// │└──────┼───────────────┼─────────────────┼───────────┘│  .─.
52/// │┌──────┼─────────┐┌────┼───────────┐┌───.▼.──────────┐│ (   ) WorkUnit
53/// ││      │P0       ││   .▼. P1       ││  (   )P2       ││  `─'  (e.g., a file address)
54/// ││     .▼.        ││  (   )         ││   `┬'          ││
55/// ││    (   )       ││   `┬'          ││    │           ││
56/// ││     `─'        ││    │           ││   .▼.          ││
57/// ││      │         ││   .▼.          ││  (   )         ││
58/// ││      │         ││  (   )         ││   `┬'          ││
59/// ││      │         ││   `─'          ││    │           ││
60/// ││      ▼         ││    ▼           ││    ▼           ││
61/// ││  processing... ││  processing... ││  processing... ││
62/// ││      │         ││    │           ││    │           ││
63/// ││      │         ││    │           ││    │           ││
64/// │└──────┼─────────┘└────┼───────────┘└────┼───────────┘│
65/// └───────┼───────────────┼─────────────────┼────────────┘
66///   ┌─────▼─────┐     ┌───▼───────┐      ┌──▼────────┐
67///   │RecordBatch│     │RecordBatch│      │RecordBatch│
68///   └───────────┘     └───────────┘      └───────────┘
69/// ```
70///
71///
72/// # Example of [WorkUnitFeed] during distributed execution
73///
74/// ```text
75///                                                                                                     ┌──────────────────┐
76///                                                                                                     │Coordinating Stage│
77/// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┴──────────────────┘
78///                                                                                                                        │
79/// │
80///  ┌────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐│
81/// ││                                                    WorkUnitFeed                                                    │
82///  │  ┌───────────┐     ┌───────────┐     ┌───────────┐            ┌───────────┐      ┌───────────┐    ┌───────────┐    ││
83/// ││  │ .feed(0)  │     │ .feed(1)  │     │ .feed(2)  │            │ .feed(3)  │      │ .feed(4)  │    │ .feed(5)  │    │
84///  │  └────┬──────┘     └──┬────────┘     └──┬────────┘            └─────┬─────┘      └──┬────────┘    └───┬───────┘    ││
85/// │└───────┼───────────────┼─────────────────┼───────────────────────────┼───────────────┼─────────────────┼────────────┘
86///          │               │                 │                           │               │                .┴.            │
87/// └ ─ ─ ─ ─│─ ─ ─ ─ ─ ─ ─ ─│─ ─ ─ ─ ─ ─ ─ ─ ─│─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ .┴. ─ ─ ─ ─ ─ ─ ─│─ ─ ─ ─ ─ ─ ─ ─(   )─ ─ ─ ─ ─ ─
88///          │              .┴.                │                         (   )             │                `┬'
89///          │             (   )               │                          `┬'              │                .┴.
90///         .┴.             `┬'               .┴.                          │               │               (   )
91///        (   )             │               (   )                         │              .┴.               `┬'
92///         `┬'             .┴.               `┬'────────────┐             │             (   )               │┌────────────┐
93///          │             (   )               ││  Worker 1  │             │              `┬'                ││  Worker 2  │
94/// ┌ ─ ─ ─ ─│─ ─ ─ ─ ─ ─ ─ `┬' ─ ─ ─ ─ ─ ─ ─ ─│┴────────────┘    ┌ ─ ─ ─ ─│─ ─ ─ ─ ─ ─ ─ ─│─ ─ ─ ─ ─ ─ ─ ─ ─│┴────────────┘
95///  ┌───────┼───────────────┼─────────────────┼────────────┐│     ┌───────┼───────────────┼─────────────────┼────────────┐│
96/// ││       │            Exe│utionPlan        │            │     ││       │            Exe│utionPlan        │            │
97///  │       │               │                 │            ││     │       │               │                 │            ││
98/// ││┌──────┼───────────────┼─────────────────┼───────────┐│     ││┌──────┼───────────────┼─────────────────┼───────────┐│
99///  ││      │          Remot│WorkUnitFeed     │           │││     ││      │          Remot│WorkUnitFeed     │           │││
100/// │││ ┌────▼──────┐     ┌──▼────────┐     ┌──▼────────┐  ││     │││ ┌────▼──────┐     ┌──▼────────┐     ┌──▼────────┐  ││
101///  ││ │ .feed(0)  │     │ .feed(1)  │     │ .feed(2)  │  │││     ││ │ .feed(0)  │     │ .feed(1)  │     │ .feed(2)  │  │││
102/// │││ └────┬──────┘     └──┬────────┘     └──┬────────┘  ││     │││ └────┬──────┘     └──┬────────┘     └──┬────────┘  ││
103///  │└──────┼───────────────┼─────────────────┼───────────┘││     │└──────┼───────────────┼─────────────────┼───────────┘││
104/// ││       │               │                 │            │     ││       │               │                 │            │
105///  │┌──────┼─────────┐┌────┼───────────┐┌───.▼.──────────┐││     │┌──────┼─────────┐┌────┼───────────┐┌───.▼.──────────┐││
106/// │││      │P0       ││   .▼. P1       ││  (   )P2       ││     │││      │P0       ││   .▼. P1       ││  (   )P2       ││
107///  ││     .▼.        ││  (   )         ││   `┬'          │││     ││     .▼.        ││  (   )         ││   `┬'          │││
108/// │││    (   )       ││   `┬'          ││    │           ││     │││    (   )       ││   `┬'          ││    │           ││
109///  ││     `─'        ││    ┼           ││   .▼.          │││     ││     `─'        ││    ┼           ││   .▼.          │││
110/// │││      │         ││   .▼.          ││  (   )         ││     │││      │         ││   .▼.          ││  (   )         ││
111///  ││      │         ││  (   )         ││   `┬'          │││     ││      │         ││  (   )         ││   `┬'          │││
112/// │││      │         ││   `─'          ││    │           ││     │││      │         ││   `─'          ││    │           ││
113///  ││      ▼         ││                ││    ▼           │││     ││      ▼         ││                ││    ▼           │││
114/// │││  processing... ││  processing... ││  processing... ││     │││  processing... ││  processing... ││  processing... ││
115///  ││      │         ││    │           ││    │           │││     ││      │         ││    │           ││    │           │││
116/// │││      │         ││    │           ││    │           ││     │││      │         ││    │           ││    │           ││
117///  │└──────┼─────────┘└────┼───────────┘└────┼───────────┘││     │└──────┼─────────┘└────┼───────────┘└────┼───────────┘││
118/// │└───────┼───────────────┼─────────────────┼────────────┘     │└───────┼───────────────┼─────────────────┼────────────┘
119///    ┌─────▼─────┐     ┌───▼───────┐      ┌──▼────────┐    │       ┌─────▼─────┐     ┌───▼───────┐      ┌──▼────────┐    │
120/// │  │RecordBatch│     │RecordBatch│      │RecordBatch│         │  │RecordBatch│     │RecordBatch│      │RecordBatch│
121///    └───────────┘     └───────────┘      └───────────┘    │       └───────────┘     └───────────┘      └───────────┘    │
122/// └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─     └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
123/// ```
124#[derive(Debug, Clone)]
125pub struct WorkUnitFeed<T: WorkUnitFeedProvider> {
126    pub(crate) id: Uuid,
127    pub(crate) provider: RemoteOrLocalProvider<T>,
128}
129
130#[derive(Debug, Clone)]
131pub enum RemoteOrLocalProvider<T: WorkUnitFeedProvider> {
132    Local(T),
133    Remote(RemoteFeedProvider),
134}
135
136impl<T: WorkUnitFeedProvider> RemoteOrLocalProvider<T> {
137    fn feed(
138        &self,
139        partition: usize,
140        ctx: Arc<TaskContext>,
141    ) -> Result<BoxStream<'static, Result<T::WorkUnit>>> {
142        match self {
143            Self::Local(local) => local.feed(partition, ctx),
144            Self::Remote(remote) => Ok(remote.feed::<T::WorkUnit>(partition, ctx)?),
145        }
146    }
147
148    fn metrics(&self) -> ExecutionPlanMetricsSet {
149        match self {
150            Self::Local(local) => local.metrics(),
151            Self::Remote(remote) => remote.metrics(),
152        }
153    }
154}
155
156#[derive(Clone, PartialEq, ::prost::Message)]
157pub struct WorkUnitFeedProto {
158    #[prost(bytes, tag = "1")]
159    pub id: Vec<u8>,
160}
161
162impl<T: WorkUnitFeedProvider> WorkUnitFeed<T> {
163    /// Builds a new local [`WorkUnitFeed`] backed by the given `provider`. Store the
164    /// resulting feed as a field of your leaf [`datafusion::physical_plan::ExecutionPlan`]
165    /// and register a getter with [`crate::DistributedExt::set_distributed_work_unit_feed`]
166    /// so the distributed layer can find it.
167    pub fn new(provider: T) -> Self {
168        Self {
169            id: Uuid::new_v4(),
170            provider: RemoteOrLocalProvider::Local(provider),
171        }
172    }
173
174    /// Reconstructs a [`WorkUnitFeed`] from its serialized form. The resulting feed is in
175    /// the **remote** variant — it will read work units off the network using the
176    /// `RemoteWorkUnitFeedRegistry` installed in the worker's session config. Used by
177    /// physical plan codecs when deserializing a plan on a worker.
178    pub fn from_proto(proto: WorkUnitFeedProto) -> Result<Self> {
179        let id = deserialize_uuid(&proto.id)?;
180        Ok(WorkUnitFeed {
181            id,
182            provider: RemoteOrLocalProvider::Remote(RemoteFeedProvider {
183                id,
184                metrics: ExecutionPlanMetricsSet::new(),
185            }),
186        })
187    }
188
189    /// Serializes just the feed's identifier. The concrete provider is never sent over the
190    /// wire — the coordinator keeps the local provider to produce work units, and the
191    /// worker rebuilds a remote-variant feed via [`WorkUnitFeed::from_proto`] that reads
192    /// from the network.
193    pub fn to_proto(&self) -> WorkUnitFeedProto {
194        WorkUnitFeedProto {
195            id: serialize_uuid(&self.id),
196        }
197    }
198
199    /// Consumes the feed and returns the user-provided [`WorkUnitFeedProvider`] if this
200    /// feed is in the local variant. Returns an error if the feed is remote (i.e. we're on
201    /// a worker and there is no local provider to extract).
202    pub fn try_into_inner(self) -> Result<T> {
203        match self.provider {
204            RemoteOrLocalProvider::Local(local) => Ok(local),
205            RemoteOrLocalProvider::Remote(_) => {
206                internal_err!(
207                    "Cannot get the inner local provider, as the remote variant was already set"
208                )
209            }
210        }
211    }
212
213    /// Consumes the feed and returns the user-provided [`WorkUnitFeedProvider`] if this
214    /// feed is in the local variant. Returns None otherwise.
215    pub fn into_inner(self) -> Option<T> {
216        match self.provider {
217            RemoteOrLocalProvider::Local(local) => Some(local),
218            RemoteOrLocalProvider::Remote(_) => None,
219        }
220    }
221
222    /// Returns a reference to the inner [`WorkUnitFeedProvider`] if this feed is
223    /// in the local variant. Returns None otherwise
224    pub fn inner(&self) -> Option<&T> {
225        match &self.provider {
226            RemoteOrLocalProvider::Local(local) => Some(local),
227            RemoteOrLocalProvider::Remote(_) => None,
228        }
229    }
230
231    /// Returns a mutable reference to the inner [`WorkUnitFeedProvider`] if this feed is
232    /// in the local variant. Returns None otherwise
233    pub fn inner_mut(&mut self) -> Option<&mut T> {
234        match &mut self.provider {
235            RemoteOrLocalProvider::Local(local) => Some(local),
236            RemoteOrLocalProvider::Remote(_) => None,
237        }
238    }
239
240    /// Returns a reference to the inner [`WorkUnitFeedProvider`] if this feed is
241    /// in the local variant. Returns an error if the feed is remote (i.e. we're on
242    /// a worker and there is no local provider to extract).
243    pub fn try_inner(&self) -> Result<&T> {
244        match &self.provider {
245            RemoteOrLocalProvider::Local(local) => Ok(local),
246            RemoteOrLocalProvider::Remote(_) => {
247                internal_err!(
248                    "Cannot get the inner local provider, as the remote variant was already set"
249                )
250            }
251        }
252    }
253
254    /// Returns a mutable reference to the inner [`WorkUnitFeedProvider`] if this feed is
255    /// in the local variant. Returns an error if the feed is remote (i.e. we're on
256    /// a worker and there is no local provider to extract).
257    pub fn try_inner_mut(&mut self) -> Result<&mut T> {
258        match &mut self.provider {
259            RemoteOrLocalProvider::Local(local) => Ok(local),
260            RemoteOrLocalProvider::Remote(_) => {
261                internal_err!(
262                    "Cannot get the inner local provider, as the remote variant was already set"
263                )
264            }
265        }
266    }
267
268    /// Returns the per-partition stream of [`WorkUnit`]s for `partition`. Refer to the
269    /// [WorkUnitFeed] docs for more details about how this works.
270    pub fn feed(
271        &self,
272        partition: usize,
273        ctx: Arc<TaskContext>,
274    ) -> Result<BoxStream<'static, Result<T::WorkUnit>>> {
275        self.provider.feed(partition, ctx)
276    }
277
278    /// DataFusion metrics collected at runtime while streaming [WorkUnit]s through [Self::feed].
279    pub fn metrics(&self) -> ExecutionPlanMetricsSet {
280        self.provider.metrics()
281    }
282}