Skip to main content

WorkUnitFeed

Struct WorkUnitFeed 

Source
pub struct WorkUnitFeed<T: WorkUnitFeedProvider> { /* private fields */ }
Expand description

The WorkUnitFeed is created with a user-provided WorkUnitFeedProvider and is embedded in any custom datafusion::physical_plan::ExecutionPlan implementation as a field.

It exposes the WorkUnitFeed::feed method that users are expected to call in their datafusion::physical_plan::ExecutionPlan::execute implementation, which provides a stream of crate::WorkUnits, representing individual units of work (e.g., file addresses) at runtime. This is useful for when these units of work cannot be known at planning time, and are expected to be discovered streamed at execution time instead, as the query makes progress.

The special thing about this structure, is that it automatically works under distributed scenarios:

  • The feeds are streamed from coordinator to workers, so the WorkUnitFeedProvider::feed method is never called from a remote worker.
  • When deserializing a plan containing a WorkUnitFeed in a remote worker, a gRPC remote streaming version of the WorkUnitFeed is deserialized instead, streaming back the contents from the original WorkUnitFeed.

For the distributed layer to find the feed inside a leaf plan, register a getter closure via crate::DistributedExt::set_distributed_work_unit_feed.

Keep in mind that, while interacting with WorkUnitFeed within a node, there’s no compile-time guarantee that it will not be in “remote” mode, although it’s guaranteed that this mode only applies after the datafusion::physical_plan::ExecutionPlan has been deserialized.

Upon serializing or de-serializing a plan containing a WorkUnitFeed, use the WorkUnitFeed::from_proto and WorkUnitFeed::to_proto methods.

§Example of WorkUnitFeed in single-node

┌──────────────────────────────────────────────────────┐
│                    ExecutionPlan                     │
│                                                      │
│                                                      │
│┌────────────────────────────────────────────────────┐│
││                    WorkUnitFeed                    ││
││ ┌───────────┐     ┌───────────┐     ┌───────────┐  ││
││ │ .feed(0)  │     │ .feed(1)  │     │ .feed(2)  │  ││
││ └────┬──────┘     └──┬────────┘     └──┬────────┘  ││
│└──────┼───────────────┼─────────────────┼───────────┘│  .─.
│┌──────┼─────────┐┌────┼───────────┐┌───.▼.──────────┐│ (   ) WorkUnit
││      │P0       ││   .▼. P1       ││  (   )P2       ││  `─'  (e.g., a file address)
││     .▼.        ││  (   )         ││   `┬'          ││
││    (   )       ││   `┬'          ││    │           ││
││     `─'        ││    │           ││   .▼.          ││
││      │         ││   .▼.          ││  (   )         ││
││      │         ││  (   )         ││   `┬'          ││
││      │         ││   `─'          ││    │           ││
││      ▼         ││    ▼           ││    ▼           ││
││  processing... ││  processing... ││  processing... ││
││      │         ││    │           ││    │           ││
││      │         ││    │           ││    │           ││
│└──────┼─────────┘└────┼───────────┘└────┼───────────┘│
└───────┼───────────────┼─────────────────┼────────────┘
  ┌─────▼─────┐     ┌───▼───────┐      ┌──▼────────┐
  │RecordBatch│     │RecordBatch│      │RecordBatch│
  └───────────┘     └───────────┘      └───────────┘

§Example of WorkUnitFeed during distributed execution

                                                                                                    ┌──────────────────┐
                                                                                                    │Coordinating Stage│
┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┴──────────────────┘
                                                                                                                       │
│
 ┌────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐│
││                                                    WorkUnitFeed                                                    │
 │  ┌───────────┐     ┌───────────┐     ┌───────────┐            ┌───────────┐      ┌───────────┐    ┌───────────┐    ││
││  │ .feed(0)  │     │ .feed(1)  │     │ .feed(2)  │            │ .feed(3)  │      │ .feed(4)  │    │ .feed(5)  │    │
 │  └────┬──────┘     └──┬────────┘     └──┬────────┘            └─────┬─────┘      └──┬────────┘    └───┬───────┘    ││
│└───────┼───────────────┼─────────────────┼───────────────────────────┼───────────────┼─────────────────┼────────────┘
         │               │                 │                           │               │                .┴.            │
