pub struct ExecutionGraph { /* private fields */ }Expand description
Represents the DAG for a distributed query plan.
A distributed query plan consists of a set of stages which must be executed sequentially.
Each stage consists of a set of partitions which can be executed in parallel, where each partition
represents a Task, which is the basic unit of scheduling in kapot.
As an example, consider a SQL query which performs a simple aggregation:
SELECT id, SUM(gmv) FROM some_table GROUP BY id
This will produce a DataFusion execution plan that looks something like
CoalesceBatchesExec: target_batch_size=4096 RepartitionExec: partitioning=Hash([Column { name: “id”, index: 0 }], 4) AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[SUM(some_table.gmv)] TableScan: some_table
The kapot DistributedPlanner will turn this into a distributed plan by creating a shuffle
boundary (called a “Stage”) whenever the underlying plan needs to perform a repartition.
In this case we end up with a distributed plan with two stages:
ExecutionGraph[job_id=job, session_id=session, available_tasks=1, complete=false] =========UnResolvedStage[id=2, children=1]========= Inputs{1: StageOutput { partition_locations: {}, complete: false }} ShuffleWriterExec: None AggregateExec: mode=FinalPartitioned, gby=[id@0 as id], aggr=[SUM(?table?.gmv)] CoalesceBatchesExec: target_batch_size=4096 UnresolvedShuffleExec =========ResolvedStage[id=1, partitions=1]========= ShuffleWriterExec: Some(Hash([Column { name: “id”, index: 0 }], 4)) AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[SUM(?table?.gmv)] TableScan: some_table
The DAG structure of this ExecutionGraph is encoded in the stages. Each stage’s input field
will indicate which stages it depends on, and each stage’s output_links will indicate which
stage it needs to publish its output to.
If a stage has output_links is empty then it is the final stage in this query, and it should
publish its outputs to the ExecutionGraphs output_locations representing the final query results.
Implementations§
Source§impl ExecutionGraph
impl ExecutionGraph
pub fn new( scheduler_id: &str, job_id: &str, job_name: &str, session_id: &str, plan: Arc<dyn ExecutionPlan>, queued_at: u64, ) -> Result<Self>
pub fn job_id(&self) -> &str
pub fn job_name(&self) -> &str
pub fn session_id(&self) -> &str
pub fn status(&self) -> &JobStatus
pub fn start_time(&self) -> u64
pub fn end_time(&self) -> u64
pub fn stage_count(&self) -> usize
pub fn next_task_id(&mut self) -> usize
Sourcepub fn is_successful(&self) -> bool
pub fn is_successful(&self) -> bool
An ExecutionGraph is successful if all its stages are successful
pub fn is_complete(&self) -> bool
Sourcepub fn revive(&mut self) -> bool
pub fn revive(&mut self) -> bool
Revive the execution graph by converting the resolved stages to running stages If any stages are converted, return true; else false.
Sourcepub fn update_task_status(
&mut self,
executor: &ExecutorMetadata,
task_statuses: Vec<TaskStatus>,
max_task_failures: usize,
max_stage_failures: usize,
) -> Result<Vec<QueryStageSchedulerEvent>>
pub fn update_task_status( &mut self, executor: &ExecutorMetadata, task_statuses: Vec<TaskStatus>, max_task_failures: usize, max_stage_failures: usize, ) -> Result<Vec<QueryStageSchedulerEvent>>
Update task statuses and task metrics in the graph. This will also push shuffle partitions to their respective shuffle read stages.
Sourcepub fn running_stages(&self) -> Vec<usize>
pub fn running_stages(&self) -> Vec<usize>
Return all the currently running stage ids
Sourcepub fn running_tasks(&self) -> Vec<RunningTaskInfo>
pub fn running_tasks(&self) -> Vec<RunningTaskInfo>
Return all currently running tasks along with the executor ID on which they are assigned
Sourcepub fn available_tasks(&self) -> usize
pub fn available_tasks(&self) -> usize
Total number of tasks in this plan that are ready for scheduling
Sourcepub fn pop_next_task(
&mut self,
executor_id: &str,
) -> Result<Option<TaskDescription>>
pub fn pop_next_task( &mut self, executor_id: &str, ) -> Result<Option<TaskDescription>>
Get next task that can be assigned to the given executor. This method should only be called when the resulting task is immediately being launched as the status will be set to Running and it will not be available to the scheduler. If the task is not launched the status must be reset to allow the task to be scheduled elsewhere.
pub fn update_status(&mut self, status: JobStatus)
pub fn output_locations(&self) -> Vec<PartitionLocation>
Sourcepub fn reset_stages_on_lost_executor(
&mut self,
executor_id: &str,
) -> Result<(HashSet<usize>, Vec<RunningTaskInfo>)>
pub fn reset_stages_on_lost_executor( &mut self, executor_id: &str, ) -> Result<(HashSet<usize>, Vec<RunningTaskInfo>)>
Reset running and successful stages on a given executor This will first check the unresolved/resolved/running stages and reset the running tasks and successful tasks. Then it will check the successful stage and whether there are running parent stages need to read shuffle from it. If yes, reset the successful tasks and roll back the resolved shuffle recursively.
Returns the reset stage ids and running tasks should be killed
Sourcepub fn resolve_stage(&mut self, stage_id: usize) -> Result<bool>
pub fn resolve_stage(&mut self, stage_id: usize) -> Result<bool>
Convert unresolved stage to be resolved
Sourcepub fn succeed_stage(&mut self, stage_id: usize) -> bool
pub fn succeed_stage(&mut self, stage_id: usize) -> bool
Convert running stage to be successful
Sourcepub fn fail_stage(&mut self, stage_id: usize, err_msg: String) -> bool
pub fn fail_stage(&mut self, stage_id: usize, err_msg: String) -> bool
Convert running stage to be failed
Sourcepub fn rollback_running_stage(
&mut self,
stage_id: usize,
failure_reasons: HashSet<String>,
) -> Result<Vec<RunningTaskInfo>>
pub fn rollback_running_stage( &mut self, stage_id: usize, failure_reasons: HashSet<String>, ) -> Result<Vec<RunningTaskInfo>>
Convert running stage to be unresolved, Returns a Vec of RunningTaskInfo for running tasks in this stage.
Sourcepub fn rollback_resolved_stage(&mut self, stage_id: usize) -> Result<bool>
pub fn rollback_resolved_stage(&mut self, stage_id: usize) -> Result<bool>
Convert resolved stage to be unresolved
Sourcepub fn rerun_successful_stage(&mut self, stage_id: usize) -> bool
pub fn rerun_successful_stage(&mut self, stage_id: usize) -> bool
Convert successful stage to be running
Sourcepub fn succeed_job(&mut self) -> Result<()>
pub fn succeed_job(&mut self) -> Result<()>
Mark the job success
Trait Implementations§
Source§impl Clone for ExecutionGraph
impl Clone for ExecutionGraph
Source§fn clone(&self) -> ExecutionGraph
fn clone(&self) -> ExecutionGraph
1.0.0 · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source. Read moreSource§impl Debug for ExecutionGraph
impl Debug for ExecutionGraph
Source§impl From<&ExecutionGraph> for JobOverview
impl From<&ExecutionGraph> for JobOverview
Source§fn from(value: &ExecutionGraph) -> Self
fn from(value: &ExecutionGraph) -> Self
Auto Trait Implementations§
impl Freeze for ExecutionGraph
impl !RefUnwindSafe for ExecutionGraph
impl Send for ExecutionGraph
impl Sync for ExecutionGraph
impl Unpin for ExecutionGraph
impl !UnwindSafe for ExecutionGraph
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
Source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T in a tonic::Request