Skip to main content

datafusion_distributed/worker/generated/
worker.rs

1// This file is @generated by prost-build.
2#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
3pub struct CoordinatorToWorkerMsg {
4    #[prost(oneof = "coordinator_to_worker_msg::Inner", tags = "1")]
5    pub inner: ::core::option::Option<coordinator_to_worker_msg::Inner>,
6}
7/// Nested message and enum types in `CoordinatorToWorkerMsg`.
8pub mod coordinator_to_worker_msg {
9    #[derive(Clone, PartialEq, Eq, Hash, ::prost::Oneof)]
10    pub enum Inner {
11        /// Sends a subplan to a worker so that a future ExecuteTask call can actually execute it.
12        /// The plan is identified by a TaskKey.
13        #[prost(message, tag = "1")]
14        SetPlanRequest(super::SetPlanRequest),
15    }
16}
17/// For now, there are no messages that can flow back from worker to coordinator.
18#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
19pub struct WorkerToCoordinatorMsg {}
20#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
21pub struct GetWorkerInfoRequest {}
22#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
23pub struct GetWorkerInfoResponse {
24    #[prost(string, tag = "1")]
25    pub version: ::prost::alloc::string::String,
26}
27#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
28pub struct SetPlanRequest {
29    /// The unique identifier of the task to which the subplan belongs to.
30    #[prost(message, optional, tag = "1")]
31    pub task_key: ::core::option::Option<TaskKey>,
32    /// The amount of tasks that share the same subplan. Necessary for building the DistributedTaskContext during execution.
33    #[prost(uint64, tag = "2")]
34    pub task_count: u64,
35    /// The serialized subplan the worker is expected to execute on an ExecuteTask gRPC call.
36    #[prost(bytes = "vec", tag = "3")]
37    pub plan_proto: ::prost::alloc::vec::Vec<u8>,
38}
39#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
40pub struct ExecuteTaskRequest {
41    /// The unique identifier of the task that is going to get executed.
42    #[prost(message, optional, tag = "1")]
43    pub task_key: ::core::option::Option<TaskKey>,
44    /// The start of the partition range of the specified task that is going to be executed.
45    #[prost(uint64, tag = "2")]
46    pub target_partition_start: u64,
47    /// The end of the partition range of the specified task that is going to be executed.
48    #[prost(uint64, tag = "3")]
49    pub target_partition_end: u64,
50}
51/// A key that uniquely identifies a task in a query.
52#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
53pub struct TaskKey {
54    /// Our query id.
55    #[prost(bytes = "vec", tag = "1")]
56    pub query_id: ::prost::alloc::vec::Vec<u8>,
57    /// Our stage id.
58    #[prost(uint64, tag = "2")]
59    pub stage_id: u64,
60    /// The task number within the stage.
61    #[prost(uint64, tag = "3")]
62    pub task_number: u64,
63}
64/// FlightAppMetadata represents all types of app_metadata which we use in the distributed execution.
65#[derive(Clone, PartialEq, ::prost::Message)]
66pub struct FlightAppMetadata {
67    #[prost(uint64, tag = "1")]
68    pub partition: u64,
69    /// Unix timestamp in nanoseconds at which this message was created.
70    #[prost(uint64, tag = "2")]
71    pub created_timestamp_unix_nanos: u64,
72    /// content should always be Some, but it is optional due to protobuf rules.
73    #[prost(oneof = "flight_app_metadata::Content", tags = "10")]
74    pub content: ::core::option::Option<flight_app_metadata::Content>,
75}
76/// Nested message and enum types in `FlightAppMetadata`.
77pub mod flight_app_metadata {
78    /// content should always be Some, but it is optional due to protobuf rules.
79    #[derive(Clone, PartialEq, ::prost::Oneof)]
80    pub enum Content {
81        #[prost(message, tag = "10")]
82        MetricsCollection(super::MetricsCollection),
83    }
84}
85/// A collection of metrics for a set of tasks in an ExecutionPlan. Each
86/// entry should have a distinct TaskKey.
87#[derive(Clone, PartialEq, ::prost::Message)]
88pub struct MetricsCollection {
89    #[prost(message, repeated, tag = "1")]
90    pub tasks: ::prost::alloc::vec::Vec<TaskMetrics>,
91}
92/// TaskMetrics represents the metrics for a single task.
93#[derive(Clone, PartialEq, ::prost::Message)]
94pub struct TaskMetrics {
95    /// task_key uniquely identifies this task.
96    /// This field is always present. It's marked optional due to protobuf rules.
97    #[prost(message, optional, tag = "1")]
98    pub task_key: ::core::option::Option<TaskKey>,
99    /// metrics\[i\] is the set of metrics for plan node i where plan nodes are
100    /// in pre-order traversal order.
101    #[prost(message, repeated, tag = "2")]
102    pub metrics: ::prost::alloc::vec::Vec<MetricsSet>,
103}
104/// A Label mirrors datafusion::physical_plan::metrics::Label.
105#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
106pub struct Label {
107    #[prost(string, tag = "1")]
108    pub name: ::prost::alloc::string::String,
109    #[prost(string, tag = "2")]
110    pub value: ::prost::alloc::string::String,
111}
112/// A Metric is a protobuf mirror of datafusion::physical_plan::metrics::Metric.
113#[derive(Clone, PartialEq, ::prost::Message)]
114pub struct Metric {
115    #[prost(message, repeated, tag = "1")]
116    pub labels: ::prost::alloc::vec::Vec<Label>,
117    #[prost(uint64, optional, tag = "2")]
118    pub partition: ::core::option::Option<u64>,
119    #[prost(
120        oneof = "metric::Value",
121        tags = "10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33"
122    )]
123    pub value: ::core::option::Option<metric::Value>,
124}
125/// Nested message and enum types in `Metric`.
126pub mod metric {
127    #[derive(Clone, PartialEq, Eq, Hash, ::prost::Oneof)]
128    pub enum Value {
129        #[prost(message, tag = "10")]
130        OutputRows(super::OutputRows),
131        #[prost(message, tag = "11")]
132        ElapsedCompute(super::ElapsedCompute),
133        #[prost(message, tag = "12")]
134        SpillCount(super::SpillCount),
135        #[prost(message, tag = "13")]
136        SpilledBytes(super::SpilledBytes),
137        #[prost(message, tag = "14")]
138        SpilledRows(super::SpilledRows),
139        #[prost(message, tag = "15")]
140        CurrentMemoryUsage(super::CurrentMemoryUsage),
141        #[prost(message, tag = "16")]
142        Count(super::NamedCount),
143        #[prost(message, tag = "17")]
144        Gauge(super::NamedGauge),
145        #[prost(message, tag = "18")]
146        Time(super::NamedTime),
147        #[prost(message, tag = "19")]
148        StartTimestamp(super::StartTimestamp),
149        #[prost(message, tag = "20")]
150        EndTimestamp(super::EndTimestamp),
151        #[prost(message, tag = "21")]
152        OutputBytes(super::OutputBytes),
153        #[prost(message, tag = "22")]
154        OutputBatches(super::OutputBatches),
155        #[prost(message, tag = "23")]
156        PruningMetrics(super::NamedPruningMetrics),
157        #[prost(message, tag = "24")]
158        Ratio(super::NamedRatio),
159        #[prost(message, tag = "25")]
160        CustomMinLatency(super::MinLatency),
161        #[prost(message, tag = "26")]
162        CustomMaxLatency(super::MaxLatency),
163        #[prost(message, tag = "27")]
164        CustomAvgLatency(super::AvgLatency),
165        #[prost(message, tag = "28")]
166        CustomFirstLatency(super::FirstLatency),
167        #[prost(message, tag = "29")]
168        CustomBytesCount(super::BytesCount),
169        #[prost(message, tag = "30")]
170        CustomP50Latency(super::PercentileLatency),
171        #[prost(message, tag = "31")]
172        CustomP75Latency(super::PercentileLatency),
173        #[prost(message, tag = "32")]
174        CustomP95Latency(super::PercentileLatency),
175        #[prost(message, tag = "33")]
176        CustomP99Latency(super::PercentileLatency),
177    }
178}
179/// A MetricsSet is a protobuf mirror of datafusion::physical_plan::metrics::MetricsSet. It represents
180/// a collection of metrics for one ExecutionPlan node.
181#[derive(Clone, PartialEq, ::prost::Message)]
182pub struct MetricsSet {
183    #[prost(message, repeated, tag = "1")]
184    pub metrics: ::prost::alloc::vec::Vec<Metric>,
185}
186#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
187pub struct OutputRows {
188    #[prost(uint64, tag = "1")]
189    pub value: u64,
190}
191#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
192pub struct ElapsedCompute {
193    #[prost(uint64, tag = "1")]
194    pub value: u64,
195}
196#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
197pub struct SpillCount {
198    #[prost(uint64, tag = "1")]
199    pub value: u64,
200}
201#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
202pub struct SpilledBytes {
203    #[prost(uint64, tag = "1")]
204    pub value: u64,
205}
206#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
207pub struct SpilledRows {
208    #[prost(uint64, tag = "1")]
209    pub value: u64,
210}
211#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
212pub struct CurrentMemoryUsage {
213    #[prost(uint64, tag = "1")]
214    pub value: u64,
215}
216#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
217pub struct NamedCount {
218    #[prost(string, tag = "1")]
219    pub name: ::prost::alloc::string::String,
220    #[prost(uint64, tag = "2")]
221    pub value: u64,
222}
223#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
224pub struct NamedGauge {
225    #[prost(string, tag = "1")]
226    pub name: ::prost::alloc::string::String,
227    #[prost(uint64, tag = "2")]
228    pub value: u64,
229}
230#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
231pub struct NamedTime {
232    #[prost(string, tag = "1")]
233    pub name: ::prost::alloc::string::String,
234    #[prost(uint64, tag = "2")]
235    pub value: u64,
236}
237#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
238pub struct StartTimestamp {
239    #[prost(int64, optional, tag = "1")]
240    pub value: ::core::option::Option<i64>,
241}
242#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
243pub struct EndTimestamp {
244    #[prost(int64, optional, tag = "1")]
245    pub value: ::core::option::Option<i64>,
246}
247#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
248pub struct OutputBytes {
249    #[prost(uint64, tag = "1")]
250    pub value: u64,
251}
252#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
253pub struct OutputBatches {
254    #[prost(uint64, tag = "1")]
255    pub value: u64,
256}
257#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
258pub struct NamedPruningMetrics {
259    #[prost(string, tag = "1")]
260    pub name: ::prost::alloc::string::String,
261    #[prost(uint64, tag = "2")]
262    pub pruned: u64,
263    #[prost(uint64, tag = "3")]
264    pub matched: u64,
265}
266#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
267pub struct NamedRatio {
268    #[prost(string, tag = "1")]
269    pub name: ::prost::alloc::string::String,
270    #[prost(uint64, tag = "2")]
271    pub part: u64,
272    #[prost(uint64, tag = "3")]
273    pub total: u64,
274}
275#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
276pub struct BytesCount {
277    #[prost(string, tag = "1")]
278    pub name: ::prost::alloc::string::String,
279    #[prost(uint64, tag = "2")]
280    pub value: u64,
281}
282#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
283pub struct MinLatency {
284    #[prost(string, tag = "1")]
285    pub name: ::prost::alloc::string::String,
286    #[prost(uint64, tag = "2")]
287    pub value: u64,
288}
289#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
290pub struct MaxLatency {
291    #[prost(string, tag = "1")]
292    pub name: ::prost::alloc::string::String,
293    #[prost(uint64, tag = "2")]
294    pub value: u64,
295}
296#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
297pub struct AvgLatency {
298    #[prost(string, tag = "1")]
299    pub name: ::prost::alloc::string::String,
300    #[prost(uint64, tag = "2")]
301    pub nanos_sum: u64,
302    #[prost(uint64, tag = "3")]
303    pub count: u64,
304}
305#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
306pub struct FirstLatency {
307    #[prost(string, tag = "1")]
308    pub name: ::prost::alloc::string::String,
309    #[prost(uint64, tag = "2")]
310    pub value: u64,
311}
312#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
313pub struct PercentileLatency {
314    #[prost(string, tag = "1")]
315    pub name: ::prost::alloc::string::String,
316    #[prost(bytes = "vec", tag = "4")]
317    pub sketch_bytes: ::prost::alloc::vec::Vec<u8>,
318}
319/// Generated client implementations.
320pub mod worker_service_client {
321    #![allow(
322        unused_variables,
323        dead_code,
324        missing_docs,
325        clippy::wildcard_imports,
326        clippy::let_unit_value
327    )]
328    use tonic::codegen::http::Uri;
329    use tonic::codegen::*;
330    #[derive(Debug, Clone)]
331    pub struct WorkerServiceClient<T> {
332        inner: tonic::client::Grpc<T>,
333    }
334    impl WorkerServiceClient<tonic::transport::Channel> {
335        /// Attempt to create a new client by connecting to a given endpoint.
336        pub async fn connect<D>(dst: D) -> Result<Self, tonic::transport::Error>
337        where
338            D: TryInto<tonic::transport::Endpoint>,
339            D::Error: Into<StdError>,
340        {
341            let conn = tonic::transport::Endpoint::new(dst)?.connect().await?;
342            Ok(Self::new(conn))
343        }
344    }
345    impl<T> WorkerServiceClient<T>
346    where
347        T: tonic::client::GrpcService<tonic::body::Body>,
348        T::Error: Into<StdError>,
349        T::ResponseBody: Body<Data = Bytes> + std::marker::Send + 'static,
350        <T::ResponseBody as Body>::Error: Into<StdError> + std::marker::Send,
351    {
352        pub fn new(inner: T) -> Self {
353            let inner = tonic::client::Grpc::new(inner);
354            Self { inner }
355        }
356        pub fn with_origin(inner: T, origin: Uri) -> Self {
357            let inner = tonic::client::Grpc::with_origin(inner, origin);
358            Self { inner }
359        }
360        pub fn with_interceptor<F>(
361            inner: T,
362            interceptor: F,
363        ) -> WorkerServiceClient<InterceptedService<T, F>>
364        where
365            F: tonic::service::Interceptor,
366            T::ResponseBody: Default,
367            T: tonic::codegen::Service<
368                    http::Request<tonic::body::Body>,
369                    Response = http::Response<
370                        <T as tonic::client::GrpcService<tonic::body::Body>>::ResponseBody,
371                    >,
372                >,
373            <T as tonic::codegen::Service<http::Request<tonic::body::Body>>>::Error:
374                Into<StdError> + std::marker::Send + std::marker::Sync,
375        {
376            WorkerServiceClient::new(InterceptedService::new(inner, interceptor))
377        }
378        /// Compress requests with the given encoding.
379        ///
380        /// This requires the server to support it otherwise it might respond with an
381        /// error.
382        #[must_use]
383        pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
384            self.inner = self.inner.send_compressed(encoding);
385            self
386        }
387        /// Enable decompressing responses.
388        #[must_use]
389        pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
390            self.inner = self.inner.accept_compressed(encoding);
391            self
392        }
393        /// Limits the maximum size of a decoded message.
394        ///
395        /// Default: `4MB`
396        #[must_use]
397        pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
398            self.inner = self.inner.max_decoding_message_size(limit);
399            self
400        }
401        /// Limits the maximum size of an encoded message.
402        ///
403        /// Default: `usize::MAX`
404        #[must_use]
405        pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
406            self.inner = self.inner.max_encoding_message_size(limit);
407            self
408        }
409        /// Establishes a bidirectional message stream between a coordinator and a worker, over which messages
410        /// will be exchanged at any time during a query's lifetime. It's expected to be one coordinator channel
411        /// per task.
412        pub async fn coordinator_channel(
413            &mut self,
414            request: impl tonic::IntoStreamingRequest<Message = super::CoordinatorToWorkerMsg>,
415        ) -> std::result::Result<
416            tonic::Response<tonic::codec::Streaming<super::WorkerToCoordinatorMsg>>,
417            tonic::Status,
418        > {
419            self.inner.ready().await.map_err(|e| {
420                tonic::Status::unknown(format!("Service was not ready: {}", e.into()))
421            })?;
422            let codec = tonic_prost::ProstCodec::default();
423            let path =
424                http::uri::PathAndQuery::from_static("/worker.WorkerService/CoordinatorChannel");
425            let mut req = request.into_streaming_request();
426            req.extensions_mut().insert(GrpcMethod::new(
427                "worker.WorkerService",
428                "CoordinatorChannel",
429            ));
430            self.inner.streaming(req, path, codec).await
431        }
432        /// Executes the requested partition range of a subplan previously sent by the coordinator channel.
433        pub async fn execute_task(
434            &mut self,
435            request: impl tonic::IntoRequest<super::ExecuteTaskRequest>,
436        ) -> std::result::Result<
437            tonic::Response<tonic::codec::Streaming<::arrow_flight::FlightData>>,
438            tonic::Status,
439        > {
440            self.inner.ready().await.map_err(|e| {
441                tonic::Status::unknown(format!("Service was not ready: {}", e.into()))
442            })?;
443            let codec = tonic_prost::ProstCodec::default();
444            let path = http::uri::PathAndQuery::from_static("/worker.WorkerService/ExecuteTask");
445            let mut req = request.into_request();
446            req.extensions_mut()
447                .insert(GrpcMethod::new("worker.WorkerService", "ExecuteTask"));
448            self.inner.server_streaming(req, path, codec).await
449        }
450        /// Returns metadata about a worker. Currently only used for worker versioning.
451        pub async fn get_worker_info(
452            &mut self,
453            request: impl tonic::IntoRequest<super::GetWorkerInfoRequest>,
454        ) -> std::result::Result<tonic::Response<super::GetWorkerInfoResponse>, tonic::Status>
455        {
456            self.inner.ready().await.map_err(|e| {
457                tonic::Status::unknown(format!("Service was not ready: {}", e.into()))
458            })?;
459            let codec = tonic_prost::ProstCodec::default();
460            let path = http::uri::PathAndQuery::from_static("/worker.WorkerService/GetWorkerInfo");
461            let mut req = request.into_request();
462            req.extensions_mut()
463                .insert(GrpcMethod::new("worker.WorkerService", "GetWorkerInfo"));
464            self.inner.unary(req, path, codec).await
465        }
466    }
467}
468/// Generated server implementations.
469pub mod worker_service_server {
470    #![allow(
471        unused_variables,
472        dead_code,
473        missing_docs,
474        clippy::wildcard_imports,
475        clippy::let_unit_value
476    )]
477    use tonic::codegen::*;
478    /// Generated trait containing gRPC methods that should be implemented for use with WorkerServiceServer.
479    #[async_trait]
480    pub trait WorkerService: std::marker::Send + std::marker::Sync + 'static {
481        /// Server streaming response type for the CoordinatorChannel method.
482        type CoordinatorChannelStream: tonic::codegen::tokio_stream::Stream<
483                Item = std::result::Result<super::WorkerToCoordinatorMsg, tonic::Status>,
484            > + std::marker::Send
485            + 'static;
486        /// Establishes a bidirectional message stream between a coordinator and a worker, over which messages
487        /// will be exchanged at any time during a query's lifetime. It's expected to be one coordinator channel
488        /// per task.
489        async fn coordinator_channel(
490            &self,
491            request: tonic::Request<tonic::Streaming<super::CoordinatorToWorkerMsg>>,
492        ) -> std::result::Result<tonic::Response<Self::CoordinatorChannelStream>, tonic::Status>;
493        /// Server streaming response type for the ExecuteTask method.
494        type ExecuteTaskStream: tonic::codegen::tokio_stream::Stream<
495                Item = std::result::Result<::arrow_flight::FlightData, tonic::Status>,
496            > + std::marker::Send
497            + 'static;
498        /// Executes the requested partition range of a subplan previously sent by the coordinator channel.
499        async fn execute_task(
500            &self,
501            request: tonic::Request<super::ExecuteTaskRequest>,
502        ) -> std::result::Result<tonic::Response<Self::ExecuteTaskStream>, tonic::Status>;
503        /// Returns metadata about a worker. Currently only used for worker versioning.
504        async fn get_worker_info(
505            &self,
506            request: tonic::Request<super::GetWorkerInfoRequest>,
507        ) -> std::result::Result<tonic::Response<super::GetWorkerInfoResponse>, tonic::Status>;
508    }
509    #[derive(Debug)]
510    pub struct WorkerServiceServer<T> {
511        inner: Arc<T>,
512        accept_compression_encodings: EnabledCompressionEncodings,
513        send_compression_encodings: EnabledCompressionEncodings,
514        max_decoding_message_size: Option<usize>,
515        max_encoding_message_size: Option<usize>,
516    }
517    impl<T> WorkerServiceServer<T> {
518        pub fn new(inner: T) -> Self {
519            Self::from_arc(Arc::new(inner))
520        }
521        pub fn from_arc(inner: Arc<T>) -> Self {
522            Self {
523                inner,
524                accept_compression_encodings: Default::default(),
525                send_compression_encodings: Default::default(),
526                max_decoding_message_size: None,
527                max_encoding_message_size: None,
528            }
529        }
530        pub fn with_interceptor<F>(inner: T, interceptor: F) -> InterceptedService<Self, F>
531        where
532            F: tonic::service::Interceptor,
533        {
534            InterceptedService::new(Self::new(inner), interceptor)
535        }
536        /// Enable decompressing requests with the given encoding.
537        #[must_use]
538        pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
539            self.accept_compression_encodings.enable(encoding);
540            self
541        }
542        /// Compress responses with the given encoding, if the client supports it.
543        #[must_use]
544        pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
545            self.send_compression_encodings.enable(encoding);
546            self
547        }
548        /// Limits the maximum size of a decoded message.
549        ///
550        /// Default: `4MB`
551        #[must_use]
552        pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
553            self.max_decoding_message_size = Some(limit);
554            self
555        }
556        /// Limits the maximum size of an encoded message.
557        ///
558        /// Default: `usize::MAX`
559        #[must_use]
560        pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
561            self.max_encoding_message_size = Some(limit);
562            self
563        }
564    }
565    impl<T, B> tonic::codegen::Service<http::Request<B>> for WorkerServiceServer<T>
566    where
567        T: WorkerService,
568        B: Body + std::marker::Send + 'static,
569        B::Error: Into<StdError> + std::marker::Send + 'static,
570    {
571        type Response = http::Response<tonic::body::Body>;
572        type Error = std::convert::Infallible;
573        type Future = BoxFuture<Self::Response, Self::Error>;
574        fn poll_ready(
575            &mut self,
576            _cx: &mut Context<'_>,
577        ) -> Poll<std::result::Result<(), Self::Error>> {
578            Poll::Ready(Ok(()))
579        }
580        fn call(&mut self, req: http::Request<B>) -> Self::Future {
581            match req.uri().path() {
582                "/worker.WorkerService/CoordinatorChannel" => {
583                    #[allow(non_camel_case_types)]
584                    struct CoordinatorChannelSvc<T: WorkerService>(pub Arc<T>);
585                    impl<T: WorkerService>
586                        tonic::server::StreamingService<super::CoordinatorToWorkerMsg>
587                        for CoordinatorChannelSvc<T>
588                    {
589                        type Response = super::WorkerToCoordinatorMsg;
590                        type ResponseStream = T::CoordinatorChannelStream;
591                        type Future =
592                            BoxFuture<tonic::Response<Self::ResponseStream>, tonic::Status>;
593                        fn call(
594                            &mut self,
595                            request: tonic::Request<
596                                tonic::Streaming<super::CoordinatorToWorkerMsg>,
597                            >,
598                        ) -> Self::Future {
599                            let inner = Arc::clone(&self.0);
600                            let fut = async move {
601                                <T as WorkerService>::coordinator_channel(&inner, request).await
602                            };
603                            Box::pin(fut)
604                        }
605                    }
606                    let accept_compression_encodings = self.accept_compression_encodings;
607                    let send_compression_encodings = self.send_compression_encodings;
608                    let max_decoding_message_size = self.max_decoding_message_size;
609                    let max_encoding_message_size = self.max_encoding_message_size;
610                    let inner = self.inner.clone();
611                    let fut = async move {
612                        let method = CoordinatorChannelSvc(inner);
613                        let codec = tonic_prost::ProstCodec::default();
614                        let mut grpc = tonic::server::Grpc::new(codec)
615                            .apply_compression_config(
616                                accept_compression_encodings,
617                                send_compression_encodings,
618                            )
619                            .apply_max_message_size_config(
620                                max_decoding_message_size,
621                                max_encoding_message_size,
622                            );
623                        let res = grpc.streaming(method, req).await;
624                        Ok(res)
625                    };
626                    Box::pin(fut)
627                }
628                "/worker.WorkerService/ExecuteTask" => {
629                    #[allow(non_camel_case_types)]
630                    struct ExecuteTaskSvc<T: WorkerService>(pub Arc<T>);
631                    impl<T: WorkerService>
632                        tonic::server::ServerStreamingService<super::ExecuteTaskRequest>
633                        for ExecuteTaskSvc<T>
634                    {
635                        type Response = ::arrow_flight::FlightData;
636                        type ResponseStream = T::ExecuteTaskStream;
637                        type Future =
638                            BoxFuture<tonic::Response<Self::ResponseStream>, tonic::Status>;
639                        fn call(
640                            &mut self,
641                            request: tonic::Request<super::ExecuteTaskRequest>,
642                        ) -> Self::Future {
643                            let inner = Arc::clone(&self.0);
644                            let fut = async move {
645                                <T as WorkerService>::execute_task(&inner, request).await
646                            };
647                            Box::pin(fut)
648                        }
649                    }
650                    let accept_compression_encodings = self.accept_compression_encodings;
651                    let send_compression_encodings = self.send_compression_encodings;
652                    let max_decoding_message_size = self.max_decoding_message_size;
653                    let max_encoding_message_size = self.max_encoding_message_size;
654                    let inner = self.inner.clone();
655                    let fut = async move {
656                        let method = ExecuteTaskSvc(inner);
657                        let codec = tonic_prost::ProstCodec::default();
658                        let mut grpc = tonic::server::Grpc::new(codec)
659                            .apply_compression_config(
660                                accept_compression_encodings,
661                                send_compression_encodings,
662                            )
663                            .apply_max_message_size_config(
664                                max_decoding_message_size,
665                                max_encoding_message_size,
666                            );
667                        let res = grpc.server_streaming(method, req).await;
668                        Ok(res)
669                    };
670                    Box::pin(fut)
671                }
672                "/worker.WorkerService/GetWorkerInfo" => {
673                    #[allow(non_camel_case_types)]
674                    struct GetWorkerInfoSvc<T: WorkerService>(pub Arc<T>);
675                    impl<T: WorkerService> tonic::server::UnaryService<super::GetWorkerInfoRequest>
676                        for GetWorkerInfoSvc<T>
677                    {
678                        type Response = super::GetWorkerInfoResponse;
679                        type Future = BoxFuture<tonic::Response<Self::Response>, tonic::Status>;
680                        fn call(
681                            &mut self,
682                            request: tonic::Request<super::GetWorkerInfoRequest>,
683                        ) -> Self::Future {
684                            let inner = Arc::clone(&self.0);
685                            let fut = async move {
686                                <T as WorkerService>::get_worker_info(&inner, request).await
687                            };
688                            Box::pin(fut)
689                        }
690                    }
691                    let accept_compression_encodings = self.accept_compression_encodings;
692                    let send_compression_encodings = self.send_compression_encodings;
693                    let max_decoding_message_size = self.max_decoding_message_size;
694                    let max_encoding_message_size = self.max_encoding_message_size;
695                    let inner = self.inner.clone();
696                    let fut = async move {
697                        let method = GetWorkerInfoSvc(inner);
698                        let codec = tonic_prost::ProstCodec::default();
699                        let mut grpc = tonic::server::Grpc::new(codec)
700                            .apply_compression_config(
701                                accept_compression_encodings,
702                                send_compression_encodings,
703                            )
704                            .apply_max_message_size_config(
705                                max_decoding_message_size,
706                                max_encoding_message_size,
707                            );
708                        let res = grpc.unary(method, req).await;
709                        Ok(res)
710                    };
711                    Box::pin(fut)
712                }
713                _ => Box::pin(async move {
714                    let mut response = http::Response::new(tonic::body::Body::default());
715                    let headers = response.headers_mut();
716                    headers.insert(
717                        tonic::Status::GRPC_STATUS,
718                        (tonic::Code::Unimplemented as i32).into(),
719                    );
720                    headers.insert(
721                        http::header::CONTENT_TYPE,
722                        tonic::metadata::GRPC_CONTENT_TYPE,
723                    );
724                    Ok(response)
725                }),
726            }
727        }
728    }
729    impl<T> Clone for WorkerServiceServer<T> {
730        fn clone(&self) -> Self {
731            let inner = self.inner.clone();
732            Self {
733                inner,
734                accept_compression_encodings: self.accept_compression_encodings,
735                send_compression_encodings: self.send_compression_encodings,
736                max_decoding_message_size: self.max_decoding_message_size,
737                max_encoding_message_size: self.max_encoding_message_size,
738            }
739        }
740    }
741    /// Generated gRPC service name
742    pub const SERVICE_NAME: &str = "worker.WorkerService";
743    impl<T> tonic::server::NamedService for WorkerServiceServer<T> {
744        const NAME: &'static str = SERVICE_NAME;
745    }
746}