datafusion-distributed 2.0.0

Framework for enhancing Apache DataFusion with distributed capabilities
Documentation
syntax = "proto3";
package worker;

service WorkerService {
  // Establishes a bidirectional message stream between a coordinator and a worker, over which messages
  // will be exchanged at any time during a query's lifetime. It's expected to be one coordinator channel
  // per task.
  rpc CoordinatorChannel(stream CoordinatorToWorkerMsg) returns (stream WorkerToCoordinatorMsg);
  // Executes the requested partition range of a subplan previously sent by the coordinator channel.
  rpc ExecuteTask(ExecuteTaskRequest) returns (stream FlightData);
  // Returns metadata about a worker. Currently only used for worker versioning.
  rpc GetWorkerInfo(GetWorkerInfoRequest) returns (GetWorkerInfoResponse);
}

message CoordinatorToWorkerMsg {
  oneof inner {
    // Sends a subplan to a worker so that a future ExecuteTask call can actually execute it.
    // The plan is identified by a TaskKey.
    SetPlanRequest set_plan_request = 1;
    // A batch of messages from a work unit feed belonging to different partitions from one node from the plan set in
    // set_plan_request. A work unit feed is a per-partition stream of information that tells the node what should
    // be executed within a partition, for example, a stream of file addresses that should be read.
    WorkUnitBatch work_unit_batch = 2;
  }
}

message WorkerToCoordinatorMsg {
  oneof inner {
    // Sends the metrics collected during task execution back to the coordinator.
    // This is sent after all partitions of a task have finished (or been dropped),
    // ensuring metrics are never lost due to early stream termination.
    // metrics[i] is the set of metrics for plan node i in pre-order traversal order.
    TaskMetrics task_metrics = 1;
  }
}

message TaskMetrics {
  // Metrics for a single task's plan nodes in pre-order traversal order.
  // The TaskKey is implicit — it is determined by the SetPlanRequest that
  // opened this coordinator channel connection.
  repeated MetricsSet pre_order_plan_metrics = 1;
  // Metrics related to the execution of a task within a stage. This metrics, instead of being
  // associated to a specific node, they are global to the task, like the time at which the plan
  // was fed by the coordinator to the worker.
  MetricsSet task_metrics = 2;
}

message GetWorkerInfoRequest {}

message GetWorkerInfoResponse {
  string version = 1;
}

message SetPlanRequest {
  // The unique identifier of the task to which the subplan belongs to.
  TaskKey task_key = 1;
  // The amount of tasks that share the same subplan. Necessary for building the DistributedTaskContext during execution.
  uint64 task_count = 2;
  // The serialized subplan the worker is expected to execute on an ExecuteTask gRPC call.
  bytes plan_proto = 3;

  message WorkUnitFeedDeclaration {
    // Unique identifier of the node to which work unit feeds are expected to be streamed.
    bytes id = 1;
    // The amount of partitions expected to be streamed.
    uint64 partitions = 2;
  }
  // Information about all the work unit feeds that will be streamed from coordinator to worker.
  // This information is needed here because at the moment of setting the plan, all the appropriate
  // channels for the incoming work unit feeds need to be constructed.
  //
  // If no WorkUnitFeedExec nodes are present in the plan, this should be empty.
  repeated WorkUnitFeedDeclaration work_unit_feed_declarations = 4;
  // The worker URL to which this message will go. The receiving worker will use this information to identify
  // itself, and avoid further gRPC calls in case it needs to call itself for executing remote tasks.
  string target_worker_url = 5;
  // Unix nanos when the query started as reported by the coordinator. Used for collecting temporal metrics
  // relative to when the query was fired in the coordinator.
  uint64 query_start_time_ns = 6;
}

message WorkUnitBatch {
  // A batch of WorkUnits.
  repeated WorkUnit batch = 1;
}

message WorkUnit {
  // Identifier of the node to which this work unit feed belongs to.
  bytes id = 1;
  // The partition index within the node to which the work unit feed belongs to.
  uint64 partition = 2;
  // Arbitrary user-defined data (e.g., a file address) necessary during execution.
  bytes body = 3;
  // Unix timestamp in nanoseconds at which this message was created in the coordinator.
  uint64 created_timestamp_unix_nanos = 4;
  // Unix timestamp in nanoseconds at which this message was sent by the coordinator.
  uint64 sent_timestamp_unix_nanos = 5;
  // Unix timestamp in nanoseconds at which this message was received by a worker.
  uint64 received_timestamp_unix_nanos = 6;
  // Unix timestamp in nanoseconds at which this message started being processed.
  uint64 processed_timestamp_unix_nanos = 7;
}

message ExecuteTaskRequest {
  // The unique identifier of the task that is going to get executed.
  TaskKey task_key = 1;
  // The start of the partition range of the specified task that is going to be executed.
  uint64 target_partition_start = 2;
  // The end of the partition range of the specified task that is going to be executed.
  uint64 target_partition_end = 3;

  // The head node the requested task should have. Depending on the network boundary executing
  // the task, the head node should be prepared differently, for example:
  // - A RepartitionExecHead implies a RepartitionExec at the head of the task.
  // - A BroadcastExecHead implies a BroadcastExec at the head of the task.
  // - A NoneHead does not need any specific head.
  oneof producer_head {
    // The boundary executing the task is a NetworkCoalesceExec.
    NoneHead none = 6;
    // The boundary executing the task is a NetworkBroadcastExec.
    BroadcastExecHead broadcast = 7;
    // The boundary executing the task is a NetworkShuffleExec.
    RepartitionExecHead repartition = 8;
  }
}

