syntax = "proto3";
package worker;
service WorkerService {
// Establishes a bidirectional message stream between a coordinator and a worker, over which messages
// will be exchanged at any time during a query's lifetime. It's expected to be one coordinator channel
// per task.
rpc CoordinatorChannel(stream CoordinatorToWorkerMsg) returns (stream WorkerToCoordinatorMsg);
// Executes the requested partition range of a subplan previously sent by the coordinator channel.
rpc ExecuteTask(ExecuteTaskRequest) returns (stream FlightData);
// Returns metadata about a worker. Currently only used for worker versioning.
rpc GetWorkerInfo(GetWorkerInfoRequest) returns (GetWorkerInfoResponse);
}
message CoordinatorToWorkerMsg {
oneof inner {
// Sends a subplan to a worker so that a future ExecuteTask call can actually execute it.
// The plan is identified by a TaskKey.
SetPlanRequest set_plan_request = 1;
// A batch of messages from a work unit feed belonging to different partitions from one node from the plan set in
// set_plan_request. A work unit feed is a per-partition stream of information that tells the node what should
// be executed within a partition, for example, a stream of file addresses that should be read.
WorkUnitBatch work_unit_batch = 2;
}
}
message WorkerToCoordinatorMsg {
oneof inner {
// Sends the metrics collected during task execution back to the coordinator.
// This is sent after all partitions of a task have finished (or been dropped),
// ensuring metrics are never lost due to early stream termination.
// metrics[i] is the set of metrics for plan node i in pre-order traversal order.
TaskMetrics task_metrics = 1;
}
}
message TaskMetrics {
// Metrics for a single task's plan nodes in pre-order traversal order.
// The TaskKey is implicit — it is determined by the SetPlanRequest that
// opened this coordinator channel connection.
repeated MetricsSet pre_order_plan_metrics = 1;
// Metrics related to the execution of a task within a stage. This metrics, instead of being
// associated to a specific node, they are global to the task, like the time at which the plan
// was fed by the coordinator to the worker.
MetricsSet task_metrics = 2;
}
message GetWorkerInfoRequest {}
message GetWorkerInfoResponse {
string version = 1;
}
message SetPlanRequest {
// The unique identifier of the task to which the subplan belongs to.
TaskKey task_key = 1;
// The amount of tasks that share the same subplan. Necessary for building the DistributedTaskContext during execution.
uint64 task_count = 2;
// The serialized subplan the worker is expected to execute on an ExecuteTask gRPC call.
bytes plan_proto = 3;
message WorkUnitFeedDeclaration {
// Unique identifier of the node to which work unit feeds are expected to be streamed.
bytes id = 1;
// The amount of partitions expected to be streamed.
uint64 partitions = 2;
}
// Information about all the work unit feeds that will be streamed from coordinator to worker.
// This information is needed here because at the moment of setting the plan, all the appropriate
// channels for the incoming work unit feeds need to be constructed.
//
// If no WorkUnitFeedExec nodes are present in the plan, this should be empty.
repeated WorkUnitFeedDeclaration work_unit_feed_declarations = 4;
// The worker URL to which this message will go. The receiving worker will use this information to identify
// itself, and avoid further gRPC calls in case it needs to call itself for executing remote tasks.
string target_worker_url = 5;
// Unix nanos when the query started as reported by the coordinator. Used for collecting temporal metrics
// relative to when the query was fired in the coordinator.
uint64 query_start_time_ns = 6;
}
message WorkUnitBatch {
// A batch of WorkUnits.
repeated WorkUnit batch = 1;
}
message WorkUnit {
// Identifier of the node to which this work unit feed belongs to.
bytes id = 1;
// The partition index within the node to which the work unit feed belongs to.
uint64 partition = 2;
// Arbitrary user-defined data (e.g., a file address) necessary during execution.
bytes body = 3;
// Unix timestamp in nanoseconds at which this message was created in the coordinator.
uint64 created_timestamp_unix_nanos = 4;
// Unix timestamp in nanoseconds at which this message was sent by the coordinator.
uint64 sent_timestamp_unix_nanos = 5;
// Unix timestamp in nanoseconds at which this message was received by a worker.
uint64 received_timestamp_unix_nanos = 6;
// Unix timestamp in nanoseconds at which this message started being processed.
uint64 processed_timestamp_unix_nanos = 7;
}
message ExecuteTaskRequest {
// The unique identifier of the task that is going to get executed.
TaskKey task_key = 1;
// The start of the partition range of the specified task that is going to be executed.
uint64 target_partition_start = 2;
// The end of the partition range of the specified task that is going to be executed.
uint64 target_partition_end = 3;
// The head node the requested task should have. Depending on the network boundary executing
// the task, the head node should be prepared differently, for example:
// - A RepartitionExecHead implies a RepartitionExec at the head of the task.
// - A BroadcastExecHead implies a BroadcastExec at the head of the task.
// - A NoneHead does not need any specific head.
oneof producer_head {
// The boundary executing the task is a NetworkCoalesceExec.
NoneHead none = 6;
// The boundary executing the task is a NetworkBroadcastExec.
BroadcastExecHead broadcast = 7;
// The boundary executing the task is a NetworkShuffleExec.
RepartitionExecHead repartition = 8;
}
}
// Head needed by a NetworkCoalesceExec.
message NoneHead {}
// Head needed by a NetworkBroadcastExec.
message BroadcastExecHead {
// The amount of output partitions
uint64 output_partitions = 1;
}
// Head needed by a NetworkShuffleExec.
message RepartitionExecHead {
// `Partitioning` message from datafusion.proto
bytes partitioning = 1;
}
// A key that uniquely identifies a task in a query.
message TaskKey {
// Our query id.
bytes query_id = 1;
// Our stage id.
uint64 stage_id = 2;
// The task number within the stage.
uint64 task_number = 3;
}
// Messages from https://github.com/apache/arrow/blob/main/format/Flight.proto. These
// will match the structs shipped by the `arrow-flight` crate, so that we can use its
// tools for dealing with FlightData streams.
// Matches arrow.flight.protocol.FlightDescriptor from Flight.proto.
// Mapped to arrow_flight::FlightDescriptor via extern_path at codegen time.
message FlightDescriptor {
enum DescriptorType {
UNKNOWN = 0;
PATH = 1;
CMD = 2;
}
DescriptorType type = 1;
bytes cmd = 2;
repeated string path = 3;
}
// Matches arrow.flight.protocol.FlightData from Flight.proto.
// Mapped to arrow_flight::FlightData via extern_path at codegen time.
message FlightData {
FlightDescriptor flight_descriptor = 1;
bytes data_header = 2;
bytes app_metadata = 3;
bytes data_body = 1000;
}
// --- App metadata piggybacked on FlightData messages ---
// FlightAppMetadata represents all types of app_metadata which we use in the distributed execution.
message FlightAppMetadata {
uint64 partition = 1;
// Unix timestamp in nanoseconds at which this message was created.
uint64 created_timestamp_unix_nanos = 2;
}
// --- Metric serialization ---
// A Label mirrors datafusion::physical_plan::metrics::Label.
message Label {
string name = 1;
string value = 2;
}
// A Metric is a protobuf mirror of datafusion::physical_plan::metrics::Metric.
message Metric {
repeated Label labels = 1;
optional uint64 partition = 2;
oneof value {
OutputRows output_rows = 10;
ElapsedCompute elapsed_compute = 11;
SpillCount spill_count = 12;
SpilledBytes spilled_bytes = 13;
SpilledRows spilled_rows = 14;
CurrentMemoryUsage current_memory_usage = 15;
NamedCount count = 16;
NamedGauge gauge = 17;
NamedTime time = 18;
StartTimestamp start_timestamp = 19;
EndTimestamp end_timestamp = 20;
OutputBytes output_bytes = 21;
OutputBatches output_batches = 22;
NamedPruningMetrics pruning_metrics = 23;
NamedRatio ratio = 24;
MinLatency custom_min_latency = 25;
MaxLatency custom_max_latency = 26;
AvgLatency custom_avg_latency = 27;
FirstLatency custom_first_latency = 28;
BytesCount custom_bytes_count = 29;
PercentileLatency custom_p50_latency = 30;
PercentileLatency custom_p75_latency = 31;
PercentileLatency custom_p95_latency = 32;
PercentileLatency custom_p99_latency = 33;
MaxGauge custom_max_gauge = 34;
}
}
// A MetricsSet is a protobuf mirror of datafusion::physical_plan::metrics::MetricsSet. It represents
// a collection of metrics for one ExecutionPlan node.
message MetricsSet {
repeated Metric metrics = 1;
}
message OutputRows {
uint64 value = 1;
}
message ElapsedCompute {
uint64 value = 1;
}
message SpillCount {
uint64 value = 1;
}
message SpilledBytes {
uint64 value = 1;
}
message SpilledRows {
uint64 value = 1;
}
message CurrentMemoryUsage {
uint64 value = 1;
}
message NamedCount {
string name = 1;
uint64 value = 2;
}
message NamedGauge {
string name = 1;
uint64 value = 2;
}
message NamedTime {
string name = 1;
uint64 value = 2;
}
message StartTimestamp {
optional int64 value = 1;
}
message EndTimestamp {
optional int64 value = 1;
}
message OutputBytes {
uint64 value = 1;
}
message OutputBatches {
uint64 value = 1;
}
message NamedPruningMetrics {
string name = 1;
uint64 pruned = 2;
uint64 matched = 3;
}
message NamedRatio {
string name = 1;
uint64 part = 2;
uint64 total = 3;
}
message BytesCount {
string name = 1;
uint64 value = 2;
}
message MinLatency {
string name = 1;
uint64 value = 2;
}
message MaxLatency {
string name = 1;
uint64 value = 2;
}
message AvgLatency {
string name = 1;
uint64 nanos_sum = 2;
uint64 count = 3;
}
message FirstLatency {
string name = 1;
uint64 value = 2;
}
message PercentileLatency {
string name = 1;
bytes sketch_bytes = 4;
}
message MaxGauge {
string name = 1;
uint64 value = 2;
}