1#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
3pub struct CoordinatorToWorkerMsg {
4 #[prost(oneof = "coordinator_to_worker_msg::Inner", tags = "1")]
5 pub inner: ::core::option::Option<coordinator_to_worker_msg::Inner>,
6}
7pub mod coordinator_to_worker_msg {
9 #[derive(Clone, PartialEq, Eq, Hash, ::prost::Oneof)]
10 pub enum Inner {
11 #[prost(message, tag = "1")]
14 SetPlanRequest(super::SetPlanRequest),
15 }
16}
17#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
19pub struct WorkerToCoordinatorMsg {}
20#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
21pub struct GetWorkerInfoRequest {}
22#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
23pub struct GetWorkerInfoResponse {
24 #[prost(string, tag = "1")]
25 pub version: ::prost::alloc::string::String,
26}
27#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
28pub struct SetPlanRequest {
29 #[prost(message, optional, tag = "1")]
31 pub task_key: ::core::option::Option<TaskKey>,
32 #[prost(uint64, tag = "2")]
34 pub task_count: u64,
35 #[prost(bytes = "vec", tag = "3")]
37 pub plan_proto: ::prost::alloc::vec::Vec<u8>,
38}
39#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
40pub struct ExecuteTaskRequest {
41 #[prost(message, optional, tag = "1")]
43 pub task_key: ::core::option::Option<TaskKey>,
44 #[prost(uint64, tag = "2")]
46 pub target_partition_start: u64,
47 #[prost(uint64, tag = "3")]
49 pub target_partition_end: u64,
50}
51#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
53pub struct TaskKey {
54 #[prost(bytes = "vec", tag = "1")]
56 pub query_id: ::prost::alloc::vec::Vec<u8>,
57 #[prost(uint64, tag = "2")]
59 pub stage_id: u64,
60 #[prost(uint64, tag = "3")]
62 pub task_number: u64,
63}
64#[derive(Clone, PartialEq, ::prost::Message)]
66pub struct FlightAppMetadata {
67 #[prost(uint64, tag = "1")]
68 pub partition: u64,
69 #[prost(uint64, tag = "2")]
71 pub created_timestamp_unix_nanos: u64,
72 #[prost(oneof = "flight_app_metadata::Content", tags = "10")]
74 pub content: ::core::option::Option<flight_app_metadata::Content>,
75}
76pub mod flight_app_metadata {
78 #[derive(Clone, PartialEq, ::prost::Oneof)]
80 pub enum Content {
81 #[prost(message, tag = "10")]
82 MetricsCollection(super::MetricsCollection),
83 }
84}
85#[derive(Clone, PartialEq, ::prost::Message)]
88pub struct MetricsCollection {
89 #[prost(message, repeated, tag = "1")]
90 pub tasks: ::prost::alloc::vec::Vec<TaskMetrics>,
91}
92#[derive(Clone, PartialEq, ::prost::Message)]
94pub struct TaskMetrics {
95 #[prost(message, optional, tag = "1")]
98 pub task_key: ::core::option::Option<TaskKey>,
99 #[prost(message, repeated, tag = "2")]
102 pub metrics: ::prost::alloc::vec::Vec<MetricsSet>,
103}
104#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
106pub struct Label {
107 #[prost(string, tag = "1")]
108 pub name: ::prost::alloc::string::String,
109 #[prost(string, tag = "2")]
110 pub value: ::prost::alloc::string::String,
111}
112#[derive(Clone, PartialEq, ::prost::Message)]
114pub struct Metric {
115 #[prost(message, repeated, tag = "1")]
116 pub labels: ::prost::alloc::vec::Vec<Label>,
117 #[prost(uint64, optional, tag = "2")]
118 pub partition: ::core::option::Option<u64>,
119 #[prost(
120 oneof = "metric::Value",
121 tags = "10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33"
122 )]
123 pub value: ::core::option::Option<metric::Value>,
124}
125pub mod metric {
127 #[derive(Clone, PartialEq, Eq, Hash, ::prost::Oneof)]
128 pub enum Value {
129 #[prost(message, tag = "10")]
130 OutputRows(super::OutputRows),
131 #[prost(message, tag = "11")]
132 ElapsedCompute(super::ElapsedCompute),
133 #[prost(message, tag = "12")]
134 SpillCount(super::SpillCount),
135 #[prost(message, tag = "13")]
136 SpilledBytes(super::SpilledBytes),
137 #[prost(message, tag = "14")]
138 SpilledRows(super::SpilledRows),
139 #[prost(message, tag = "15")]
140 CurrentMemoryUsage(super::CurrentMemoryUsage),
141 #[prost(message, tag = "16")]
142 Count(super::NamedCount),
143 #[prost(message, tag = "17")]
144 Gauge(super::NamedGauge),
145 #[prost(message, tag = "18")]
146 Time(super::NamedTime),
147 #[prost(message, tag = "19")]
148 StartTimestamp(super::StartTimestamp),
149 #[prost(message, tag = "20")]
150 EndTimestamp(super::EndTimestamp),
151 #[prost(message, tag = "21")]
152 OutputBytes(super::OutputBytes),
153 #[prost(message, tag = "22")]
154 OutputBatches(super::OutputBatches),
155 #[prost(message, tag = "23")]
156 PruningMetrics(super::NamedPruningMetrics),
157 #[prost(message, tag = "24")]
158 Ratio(super::NamedRatio),
159 #[prost(message, tag = "25")]
160 CustomMinLatency(super::MinLatency),
161 #[prost(message, tag = "26")]
162 CustomMaxLatency(super::MaxLatency),
163 #[prost(message, tag = "27")]
164 CustomAvgLatency(super::AvgLatency),
165 #[prost(message, tag = "28")]
166 CustomFirstLatency(super::FirstLatency),
167 #[prost(message, tag = "29")]
168 CustomBytesCount(super::BytesCount),
169 #[prost(message, tag = "30")]
170 CustomP50Latency(super::PercentileLatency),
171 #[prost(message, tag = "31")]
172 CustomP75Latency(super::PercentileLatency),
173 #[prost(message, tag = "32")]
174 CustomP95Latency(super::PercentileLatency),
175 #[prost(message, tag = "33")]
176 CustomP99Latency(super::PercentileLatency),
177 }
178}
179#[derive(Clone, PartialEq, ::prost::Message)]
182pub struct MetricsSet {
183 #[prost(message, repeated, tag = "1")]
184 pub metrics: ::prost::alloc::vec::Vec<Metric>,
185}
186#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
187pub struct OutputRows {
188 #[prost(uint64, tag = "1")]
189 pub value: u64,
190}
191#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
192pub struct ElapsedCompute {
193 #[prost(uint64, tag = "1")]
194 pub value: u64,
195}
196#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
197pub struct SpillCount {
198 #[prost(uint64, tag = "1")]
199 pub value: u64,
200}
201#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
202pub struct SpilledBytes {
203 #[prost(uint64, tag = "1")]
204 pub value: u64,
205}
206#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
207pub struct SpilledRows {
208 #[prost(uint64, tag = "1")]
209 pub value: u64,
210}
211#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
212pub struct CurrentMemoryUsage {
213 #[prost(uint64, tag = "1")]
214 pub value: u64,
215}
216#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
217pub struct NamedCount {
218 #[prost(string, tag = "1")]
219 pub name: ::prost::alloc::string::String,
220 #[prost(uint64, tag = "2")]
221 pub value: u64,
222}
223#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
224pub struct NamedGauge {
225 #[prost(string, tag = "1")]
226 pub name: ::prost::alloc::string::String,
227 #[prost(uint64, tag = "2")]
228 pub value: u64,
229}
230#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
231pub struct NamedTime {
232 #[prost(string, tag = "1")]
233 pub name: ::prost::alloc::string::String,
234 #[prost(uint64, tag = "2")]
235 pub value: u64,
236}
237#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
238pub struct StartTimestamp {
239 #[prost(int64, optional, tag = "1")]
240 pub value: ::core::option::Option<i64>,
241}
242#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
243pub struct EndTimestamp {
244 #[prost(int64, optional, tag = "1")]
245 pub value: ::core::option::Option<i64>,
246}
247#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
248pub struct OutputBytes {
249 #[prost(uint64, tag = "1")]
250 pub value: u64,
251}
252#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
253pub struct OutputBatches {
254 #[prost(uint64, tag = "1")]
255 pub value: u64,
256}
257#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
258pub struct NamedPruningMetrics {
259 #[prost(string, tag = "1")]
260 pub name: ::prost::alloc::string::String,
261 #[prost(uint64, tag = "2")]
262 pub pruned: u64,
263 #[prost(uint64, tag = "3")]
264 pub matched: u64,
265}
266#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
267pub struct NamedRatio {
268 #[prost(string, tag = "1")]
269 pub name: ::prost::alloc::string::String,
270 #[prost(uint64, tag = "2")]
271 pub part: u64,
272 #[prost(uint64, tag = "3")]
273 pub total: u64,
274}
275#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
276pub struct BytesCount {
277 #[prost(string, tag = "1")]
278 pub name: ::prost::alloc::string::String,
279 #[prost(uint64, tag = "2")]
280 pub value: u64,
281}
282#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
283pub struct MinLatency {
284 #[prost(string, tag = "1")]
285 pub name: ::prost::alloc::string::String,
286 #[prost(uint64, tag = "2")]
287 pub value: u64,
288}
289#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
290pub struct MaxLatency {
291 #[prost(string, tag = "1")]
292 pub name: ::prost::alloc::string::String,
293 #[prost(uint64, tag = "2")]
294 pub value: u64,
295}
296#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
297pub struct AvgLatency {
298 #[prost(string, tag = "1")]
299 pub name: ::prost::alloc::string::String,
300 #[prost(uint64, tag = "2")]
301 pub nanos_sum: u64,
302 #[prost(uint64, tag = "3")]
303 pub count: u64,
304}
305#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
306pub struct FirstLatency {
307 #[prost(string, tag = "1")]
308 pub name: ::prost::alloc::string::String,
309 #[prost(uint64, tag = "2")]
310 pub value: u64,
311}
312#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
313pub struct PercentileLatency {
314 #[prost(string, tag = "1")]
315 pub name: ::prost::alloc::string::String,
316 #[prost(bytes = "vec", tag = "4")]
317 pub sketch_bytes: ::prost::alloc::vec::Vec<u8>,
318}
319pub mod worker_service_client {
321 #![allow(
322 unused_variables,
323 dead_code,
324 missing_docs,
325 clippy::wildcard_imports,
326 clippy::let_unit_value
327 )]
328 use tonic::codegen::http::Uri;
329 use tonic::codegen::*;
330 #[derive(Debug, Clone)]
331 pub struct WorkerServiceClient<T> {
332 inner: tonic::client::Grpc<T>,
333 }
334 impl WorkerServiceClient<tonic::transport::Channel> {
335 pub async fn connect<D>(dst: D) -> Result<Self, tonic::transport::Error>
337 where
338 D: TryInto<tonic::transport::Endpoint>,
339 D::Error: Into<StdError>,
340 {
341 let conn = tonic::transport::Endpoint::new(dst)?.connect().await?;
342 Ok(Self::new(conn))
343 }
344 }
345 impl<T> WorkerServiceClient<T>
346 where
347 T: tonic::client::GrpcService<tonic::body::Body>,
348 T::Error: Into<StdError>,
349 T::ResponseBody: Body<Data = Bytes> + std::marker::Send + 'static,
350 <T::ResponseBody as Body>::Error: Into<StdError> + std::marker::Send,
351 {
352 pub fn new(inner: T) -> Self {
353 let inner = tonic::client::Grpc::new(inner);
354 Self { inner }
355 }
356 pub fn with_origin(inner: T, origin: Uri) -> Self {
357 let inner = tonic::client::Grpc::with_origin(inner, origin);
358 Self { inner }
359 }
360 pub fn with_interceptor<F>(
361 inner: T,
362 interceptor: F,
363 ) -> WorkerServiceClient<InterceptedService<T, F>>
364 where
365 F: tonic::service::Interceptor,
366 T::ResponseBody: Default,
367 T: tonic::codegen::Service<
368 http::Request<tonic::body::Body>,
369 Response = http::Response<
370 <T as tonic::client::GrpcService<tonic::body::Body>>::ResponseBody,
371 >,
372 >,
373 <T as tonic::codegen::Service<http::Request<tonic::body::Body>>>::Error:
374 Into<StdError> + std::marker::Send + std::marker::Sync,
375 {
376 WorkerServiceClient::new(InterceptedService::new(inner, interceptor))
377 }
378 #[must_use]
383 pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
384 self.inner = self.inner.send_compressed(encoding);
385 self
386 }
387 #[must_use]
389 pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
390 self.inner = self.inner.accept_compressed(encoding);
391 self
392 }
393 #[must_use]
397 pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
398 self.inner = self.inner.max_decoding_message_size(limit);
399 self
400 }
401 #[must_use]
405 pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
406 self.inner = self.inner.max_encoding_message_size(limit);
407 self
408 }
409 pub async fn coordinator_channel(
413 &mut self,
414 request: impl tonic::IntoStreamingRequest<Message = super::CoordinatorToWorkerMsg>,
415 ) -> std::result::Result<
416 tonic::Response<tonic::codec::Streaming<super::WorkerToCoordinatorMsg>>,
417 tonic::Status,
418 > {
419 self.inner.ready().await.map_err(|e| {
420 tonic::Status::unknown(format!("Service was not ready: {}", e.into()))
421 })?;
422 let codec = tonic_prost::ProstCodec::default();
423 let path =
424 http::uri::PathAndQuery::from_static("/worker.WorkerService/CoordinatorChannel");
425 let mut req = request.into_streaming_request();
426 req.extensions_mut().insert(GrpcMethod::new(
427 "worker.WorkerService",
428 "CoordinatorChannel",
429 ));
430 self.inner.streaming(req, path, codec).await
431 }
432 pub async fn execute_task(
434 &mut self,
435 request: impl tonic::IntoRequest<super::ExecuteTaskRequest>,
436 ) -> std::result::Result<
437 tonic::Response<tonic::codec::Streaming<::arrow_flight::FlightData>>,
438 tonic::Status,
439 > {
440 self.inner.ready().await.map_err(|e| {
441 tonic::Status::unknown(format!("Service was not ready: {}", e.into()))
442 })?;
443 let codec = tonic_prost::ProstCodec::default();
444 let path = http::uri::PathAndQuery::from_static("/worker.WorkerService/ExecuteTask");
445 let mut req = request.into_request();
446 req.extensions_mut()
447 .insert(GrpcMethod::new("worker.WorkerService", "ExecuteTask"));
448 self.inner.server_streaming(req, path, codec).await
449 }
450 pub async fn get_worker_info(
452 &mut self,
453 request: impl tonic::IntoRequest<super::GetWorkerInfoRequest>,
454 ) -> std::result::Result<tonic::Response<super::GetWorkerInfoResponse>, tonic::Status>
455 {
456 self.inner.ready().await.map_err(|e| {
457 tonic::Status::unknown(format!("Service was not ready: {}", e.into()))
458 })?;
459 let codec = tonic_prost::ProstCodec::default();
460 let path = http::uri::PathAndQuery::from_static("/worker.WorkerService/GetWorkerInfo");
461 let mut req = request.into_request();
462 req.extensions_mut()
463 .insert(GrpcMethod::new("worker.WorkerService", "GetWorkerInfo"));
464 self.inner.unary(req, path, codec).await
465 }
466 }
467}
468pub mod worker_service_server {
470 #![allow(
471 unused_variables,
472 dead_code,
473 missing_docs,
474 clippy::wildcard_imports,
475 clippy::let_unit_value
476 )]
477 use tonic::codegen::*;
478 #[async_trait]
480 pub trait WorkerService: std::marker::Send + std::marker::Sync + 'static {
481 type CoordinatorChannelStream: tonic::codegen::tokio_stream::Stream<
483 Item = std::result::Result<super::WorkerToCoordinatorMsg, tonic::Status>,
484 > + std::marker::Send
485 + 'static;
486 async fn coordinator_channel(
490 &self,
491 request: tonic::Request<tonic::Streaming<super::CoordinatorToWorkerMsg>>,
492 ) -> std::result::Result<tonic::Response<Self::CoordinatorChannelStream>, tonic::Status>;
493 type ExecuteTaskStream: tonic::codegen::tokio_stream::Stream<
495 Item = std::result::Result<::arrow_flight::FlightData, tonic::Status>,
496 > + std::marker::Send
497 + 'static;
498 async fn execute_task(
500 &self,
501 request: tonic::Request<super::ExecuteTaskRequest>,
502 ) -> std::result::Result<tonic::Response<Self::ExecuteTaskStream>, tonic::Status>;
503 async fn get_worker_info(
505 &self,
506 request: tonic::Request<super::GetWorkerInfoRequest>,
507 ) -> std::result::Result<tonic::Response<super::GetWorkerInfoResponse>, tonic::Status>;
508 }
509 #[derive(Debug)]
510 pub struct WorkerServiceServer<T> {
511 inner: Arc<T>,
512 accept_compression_encodings: EnabledCompressionEncodings,
513 send_compression_encodings: EnabledCompressionEncodings,
514 max_decoding_message_size: Option<usize>,
515 max_encoding_message_size: Option<usize>,
516 }
517 impl<T> WorkerServiceServer<T> {
518 pub fn new(inner: T) -> Self {
519 Self::from_arc(Arc::new(inner))
520 }
521 pub fn from_arc(inner: Arc<T>) -> Self {
522 Self {
523 inner,
524 accept_compression_encodings: Default::default(),
525 send_compression_encodings: Default::default(),
526 max_decoding_message_size: None,
527 max_encoding_message_size: None,
528 }
529 }
530 pub fn with_interceptor<F>(inner: T, interceptor: F) -> InterceptedService<Self, F>
531 where
532 F: tonic::service::Interceptor,
533 {
534 InterceptedService::new(Self::new(inner), interceptor)
535 }
536 #[must_use]
538 pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
539 self.accept_compression_encodings.enable(encoding);
540 self
541 }
542 #[must_use]
544 pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
545 self.send_compression_encodings.enable(encoding);
546 self
547 }
548 #[must_use]
552 pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
553 self.max_decoding_message_size = Some(limit);
554 self
555 }
556 #[must_use]
560 pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
561 self.max_encoding_message_size = Some(limit);
562 self
563 }
564 }
565 impl<T, B> tonic::codegen::Service<http::Request<B>> for WorkerServiceServer<T>
566 where
567 T: WorkerService,
568 B: Body + std::marker::Send + 'static,
569 B::Error: Into<StdError> + std::marker::Send + 'static,
570 {
571 type Response = http::Response<tonic::body::Body>;
572 type Error = std::convert::Infallible;
573 type Future = BoxFuture<Self::Response, Self::Error>;
574 fn poll_ready(
575 &mut self,
576 _cx: &mut Context<'_>,
577 ) -> Poll<std::result::Result<(), Self::Error>> {
578 Poll::Ready(Ok(()))
579 }
580 fn call(&mut self, req: http::Request<B>) -> Self::Future {
581 match req.uri().path() {
582 "/worker.WorkerService/CoordinatorChannel" => {
583 #[allow(non_camel_case_types)]
584 struct CoordinatorChannelSvc<T: WorkerService>(pub Arc<T>);
585 impl<T: WorkerService>
586 tonic::server::StreamingService<super::CoordinatorToWorkerMsg>
587 for CoordinatorChannelSvc<T>
588 {
589 type Response = super::WorkerToCoordinatorMsg;
590 type ResponseStream = T::CoordinatorChannelStream;
591 type Future =
592 BoxFuture<tonic::Response<Self::ResponseStream>, tonic::Status>;
593 fn call(
594 &mut self,
595 request: tonic::Request<
596 tonic::Streaming<super::CoordinatorToWorkerMsg>,
597 >,
598 ) -> Self::Future {
599 let inner = Arc::clone(&self.0);
600 let fut = async move {
601 <T as WorkerService>::coordinator_channel(&inner, request).await
602 };
603 Box::pin(fut)
604 }
605 }
606 let accept_compression_encodings = self.accept_compression_encodings;
607 let send_compression_encodings = self.send_compression_encodings;
608 let max_decoding_message_size = self.max_decoding_message_size;
609 let max_encoding_message_size = self.max_encoding_message_size;
610 let inner = self.inner.clone();
611 let fut = async move {
612 let method = CoordinatorChannelSvc(inner);
613 let codec = tonic_prost::ProstCodec::default();
614 let mut grpc = tonic::server::Grpc::new(codec)
615 .apply_compression_config(
616 accept_compression_encodings,
617 send_compression_encodings,
618 )
619 .apply_max_message_size_config(
620 max_decoding_message_size,
621 max_encoding_message_size,
622 );
623 let res = grpc.streaming(method, req).await;
624 Ok(res)
625 };
626 Box::pin(fut)
627 }
628 "/worker.WorkerService/ExecuteTask" => {
629 #[allow(non_camel_case_types)]
630 struct ExecuteTaskSvc<T: WorkerService>(pub Arc<T>);
631 impl<T: WorkerService>
632 tonic::server::ServerStreamingService<super::ExecuteTaskRequest>
633 for ExecuteTaskSvc<T>
634 {
635 type Response = ::arrow_flight::FlightData;
636 type ResponseStream = T::ExecuteTaskStream;
637 type Future =
638 BoxFuture<tonic::Response<Self::ResponseStream>, tonic::Status>;
639 fn call(
640 &mut self,
641 request: tonic::Request<super::ExecuteTaskRequest>,
642 ) -> Self::Future {
643 let inner = Arc::clone(&self.0);
644 let fut = async move {
645 <T as WorkerService>::execute_task(&inner, request).await
646 };
647 Box::pin(fut)
648 }
649 }
650 let accept_compression_encodings = self.accept_compression_encodings;
651 let send_compression_encodings = self.send_compression_encodings;
652 let max_decoding_message_size = self.max_decoding_message_size;
653 let max_encoding_message_size = self.max_encoding_message_size;
654 let inner = self.inner.clone();
655 let fut = async move {
656 let method = ExecuteTaskSvc(inner);
657 let codec = tonic_prost::ProstCodec::default();
658 let mut grpc = tonic::server::Grpc::new(codec)
659 .apply_compression_config(
660 accept_compression_encodings,
661 send_compression_encodings,
662 )
663 .apply_max_message_size_config(
664 max_decoding_message_size,
665 max_encoding_message_size,
666 );
667 let res = grpc.server_streaming(method, req).await;
668 Ok(res)
669 };
670 Box::pin(fut)
671 }
672 "/worker.WorkerService/GetWorkerInfo" => {
673 #[allow(non_camel_case_types)]
674 struct GetWorkerInfoSvc<T: WorkerService>(pub Arc<T>);
675 impl<T: WorkerService> tonic::server::UnaryService<super::GetWorkerInfoRequest>
676 for GetWorkerInfoSvc<T>
677 {
678 type Response = super::GetWorkerInfoResponse;
679 type Future = BoxFuture<tonic::Response<Self::Response>, tonic::Status>;
680 fn call(
681 &mut self,
682 request: tonic::Request<super::GetWorkerInfoRequest>,
683 ) -> Self::Future {
684 let inner = Arc::clone(&self.0);
685 let fut = async move {
686 <T as WorkerService>::get_worker_info(&inner, request).await
687 };
688 Box::pin(fut)
689 }
690 }
691 let accept_compression_encodings = self.accept_compression_encodings;
692 let send_compression_encodings = self.send_compression_encodings;
693 let max_decoding_message_size = self.max_decoding_message_size;
694 let max_encoding_message_size = self.max_encoding_message_size;
695 let inner = self.inner.clone();
696 let fut = async move {
697 let method = GetWorkerInfoSvc(inner);
698 let codec = tonic_prost::ProstCodec::default();
699 let mut grpc = tonic::server::Grpc::new(codec)
700 .apply_compression_config(
701 accept_compression_encodings,
702 send_compression_encodings,
703 )
704 .apply_max_message_size_config(
705 max_decoding_message_size,
706 max_encoding_message_size,
707 );
708 let res = grpc.unary(method, req).await;
709 Ok(res)
710 };
711 Box::pin(fut)
712 }
713 _ => Box::pin(async move {
714 let mut response = http::Response::new(tonic::body::Body::default());
715 let headers = response.headers_mut();
716 headers.insert(
717 tonic::Status::GRPC_STATUS,
718 (tonic::Code::Unimplemented as i32).into(),
719 );
720 headers.insert(
721 http::header::CONTENT_TYPE,
722 tonic::metadata::GRPC_CONTENT_TYPE,
723 );
724 Ok(response)
725 }),
726 }
727 }
728 }
729 impl<T> Clone for WorkerServiceServer<T> {
730 fn clone(&self) -> Self {
731 let inner = self.inner.clone();
732 Self {
733 inner,
734 accept_compression_encodings: self.accept_compression_encodings,
735 send_compression_encodings: self.send_compression_encodings,
736 max_decoding_message_size: self.max_decoding_message_size,
737 max_encoding_message_size: self.max_encoding_message_size,
738 }
739 }
740 }
741 pub const SERVICE_NAME: &str = "worker.WorkerService";
743 impl<T> tonic::server::NamedService for WorkerServiceServer<T> {
744 const NAME: &'static str = SERVICE_NAME;
745 }
746}