Skip to main content

datafusion_distributed/worker/generated/
worker.rs

1// This file is @generated by prost-build.
2#[derive(Clone, PartialEq, ::prost::Message)]
3pub struct CoordinatorToWorkerMsg {
4    #[prost(oneof = "coordinator_to_worker_msg::Inner", tags = "1, 2")]
5    pub inner: ::core::option::Option<coordinator_to_worker_msg::Inner>,
6}
7/// Nested message and enum types in `CoordinatorToWorkerMsg`.
8pub mod coordinator_to_worker_msg {
9    #[derive(Clone, PartialEq, ::prost::Oneof)]
10    pub enum Inner {
11        /// Sends a subplan to a worker so that a future ExecuteTask call can actually execute it.
12        /// The plan is identified by a TaskKey.
13        #[prost(message, tag = "1")]
14        SetPlanRequest(super::SetPlanRequest),
15        /// A batch of messages from a work unit feed belonging to different partitions from one node from the plan set in
16        /// set_plan_request. A work unit feed is a per-partition stream of information that tells the node what should
17        /// be executed within a partition, for example, a stream of file addresses that should be read.
18        #[prost(message, tag = "2")]
19        WorkUnitBatch(super::WorkUnitBatch),
20    }
21}
22#[derive(Clone, PartialEq, ::prost::Message)]
23pub struct WorkerToCoordinatorMsg {
24    #[prost(oneof = "worker_to_coordinator_msg::Inner", tags = "1")]
25    pub inner: ::core::option::Option<worker_to_coordinator_msg::Inner>,
26}
27/// Nested message and enum types in `WorkerToCoordinatorMsg`.
28pub mod worker_to_coordinator_msg {
29    #[derive(Clone, PartialEq, ::prost::Oneof)]
30    pub enum Inner {
31        /// Sends the metrics collected during task execution back to the coordinator.
32        /// This is sent after all partitions of a task have finished (or been dropped),
33        /// ensuring metrics are never lost due to early stream termination.
34        /// metrics\[i\] is the set of metrics for plan node i in pre-order traversal order.
35        #[prost(message, tag = "1")]
36        TaskMetrics(super::TaskMetrics),
37    }
38}
39#[derive(Clone, PartialEq, ::prost::Message)]
40pub struct TaskMetrics {
41    /// Metrics for a single task's plan nodes in pre-order traversal order.
42    /// The TaskKey is implicit — it is determined by the SetPlanRequest that
43    /// opened this coordinator channel connection.
44    #[prost(message, repeated, tag = "1")]
45    pub pre_order_plan_metrics: ::prost::alloc::vec::Vec<MetricsSet>,
46    /// Metrics related to the execution of a task within a stage. This metrics, instead of being
47    /// associated to a specific node, they are global to the task, like the time at which the plan
48    /// was fed by the coordinator to the worker.
49    #[prost(message, optional, tag = "2")]
50    pub task_metrics: ::core::option::Option<MetricsSet>,
51}
52#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
53pub struct GetWorkerInfoRequest {}
54#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
55pub struct GetWorkerInfoResponse {
56    #[prost(string, tag = "1")]
57    pub version: ::prost::alloc::string::String,
58}
59#[derive(Clone, PartialEq, ::prost::Message)]
60pub struct SetPlanRequest {
61    /// The unique identifier of the task to which the subplan belongs to.
62    #[prost(message, optional, tag = "1")]
63    pub task_key: ::core::option::Option<TaskKey>,
64    /// The amount of tasks that share the same subplan. Necessary for building the DistributedTaskContext during execution.
65    #[prost(uint64, tag = "2")]
66    pub task_count: u64,
67    /// The serialized subplan the worker is expected to execute on an ExecuteTask gRPC call.
68    #[prost(bytes = "vec", tag = "3")]
69    pub plan_proto: ::prost::alloc::vec::Vec<u8>,
70    /// Information about all the work unit feeds that will be streamed from coordinator to worker.
71    /// This information is needed here because at the moment of setting the plan, all the appropriate
72    /// channels for the incoming work unit feeds need to be constructed.
73    ///
74    /// If no WorkUnitFeedExec nodes are present in the plan, this should be empty.
75    #[prost(message, repeated, tag = "4")]
76    pub work_unit_feed_declarations:
77        ::prost::alloc::vec::Vec<set_plan_request::WorkUnitFeedDeclaration>,
78    /// The worker URL to which this message will go. The receiving worker will use this information to identify
79    /// itself, and avoid further gRPC calls in case it needs to call itself for executing remote tasks.
80    #[prost(string, tag = "5")]
81    pub target_worker_url: ::prost::alloc::string::String,
82    /// Unix nanos when the query started as reported by the coordinator. Used for collecting temporal metrics
83    /// relative to when the query was fired in the coordinator.
84    #[prost(uint64, tag = "6")]
85    pub query_start_time_ns: u64,
86}
87/// Nested message and enum types in `SetPlanRequest`.
88pub mod set_plan_request {
89    #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
90    pub struct WorkUnitFeedDeclaration {
91        /// Unique identifier of the node to which work unit feeds are expected to be streamed.
92        #[prost(bytes = "vec", tag = "1")]
93        pub id: ::prost::alloc::vec::Vec<u8>,
94        /// The amount of partitions expected to be streamed.
95        #[prost(uint64, tag = "2")]
96        pub partitions: u64,
97    }
98}
99#[derive(Clone, PartialEq, ::prost::Message)]
100pub struct WorkUnitBatch {
101    /// A batch of WorkUnits.
102    #[prost(message, repeated, tag = "1")]
103    pub batch: ::prost::alloc::vec::Vec<WorkUnit>,
104}
105#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
106pub struct WorkUnit {
107    /// Identifier of the node to which this work unit feed belongs to.
108    #[prost(bytes = "vec", tag = "1")]
109    pub id: ::prost::alloc::vec::Vec<u8>,
110    /// The partition index within the node to which the work unit feed belongs to.
111    #[prost(uint64, tag = "2")]
112    pub partition: u64,
113    /// Arbitrary user-defined data (e.g., a file address) necessary during execution.
114    #[prost(bytes = "vec", tag = "3")]
115    pub body: ::prost::alloc::vec::Vec<u8>,
116    /// Unix timestamp in nanoseconds at which this message was created in the coordinator.
117    #[prost(uint64, tag = "4")]
118    pub created_timestamp_unix_nanos: u64,
119    /// Unix timestamp in nanoseconds at which this message was sent by the coordinator.
120    #[prost(uint64, tag = "5")]
121    pub sent_timestamp_unix_nanos: u64,
122    /// Unix timestamp in nanoseconds at which this message was received by a worker.
123    #[prost(uint64, tag = "6")]
124    pub received_timestamp_unix_nanos: u64,
125    /// Unix timestamp in nanoseconds at which this message started being processed.
126    #[prost(uint64, tag = "7")]
127    pub processed_timestamp_unix_nanos: u64,
128}
129#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
130pub struct ExecuteTaskRequest {
131    /// The unique identifier of the task that is going to get executed.
132    #[prost(message, optional, tag = "1")]
133    pub task_key: ::core::option::Option<TaskKey>,
134    /// The start of the partition range of the specified task that is going to be executed.
135    #[prost(uint64, tag = "2")]
136    pub target_partition_start: u64,
137    /// The end of the partition range of the specified task that is going to be executed.
138    #[prost(uint64, tag = "3")]
139    pub target_partition_end: u64,
140    /// The head node the requested task should have. Depending on the network boundary executing
141    /// the task, the head node should be prepared differently, for example:
142    ///
143    /// * A RepartitionExecHead implies a RepartitionExec at the head of the task.
144    /// * A BroadcastExecHead implies a BroadcastExec at the head of the task.
145    /// * A NoneHead does not need any specific head.
146    #[prost(oneof = "execute_task_request::ProducerHead", tags = "6, 7, 8")]
147    pub producer_head: ::core::option::Option<execute_task_request::ProducerHead>,
148}
149/// Nested message and enum types in `ExecuteTaskRequest`.
150pub mod execute_task_request {
151    /// The head node the requested task should have. Depending on the network boundary executing
152    /// the task, the head node should be prepared differently, for example:
153    ///
154    /// * A RepartitionExecHead implies a RepartitionExec at the head of the task.
155    /// * A BroadcastExecHead implies a BroadcastExec at the head of the task.
156    /// * A NoneHead does not need any specific head.
157    #[derive(Clone, PartialEq, Eq, Hash, ::prost::Oneof)]
158    pub enum ProducerHead {
159        /// The boundary executing the task is a NetworkCoalesceExec.
160        #[prost(message, tag = "6")]
161        None(super::NoneHead),
162        /// The boundary executing the task is a NetworkBroadcastExec.
163        #[prost(message, tag = "7")]
164        Broadcast(super::BroadcastExecHead),
165        /// The boundary executing the task is a NetworkShuffleExec.
166        #[prost(message, tag = "8")]
167        Repartition(super::RepartitionExecHead),
168    }
169}
170/// Head needed by a NetworkCoalesceExec.
171#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
172pub struct NoneHead {}
173/// Head needed by a NetworkBroadcastExec.
174#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
175pub struct BroadcastExecHead {
176    /// The amount of output partitions
177    #[prost(uint64, tag = "1")]
178    pub output_partitions: u64,
179}
180/// Head needed by a NetworkShuffleExec.
181#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
182pub struct RepartitionExecHead {
183    /// `Partitioning` message from datafusion.proto
184    #[prost(bytes = "vec", tag = "1")]
185    pub partitioning: ::prost::alloc::vec::Vec<u8>,
186}
187/// A key that uniquely identifies a task in a query.
188#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
189pub struct TaskKey {
190    /// Our query id.
191    #[prost(bytes = "vec", tag = "1")]
192    pub query_id: ::prost::alloc::vec::Vec<u8>,
193    /// Our stage id.
194    #[prost(uint64, tag = "2")]
195    pub stage_id: u64,
196    /// The task number within the stage.
197    #[prost(uint64, tag = "3")]
198    pub task_number: u64,
199}
200/// FlightAppMetadata represents all types of app_metadata which we use in the distributed execution.
201#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
202pub struct FlightAppMetadata {
203    #[prost(uint64, tag = "1")]
204    pub partition: u64,
205    /// Unix timestamp in nanoseconds at which this message was created.
206    #[prost(uint64, tag = "2")]
207    pub created_timestamp_unix_nanos: u64,
208}
209/// A Label mirrors datafusion::physical_plan::metrics::Label.
210#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
211pub struct Label {
212    #[prost(string, tag = "1")]
213    pub name: ::prost::alloc::string::String,
214    #[prost(string, tag = "2")]
215    pub value: ::prost::alloc::string::String,
216}
217/// A Metric is a protobuf mirror of datafusion::physical_plan::metrics::Metric.
218#[derive(Clone, PartialEq, ::prost::Message)]
219pub struct Metric {
220    #[prost(message, repeated, tag = "1")]
221    pub labels: ::prost::alloc::vec::Vec<Label>,
222    #[prost(uint64, optional, tag = "2")]
223    pub partition: ::core::option::Option<u64>,
224    #[prost(
225        oneof = "metric::Value",
226        tags = "10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34"
227    )]
228    pub value: ::core::option::Option<metric::Value>,
229}
230/// Nested message and enum types in `Metric`.
231pub mod metric {
232    #[derive(Clone, PartialEq, Eq, Hash, ::prost::Oneof)]
233    pub enum Value {
234        #[prost(message, tag = "10")]
235        OutputRows(super::OutputRows),
236        #[prost(message, tag = "11")]
237        ElapsedCompute(super::ElapsedCompute),
238        #[prost(message, tag = "12")]
239        SpillCount(super::SpillCount),
240        #[prost(message, tag = "13")]
241        SpilledBytes(super::SpilledBytes),
242        #[prost(message, tag = "14")]
243        SpilledRows(super::SpilledRows),
244        #[prost(message, tag = "15")]
245        CurrentMemoryUsage(super::CurrentMemoryUsage),
246        #[prost(message, tag = "16")]
247        Count(super::NamedCount),
248        #[prost(message, tag = "17")]
249        Gauge(super::NamedGauge),
250        #[prost(message, tag = "18")]
251        Time(super::NamedTime),
252        #[prost(message, tag = "19")]
253        StartTimestamp(super::StartTimestamp),
254        #[prost(message, tag = "20")]
255        EndTimestamp(super::EndTimestamp),
256        #[prost(message, tag = "21")]
257        OutputBytes(super::OutputBytes),
258        #[prost(message, tag = "22")]
259        OutputBatches(super::OutputBatches),
260        #[prost(message, tag = "23")]
261        PruningMetrics(super::NamedPruningMetrics),
262        #[prost(message, tag = "24")]
263        Ratio(super::NamedRatio),
264        #[prost(message, tag = "25")]
265        CustomMinLatency(super::MinLatency),
266        #[prost(message, tag = "26")]
267        CustomMaxLatency(super::MaxLatency),
268        #[prost(message, tag = "27")]
269        CustomAvgLatency(super::AvgLatency),
270        #[prost(message, tag = "28")]
271        CustomFirstLatency(super::FirstLatency),
272        #[prost(message, tag = "29")]
273        CustomBytesCount(super::BytesCount),
274        #[prost(message, tag = "30")]
275        CustomP50Latency(super::PercentileLatency),
276        #[prost(message, tag = "31")]
277        CustomP75Latency(super::PercentileLatency),
278        #[prost(message, tag = "32")]
279        CustomP95Latency(super::PercentileLatency),
280        #[prost(message, tag = "33")]
281        CustomP99Latency(super::PercentileLatency),
282        #[prost(message, tag = "34")]
283        CustomMaxGauge(super::MaxGauge),
284    }
285}
286/// A MetricsSet is a protobuf mirror of datafusion::physical_plan::metrics::MetricsSet. It represents
287/// a collection of metrics for one ExecutionPlan node.
288#[derive(Clone, PartialEq, ::prost::Message)]
289pub struct MetricsSet {
290    #[prost(message, repeated, tag = "1")]
291    pub metrics: ::prost::alloc::vec::Vec<Metric>,
292}
293#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
294pub struct OutputRows {
295    #[prost(uint64, tag = "1")]
296    pub value: u64,
297}
298#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
299pub struct ElapsedCompute {
300    #[prost(uint64, tag = "1")]
301    pub value: u64,
302}
303#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
304pub struct SpillCount {
305    #[prost(uint64, tag = "1")]
306    pub value: u64,
307}
308#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
309pub struct SpilledBytes {
310    #[prost(uint64, tag = "1")]
311    pub value: u64,
312}
313#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
314pub struct SpilledRows {
315    #[prost(uint64, tag = "1")]
316    pub value: u64,
317}
318#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
319pub struct CurrentMemoryUsage {
320    #[prost(uint64, tag = "1")]
321    pub value: u64,
322}
323#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
324pub struct NamedCount {
325    #[prost(string, tag = "1")]
326    pub name: ::prost::alloc::string::String,
327    #[prost(uint64, tag = "2")]
328    pub value: u64,
329}
330#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
331pub struct NamedGauge {
332    #[prost(string, tag = "1")]
333    pub name: ::prost::alloc::string::String,
334    #[prost(uint64, tag = "2")]
335    pub value: u64,
336}
337#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
338pub struct NamedTime {
339    #[prost(string, tag = "1")]
340    pub name: ::prost::alloc::string::String,
341    #[prost(uint64, tag = "2")]
342    pub value: u64,
343}
344#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
345pub struct StartTimestamp {
346    #[prost(int64, optional, tag = "1")]
347    pub value: ::core::option::Option<i64>,
348}
349#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
350pub struct EndTimestamp {
351    #[prost(int64, optional, tag = "1")]
352    pub value: ::core::option::Option<i64>,
353}
354#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
355pub struct OutputBytes {
356    #[prost(uint64, tag = "1")]
357    pub value: u64,
358}
359#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
360pub struct OutputBatches {
361    #[prost(uint64, tag = "1")]
362    pub value: u64,
363}
364#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
365pub struct NamedPruningMetrics {
366    #[prost(string, tag = "1")]
367    pub name: ::prost::alloc::string::String,
368    #[prost(uint64, tag = "2")]
369    pub pruned: u64,
370    #[prost(uint64, tag = "3")]
371    pub matched: u64,
372}
373#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
374pub struct NamedRatio {
375    #[prost(string, tag = "1")]
376    pub name: ::prost::alloc::string::String,
377    #[prost(uint64, tag = "2")]
378    pub part: u64,
379    #[prost(uint64, tag = "3")]
380    pub total: u64,
381}
382#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
383pub struct BytesCount {
384    #[prost(string, tag = "1")]
385    pub name: ::prost::alloc::string::String,
386    #[prost(uint64, tag = "2")]
387    pub value: u64,
388}
389#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
390pub struct MinLatency {
391    #[prost(string, tag = "1")]
392    pub name: ::prost::alloc::string::String,
393    #[prost(uint64, tag = "2")]
394    pub value: u64,
395}
396#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
397pub struct MaxLatency {
398    #[prost(string, tag = "1")]
399    pub name: ::prost::alloc::string::String,
400    #[prost(uint64, tag = "2")]
401    pub value: u64,
402}
403#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
404pub struct AvgLatency {
405    #[prost(string, tag = "1")]
406    pub name: ::prost::alloc::string::String,
407    #[prost(uint64, tag = "2")]
408    pub nanos_sum: u64,
409    #[prost(uint64, tag = "3")]
410    pub count: u64,
411}
412#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
413pub struct FirstLatency {
414    #[prost(string, tag = "1")]
415    pub name: ::prost::alloc::string::String,
416    #[prost(uint64, tag = "2")]
417    pub value: u64,
418}
419#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
420pub struct PercentileLatency {
421    #[prost(string, tag = "1")]
422    pub name: ::prost::alloc::string::String,
423    #[prost(bytes = "vec", tag = "4")]
424    pub sketch_bytes: ::prost::alloc::vec::Vec<u8>,
425}
426#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
427pub struct MaxGauge {
428    #[prost(string, tag = "1")]
429    pub name: ::prost::alloc::string::String,
430    #[prost(uint64, tag = "2")]
431    pub value: u64,
432}
433/// Generated client implementations.
434pub mod worker_service_client {
435    #![allow(
436        unused_variables,
437        dead_code,
438        missing_docs,
439        clippy::wildcard_imports,
440        clippy::let_unit_value
441    )]
442    use tonic::codegen::http::Uri;
443    use tonic::codegen::*;
444    #[derive(Debug, Clone)]
445    pub struct WorkerServiceClient<T> {
446        inner: tonic::client::Grpc<T>,
447    }
448    impl WorkerServiceClient<tonic::transport::Channel> {
449        /// Attempt to create a new client by connecting to a given endpoint.
450        pub async fn connect<D>(dst: D) -> Result<Self, tonic::transport::Error>
451        where
452            D: TryInto<tonic::transport::Endpoint>,
453            D::Error: Into<StdError>,
454        {
455            let conn = tonic::transport::Endpoint::new(dst)?.connect().await?;
456            Ok(Self::new(conn))
457        }
458    }
459    impl<T> WorkerServiceClient<T>
460    where
461        T: tonic::client::GrpcService<tonic::body::Body>,
462        T::Error: Into<StdError>,
463        T::ResponseBody: Body<Data = Bytes> + std::marker::Send + 'static,
464        <T::ResponseBody as Body>::Error: Into<StdError> + std::marker::Send,
465    {
466        pub fn new(inner: T) -> Self {
467            let inner = tonic::client::Grpc::new(inner);
468            Self { inner }
469        }
470        pub fn with_origin(inner: T, origin: Uri) -> Self {
471            let inner = tonic::client::Grpc::with_origin(inner, origin);
472            Self { inner }
473        }
474        pub fn with_interceptor<F>(
475            inner: T,
476            interceptor: F,
477        ) -> WorkerServiceClient<InterceptedService<T, F>>
478        where
479            F: tonic::service::Interceptor,
480            T::ResponseBody: Default,
481            T: tonic::codegen::Service<
482                    http::Request<tonic::body::Body>,
483                    Response = http::Response<
484                        <T as tonic::client::GrpcService<tonic::body::Body>>::ResponseBody,
485                    >,
486                >,
487            <T as tonic::codegen::Service<http::Request<tonic::body::Body>>>::Error:
488                Into<StdError> + std::marker::Send + std::marker::Sync,
489        {
490            WorkerServiceClient::new(InterceptedService::new(inner, interceptor))
491        }
492        /// Compress requests with the given encoding.
493        ///
494        /// This requires the server to support it otherwise it might respond with an
495        /// error.
496        #[must_use]
497        pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
498            self.inner = self.inner.send_compressed(encoding);
499            self
500        }
501        /// Enable decompressing responses.
502        #[must_use]
503        pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
504            self.inner = self.inner.accept_compressed(encoding);
505            self
506        }
507        /// Limits the maximum size of a decoded message.
508        ///
509        /// Default: `4MB`
510        #[must_use]
511        pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
512            self.inner = self.inner.max_decoding_message_size(limit);
513            self
514        }
515        /// Limits the maximum size of an encoded message.
516        ///
517        /// Default: `usize::MAX`
518        #[must_use]
519        pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
520            self.inner = self.inner.max_encoding_message_size(limit);
521            self
522        }
523        /// Establishes a bidirectional message stream between a coordinator and a worker, over which messages
524        /// will be exchanged at any time during a query's lifetime. It's expected to be one coordinator channel
525        /// per task.
526        pub async fn coordinator_channel(
527            &mut self,
528            request: impl tonic::IntoStreamingRequest<Message = super::CoordinatorToWorkerMsg>,
529        ) -> std::result::Result<
530            tonic::Response<tonic::codec::Streaming<super::WorkerToCoordinatorMsg>>,
531            tonic::Status,
532        > {
533            self.inner.ready().await.map_err(|e| {
534                tonic::Status::unknown(format!("Service was not ready: {}", e.into()))
535            })?;
536            let codec = tonic_prost::ProstCodec::default();
537            let path =
538                http::uri::PathAndQuery::from_static("/worker.WorkerService/CoordinatorChannel");
539            let mut req = request.into_streaming_request();
540            req.extensions_mut().insert(GrpcMethod::new(
541                "worker.WorkerService",
542                "CoordinatorChannel",
543            ));
544            self.inner.streaming(req, path, codec).await
545        }
546        /// Executes the requested partition range of a subplan previously sent by the coordinator channel.
547        pub async fn execute_task(
548            &mut self,
549            request: impl tonic::IntoRequest<super::ExecuteTaskRequest>,
550        ) -> std::result::Result<
551            tonic::Response<tonic::codec::Streaming<::arrow_flight::FlightData>>,
552            tonic::Status,
553        > {
554            self.inner.ready().await.map_err(|e| {
555                tonic::Status::unknown(format!("Service was not ready: {}", e.into()))
556            })?;
557            let codec = tonic_prost::ProstCodec::default();
558            let path = http::uri::PathAndQuery::from_static("/worker.WorkerService/ExecuteTask");
559            let mut req = request.into_request();
560            req.extensions_mut()
561                .insert(GrpcMethod::new("worker.WorkerService", "ExecuteTask"));
562            self.inner.server_streaming(req, path, codec).await
563        }
564        /// Returns metadata about a worker. Currently only used for worker versioning.
565        pub async fn get_worker_info(
566            &mut self,
567            request: impl tonic::IntoRequest<super::GetWorkerInfoRequest>,
568        ) -> std::result::Result<tonic::Response<super::GetWorkerInfoResponse>, tonic::Status>
569        {
570            self.inner.ready().await.map_err(|e| {
571                tonic::Status::unknown(format!("Service was not ready: {}", e.into()))
572            })?;
573            let codec = tonic_prost::ProstCodec::default();
574            let path = http::uri::PathAndQuery::from_static("/worker.WorkerService/GetWorkerInfo");
575            let mut req = request.into_request();
576            req.extensions_mut()
577                .insert(GrpcMethod::new("worker.WorkerService", "GetWorkerInfo"));
578            self.inner.unary(req, path, codec).await
579        }
580    }
581}
582/// Generated server implementations.
583pub mod worker_service_server {
584    #![allow(
585        unused_variables,
586        dead_code,
587        missing_docs,
588        clippy::wildcard_imports,
589        clippy::let_unit_value
590    )]
591    use tonic::codegen::*;
592    /// Generated trait containing gRPC methods that should be implemented for use with WorkerServiceServer.
593    #[async_trait]
594    pub trait WorkerService: std::marker::Send + std::marker::Sync + 'static {
595        /// Server streaming response type for the CoordinatorChannel method.
596        type CoordinatorChannelStream: tonic::codegen::tokio_stream::Stream<
597                Item = std::result::Result<super::WorkerToCoordinatorMsg, tonic::Status>,
598            > + std::marker::Send
599            + 'static;
600        /// Establishes a bidirectional message stream between a coordinator and a worker, over which messages
601        /// will be exchanged at any time during a query's lifetime. It's expected to be one coordinator channel
602        /// per task.
603        async fn coordinator_channel(
604            &self,
605            request: tonic::Request<tonic::Streaming<super::CoordinatorToWorkerMsg>>,
606        ) -> std::result::Result<tonic::Response<Self::CoordinatorChannelStream>, tonic::Status>;
607        /// Server streaming response type for the ExecuteTask method.
608        type ExecuteTaskStream: tonic::codegen::tokio_stream::Stream<
609                Item = std::result::Result<::arrow_flight::FlightData, tonic::Status>,
610            > + std::marker::Send
611            + 'static;
612        /// Executes the requested partition range of a subplan previously sent by the coordinator channel.
613        async fn execute_task(
614            &self,
615            request: tonic::Request<super::ExecuteTaskRequest>,
616        ) -> std::result::Result<tonic::Response<Self::ExecuteTaskStream>, tonic::Status>;
617        /// Returns metadata about a worker. Currently only used for worker versioning.
618        async fn get_worker_info(
619            &self,
620            request: tonic::Request<super::GetWorkerInfoRequest>,
621        ) -> std::result::Result<tonic::Response<super::GetWorkerInfoResponse>, tonic::Status>;
622    }
623    #[derive(Debug)]
624    pub struct WorkerServiceServer<T> {
625        inner: Arc<T>,
626        accept_compression_encodings: EnabledCompressionEncodings,
627        send_compression_encodings: EnabledCompressionEncodings,
628        max_decoding_message_size: Option<usize>,
629        max_encoding_message_size: Option<usize>,
630    }
631    impl<T> WorkerServiceServer<T> {
632        pub fn new(inner: T) -> Self {
633            Self::from_arc(Arc::new(inner))
634        }
635        pub fn from_arc(inner: Arc<T>) -> Self {
636            Self {
637                inner,
638                accept_compression_encodings: Default::default(),
639                send_compression_encodings: Default::default(),
640                max_decoding_message_size: None,
641                max_encoding_message_size: None,
642            }
643        }
644        pub fn with_interceptor<F>(inner: T, interceptor: F) -> InterceptedService<Self, F>
645        where
646            F: tonic::service::Interceptor,
647        {
648            InterceptedService::new(Self::new(inner), interceptor)
649        }
650        /// Enable decompressing requests with the given encoding.
651        #[must_use]
652        pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
653            self.accept_compression_encodings.enable(encoding);
654            self
655        }
656        /// Compress responses with the given encoding, if the client supports it.
657        #[must_use]
658        pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
659            self.send_compression_encodings.enable(encoding);
660            self
661        }
662        /// Limits the maximum size of a decoded message.
663        ///
664        /// Default: `4MB`
665        #[must_use]
666        pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
667            self.max_decoding_message_size = Some(limit);
668            self
669        }
670        /// Limits the maximum size of an encoded message.
671        ///
672        /// Default: `usize::MAX`
673        #[must_use]
674        pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
675            self.max_encoding_message_size = Some(limit);
676            self
677        }
678    }
679    impl<T, B> tonic::codegen::Service<http::Request<B>> for WorkerServiceServer<T>
680    where
681        T: WorkerService,
682        B: Body + std::marker::Send + 'static,
683        B::Error: Into<StdError> + std::marker::Send + 'static,
684    {
685        type Response = http::Response<tonic::body::Body>;
686        type Error = std::convert::Infallible;
687        type Future = BoxFuture<Self::Response, Self::Error>;
688        fn poll_ready(
689            &mut self,
690            _cx: &mut Context<'_>,
691        ) -> Poll<std::result::Result<(), Self::Error>> {
692            Poll::Ready(Ok(()))
693        }
694        fn call(&mut self, req: http::Request<B>) -> Self::Future {
695            match req.uri().path() {
696                "/worker.WorkerService/CoordinatorChannel" => {
697                    #[allow(non_camel_case_types)]
698                    struct CoordinatorChannelSvc<T: WorkerService>(pub Arc<T>);
699                    impl<T: WorkerService>
700                        tonic::server::StreamingService<super::CoordinatorToWorkerMsg>
701                        for CoordinatorChannelSvc<T>
702                    {
703                        type Response = super::WorkerToCoordinatorMsg;
704                        type ResponseStream = T::CoordinatorChannelStream;
705                        type Future =
706                            BoxFuture<tonic::Response<Self::ResponseStream>, tonic::Status>;
707                        fn call(
708                            &mut self,
709                            request: tonic::Request<
710                                tonic::Streaming<super::CoordinatorToWorkerMsg>,
711                            >,
712                        ) -> Self::Future {
713                            let inner = Arc::clone(&self.0);
714                            let fut = async move {
715                                <T as WorkerService>::coordinator_channel(&inner, request).await
716                            };
717                            Box::pin(fut)
718                        }
719                    }
720                    let accept_compression_encodings = self.accept_compression_encodings;
721                    let send_compression_encodings = self.send_compression_encodings;
722                    let max_decoding_message_size = self.max_decoding_message_size;
723                    let max_encoding_message_size = self.max_encoding_message_size;
724                    let inner = self.inner.clone();
725                    let fut = async move {
726                        let method = CoordinatorChannelSvc(inner);
727                        let codec = tonic_prost::ProstCodec::default();
728                        let mut grpc = tonic::server::Grpc::new(codec)
729                            .apply_compression_config(
730                                accept_compression_encodings,
731                                send_compression_encodings,
732                            )
733                            .apply_max_message_size_config(
734                                max_decoding_message_size,
735                                max_encoding_message_size,
736                            );
737                        let res = grpc.streaming(method, req).await;
738                        Ok(res)
739                    };
740                    Box::pin(fut)
741                }
742                "/worker.WorkerService/ExecuteTask" => {
743                    #[allow(non_camel_case_types)]
744                    struct ExecuteTaskSvc<T: WorkerService>(pub Arc<T>);
745                    impl<T: WorkerService>
746                        tonic::server::ServerStreamingService<super::ExecuteTaskRequest>
747                        for ExecuteTaskSvc<T>
748                    {
749                        type Response = ::arrow_flight::FlightData;
750                        type ResponseStream = T::ExecuteTaskStream;
751                        type Future =
752                            BoxFuture<tonic::Response<Self::ResponseStream>, tonic::Status>;
753                        fn call(
754                            &mut self,
755                            request: tonic::Request<super::ExecuteTaskRequest>,
756                        ) -> Self::Future {
757                            let inner = Arc::clone(&self.0);
758                            let fut = async move {
759                                <T as WorkerService>::execute_task(&inner, request).await
760                            };
761                            Box::pin(fut)
762                        }
763                    }
764                    let accept_compression_encodings = self.accept_compression_encodings;
765                    let send_compression_encodings = self.send_compression_encodings;
766                    let max_decoding_message_size = self.max_decoding_message_size;
767                    let max_encoding_message_size = self.max_encoding_message_size;
768                    let inner = self.inner.clone();
769                    let fut = async move {
770                        let method = ExecuteTaskSvc(inner);
771                        let codec = tonic_prost::ProstCodec::default();
772                        let mut grpc = tonic::server::Grpc::new(codec)
773                            .apply_compression_config(
774                                accept_compression_encodings,
775                                send_compression_encodings,
776                            )
777                            .apply_max_message_size_config(
778                                max_decoding_message_size,
779                                max_encoding_message_size,
780                            );
781                        let res = grpc.server_streaming(method, req).await;
782                        Ok(res)
783                    };
784                    Box::pin(fut)
785                }
786                "/worker.WorkerService/GetWorkerInfo" => {
787                    #[allow(non_camel_case_types)]
788                    struct GetWorkerInfoSvc<T: WorkerService>(pub Arc<T>);
789                    impl<T: WorkerService> tonic::server::UnaryService<super::GetWorkerInfoRequest>
790                        for GetWorkerInfoSvc<T>
791                    {
792                        type Response = super::GetWorkerInfoResponse;
793                        type Future = BoxFuture<tonic::Response<Self::Response>, tonic::Status>;
794                        fn call(
795                            &mut self,
796                            request: tonic::Request<super::GetWorkerInfoRequest>,
797                        ) -> Self::Future {
798                            let inner = Arc::clone(&self.0);
799                            let fut = async move {
800                                <T as WorkerService>::get_worker_info(&inner, request).await
801                            };
802                            Box::pin(fut)
803                        }
804                    }
805                    let accept_compression_encodings = self.accept_compression_encodings;
806                    let send_compression_encodings = self.send_compression_encodings;
807                    let max_decoding_message_size = self.max_decoding_message_size;
808                    let max_encoding_message_size = self.max_encoding_message_size;
809                    let inner = self.inner.clone();
810                    let fut = async move {
811                        let method = GetWorkerInfoSvc(inner);
812                        let codec = tonic_prost::ProstCodec::default();
813                        let mut grpc = tonic::server::Grpc::new(codec)
814                            .apply_compression_config(
815                                accept_compression_encodings,
816                                send_compression_encodings,
817                            )
818                            .apply_max_message_size_config(
819                                max_decoding_message_size,
820                                max_encoding_message_size,
821                            );
822                        let res = grpc.unary(method, req).await;
823                        Ok(res)
824                    };
825                    Box::pin(fut)
826                }
827                _ => Box::pin(async move {
828                    let mut response = http::Response::new(tonic::body::Body::default());
829                    let headers = response.headers_mut();
830                    headers.insert(
831                        tonic::Status::GRPC_STATUS,
832                        (tonic::Code::Unimplemented as i32).into(),
833                    );
834                    headers.insert(
835                        http::header::CONTENT_TYPE,
836                        tonic::metadata::GRPC_CONTENT_TYPE,
837                    );
838                    Ok(response)
839                }),
840            }
841        }
842    }
843    impl<T> Clone for WorkerServiceServer<T> {
844        fn clone(&self) -> Self {
845            let inner = self.inner.clone();
846            Self {
847                inner,
848                accept_compression_encodings: self.accept_compression_encodings,
849                send_compression_encodings: self.send_compression_encodings,
850                max_decoding_message_size: self.max_decoding_message_size,
851                max_encoding_message_size: self.max_encoding_message_size,
852            }
853        }
854    }
855    /// Generated gRPC service name
856    pub const SERVICE_NAME: &str = "worker.WorkerService";
857    impl<T> tonic::server::NamedService for WorkerServiceServer<T> {
858        const NAME: &'static str = SERVICE_NAME;
859    }
860}