pub struct WorkUnitFeed<T: WorkUnitFeedProvider> { /* private fields */ }Expand description
The WorkUnitFeed is created with a user-provided WorkUnitFeedProvider and is embedded in any custom datafusion::physical_plan::ExecutionPlan implementation as a field.
It exposes the WorkUnitFeed::feed method that users are expected to call in their datafusion::physical_plan::ExecutionPlan::execute implementation, which provides a stream of crate::WorkUnits, representing individual units of work (e.g., file addresses) at runtime. This is useful for when these units of work cannot be known at planning time, and are expected to be discovered streamed at execution time instead, as the query makes progress.
The special thing about this structure, is that it automatically works under distributed scenarios:
- The feeds are streamed from coordinator to workers, so the WorkUnitFeedProvider::feed method is never called from a remote worker.
- When deserializing a plan containing a WorkUnitFeed in a remote worker, a gRPC remote streaming version of the WorkUnitFeed is deserialized instead, streaming back the contents from the original WorkUnitFeed.
For the distributed layer to find the feed inside a leaf plan, register a getter
closure via crate::DistributedExt::set_distributed_work_unit_feed.
Keep in mind that, while interacting with WorkUnitFeed within a node, there’s no compile-time guarantee that it will not be in “remote” mode, although it’s guaranteed that this mode only applies after the datafusion::physical_plan::ExecutionPlan has been deserialized.
Upon serializing or de-serializing a plan containing a WorkUnitFeed, use the WorkUnitFeed::from_proto and WorkUnitFeed::to_proto methods.
§Example of WorkUnitFeed in single-node
┌──────────────────────────────────────────────────────┐
│ ExecutionPlan │
│ │
│ │
│┌────────────────────────────────────────────────────┐│
││ WorkUnitFeed ││
││ ┌───────────┐ ┌───────────┐ ┌───────────┐ ││
││ │ .feed(0) │ │ .feed(1) │ │ .feed(2) │ ││
││ └────┬──────┘ └──┬────────┘ └──┬────────┘ ││
│└──────┼───────────────┼─────────────────┼───────────┘│ .─.
│┌──────┼─────────┐┌────┼───────────┐┌───.▼.──────────┐│ ( ) WorkUnit
││ │P0 ││ .▼. P1 ││ ( )P2 ││ `─' (e.g., a file address)
││ .▼. ││ ( ) ││ `┬' ││
││ ( ) ││ `┬' ││ │ ││
││ `─' ││ │ ││ .▼. ││
││ │ ││ .▼. ││ ( ) ││
││ │ ││ ( ) ││ `┬' ││
││ │ ││ `─' ││ │ ││
││ ▼ ││ ▼ ││ ▼ ││
││ processing... ││ processing... ││ processing... ││
││ │ ││ │ ││ │ ││
││ │ ││ │ ││ │ ││
│└──────┼─────────┘└────┼───────────┘└────┼───────────┘│
└───────┼───────────────┼─────────────────┼────────────┘
┌─────▼─────┐ ┌───▼───────┐ ┌──▼────────┐
│RecordBatch│ │RecordBatch│ │RecordBatch│
└───────────┘ └───────────┘ └───────────┘§Example of WorkUnitFeed during distributed execution
┌──────────────────┐
│Coordinating Stage│
┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┴──────────────────┘
│
│
┌────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐│
││ WorkUnitFeed │
│ ┌───────────┐ ┌───────────┐ ┌───────────┐ ┌───────────┐ ┌───────────┐ ┌───────────┐ ││
││ │ .feed(0) │ │ .feed(1) │ │ .feed(2) │ │ .feed(3) │ │ .feed(4) │ │ .feed(5) │ │
│ └────┬──────┘ └──┬────────┘ └──┬────────┘ └─────┬─────┘ └──┬────────┘ └───┬───────┘ ││
│└───────┼───────────────┼─────────────────┼───────────────────────────┼───────────────┼─────────────────┼────────────┘
│ │ │ │ │ .┴. │
└ ─ ─ ─ ─│─ ─ ─ ─ ─ ─ ─ ─│─ ─ ─ ─ ─ ─ ─ ─ ─│─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ .┴. ─ ─ ─ ─ ─ ─ ─│─ ─ ─ ─ ─ ─ ─ ─( )─ ─ ─ ─ ─ ─
│ .┴. │ ( ) │ `┬'
│ ( ) │ `┬' │ .┴.
.┴. `┬' .┴. │ │ ( )
( ) │ ( ) │ .┴. `┬'
`┬' .┴. `┬'────────────┐ │ ( ) │┌────────────┐
│ ( ) ││ Worker 1 │ │ `┬' ││ Worker 2 │
┌ ─ ─ ─ ─│─ ─ ─ ─ ─ ─ ─ `┬' ─ ─ ─ ─ ─ ─ ─ ─│┴────────────┘ ┌ ─ ─ ─ ─│─ ─ ─ ─ ─ ─ ─ ─│─ ─ ─ ─ ─ ─ ─ ─ ─│┴────────────┘
┌───────┼───────────────┼─────────────────┼────────────┐│ ┌───────┼───────────────┼─────────────────┼────────────┐│
││ │ Exe│utionPlan │ │ ││ │ Exe│utionPlan │ │
│ │ │ │ ││ │ │ │ │ ││
││┌──────┼───────────────┼─────────────────┼───────────┐│ ││┌──────┼───────────────┼─────────────────┼───────────┐│
││ │ Remot│WorkUnitFeed │ │││ ││ │ Remot│WorkUnitFeed │ │││
│││ ┌────▼──────┐ ┌──▼────────┐ ┌──▼────────┐ ││ │││ ┌────▼──────┐ ┌──▼────────┐ ┌──▼────────┐ ││
││ │ .feed(0) │ │ .feed(1) │ │ .feed(2) │ │││ ││ │ .feed(0) │ │ .feed(1) │ │ .feed(2) │ │││
│││ └────┬──────┘ └──┬────────┘ └──┬────────┘ ││ │││ └────┬──────┘ └──┬────────┘ └──┬────────┘ ││
│└──────┼───────────────┼─────────────────┼───────────┘││ │└──────┼───────────────┼─────────────────┼───────────┘││
││ │ │ │ │ ││ │ │ │ │
│┌──────┼─────────┐┌────┼───────────┐┌───.▼.──────────┐││ │┌──────┼─────────┐┌────┼───────────┐┌───.▼.──────────┐││
│││ │P0 ││ .▼. P1 ││ ( )P2 ││ │││ │P0 ││ .▼. P1 ││ ( )P2 ││
││ .▼. ││ ( ) ││ `┬' │││ ││ .▼. ││ ( ) ││ `┬' │││
│││ ( ) ││ `┬' ││ │ ││ │││ ( ) ││ `┬' ││ │ ││
││ `─' ││ ┼ ││ .▼. │││ ││ `─' ││ ┼ ││ .▼. │││
│││ │ ││ .▼. ││ ( ) ││ │││ │ ││ .▼. ││ ( ) ││
││ │ ││ ( ) ││ `┬' │││ ││ │ ││ ( ) ││ `┬' │││
│││ │ ││ `─' ││ │ ││ │││ │ ││ `─' ││ │ ││
││ ▼ ││ ││ ▼ │││ ││ ▼ ││ ││ ▼ │││
│││ processing... ││ processing... ││ processing... ││ │││ processing... ││ processing... ││ processing... ││
││ │ ││ │ ││ │ │││ ││ │ ││ │ ││ │ │││
│││ │ ││ │ ││ │ ││ │││ │ ││ │ ││ │ ││
│└──────┼─────────┘└────┼───────────┘└────┼───────────┘││ │└──────┼─────────┘└────┼───────────┘└────┼───────────┘││
│└───────┼───────────────┼─────────────────┼────────────┘ │└───────┼───────────────┼─────────────────┼────────────┘
┌─────▼─────┐ ┌───▼───────┐ ┌──▼────────┐ │ ┌─────▼─────┐ ┌───▼───────┐ ┌──▼────────┐ │
│ │RecordBatch│ │RecordBatch│ │RecordBatch│ │ │RecordBatch│ │RecordBatch│ │RecordBatch│
└───────────┘ └───────────┘ └───────────┘ │ └───────────┘ └───────────┘ └───────────┘ │
└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─Implementations§
Source§impl<T: WorkUnitFeedProvider> WorkUnitFeed<T>
impl<T: WorkUnitFeedProvider> WorkUnitFeed<T>
Sourcepub fn new(provider: T) -> Self
pub fn new(provider: T) -> Self
Builds a new local WorkUnitFeed backed by the given provider. Store the
resulting feed as a field of your leaf datafusion::physical_plan::ExecutionPlan
and register a getter with crate::DistributedExt::set_distributed_work_unit_feed
so the distributed layer can find it.
Sourcepub fn from_proto(proto: WorkUnitFeedProto) -> Result<Self>
pub fn from_proto(proto: WorkUnitFeedProto) -> Result<Self>
Reconstructs a WorkUnitFeed from its serialized form. The resulting feed is in
the remote variant — it will read work units off the network using the
RemoteWorkUnitFeedRegistry installed in the worker’s session config. Used by
physical plan codecs when deserializing a plan on a worker.
Sourcepub fn to_proto(&self) -> WorkUnitFeedProto
pub fn to_proto(&self) -> WorkUnitFeedProto
Serializes just the feed’s identifier. The concrete provider is never sent over the
wire — the coordinator keeps the local provider to produce work units, and the
worker rebuilds a remote-variant feed via WorkUnitFeed::from_proto that reads
from the network.
Sourcepub fn try_into_inner(self) -> Result<T>
pub fn try_into_inner(self) -> Result<T>
Consumes the feed and returns the user-provided WorkUnitFeedProvider if this
feed is in the local variant. Returns an error if the feed is remote (i.e. we’re on
a worker and there is no local provider to extract).
Sourcepub fn into_inner(self) -> Option<T>
pub fn into_inner(self) -> Option<T>
Consumes the feed and returns the user-provided WorkUnitFeedProvider if this
feed is in the local variant. Returns None otherwise.
Sourcepub fn inner(&self) -> Option<&T>
pub fn inner(&self) -> Option<&T>
Returns a reference to the inner WorkUnitFeedProvider if this feed is
in the local variant. Returns None otherwise
Sourcepub fn inner_mut(&mut self) -> Option<&mut T>
pub fn inner_mut(&mut self) -> Option<&mut T>
Returns a mutable reference to the inner WorkUnitFeedProvider if this feed is
in the local variant. Returns None otherwise
Sourcepub fn try_inner(&self) -> Result<&T>
pub fn try_inner(&self) -> Result<&T>
Returns a reference to the inner WorkUnitFeedProvider if this feed is
in the local variant. Returns an error if the feed is remote (i.e. we’re on
a worker and there is no local provider to extract).
Sourcepub fn try_inner_mut(&mut self) -> Result<&mut T>
pub fn try_inner_mut(&mut self) -> Result<&mut T>
Returns a mutable reference to the inner WorkUnitFeedProvider if this feed is
in the local variant. Returns an error if the feed is remote (i.e. we’re on
a worker and there is no local provider to extract).
Sourcepub fn feed(
&self,
partition: usize,
ctx: Arc<TaskContext>,
) -> Result<BoxStream<'static, Result<T::WorkUnit>>>
pub fn feed( &self, partition: usize, ctx: Arc<TaskContext>, ) -> Result<BoxStream<'static, Result<T::WorkUnit>>>
Returns the per-partition stream of [WorkUnit]s for partition. Refer to the
WorkUnitFeed docs for more details about how this works.
Sourcepub fn metrics(&self) -> ExecutionPlanMetricsSet
pub fn metrics(&self) -> ExecutionPlanMetricsSet
DataFusion metrics collected at runtime while streaming [WorkUnit]s through Self::feed.
Trait Implementations§
Source§impl<T: Clone + WorkUnitFeedProvider> Clone for WorkUnitFeed<T>
impl<T: Clone + WorkUnitFeedProvider> Clone for WorkUnitFeed<T>
Source§fn clone(&self) -> WorkUnitFeed<T>
fn clone(&self) -> WorkUnitFeed<T>
1.0.0 (const: unstable) · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source. Read moreAuto Trait Implementations§
impl<T> !RefUnwindSafe for WorkUnitFeed<T>
impl<T> !UnwindSafe for WorkUnitFeed<T>
impl<T> Freeze for WorkUnitFeed<T>where
T: Freeze,
impl<T> Send for WorkUnitFeed<T>
impl<T> Sync for WorkUnitFeed<T>
impl<T> Unpin for WorkUnitFeed<T>where
T: Unpin,
impl<T> UnsafeUnpin for WorkUnitFeed<T>where
T: UnsafeUnpin,
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
Source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T in a tonic::Request