syntax = "proto3";
package worker;
service WorkerService {
// Establishes a bidirectional message stream between a coordinator and a worker, over which messages
// will be exchanged at any time during a query's lifetime. It's expected to be one coordinator channel
// per task.
rpc CoordinatorChannel(stream CoordinatorToWorkerMsg) returns (stream WorkerToCoordinatorMsg);
// Executes the requested partition range of a subplan previously sent by the coordinator channel.
rpc ExecuteTask(ExecuteTaskRequest) returns (stream FlightData);
// Returns metadata about a worker. Currently only used for worker versioning.
rpc GetWorkerInfo(GetWorkerInfoRequest) returns (GetWorkerInfoResponse);
}
message CoordinatorToWorkerMsg {
oneof inner {
// Sends a subplan to a worker so that a future ExecuteTask call can actually execute it.
// The plan is identified by a TaskKey.
SetPlanRequest set_plan_request = 1;
}
}
message WorkerToCoordinatorMsg {
// For now, there are no messages that can flow back from worker to coordinator.
}
message GetWorkerInfoRequest {}
message GetWorkerInfoResponse {
string version = 1;
}
message SetPlanRequest {
// The unique identifier of the task to which the subplan belongs to.
TaskKey task_key = 1;
// The amount of tasks that share the same subplan. Necessary for building the DistributedTaskContext during execution.
uint64 task_count = 2;
// The serialized subplan the worker is expected to execute on an ExecuteTask gRPC call.
bytes plan_proto = 3;
}
message ExecuteTaskRequest {
// The unique identifier of the task that is going to get executed.
TaskKey task_key = 1;
// The start of the partition range of the specified task that is going to be executed.
uint64 target_partition_start = 2;
// The end of the partition range of the specified task that is going to be executed.
uint64 target_partition_end = 3;
}
// A key that uniquely identifies a task in a query.
message TaskKey {
// Our query id.
bytes query_id = 1;
// Our stage id.
uint64 stage_id = 2;
// The task number within the stage.
uint64 task_number = 3;
}
// Messages from https://github.com/apache/arrow/blob/main/format/Flight.proto. These
// will match the structs shipped by the `arrow-flight` crate, so that we can use its
// tools for dealing with FlightData streams.
// Matches arrow.flight.protocol.FlightDescriptor from Flight.proto.
// Mapped to arrow_flight::FlightDescriptor via extern_path at codegen time.
message FlightDescriptor {
enum DescriptorType {
UNKNOWN = 0;
PATH = 1;
CMD = 2;
}
DescriptorType type = 1;
bytes cmd = 2;
repeated string path = 3;
}
// Matches arrow.flight.protocol.FlightData from Flight.proto.
// Mapped to arrow_flight::FlightData via extern_path at codegen time.
message FlightData {
FlightDescriptor flight_descriptor = 1;
bytes data_header = 2;
bytes app_metadata = 3;
bytes data_body = 1000;
}
// --- App metadata piggybacked on FlightData messages ---
// FlightAppMetadata represents all types of app_metadata which we use in the distributed execution.
message FlightAppMetadata {
uint64 partition = 1;
// Unix timestamp in nanoseconds at which this message was created.
uint64 created_timestamp_unix_nanos = 2;
// content should always be Some, but it is optional due to protobuf rules.
oneof content {
MetricsCollection metrics_collection = 10;
}
}
// A collection of metrics for a set of tasks in an ExecutionPlan. Each
// entry should have a distinct TaskKey.
message MetricsCollection {
repeated TaskMetrics tasks = 1;
}
// TaskMetrics represents the metrics for a single task.
message TaskMetrics {
// task_key uniquely identifies this task.
// This field is always present. It's marked optional due to protobuf rules.
TaskKey task_key = 1;
// metrics[i] is the set of metrics for plan node i where plan nodes are
// in pre-order traversal order.
repeated MetricsSet metrics = 2;
}
// --- Metric serialization ---
// A Label mirrors datafusion::physical_plan::metrics::Label.
message Label {
string name = 1;
string value = 2;
}
// A Metric is a protobuf mirror of datafusion::physical_plan::metrics::Metric.
message Metric {
repeated Label labels = 1;
optional uint64 partition = 2;
oneof value {
OutputRows output_rows = 10;
ElapsedCompute elapsed_compute = 11;
SpillCount spill_count = 12;
SpilledBytes spilled_bytes = 13;
SpilledRows spilled_rows = 14;
CurrentMemoryUsage current_memory_usage = 15;
NamedCount count = 16;
NamedGauge gauge = 17;
NamedTime time = 18;
StartTimestamp start_timestamp = 19;
EndTimestamp end_timestamp = 20;
OutputBytes output_bytes = 21;
OutputBatches output_batches = 22;
NamedPruningMetrics pruning_metrics = 23;
NamedRatio ratio = 24;
MinLatency custom_min_latency = 25;
MaxLatency custom_max_latency = 26;
AvgLatency custom_avg_latency = 27;
FirstLatency custom_first_latency = 28;
BytesCount custom_bytes_count = 29;
PercentileLatency custom_p50_latency = 30;
PercentileLatency custom_p75_latency = 31;
PercentileLatency custom_p95_latency = 32;
PercentileLatency custom_p99_latency = 33;
}
}
// A MetricsSet is a protobuf mirror of datafusion::physical_plan::metrics::MetricsSet. It represents
// a collection of metrics for one ExecutionPlan node.
message MetricsSet {
repeated Metric metrics = 1;
}
message OutputRows {
uint64 value = 1;
}
message ElapsedCompute {
uint64 value = 1;
}
message SpillCount {
uint64 value = 1;
}
message SpilledBytes {
uint64 value = 1;
}
message SpilledRows {
uint64 value = 1;
}
message CurrentMemoryUsage {
uint64 value = 1;
}
message NamedCount {
string name = 1;
uint64 value = 2;
}
message NamedGauge {
string name = 1;
uint64 value = 2;
}
message NamedTime {
string name = 1;
uint64 value = 2;
}
message StartTimestamp {
optional int64 value = 1;
}
message EndTimestamp {
optional int64 value = 1;
}
message OutputBytes {
uint64 value = 1;
}
message OutputBatches {
uint64 value = 1;
}
message NamedPruningMetrics {
string name = 1;
uint64 pruned = 2;
uint64 matched = 3;
}
message NamedRatio {
string name = 1;
uint64 part = 2;
uint64 total = 3;
}
message BytesCount {
string name = 1;
uint64 value = 2;
}
message MinLatency {
string name = 1;
uint64 value = 2;
}
message MaxLatency {
string name = 1;
uint64 value = 2;
}
message AvgLatency {
string name = 1;
uint64 nanos_sum = 2;
uint64 count = 3;
}
message FirstLatency {
string name = 1;
uint64 value = 2;
}
message PercentileLatency {
string name = 1;
bytes sketch_bytes = 4;
}