// Head needed by a NetworkCoalesceExec.
message NoneHead {}

// Head needed by a NetworkBroadcastExec.
message BroadcastExecHead {
  // The amount of output partitions
  uint64 output_partitions = 1;
}

// Head needed by a NetworkShuffleExec.
message RepartitionExecHead {
  // `Partitioning` message from datafusion.proto
  bytes partitioning = 1;
}

// A key that uniquely identifies a task in a query.
message TaskKey {
  // Our query id.
  bytes query_id = 1;
  // Our stage id.
  uint64 stage_id = 2;
  // The task number within the stage.
  uint64 task_number = 3;
}

// Messages from https://github.com/apache/arrow/blob/main/format/Flight.proto. These
// will match the structs shipped by the `arrow-flight` crate, so that we can use its
// tools for dealing with FlightData streams.

// Matches arrow.flight.protocol.FlightDescriptor from Flight.proto.
// Mapped to arrow_flight::FlightDescriptor via extern_path at codegen time.
message FlightDescriptor {
  enum DescriptorType {
    UNKNOWN = 0;
    PATH = 1;
    CMD = 2;
  }

  DescriptorType type = 1;
  bytes cmd = 2;
  repeated string path = 3;
}

// Matches arrow.flight.protocol.FlightData from Flight.proto.
// Mapped to arrow_flight::FlightData via extern_path at codegen time.
message FlightData {
  FlightDescriptor flight_descriptor = 1;
  bytes data_header = 2;
  bytes app_metadata = 3;
  bytes data_body = 1000;
}

// --- App metadata piggybacked on FlightData messages ---

// FlightAppMetadata represents all types of app_metadata which we use in the distributed execution.
message FlightAppMetadata {
  uint64 partition = 1;
  // Unix timestamp in nanoseconds at which this message was created.
  uint64 created_timestamp_unix_nanos = 2;
}

// --- Metric serialization ---

// A Label mirrors datafusion::physical_plan::metrics::Label.
message Label {
  string name = 1;
  string value = 2;
}

// A Metric is a protobuf mirror of datafusion::physical_plan::metrics::Metric.
message Metric {
  repeated Label labels = 1;
  optional uint64 partition = 2;
  oneof value {
    OutputRows output_rows = 10;
    ElapsedCompute elapsed_compute = 11;
    SpillCount spill_count = 12;
    SpilledBytes spilled_bytes = 13;
    SpilledRows spilled_rows = 14;
    CurrentMemoryUsage current_memory_usage = 15;
    NamedCount count = 16;
    NamedGauge gauge = 17;
    NamedTime time = 18;
    StartTimestamp start_timestamp = 19;
    EndTimestamp end_timestamp = 20;
    OutputBytes output_bytes = 21;
    OutputBatches output_batches = 22;
    NamedPruningMetrics pruning_metrics = 23;
    NamedRatio ratio = 24;
    MinLatency custom_min_latency = 25;
    MaxLatency custom_max_latency = 26;
    AvgLatency custom_avg_latency = 27;
    FirstLatency custom_first_latency = 28;
    BytesCount custom_bytes_count = 29;
    PercentileLatency custom_p50_latency = 30;
    PercentileLatency custom_p75_latency = 31;
    PercentileLatency custom_p95_latency = 32;
    PercentileLatency custom_p99_latency = 33;
    MaxGauge custom_max_gauge = 34;
  }
}

// A MetricsSet is a protobuf mirror of datafusion::physical_plan::metrics::MetricsSet. It represents
// a collection of metrics for one ExecutionPlan node.
message MetricsSet {
  repeated Metric metrics = 1;
}

message OutputRows {
  uint64 value = 1;
}

message ElapsedCompute {
  uint64 value = 1;
}

message SpillCount {
  uint64 value = 1;
}

message SpilledBytes {
  uint64 value = 1;
}

message SpilledRows {
  uint64 value = 1;
}

message CurrentMemoryUsage {
  uint64 value = 1;
}

message NamedCount {
  string name = 1;
  uint64 value = 2;
}

message NamedGauge {
  string name = 1;
  uint64 value = 2;
}

message NamedTime {
  string name = 1;
  uint64 value = 2;
}

message StartTimestamp {
  optional int64 value = 1;
}

message EndTimestamp {
  optional int64 value = 1;
}

message OutputBytes {
  uint64 value = 1;
}

message OutputBatches {
  uint64 value = 1;
}

message NamedPruningMetrics {
  string name = 1;
  uint64 pruned = 2;
  uint64 matched = 3;
}

message NamedRatio {
  string name = 1;
  uint64 part = 2;
  uint64 total = 3;
}

message BytesCount {
  string name = 1;
  uint64 value = 2;
}

message MinLatency {
  string name = 1;
  uint64 value = 2;
}

message MaxLatency {
  string name = 1;
  uint64 value = 2;
}

message AvgLatency {
  string name = 1;
  uint64 nanos_sum = 2;
  uint64 count = 3;
}

message FirstLatency {
  string name = 1;
  uint64 value = 2;
}

message PercentileLatency {
  string name = 1;
  bytes sketch_bytes = 4;
}

message MaxGauge {
  string name = 1;
  uint64 value = 2;
}