└ ─ ─ ─ ─│─ ─ ─ ─ ─ ─ ─ ─│─ ─ ─ ─ ─ ─ ─ ─ ─│─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ .┴. ─ ─ ─ ─ ─ ─ ─│─ ─ ─ ─ ─ ─ ─ ─(   )─ ─ ─ ─ ─ ─
         │              .┴.                │                         (   )             │                `┬'
         │             (   )               │                          `┬'              │                .┴.
        .┴.             `┬'               .┴.                          │               │               (   )
       (   )             │               (   )                         │              .┴.               `┬'
        `┬'             .┴.               `┬'────────────┐             │             (   )               │┌────────────┐
         │             (   )               ││  Worker 1  │             │              `┬'                ││  Worker 2  │
┌ ─ ─ ─ ─│─ ─ ─ ─ ─ ─ ─ `┬' ─ ─ ─ ─ ─ ─ ─ ─│┴────────────┘    ┌ ─ ─ ─ ─│─ ─ ─ ─ ─ ─ ─ ─│─ ─ ─ ─ ─ ─ ─ ─ ─│┴────────────┘
 ┌───────┼───────────────┼─────────────────┼────────────┐│     ┌───────┼───────────────┼─────────────────┼────────────┐│
││       │            Exe│utionPlan        │            │     ││       │            Exe│utionPlan        │            │
 │       │               │                 │            ││     │       │               │                 │            ││
││┌──────┼───────────────┼─────────────────┼───────────┐│     ││┌──────┼───────────────┼─────────────────┼───────────┐│
 ││      │          Remot│WorkUnitFeed     │           │││     ││      │          Remot│WorkUnitFeed     │           │││
│││ ┌────▼──────┐     ┌──▼────────┐     ┌──▼────────┐  ││     │││ ┌────▼──────┐     ┌──▼────────┐     ┌──▼────────┐  ││
 ││ │ .feed(0)  │     │ .feed(1)  │     │ .feed(2)  │  │││     ││ │ .feed(0)  │     │ .feed(1)  │     │ .feed(2)  │  │││
│││ └────┬──────┘     └──┬────────┘     └──┬────────┘  ││     │││ └────┬──────┘     └──┬────────┘     └──┬────────┘  ││
 │└──────┼───────────────┼─────────────────┼───────────┘││     │└──────┼───────────────┼─────────────────┼───────────┘││
││       │               │                 │            │     ││       │               │                 │            │
 │┌──────┼─────────┐┌────┼───────────┐┌───.▼.──────────┐││     │┌──────┼─────────┐┌────┼───────────┐┌───.▼.──────────┐││
│││      │P0       ││   .▼. P1       ││  (   )P2       ││     │││      │P0       ││   .▼. P1       ││  (   )P2       ││
 ││     .▼.        ││  (   )         ││   `┬'          │││     ││     .▼.        ││  (   )         ││   `┬'          │││
│││    (   )       ││   `┬'          ││    │           ││     │││    (   )       ││   `┬'          ││    │           ││
 ││     `─'        ││    ┼           ││   .▼.          │││     ││     `─'        ││    ┼           ││   .▼.          │││
│││      │         ││   .▼.          ││  (   )         ││     │││      │         ││   .▼.          ││  (   )         ││
 ││      │         ││  (   )         ││   `┬'          │││     ││      │         ││  (   )         ││   `┬'          │││
│││      │         ││   `─'          ││    │           ││     │││      │         ││   `─'          ││    │           ││
 ││      ▼         ││                ││    ▼           │││     ││      ▼         ││                ││    ▼           │││
│││  processing... ││  processing... ││  processing... ││     │││  processing... ││  processing... ││  processing... ││
 ││      │         ││    │           ││    │           │││     ││      │         ││    │           ││    │           │││
│││      │         ││    │           ││    │           ││     │││      │         ││    │           ││    │           ││
 │└──────┼─────────┘└────┼───────────┘└────┼───────────┘││     │└──────┼─────────┘└────┼───────────┘└────┼───────────┘││
│└───────┼───────────────┼─────────────────┼────────────┘     │└───────┼───────────────┼─────────────────┼────────────┘
   ┌─────▼─────┐     ┌───▼───────┐      ┌──▼────────┐    │       ┌─────▼─────┐     ┌───▼───────┐      ┌──▼────────┐    │
│  │RecordBatch│     │RecordBatch│      │RecordBatch│         │  │RecordBatch│     │RecordBatch│      │RecordBatch│
   └───────────┘     └───────────┘      └───────────┘    │       └───────────┘     └───────────┘      └───────────┘    │
