1#[derive(Clone, PartialEq, ::prost::Message)]
3pub struct CoordinatorToWorkerMsg {
4 #[prost(oneof = "coordinator_to_worker_msg::Inner", tags = "1, 2")]
5 pub inner: ::core::option::Option<coordinator_to_worker_msg::Inner>,
6}
7pub mod coordinator_to_worker_msg {
9 #[derive(Clone, PartialEq, ::prost::Oneof)]
10 pub enum Inner {
11 #[prost(message, tag = "1")]
14 SetPlanRequest(super::SetPlanRequest),
15 #[prost(message, tag = "2")]
19 WorkUnitBatch(super::WorkUnitBatch),
20 }
21}
22#[derive(Clone, PartialEq, ::prost::Message)]
23pub struct WorkerToCoordinatorMsg {
24 #[prost(oneof = "worker_to_coordinator_msg::Inner", tags = "1")]
25 pub inner: ::core::option::Option<worker_to_coordinator_msg::Inner>,
26}
27pub mod worker_to_coordinator_msg {
29 #[derive(Clone, PartialEq, ::prost::Oneof)]
30 pub enum Inner {
31 #[prost(message, tag = "1")]
36 TaskMetrics(super::TaskMetrics),
37 }
38}
39#[derive(Clone, PartialEq, ::prost::Message)]
40pub struct TaskMetrics {
41 #[prost(message, repeated, tag = "1")]
45 pub pre_order_plan_metrics: ::prost::alloc::vec::Vec<MetricsSet>,
46 #[prost(message, optional, tag = "2")]
50 pub task_metrics: ::core::option::Option<MetricsSet>,
51}
52#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
53pub struct GetWorkerInfoRequest {}
54#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
55pub struct GetWorkerInfoResponse {
56 #[prost(string, tag = "1")]
57 pub version: ::prost::alloc::string::String,
58}
59#[derive(Clone, PartialEq, ::prost::Message)]
60pub struct SetPlanRequest {
61 #[prost(message, optional, tag = "1")]
63 pub task_key: ::core::option::Option<TaskKey>,
64 #[prost(uint64, tag = "2")]
66 pub task_count: u64,
67 #[prost(bytes = "vec", tag = "3")]
69 pub plan_proto: ::prost::alloc::vec::Vec<u8>,
70 #[prost(message, repeated, tag = "4")]
76 pub work_unit_feed_declarations:
77 ::prost::alloc::vec::Vec<set_plan_request::WorkUnitFeedDeclaration>,
78 #[prost(string, tag = "5")]
81 pub target_worker_url: ::prost::alloc::string::String,
82 #[prost(uint64, tag = "6")]
85 pub query_start_time_ns: u64,
86}
87pub mod set_plan_request {
89 #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
90 pub struct WorkUnitFeedDeclaration {
91 #[prost(bytes = "vec", tag = "1")]
93 pub id: ::prost::alloc::vec::Vec<u8>,
94 #[prost(uint64, tag = "2")]
96 pub partitions: u64,
97 }
98}
99#[derive(Clone, PartialEq, ::prost::Message)]
100pub struct WorkUnitBatch {
101 #[prost(message, repeated, tag = "1")]
103 pub batch: ::prost::alloc::vec::Vec<WorkUnit>,
104}
105#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
106pub struct WorkUnit {
107 #[prost(bytes = "vec", tag = "1")]
109 pub id: ::prost::alloc::vec::Vec<u8>,
110 #[prost(uint64, tag = "2")]
112 pub partition: u64,
113 #[prost(bytes = "vec", tag = "3")]
115 pub body: ::prost::alloc::vec::Vec<u8>,
116 #[prost(uint64, tag = "4")]
118 pub created_timestamp_unix_nanos: u64,
119 #[prost(uint64, tag = "5")]
121 pub sent_timestamp_unix_nanos: u64,
122 #[prost(uint64, tag = "6")]
124 pub received_timestamp_unix_nanos: u64,
125 #[prost(uint64, tag = "7")]
127 pub processed_timestamp_unix_nanos: u64,
128}
129#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
130pub struct ExecuteTaskRequest {
131 #[prost(message, optional, tag = "1")]
133 pub task_key: ::core::option::Option<TaskKey>,
134 #[prost(uint64, tag = "2")]
136 pub target_partition_start: u64,
137 #[prost(uint64, tag = "3")]
139 pub target_partition_end: u64,
140 #[prost(oneof = "execute_task_request::ProducerHead", tags = "6, 7, 8")]
147 pub producer_head: ::core::option::Option<execute_task_request::ProducerHead>,
148}
149pub mod execute_task_request {
151 #[derive(Clone, PartialEq, Eq, Hash, ::prost::Oneof)]
158 pub enum ProducerHead {
159 #[prost(message, tag = "6")]
161 None(super::NoneHead),
162 #[prost(message, tag = "7")]
164 Broadcast(super::BroadcastExecHead),
165 #[prost(message, tag = "8")]
167 Repartition(super::RepartitionExecHead),
168 }
169}
170#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
172pub struct NoneHead {}
173#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
175pub struct BroadcastExecHead {
176 #[prost(uint64, tag = "1")]
178 pub output_partitions: u64,
179}
180#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
182pub struct RepartitionExecHead {
183 #[prost(bytes = "vec", tag = "1")]
185 pub partitioning: ::prost::alloc::vec::Vec<u8>,
186}
187#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
189pub struct TaskKey {
190 #[prost(bytes = "vec", tag = "1")]
192 pub query_id: ::prost::alloc::vec::Vec<u8>,
193 #[prost(uint64, tag = "2")]
195 pub stage_id: u64,
196 #[prost(uint64, tag = "3")]
198 pub task_number: u64,
199}
200#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
202pub struct FlightAppMetadata {
203 #[prost(uint64, tag = "1")]
204 pub partition: u64,
205 #[prost(uint64, tag = "2")]
207 pub created_timestamp_unix_nanos: u64,
208}
209#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
211pub struct Label {
212 #[prost(string, tag = "1")]
213 pub name: ::prost::alloc::string::String,
214 #[prost(string, tag = "2")]
215 pub value: ::prost::alloc::string::String,
216}
217#[derive(Clone, PartialEq, ::prost::Message)]
219pub struct Metric {
220 #[prost(message, repeated, tag = "1")]
221 pub labels: ::prost::alloc::vec::Vec<Label>,
222 #[prost(uint64, optional, tag = "2")]
223 pub partition: ::core::option::Option<u64>,
224 #[prost(
225 oneof = "metric::Value",
226 tags = "10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34"
227 )]
228 pub value: ::core::option::Option<metric::Value>,
229}
230pub mod metric {
232 #[derive(Clone, PartialEq, Eq, Hash, ::prost::Oneof)]
233 pub enum Value {
234 #[prost(message, tag = "10")]
235 OutputRows(super::OutputRows),
236 #[prost(message, tag = "11")]
237 ElapsedCompute(super::ElapsedCompute),
238 #[prost(message, tag = "12")]
239 SpillCount(super::SpillCount),
240 #[prost(message, tag = "13")]
241 SpilledBytes(super::SpilledBytes),
242 #[prost(message, tag = "14")]
243 SpilledRows(super::SpilledRows),
244 #[prost(message, tag = "15")]
245 CurrentMemoryUsage(super::CurrentMemoryUsage),
246 #[prost(message, tag = "16")]
247 Count(super::NamedCount),
248 #[prost(message, tag = "17")]
249 Gauge(super::NamedGauge),
250 #[prost(message, tag = "18")]
251 Time(super::NamedTime),
252 #[prost(message, tag = "19")]
253 StartTimestamp(super::StartTimestamp),
254 #[prost(message, tag = "20")]
255 EndTimestamp(super::EndTimestamp),
256 #[prost(message, tag = "21")]
257 OutputBytes(super::OutputBytes),
258 #[prost(message, tag = "22")]
259 OutputBatches(super::OutputBatches),
260 #[prost(message, tag = "23")]
261 PruningMetrics(super::NamedPruningMetrics),
262 #[prost(message, tag = "24")]
263 Ratio(super::NamedRatio),
264 #[prost(message, tag = "25")]
265 CustomMinLatency(super::MinLatency),
266 #[prost(message, tag = "26")]
267 CustomMaxLatency(super::MaxLatency),
268 #[prost(message, tag = "27")]
269 CustomAvgLatency(super::AvgLatency),
270 #[prost(message, tag = "28")]
271 CustomFirstLatency(super::FirstLatency),
272 #[prost(message, tag = "29")]
273 CustomBytesCount(super::BytesCount),
274 #[prost(message, tag = "30")]
275 CustomP50Latency(super::PercentileLatency),
276 #[prost(message, tag = "31")]
277 CustomP75Latency(super::PercentileLatency),
278 #[prost(message, tag = "32")]
279 CustomP95Latency(super::PercentileLatency),
280 #[prost(message, tag = "33")]
281 CustomP99Latency(super::PercentileLatency),
282 #[prost(message, tag = "34")]
283 CustomMaxGauge(super::MaxGauge),
284 }
285}
286#[derive(Clone, PartialEq, ::prost::Message)]
289pub struct MetricsSet {
290 #[prost(message, repeated, tag = "1")]
291 pub metrics: ::prost::alloc::vec::Vec<Metric>,
292}
293#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
294pub struct OutputRows {
295 #[prost(uint64, tag = "1")]
296 pub value: u64,
297}
298#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
299pub struct ElapsedCompute {
300 #[prost(uint64, tag = "1")]
301 pub value: u64,
302}
303#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
304pub struct SpillCount {
305 #[prost(uint64, tag = "1")]
306 pub value: u64,
307}
308#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
309pub struct SpilledBytes {
310 #[prost(uint64, tag = "1")]
311 pub value: u64,
312}
313#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
314pub struct SpilledRows {
315 #[prost(uint64, tag = "1")]
316 pub value: u64,
317}
318#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
319pub struct CurrentMemoryUsage {
320 #[prost(uint64, tag = "1")]
321 pub value: u64,
322}
323#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
324pub struct NamedCount {
325 #[prost(string, tag = "1")]
326 pub name: ::prost::alloc::string::String,
327 #[prost(uint64, tag = "2")]
328 pub value: u64,
329}
330#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
331pub struct NamedGauge {
332 #[prost(string, tag = "1")]
333 pub name: ::prost::alloc::string::String,
334 #[prost(uint64, tag = "2")]
335 pub value: u64,
336}
337#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
338pub struct NamedTime {
339 #[prost(string, tag = "1")]
340 pub name: ::prost::alloc::string::String,
341 #[prost(uint64, tag = "2")]
342 pub value: u64,
343}
344#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
345pub struct StartTimestamp {
346 #[prost(int64, optional, tag = "1")]
347 pub value: ::core::option::Option<i64>,
348}
349#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
350pub struct EndTimestamp {
351 #[prost(int64, optional, tag = "1")]
352 pub value: ::core::option::Option<i64>,
353}
354#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
355pub struct OutputBytes {
356 #[prost(uint64, tag = "1")]
357 pub value: u64,
358}
359#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
360pub struct OutputBatches {
361 #[prost(uint64, tag = "1")]
362 pub value: u64,
363}
364#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
365pub struct NamedPruningMetrics {
366 #[prost(string, tag = "1")]
367 pub name: ::prost::alloc::string::String,
368 #[prost(uint64, tag = "2")]
369 pub pruned: u64,
370 #[prost(uint64, tag = "3")]
371 pub matched: u64,
372}
373#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
374pub struct NamedRatio {
375 #[prost(string, tag = "1")]
376 pub name: ::prost::alloc::string::String,
377 #[prost(uint64, tag = "2")]
378 pub part: u64,
379 #[prost(uint64, tag = "3")]
380 pub total: u64,
381}
382#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
383pub struct BytesCount {
384 #[prost(string, tag = "1")]
385 pub name: ::prost::alloc::string::String,
386 #[prost(uint64, tag = "2")]
387 pub value: u64,
388}
389#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
390pub struct MinLatency {
391 #[prost(string, tag = "1")]
392 pub name: ::prost::alloc::string::String,
393 #[prost(uint64, tag = "2")]
394 pub value: u64,
395}
396#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
397pub struct MaxLatency {
398 #[prost(string, tag = "1")]
399 pub name: ::prost::alloc::string::String,
400 #[prost(uint64, tag = "2")]
401 pub value: u64,
402}
403#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
404pub struct AvgLatency {
405 #[prost(string, tag = "1")]
406 pub name: ::prost::alloc::string::String,
407 #[prost(uint64, tag = "2")]
408 pub nanos_sum: u64,
409 #[prost(uint64, tag = "3")]
410 pub count: u64,
411}
412#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
413pub struct FirstLatency {
414 #[prost(string, tag = "1")]
415 pub name: ::prost::alloc::string::String,
416 #[prost(uint64, tag = "2")]
417 pub value: u64,
418}
419#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
420pub struct PercentileLatency {
421 #[prost(string, tag = "1")]
422 pub name: ::prost::alloc::string::String,
423 #[prost(bytes = "vec", tag = "4")]
424 pub sketch_bytes: ::prost::alloc::vec::Vec<u8>,
425}
426#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
427pub struct MaxGauge {
428 #[prost(string, tag = "1")]
429 pub name: ::prost::alloc::string::String,
430 #[prost(uint64, tag = "2")]
431 pub value: u64,
432}
433pub mod worker_service_client {
435 #![allow(
436 unused_variables,
437 dead_code,
438 missing_docs,
439 clippy::wildcard_imports,
440 clippy::let_unit_value
441 )]
442 use tonic::codegen::http::Uri;
443 use tonic::codegen::*;
444 #[derive(Debug, Clone)]
445 pub struct WorkerServiceClient<T> {
446 inner: tonic::client::Grpc<T>,
447 }
448 impl WorkerServiceClient<tonic::transport::Channel> {
449 pub async fn connect<D>(dst: D) -> Result<Self, tonic::transport::Error>
451 where
452 D: TryInto<tonic::transport::Endpoint>,
453 D::Error: Into<StdError>,
454 {
455 let conn = tonic::transport::Endpoint::new(dst)?.connect().await?;
456 Ok(Self::new(conn))
457 }
458 }
459 impl<T> WorkerServiceClient<T>
460 where
461 T: tonic::client::GrpcService<tonic::body::Body>,
462 T::Error: Into<StdError>,
463 T::ResponseBody: Body<Data = Bytes> + std::marker::Send + 'static,
464 <T::ResponseBody as Body>::Error: Into<StdError> + std::marker::Send,
465 {
466 pub fn new(inner: T) -> Self {
467 let inner = tonic::client::Grpc::new(inner);
468 Self { inner }
469 }
470 pub fn with_origin(inner: T, origin: Uri) -> Self {
471 let inner = tonic::client::Grpc::with_origin(inner, origin);
472 Self { inner }
473 }
474 pub fn with_interceptor<F>(
475 inner: T,
476 interceptor: F,
477 ) -> WorkerServiceClient<InterceptedService<T, F>>
478 where
479 F: tonic::service::Interceptor,
480 T::ResponseBody: Default,
481 T: tonic::codegen::Service<
482 http::Request<tonic::body::Body>,
483 Response = http::Response<
484 <T as tonic::client::GrpcService<tonic::body::Body>>::ResponseBody,
485 >,
486 >,
487 <T as tonic::codegen::Service<http::Request<tonic::body::Body>>>::Error:
488 Into<StdError> + std::marker::Send + std::marker::Sync,
489 {
490 WorkerServiceClient::new(InterceptedService::new(inner, interceptor))
491 }
492 #[must_use]
497 pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
498 self.inner = self.inner.send_compressed(encoding);
499 self
500 }
501 #[must_use]
503 pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
504 self.inner = self.inner.accept_compressed(encoding);
505 self
506 }
507 #[must_use]
511 pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
512 self.inner = self.inner.max_decoding_message_size(limit);
513 self
514 }
515 #[must_use]
519 pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
520 self.inner = self.inner.max_encoding_message_size(limit);
521 self
522 }
523 pub async fn coordinator_channel(
527 &mut self,
528 request: impl tonic::IntoStreamingRequest<Message = super::CoordinatorToWorkerMsg>,
529 ) -> std::result::Result<
530 tonic::Response<tonic::codec::Streaming<super::WorkerToCoordinatorMsg>>,
531 tonic::Status,
532 > {
533 self.inner.ready().await.map_err(|e| {
534 tonic::Status::unknown(format!("Service was not ready: {}", e.into()))
535 })?;
536 let codec = tonic_prost::ProstCodec::default();
537 let path =
538 http::uri::PathAndQuery::from_static("/worker.WorkerService/CoordinatorChannel");
539 let mut req = request.into_streaming_request();
540 req.extensions_mut().insert(GrpcMethod::new(
541 "worker.WorkerService",
542 "CoordinatorChannel",
543 ));
544 self.inner.streaming(req, path, codec).await
545 }
546 pub async fn execute_task(
548 &mut self,
549 request: impl tonic::IntoRequest<super::ExecuteTaskRequest>,
550 ) -> std::result::Result<
551 tonic::Response<tonic::codec::Streaming<::arrow_flight::FlightData>>,
552 tonic::Status,
553 > {
554 self.inner.ready().await.map_err(|e| {
555 tonic::Status::unknown(format!("Service was not ready: {}", e.into()))
556 })?;
557 let codec = tonic_prost::ProstCodec::default();
558 let path = http::uri::PathAndQuery::from_static("/worker.WorkerService/ExecuteTask");
559 let mut req = request.into_request();
560 req.extensions_mut()
561 .insert(GrpcMethod::new("worker.WorkerService", "ExecuteTask"));
562 self.inner.server_streaming(req, path, codec).await
563 }
564 pub async fn get_worker_info(
566 &mut self,
567 request: impl tonic::IntoRequest<super::GetWorkerInfoRequest>,
568 ) -> std::result::Result<tonic::Response<super::GetWorkerInfoResponse>, tonic::Status>
569 {
570 self.inner.ready().await.map_err(|e| {
571 tonic::Status::unknown(format!("Service was not ready: {}", e.into()))
572 })?;
573 let codec = tonic_prost::ProstCodec::default();
574 let path = http::uri::PathAndQuery::from_static("/worker.WorkerService/GetWorkerInfo");
575 let mut req = request.into_request();
576 req.extensions_mut()
577 .insert(GrpcMethod::new("worker.WorkerService", "GetWorkerInfo"));
578 self.inner.unary(req, path, codec).await
579 }
580 }
581}
582pub mod worker_service_server {
584 #![allow(
585 unused_variables,
586 dead_code,
587 missing_docs,
588 clippy::wildcard_imports,
589 clippy::let_unit_value
590 )]
591 use tonic::codegen::*;
592 #[async_trait]
594 pub trait WorkerService: std::marker::Send + std::marker::Sync + 'static {
595 type CoordinatorChannelStream: tonic::codegen::tokio_stream::Stream<
597 Item = std::result::Result<super::WorkerToCoordinatorMsg, tonic::Status>,
598 > + std::marker::Send
599 + 'static;
600 async fn coordinator_channel(
604 &self,
605 request: tonic::Request<tonic::Streaming<super::CoordinatorToWorkerMsg>>,
606 ) -> std::result::Result<tonic::Response<Self::CoordinatorChannelStream>, tonic::Status>;
607 type ExecuteTaskStream: tonic::codegen::tokio_stream::Stream<
609 Item = std::result::Result<::arrow_flight::FlightData, tonic::Status>,
610 > + std::marker::Send
611 + 'static;
612 async fn execute_task(
614 &self,
615 request: tonic::Request<super::ExecuteTaskRequest>,
616 ) -> std::result::Result<tonic::Response<Self::ExecuteTaskStream>, tonic::Status>;
617 async fn get_worker_info(
619 &self,
620 request: tonic::Request<super::GetWorkerInfoRequest>,
621 ) -> std::result::Result<tonic::Response<super::GetWorkerInfoResponse>, tonic::Status>;
622 }
623 #[derive(Debug)]
624 pub struct WorkerServiceServer<T> {
625 inner: Arc<T>,
626 accept_compression_encodings: EnabledCompressionEncodings,
627 send_compression_encodings: EnabledCompressionEncodings,
628 max_decoding_message_size: Option<usize>,
629 max_encoding_message_size: Option<usize>,
630 }
631 impl<T> WorkerServiceServer<T> {
632 pub fn new(inner: T) -> Self {
633 Self::from_arc(Arc::new(inner))
634 }
635 pub fn from_arc(inner: Arc<T>) -> Self {
636 Self {
637 inner,
638 accept_compression_encodings: Default::default(),
639 send_compression_encodings: Default::default(),
640 max_decoding_message_size: None,
641 max_encoding_message_size: None,
642 }
643 }
644 pub fn with_interceptor<F>(inner: T, interceptor: F) -> InterceptedService<Self, F>
645 where
646 F: tonic::service::Interceptor,
647 {
648 InterceptedService::new(Self::new(inner), interceptor)
649 }
650 #[must_use]
652 pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
653 self.accept_compression_encodings.enable(encoding);
654 self
655 }
656 #[must_use]
658 pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
659 self.send_compression_encodings.enable(encoding);
660 self
661 }
662 #[must_use]
666 pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
667 self.max_decoding_message_size = Some(limit);
668 self
669 }
670 #[must_use]
674 pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
675 self.max_encoding_message_size = Some(limit);
676 self
677 }
678 }
679 impl<T, B> tonic::codegen::Service<http::Request<B>> for WorkerServiceServer<T>
680 where
681 T: WorkerService,
682 B: Body + std::marker::Send + 'static,
683 B::Error: Into<StdError> + std::marker::Send + 'static,
684 {
685 type Response = http::Response<tonic::body::Body>;
686 type Error = std::convert::Infallible;
687 type Future = BoxFuture<Self::Response, Self::Error>;
688 fn poll_ready(
689 &mut self,
690 _cx: &mut Context<'_>,
691 ) -> Poll<std::result::Result<(), Self::Error>> {
692 Poll::Ready(Ok(()))
693 }
694 fn call(&mut self, req: http::Request<B>) -> Self::Future {
695 match req.uri().path() {
696 "/worker.WorkerService/CoordinatorChannel" => {
697 #[allow(non_camel_case_types)]
698 struct CoordinatorChannelSvc<T: WorkerService>(pub Arc<T>);
699 impl<T: WorkerService>
700 tonic::server::StreamingService<super::CoordinatorToWorkerMsg>
701 for CoordinatorChannelSvc<T>
702 {
703 type Response = super::WorkerToCoordinatorMsg;
704 type ResponseStream = T::CoordinatorChannelStream;
705 type Future =
706 BoxFuture<tonic::Response<Self::ResponseStream>, tonic::Status>;
707 fn call(
708 &mut self,
709 request: tonic::Request<
710 tonic::Streaming<super::CoordinatorToWorkerMsg>,
711 >,
712 ) -> Self::Future {
713 let inner = Arc::clone(&self.0);
714 let fut = async move {
715 <T as WorkerService>::coordinator_channel(&inner, request).await
716 };
717 Box::pin(fut)
718 }
719 }
720 let accept_compression_encodings = self.accept_compression_encodings;
721 let send_compression_encodings = self.send_compression_encodings;
722 let max_decoding_message_size = self.max_decoding_message_size;
723 let max_encoding_message_size = self.max_encoding_message_size;
724 let inner = self.inner.clone();
725 let fut = async move {
726 let method = CoordinatorChannelSvc(inner);
727 let codec = tonic_prost::ProstCodec::default();
728 let mut grpc = tonic::server::Grpc::new(codec)
729 .apply_compression_config(
730 accept_compression_encodings,
731 send_compression_encodings,
732 )
733 .apply_max_message_size_config(
734 max_decoding_message_size,
735 max_encoding_message_size,
736 );
737 let res = grpc.streaming(method, req).await;
738 Ok(res)
739 };
740 Box::pin(fut)
741 }
742 "/worker.WorkerService/ExecuteTask" => {
743 #[allow(non_camel_case_types)]
744 struct ExecuteTaskSvc<T: WorkerService>(pub Arc<T>);
745 impl<T: WorkerService>
746 tonic::server::ServerStreamingService<super::ExecuteTaskRequest>
747 for ExecuteTaskSvc<T>
748 {
749 type Response = ::arrow_flight::FlightData;
750 type ResponseStream = T::ExecuteTaskStream;
751 type Future =
752 BoxFuture<tonic::Response<Self::ResponseStream>, tonic::Status>;
753 fn call(
754 &mut self,
755 request: tonic::Request<super::ExecuteTaskRequest>,
756 ) -> Self::Future {
757 let inner = Arc::clone(&self.0);
758 let fut = async move {
759 <T as WorkerService>::execute_task(&inner, request).await
760 };
761 Box::pin(fut)
762 }
763 }
764 let accept_compression_encodings = self.accept_compression_encodings;
765 let send_compression_encodings = self.send_compression_encodings;
766 let max_decoding_message_size = self.max_decoding_message_size;
767 let max_encoding_message_size = self.max_encoding_message_size;
768 let inner = self.inner.clone();
769 let fut = async move {
770 let method = ExecuteTaskSvc(inner);
771 let codec = tonic_prost::ProstCodec::default();
772 let mut grpc = tonic::server::Grpc::new(codec)
773 .apply_compression_config(
774 accept_compression_encodings,
775 send_compression_encodings,
776 )
777 .apply_max_message_size_config(
778 max_decoding_message_size,
779 max_encoding_message_size,
780 );
781 let res = grpc.server_streaming(method, req).await;
782 Ok(res)
783 };
784 Box::pin(fut)
785 }
786 "/worker.WorkerService/GetWorkerInfo" => {
787 #[allow(non_camel_case_types)]
788 struct GetWorkerInfoSvc<T: WorkerService>(pub Arc<T>);
789 impl<T: WorkerService> tonic::server::UnaryService<super::GetWorkerInfoRequest>
790 for GetWorkerInfoSvc<T>
791 {
792 type Response = super::GetWorkerInfoResponse;
793 type Future = BoxFuture<tonic::Response<Self::Response>, tonic::Status>;
794 fn call(
795 &mut self,
796 request: tonic::Request<super::GetWorkerInfoRequest>,
797 ) -> Self::Future {
798 let inner = Arc::clone(&self.0);
799 let fut = async move {
800 <T as WorkerService>::get_worker_info(&inner, request).await
801 };
802 Box::pin(fut)
803 }
804 }
805 let accept_compression_encodings = self.accept_compression_encodings;
806 let send_compression_encodings = self.send_compression_encodings;
807 let max_decoding_message_size = self.max_decoding_message_size;
808 let max_encoding_message_size = self.max_encoding_message_size;
809 let inner = self.inner.clone();
810 let fut = async move {
811 let method = GetWorkerInfoSvc(inner);
812 let codec = tonic_prost::ProstCodec::default();
813 let mut grpc = tonic::server::Grpc::new(codec)
814 .apply_compression_config(
815 accept_compression_encodings,
816 send_compression_encodings,
817 )
818 .apply_max_message_size_config(
819 max_decoding_message_size,
820 max_encoding_message_size,
821 );
822 let res = grpc.unary(method, req).await;
823 Ok(res)
824 };
825 Box::pin(fut)
826 }
827 _ => Box::pin(async move {
828 let mut response = http::Response::new(tonic::body::Body::default());
829 let headers = response.headers_mut();
830 headers.insert(
831 tonic::Status::GRPC_STATUS,
832 (tonic::Code::Unimplemented as i32).into(),
833 );
834 headers.insert(
835 http::header::CONTENT_TYPE,
836 tonic::metadata::GRPC_CONTENT_TYPE,
837 );
838 Ok(response)
839 }),
840 }
841 }
842 }
843 impl<T> Clone for WorkerServiceServer<T> {
844 fn clone(&self) -> Self {
845 let inner = self.inner.clone();
846 Self {
847 inner,
848 accept_compression_encodings: self.accept_compression_encodings,
849 send_compression_encodings: self.send_compression_encodings,
850 max_decoding_message_size: self.max_decoding_message_size,
851 max_encoding_message_size: self.max_encoding_message_size,
852 }
853 }
854 }
855 pub const SERVICE_NAME: &str = "worker.WorkerService";
857 impl<T> tonic::server::NamedService for WorkerServiceServer<T> {
858 const NAME: &'static str = SERVICE_NAME;
859 }
860}