ExecutionGraph

Struct ExecutionGraph 

Source
pub struct ExecutionGraph { /* private fields */ }
Expand description

Represents the DAG for a distributed query plan.

A distributed query plan consists of a set of stages which must be executed sequentially.

Each stage consists of a set of partitions which can be executed in parallel, where each partition represents a Task, which is the basic unit of scheduling in kapot.

As an example, consider a SQL query which performs a simple aggregation:

SELECT id, SUM(gmv) FROM some_table GROUP BY id

This will produce a DataFusion execution plan that looks something like

CoalesceBatchesExec: target_batch_size=4096 RepartitionExec: partitioning=Hash([Column { name: “id”, index: 0 }], 4) AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[SUM(some_table.gmv)] TableScan: some_table

The kapot DistributedPlanner will turn this into a distributed plan by creating a shuffle boundary (called a “Stage”) whenever the underlying plan needs to perform a repartition. In this case we end up with a distributed plan with two stages:

ExecutionGraph[job_id=job, session_id=session, available_tasks=1, complete=false] =========UnResolvedStage[id=2, children=1]========= Inputs{1: StageOutput { partition_locations: {}, complete: false }} ShuffleWriterExec: None AggregateExec: mode=FinalPartitioned, gby=[id@0 as id], aggr=[SUM(?table?.gmv)] CoalesceBatchesExec: target_batch_size=4096 UnresolvedShuffleExec =========ResolvedStage[id=1, partitions=1]========= ShuffleWriterExec: Some(Hash([Column { name: “id”, index: 0 }], 4)) AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[SUM(?table?.gmv)] TableScan: some_table

The DAG structure of this ExecutionGraph is encoded in the stages. Each stage’s input field will indicate which stages it depends on, and each stage’s output_links will indicate which stage it needs to publish its output to.

If a stage has output_links is empty then it is the final stage in this query, and it should publish its outputs to the ExecutionGraphs output_locations representing the final query results.

Implementations§

Source§

impl ExecutionGraph

Source

pub fn new( scheduler_id: &str, job_id: &str, job_name: &str, session_id: &str, plan: Arc<dyn ExecutionPlan>, queued_at: u64, ) -> Result<Self>

Source

pub fn job_id(&self) -> &str

Source

pub fn job_name(&self) -> &str

Source

pub fn session_id(&self) -> &str

Source

pub fn status(&self) -> &JobStatus

Source

pub fn start_time(&self) -> u64

Source

pub fn end_time(&self) -> u64

Source

pub fn stage_count(&self) -> usize

Source

pub fn next_task_id(&mut self) -> usize

Source

pub fn is_successful(&self) -> bool

An ExecutionGraph is successful if all its stages are successful

Source

pub fn is_complete(&self) -> bool

Source

pub fn revive(&mut self) -> bool

Revive the execution graph by converting the resolved stages to running stages If any stages are converted, return true; else false.

Source

pub fn update_task_status( &mut self, executor: &ExecutorMetadata, task_statuses: Vec<TaskStatus>, max_task_failures: usize, max_stage_failures: usize, ) -> Result<Vec<QueryStageSchedulerEvent>>

Update task statuses and task metrics in the graph. This will also push shuffle partitions to their respective shuffle read stages.

Source

pub fn running_stages(&self) -> Vec<usize>

Return all the currently running stage ids

Source

pub fn running_tasks(&self) -> Vec<RunningTaskInfo>

Return all currently running tasks along with the executor ID on which they are assigned

Source

pub fn available_tasks(&self) -> usize

Total number of tasks in this plan that are ready for scheduling

Source

pub fn pop_next_task( &mut self, executor_id: &str, ) -> Result<Option<TaskDescription>>

Get next task that can be assigned to the given executor. This method should only be called when the resulting task is immediately being launched as the status will be set to Running and it will not be available to the scheduler. If the task is not launched the status must be reset to allow the task to be scheduled elsewhere.

Source

pub fn update_status(&mut self, status: JobStatus)

Source

pub fn output_locations(&self) -> Vec<PartitionLocation>

Source

pub fn reset_stages_on_lost_executor( &mut self, executor_id: &str, ) -> Result<(HashSet<usize>, Vec<RunningTaskInfo>)>

Reset running and successful stages on a given executor This will first check the unresolved/resolved/running stages and reset the running tasks and successful tasks. Then it will check the successful stage and whether there are running parent stages need to read shuffle from it. If yes, reset the successful tasks and roll back the resolved shuffle recursively.

Returns the reset stage ids and running tasks should be killed

Source

pub fn resolve_stage(&mut self, stage_id: usize) -> Result<bool>

Convert unresolved stage to be resolved

Source

pub fn succeed_stage(&mut self, stage_id: usize) -> bool

Convert running stage to be successful

Source

pub fn fail_stage(&mut self, stage_id: usize, err_msg: String) -> bool

Convert running stage to be failed

Source

pub fn rollback_running_stage( &mut self, stage_id: usize, failure_reasons: HashSet<String>, ) -> Result<Vec<RunningTaskInfo>>

Convert running stage to be unresolved, Returns a Vec of RunningTaskInfo for running tasks in this stage.

Source

pub fn rollback_resolved_stage(&mut self, stage_id: usize) -> Result<bool>

Convert resolved stage to be unresolved

Source

pub fn rerun_successful_stage(&mut self, stage_id: usize) -> bool

Convert successful stage to be running

Source

pub fn fail_job(&mut self, error: String)

fail job with error message

Source

pub fn succeed_job(&mut self) -> Result<()>

Mark the job success

Trait Implementations§

Source§

impl Clone for ExecutionGraph

Source§

fn clone(&self) -> ExecutionGraph

Returns a duplicate of the value. Read more
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl Debug for ExecutionGraph

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
Source§

impl From<&ExecutionGraph> for JobOverview

Source§

fn from(value: &ExecutionGraph) -> Self

Converts to this type from the input type.

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> FromRef<T> for T
where T: Clone,

Source§

fn from_ref(input: &T) -> T

Converts to this type from a reference to the input type.
Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T> IntoRequest<T> for T

Source§

fn into_request(self) -> Request<T>

Wrap the input message T in a tonic::Request
Source§

impl<T> Pointable for T

Source§

const ALIGN: usize

The alignment of pointer.
Source§

type Init = T

The type for initializers.
Source§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
Source§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
Source§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
Source§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
Source§

impl<T> PolicyExt for T
where T: ?Sized,

Source§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow only if self and other return Action::Follow. Read more
Source§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow if either self or other returns Action::Follow. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more