└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─     └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─

Implementations§

Source§

impl<T: WorkUnitFeedProvider> WorkUnitFeed<T>

Source

pub fn new(provider: T) -> Self

Builds a new local WorkUnitFeed backed by the given provider. Store the resulting feed as a field of your leaf datafusion::physical_plan::ExecutionPlan and register a getter with crate::DistributedExt::set_distributed_work_unit_feed so the distributed layer can find it.

Source

pub fn from_proto(proto: WorkUnitFeedProto) -> Result<Self>

Reconstructs a WorkUnitFeed from its serialized form. The resulting feed is in the remote variant — it will read work units off the network using the RemoteWorkUnitFeedRegistry installed in the worker’s session config. Used by physical plan codecs when deserializing a plan on a worker.

Source

pub fn to_proto(&self) -> WorkUnitFeedProto

Serializes just the feed’s identifier. The concrete provider is never sent over the wire — the coordinator keeps the local provider to produce work units, and the worker rebuilds a remote-variant feed via WorkUnitFeed::from_proto that reads from the network.

Source

pub fn try_into_inner(self) -> Result<T>

Consumes the feed and returns the user-provided WorkUnitFeedProvider if this feed is in the local variant. Returns an error if the feed is remote (i.e. we’re on a worker and there is no local provider to extract).

Source

pub fn into_inner(self) -> Option<T>

Consumes the feed and returns the user-provided WorkUnitFeedProvider if this feed is in the local variant. Returns None otherwise.

Source

pub fn inner(&self) -> Option<&T>

Returns a reference to the inner WorkUnitFeedProvider if this feed is in the local variant. Returns None otherwise

Source

pub fn inner_mut(&mut self) -> Option<&mut T>

Returns a mutable reference to the inner WorkUnitFeedProvider if this feed is in the local variant. Returns None otherwise

Source

pub fn try_inner(&self) -> Result<&T>

Returns a reference to the inner WorkUnitFeedProvider if this feed is in the local variant. Returns an error if the feed is remote (i.e. we’re on a worker and there is no local provider to extract).

Source

pub fn try_inner_mut(&mut self) -> Result<&mut T>

Returns a mutable reference to the inner WorkUnitFeedProvider if this feed is in the local variant. Returns an error if the feed is remote (i.e. we’re on a worker and there is no local provider to extract).

Source

pub fn feed( &self, partition: usize, ctx: Arc<TaskContext>, ) -> Result<BoxStream<'static, Result<T::WorkUnit>>>

Returns the per-partition stream of [WorkUnit]s for partition. Refer to the WorkUnitFeed docs for more details about how this works.

Source

pub fn metrics(&self) -> ExecutionPlanMetricsSet

DataFusion metrics collected at runtime while streaming [WorkUnit]s through Self::feed.

Trait Implementations§

Source§

impl<T: Clone + WorkUnitFeedProvider> Clone for WorkUnitFeed<T>

Source§

fn clone(&self) -> WorkUnitFeed<T>

Returns a duplicate of the value. Read more
1.0.0 (const: unstable) · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl<T: Debug + WorkUnitFeedProvider> Debug for WorkUnitFeed<T>

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

§

impl<T> !RefUnwindSafe for WorkUnitFeed<T>

§

impl<T> !UnwindSafe for WorkUnitFeed<T>

§

impl<T> Freeze for WorkUnitFeed<T>
where T: Freeze,

§

impl<T> Send for WorkUnitFeed<T>

§

impl<T> Sync for WorkUnitFeed<T>

§

impl<T> Unpin for WorkUnitFeed<T>
where T: Unpin,

§

impl<T> UnsafeUnpin for WorkUnitFeed<T>
where T: UnsafeUnpin,

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> FromRef<T> for T
where T: Clone,

Source§

fn from_ref(input: &T) -> T

Converts to this type from a reference to the input type.
Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T> IntoRequest<T> for T

Source§

fn into_request(self) -> Request<T>

Wrap the input message T in a tonic::Request
Source§

impl<L> LayerExt<L> for L

Source§

fn named_layer<S>(&self, service: S) -> Layered<<L as Layer<S>>::Service, S>
where L: Layer<S>,

Applies the layer to a service and wraps it in Layered.
Source§

impl<T> Pointable for T

Source§

const ALIGN: usize

The alignment of pointer.
Source§

type Init = T

The type for initializers.
Source§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
Source§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
Source§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
Source§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more