Skip to main content

temporalio_client/
grpc.rs

1//! gRPC service traits for direct access to Temporal services.
2//!
3//! Most users should use the higher-level methods on [`Client`] or [`Connection`] instead.
4//! These traits are useful for advanced scenarios like custom interceptors, testing with mocks,
5//! or making raw gRPC calls not covered by the higher-level API.
6
7use crate::{
8    Client, Connection, LONG_POLL_TIMEOUT, RequestExt, SharedReplaceableClient,
9    TEMPORAL_NAMESPACE_HEADER_KEY, TemporalServiceClient,
10    metrics::namespace_kv,
11    retry::make_future_retry,
12    worker::{ClientWorkerSet, Slot},
13};
14use dyn_clone::DynClone;
15use futures_util::{FutureExt, TryFutureExt, future::BoxFuture};
16use std::{any::Any, marker::PhantomData, sync::Arc};
17use temporalio_common::{
18    protos::{
19        grpc::health::v1::{health_client::HealthClient, *},
20        temporal::api::{
21            cloud::cloudservice::{v1 as cloudreq, v1::cloud_service_client::CloudServiceClient},
22            operatorservice::v1::{operator_service_client::OperatorServiceClient, *},
23            taskqueue::v1::TaskQueue,
24            testservice::v1::{test_service_client::TestServiceClient, *},
25            workflowservice::v1::{workflow_service_client::WorkflowServiceClient, *},
26        },
27    },
28    telemetry::metrics::MetricKeyValue,
29};
30use tonic::{
31    Request, Response, Status,
32    body::Body,
33    client::GrpcService,
34    metadata::{AsciiMetadataValue, KeyAndValueRef},
35};
36
37/// Something that has access to the raw grpc services
38pub(crate) trait RawClientProducer {
39    /// Returns information about workers associated with this client. Implementers outside of
40    /// core can safely return `None`.
41    fn get_workers_info(&self) -> Option<Arc<ClientWorkerSet>>;
42
43    /// Return a workflow service client instance
44    fn workflow_client(&mut self) -> Box<dyn WorkflowService>;
45
46    /// Return a mutable ref to the operator service client instance
47    fn operator_client(&mut self) -> Box<dyn OperatorService>;
48
49    /// Return a mutable ref to the cloud service client instance
50    fn cloud_client(&mut self) -> Box<dyn CloudService>;
51
52    /// Return a mutable ref to the test service client instance
53    fn test_client(&mut self) -> Box<dyn TestService>;
54
55    /// Return a mutable ref to the health service client instance
56    fn health_client(&mut self) -> Box<dyn HealthService>;
57}
58
59/// Any client that can make gRPC calls. The default implementation simply invokes the passed-in
60/// function. Implementers may override this to provide things like retry behavior.
61#[async_trait::async_trait]
62pub(crate) trait RawGrpcCaller: Send + Sync + 'static {
63    /// Make a gRPC call. The default implementation simply invokes the provided function.
64    async fn call<F, Req, Resp>(
65        &mut self,
66        _call_name: &'static str,
67        mut callfn: F,
68        req: Request<Req>,
69    ) -> Result<Response<Resp>, Status>
70    where
71        Req: Clone + Unpin + Send + Sync + 'static,
72        Resp: Send + 'static,
73        F: FnMut(Request<Req>) -> BoxFuture<'static, Result<Response<Resp>, Status>>,
74        F: Send + Sync + Unpin + 'static,
75    {
76        callfn(req).await
77    }
78}
79
80trait ErasedRawClient: Send + Sync + 'static {
81    fn erased_call(
82        &mut self,
83        call_name: &'static str,
84        op: &mut dyn ErasedCallOp,
85    ) -> BoxFuture<'static, Result<Response<Box<dyn Any + Send>>, Status>>;
86}
87
88trait ErasedCallOp: Send {
89    fn invoke(
90        &mut self,
91        raw: &mut dyn ErasedRawClient,
92        call_name: &'static str,
93    ) -> BoxFuture<'static, Result<Response<Box<dyn Any + Send>>, Status>>;
94}
95
96struct CallShim<F, Req, Resp> {
97    callfn: F,
98    seed_req: Option<Request<Req>>,
99    _resp: PhantomData<Resp>,
100}
101
102impl<F, Req, Resp> CallShim<F, Req, Resp> {
103    fn new(callfn: F, seed_req: Request<Req>) -> Self {
104        Self {
105            callfn,
106            seed_req: Some(seed_req),
107            _resp: PhantomData,
108        }
109    }
110}
111impl<F, Req, Resp> ErasedCallOp for CallShim<F, Req, Resp>
112where
113    Req: Clone + Unpin + Send + Sync + 'static,
114    Resp: Send + 'static,
115    F: FnMut(Request<Req>) -> BoxFuture<'static, Result<Response<Resp>, Status>>,
116    F: Send + Sync + Unpin + 'static,
117{
118    fn invoke(
119        &mut self,
120        _raw: &mut dyn ErasedRawClient,
121        _call_name: &'static str,
122    ) -> BoxFuture<'static, Result<Response<Box<dyn Any + Send>>, Status>> {
123        (self.callfn)(
124            self.seed_req
125                .take()
126                .expect("CallShim must have request populated"),
127        )
128        .map(|res| res.map(|payload| payload.map(|t| Box::new(t) as Box<dyn Any + Send>)))
129        .boxed()
130    }
131}
132
133#[async_trait::async_trait]
134impl RawGrpcCaller for Connection {
135    async fn call<F, Req, Resp>(
136        &mut self,
137        call_name: &'static str,
138        mut callfn: F,
139        mut req: Request<Req>,
140    ) -> Result<Response<Resp>, Status>
141    where
142        Req: Clone + Unpin + Send + Sync + 'static,
143        Resp: Send + 'static,
144        F: FnMut(Request<Req>) -> BoxFuture<'static, Result<Response<Resp>, Status>>,
145        F: Send + Sync + Unpin + 'static,
146    {
147        let info = self
148            .inner
149            .retry_options
150            .get_call_info(call_name, Some(&req));
151        req.extensions_mut().insert(info.call_type);
152        if info.call_type.is_long() {
153            req.set_default_timeout(LONG_POLL_TIMEOUT);
154        }
155
156        let fact = || {
157            let req_clone = req_cloner(&req);
158            callfn(req_clone)
159        };
160
161        let res = make_future_retry(info, fact);
162        res.map_err(|(e, _attempt)| e).map_ok(|x| x.0).await
163    }
164}
165
166/// Helper for cloning a tonic request as long as the inner message may be cloned.
167fn req_cloner<T: Clone>(cloneme: &Request<T>) -> Request<T> {
168    let msg = cloneme.get_ref().clone();
169    let mut new_req = Request::new(msg);
170    let new_met = new_req.metadata_mut();
171    for kv in cloneme.metadata().iter() {
172        match kv {
173            KeyAndValueRef::Ascii(k, v) => {
174                new_met.insert(k, v.clone());
175            }
176            KeyAndValueRef::Binary(k, v) => {
177                new_met.insert_bin(k, v.clone());
178            }
179        }
180    }
181    *new_req.extensions_mut() = cloneme.extensions().clone();
182    new_req
183}
184
185#[async_trait::async_trait]
186impl RawGrpcCaller for dyn ErasedRawClient {
187    async fn call<F, Req, Resp>(
188        &mut self,
189        call_name: &'static str,
190        callfn: F,
191        req: Request<Req>,
192    ) -> Result<Response<Resp>, Status>
193    where
194        Req: Clone + Unpin + Send + Sync + 'static,
195        Resp: Send + 'static,
196        F: FnMut(Request<Req>) -> BoxFuture<'static, Result<Response<Resp>, Status>>,
197        F: Send + Sync + Unpin + 'static,
198    {
199        let mut shim = CallShim::new(callfn, req);
200        let erased_resp = ErasedRawClient::erased_call(self, call_name, &mut shim).await?;
201        Ok(erased_resp.map(|boxed| {
202            *boxed
203                .downcast()
204                .expect("RawGrpcCaller erased response type mismatch")
205        }))
206    }
207}
208
209impl<T> ErasedRawClient for T
210where
211    T: RawGrpcCaller + 'static,
212{
213    fn erased_call(
214        &mut self,
215        call_name: &'static str,
216        op: &mut dyn ErasedCallOp,
217    ) -> BoxFuture<'static, Result<Response<Box<dyn Any + Send>>, Status>> {
218        let raw: &mut dyn ErasedRawClient = self;
219        op.invoke(raw, call_name)
220    }
221}
222
223impl RawClientProducer for Connection {
224    fn get_workers_info(&self) -> Option<Arc<ClientWorkerSet>> {
225        Some(self.workers())
226    }
227
228    fn workflow_client(&mut self) -> Box<dyn WorkflowService> {
229        self.inner.service.workflow_service()
230    }
231
232    fn operator_client(&mut self) -> Box<dyn OperatorService> {
233        self.inner.service.operator_service()
234    }
235
236    fn cloud_client(&mut self) -> Box<dyn CloudService> {
237        self.inner.service.cloud_service()
238    }
239
240    fn test_client(&mut self) -> Box<dyn TestService> {
241        self.inner.service.test_service()
242    }
243
244    fn health_client(&mut self) -> Box<dyn HealthService> {
245        self.inner.service.health_service()
246    }
247}
248
249impl RawClientProducer for TemporalServiceClient {
250    fn get_workers_info(&self) -> Option<Arc<ClientWorkerSet>> {
251        None
252    }
253
254    fn workflow_client(&mut self) -> Box<dyn WorkflowService> {
255        self.workflow_service()
256    }
257
258    fn operator_client(&mut self) -> Box<dyn OperatorService> {
259        self.operator_service()
260    }
261
262    fn cloud_client(&mut self) -> Box<dyn CloudService> {
263        self.cloud_service()
264    }
265
266    fn test_client(&mut self) -> Box<dyn TestService> {
267        self.test_service()
268    }
269
270    fn health_client(&mut self) -> Box<dyn HealthService> {
271        self.health_service()
272    }
273}
274
275impl RawGrpcCaller for TemporalServiceClient {}
276
277impl RawClientProducer for Client {
278    fn get_workers_info(&self) -> Option<Arc<ClientWorkerSet>> {
279        Some(self.connection.workers())
280    }
281
282    fn workflow_client(&mut self) -> Box<dyn WorkflowService> {
283        self.connection.workflow_client()
284    }
285
286    fn operator_client(&mut self) -> Box<dyn OperatorService> {
287        self.connection.operator_client()
288    }
289
290    fn cloud_client(&mut self) -> Box<dyn CloudService> {
291        self.connection.cloud_client()
292    }
293
294    fn test_client(&mut self) -> Box<dyn TestService> {
295        self.connection.test_client()
296    }
297
298    fn health_client(&mut self) -> Box<dyn HealthService> {
299        self.connection.health_client()
300    }
301}
302
303#[async_trait::async_trait]
304impl RawGrpcCaller for Client {
305    async fn call<F, Req, Resp>(
306        &mut self,
307        call_name: &'static str,
308        callfn: F,
309        req: Request<Req>,
310    ) -> Result<Response<Resp>, Status>
311    where
312        Req: Clone + Unpin + Send + Sync + 'static,
313        Resp: Send + 'static,
314        F: FnMut(Request<Req>) -> BoxFuture<'static, Result<Response<Resp>, Status>>,
315        F: Send + Sync + Unpin + 'static,
316    {
317        self.connection.call(call_name, callfn, req).await
318    }
319}
320
321impl<RC> RawClientProducer for SharedReplaceableClient<RC>
322where
323    RC: RawClientProducer + Clone + Send + Sync + 'static,
324{
325    fn get_workers_info(&self) -> Option<Arc<ClientWorkerSet>> {
326        self.inner_cow().get_workers_info()
327    }
328    fn workflow_client(&mut self) -> Box<dyn WorkflowService> {
329        self.inner_mut_refreshed().workflow_client()
330    }
331
332    fn operator_client(&mut self) -> Box<dyn OperatorService> {
333        self.inner_mut_refreshed().operator_client()
334    }
335
336    fn cloud_client(&mut self) -> Box<dyn CloudService> {
337        self.inner_mut_refreshed().cloud_client()
338    }
339
340    fn test_client(&mut self) -> Box<dyn TestService> {
341        self.inner_mut_refreshed().test_client()
342    }
343
344    fn health_client(&mut self) -> Box<dyn HealthService> {
345        self.inner_mut_refreshed().health_client()
346    }
347}
348
349#[async_trait::async_trait]
350impl<RC> RawGrpcCaller for SharedReplaceableClient<RC>
351where
352    RC: RawGrpcCaller + Clone + Sync + 'static,
353{
354    async fn call<F, Req, Resp>(
355        &mut self,
356        call_name: &'static str,
357        callfn: F,
358        req: Request<Req>,
359    ) -> Result<Response<Resp>, Status>
360    where
361        Req: Clone + Unpin + Send + Sync + 'static,
362        Resp: Send + 'static,
363        F: FnMut(Request<Req>) -> BoxFuture<'static, Result<Response<Resp>, Status>>,
364        F: Send + Sync + Unpin + 'static,
365    {
366        self.inner_mut_refreshed()
367            .call(call_name, callfn, req)
368            .await
369    }
370}
371
372#[derive(Clone, Debug)]
373pub(super) struct AttachMetricLabels {
374    pub(super) labels: Vec<MetricKeyValue>,
375    pub(super) normal_task_queue: Option<String>,
376    pub(super) sticky_task_queue: Option<String>,
377}
378impl AttachMetricLabels {
379    pub(super) fn new(kvs: impl Into<Vec<MetricKeyValue>>) -> Self {
380        Self {
381            labels: kvs.into(),
382            normal_task_queue: None,
383            sticky_task_queue: None,
384        }
385    }
386    pub(super) fn namespace(ns: impl Into<String>) -> Self {
387        AttachMetricLabels::new(vec![namespace_kv(ns.into())])
388    }
389    pub(super) fn task_q(&mut self, tq: Option<TaskQueue>) -> &mut Self {
390        if let Some(tq) = tq {
391            if !tq.normal_name.is_empty() {
392                self.sticky_task_queue = Some(tq.name);
393                self.normal_task_queue = Some(tq.normal_name);
394            } else {
395                self.normal_task_queue = Some(tq.name);
396            }
397        }
398        self
399    }
400    pub(super) fn task_q_str(&mut self, tq: impl Into<String>) -> &mut Self {
401        self.normal_task_queue = Some(tq.into());
402        self
403    }
404}
405
406/// A request extension that, when set, should make the [RetryClient] consider this call to be a
407/// [super::retry::CallType::UserLongPoll]
408#[derive(Copy, Clone, Debug)]
409pub(super) struct IsUserLongPoll;
410
411macro_rules! proxy_def {
412    ($client_type:tt, $client_meth:ident, $method:ident, $req:ty, $resp:ty, defaults) => {
413        #[doc = concat!("See [", stringify!($client_type), "::", stringify!($method), "]")]
414        fn $method(
415            &mut self,
416            _request: tonic::Request<$req>,
417        ) -> BoxFuture<'_, Result<tonic::Response<$resp>, tonic::Status>> {
418            async { Ok(tonic::Response::new(<$resp>::default())) }.boxed()
419        }
420    };
421    ($client_type:tt, $client_meth:ident, $method:ident, $req:ty, $resp:ty) => {
422        #[doc = concat!("See [", stringify!($client_type), "::", stringify!($method), "]")]
423        fn $method(
424            &mut self,
425            _request: tonic::Request<$req>,
426        ) -> BoxFuture<'_, Result<tonic::Response<$resp>, tonic::Status>>;
427    };
428}
429
430/// Helps re-declare gRPC client methods
431///
432/// There are four forms:
433///
434/// * The first takes a closure that can modify the request. This is only called once, before the
435///   actual rpc call is made, and before determinations are made about the kind of call (long poll
436///   or not) and retry policy.
437/// * The second takes three closures. The first can modify the request like in the first form.
438///   The second can modify the request and return a value, and is called right before every call
439///   (including on retries). The third is called with the response to the call after it resolves.
440/// * The third and fourth are equivalents of the above that skip calling through the `call` method
441///   and are implemented directly on the generated gRPC clients (IE: the bottom of the stack).
442macro_rules! proxy_impl {
443    ($client_type:tt, $client_meth:ident, $method:ident, $req:ty, $resp:ty $(, $closure:expr)?) => {
444        #[doc = concat!("See [", stringify!($client_type), "::", stringify!($method), "]")]
445        fn $method(
446            &mut self,
447            #[allow(unused_mut)]
448            mut request: tonic::Request<$req>,
449        ) -> BoxFuture<'_, Result<tonic::Response<$resp>, tonic::Status>> {
450            $( type_closure_arg(&mut request, $closure); )*
451            let mut self_clone = self.clone();
452            #[allow(unused_mut)]
453            let fact = move |mut req: tonic::Request<$req>| {
454                let mut c = self_clone.$client_meth();
455                async move { c.$method(req).await }.boxed()
456            };
457            self.call(stringify!($method), fact, request)
458        }
459    };
460    ($client_type:tt, $client_meth:ident, $method:ident, $req:ty, $resp:ty,
461     $closure_request:expr, $closure_before:expr, $closure_after:expr) => {
462        #[doc = concat!("See [", stringify!($client_type), "::", stringify!($method), "]")]
463        fn $method(
464            &mut self,
465            mut request: tonic::Request<$req>,
466        ) -> BoxFuture<'_, Result<tonic::Response<$resp>, tonic::Status>> {
467            type_closure_arg(&mut request, $closure_request);
468            let workers_info = self.get_workers_info();
469            let mut self_clone = self.clone();
470            #[allow(unused_mut)]
471            let fact = move |mut req: tonic::Request<$req>| {
472                let data = type_closure_two_arg(&mut req, workers_info.clone(), $closure_before);
473                let mut c = self_clone.$client_meth();
474                async move {
475                    type_closure_two_arg(c.$method(req).await, data, $closure_after)
476                }.boxed()
477            };
478            self.call(stringify!($method), fact, request)
479        }
480    };
481    ($client_type:tt, $method:ident, $req:ty, $resp:ty $(, $closure:expr)?) => {
482        #[doc = concat!("See [", stringify!($client_type), "::", stringify!($method), "]")]
483        fn $method(
484            &mut self,
485            #[allow(unused_mut)]
486            mut request: tonic::Request<$req>,
487        ) -> BoxFuture<'_, Result<tonic::Response<$resp>, tonic::Status>> {
488            $( type_closure_arg(&mut request, $closure); )*
489            async move { <$client_type<_>>::$method(self, request).await }.boxed()
490        }
491    };
492    ($client_type:tt, $method:ident, $req:ty, $resp:ty,
493     $closure_request:expr, $closure_before:expr, $closure_after:expr) => {
494        #[doc = concat!("See [", stringify!($client_type), "::", stringify!($method), "]")]
495        fn $method(
496            &mut self,
497            mut request: tonic::Request<$req>,
498        ) -> BoxFuture<'_, Result<tonic::Response<$resp>, tonic::Status>> {
499            type_closure_arg(&mut request, $closure_request);
500            let data = type_closure_two_arg(&mut request, Option::<Arc<ClientWorkerSet>>::None,
501                                            $closure_before);
502            async move {
503                type_closure_two_arg(<$client_type<_>>::$method(self, request).await,
504                                     data, $closure_after)
505            }.boxed()
506        }
507    };
508}
509macro_rules! proxier_impl {
510    ($trait_name:ident; $impl_list_name:ident; $client_type:tt; $client_meth:ident;
511     [$( proxy_def!($($def_args:tt)*); )*];
512     $(($method:ident, $req:ty, $resp:ty
513       $(, $closure:expr $(, $closure_before:expr, $closure_after:expr)?)? );)* ) => {
514        #[cfg(test)]
515        const $impl_list_name: &'static [&'static str] = &[$(stringify!($method)),*];
516
517        #[doc = concat!("Trait version of [", stringify!($client_type), "]")]
518        pub trait $trait_name: Send + Sync + DynClone
519        {
520            $( proxy_def!($($def_args)*); )*
521        }
522        dyn_clone::clone_trait_object!($trait_name);
523
524        // `allow(deprecated)`: upstream proto deprecations (e.g. cloud-api
525        // AddNamespaceRegion) generate calls to deprecated tonic client
526        // methods inside the impls; we still wire them for back-compat.
527        #[allow(deprecated)]
528        impl<RC> $trait_name for RC
529        where
530            RC: RawGrpcCaller + RawClientProducer + Clone + Unpin,
531        {
532            $(
533                proxy_impl!($client_type, $client_meth, $method, $req, $resp
534                            $(,$closure $(,$closure_before, $closure_after)*)*);
535            )*
536        }
537
538        impl<T: Send + Sync + 'static> RawGrpcCaller for $client_type<T> {}
539
540        #[allow(deprecated)]
541        impl<T> $trait_name for $client_type<T>
542        where
543            T: GrpcService<Body> + Clone + Send + Sync + 'static,
544            T::ResponseBody: tonic::codegen::Body<Data = tonic::codegen::Bytes> + Send + 'static,
545            T::Error: Into<tonic::codegen::StdError>,
546            <T::ResponseBody as tonic::codegen::Body>::Error: Into<tonic::codegen::StdError> + Send,
547            <T as tonic::client::GrpcService<Body>>::Future: Send
548        {
549            $(
550                proxy_impl!($client_type, $method, $req, $resp
551                            $(,$closure $(,$closure_before, $closure_after)*)*);
552            )*
553        }
554    };
555}
556
557macro_rules! proxier {
558    ( $trait_name:ident; $impl_list_name:ident; $client_type:tt; $client_meth:ident;
559      $(($method:ident, $req:ty, $resp:ty
560         $(, $closure:expr $(, $closure_before:expr, $closure_after:expr)?)? );)* ) => {
561        proxier_impl!($trait_name; $impl_list_name; $client_type; $client_meth;
562                      [$(proxy_def!($client_type, $client_meth, $method, $req, $resp);)*];
563                      $(($method, $req, $resp $(, $closure $(, $closure_before, $closure_after)?)?);)*);
564    };
565    ( $trait_name:ident; $impl_list_name:ident; $client_type:tt; $client_meth:ident; defaults;
566      $(($method:ident, $req:ty, $resp:ty
567         $(, $closure:expr $(, $closure_before:expr, $closure_after:expr)?)? );)* ) => {
568        proxier_impl!($trait_name; $impl_list_name; $client_type; $client_meth;
569                      [$(proxy_def!($client_type, $client_meth, $method, $req, $resp, defaults);)*];
570                      $(($method, $req, $resp $(, $closure $(, $closure_before, $closure_after)?)?);)*);
571    };
572}
573
574macro_rules! namespaced_request {
575    ($req:ident) => {{
576        let ns_str = $req.get_ref().namespace.clone();
577        // Attach namespace header
578        $req.metadata_mut().insert(
579            TEMPORAL_NAMESPACE_HEADER_KEY,
580            ns_str.parse().unwrap_or_else(|e| {
581                warn!("Unable to parse namespace for header: {e:?}");
582                AsciiMetadataValue::from_static("")
583            }),
584        );
585        // Init metric labels
586        AttachMetricLabels::namespace(ns_str)
587    }};
588}
589
590// Nice little trick to avoid the callsite asking to type the closure parameter
591fn type_closure_arg<T, R>(arg: T, f: impl FnOnce(T) -> R) -> R {
592    f(arg)
593}
594
595fn type_closure_two_arg<T, R, S>(arg1: R, arg2: T, f: impl FnOnce(R, T) -> S) -> S {
596    f(arg1, arg2)
597}
598
599proxier! {
600    WorkflowService; ALL_IMPLEMENTED_WORKFLOW_SERVICE_RPCS; WorkflowServiceClient; workflow_client; defaults;
601    (
602        register_namespace,
603        RegisterNamespaceRequest,
604        RegisterNamespaceResponse,
605        |r| {
606            let labels = namespaced_request!(r);
607            r.extensions_mut().insert(labels);
608        }
609    );
610    (
611        describe_namespace,
612        DescribeNamespaceRequest,
613        DescribeNamespaceResponse,
614        |r| {
615            let labels = namespaced_request!(r);
616            r.extensions_mut().insert(labels);
617        }
618    );
619    (
620        list_namespaces,
621        ListNamespacesRequest,
622        ListNamespacesResponse
623    );
624    (
625        update_namespace,
626        UpdateNamespaceRequest,
627        UpdateNamespaceResponse,
628        |r| {
629            let labels = namespaced_request!(r);
630            r.extensions_mut().insert(labels);
631        }
632    );
633    (
634        deprecate_namespace,
635        DeprecateNamespaceRequest,
636        DeprecateNamespaceResponse,
637        |r| {
638            let labels = namespaced_request!(r);
639            r.extensions_mut().insert(labels);
640        }
641    );
642    (
643        start_workflow_execution,
644        StartWorkflowExecutionRequest,
645        StartWorkflowExecutionResponse,
646        |r| {
647            let mut labels = namespaced_request!(r);
648            labels.task_q(r.get_ref().task_queue.clone());
649            r.extensions_mut().insert(labels);
650        },
651        |r, workers| {
652            if let Some(workers) = workers {
653                let mut slot: Option<Box<dyn Slot + Send>> = None;
654                let req_mut = r.get_mut();
655                if req_mut.request_eager_execution {
656                    let namespace = req_mut.namespace.clone();
657                    let task_queue = req_mut.task_queue.as_ref()
658                                        .map(|tq| tq.name.clone()).unwrap_or_default();
659                    match workers.try_reserve_wft_slot(namespace, task_queue) {
660                        Some(reservation) => {
661                            // Populate eager_worker_deployment_options from the slot reservation
662                            if let Some(opts) = reservation.deployment_options {
663                                req_mut.eager_worker_deployment_options = Some(temporalio_common::protos::temporal::api::deployment::v1::WorkerDeploymentOptions {
664                                    deployment_name: opts.version.deployment_name,
665                                    build_id: opts.version.build_id,
666                                    worker_versioning_mode: if opts.use_worker_versioning {
667                                        temporalio_common::protos::temporal::api::enums::v1::WorkerVersioningMode::Versioned.into()
668                                    } else {
669                                        temporalio_common::protos::temporal::api::enums::v1::WorkerVersioningMode::Unversioned.into()
670                                    },
671                                });                            }
672                            slot = Some(reservation.slot);
673                        }
674                        None => req_mut.request_eager_execution = false
675                    }
676                }
677                slot
678            } else {
679                None
680            }
681        },
682        |resp, slot| {
683            if let Some(s) = slot
684                && let Ok(response) = resp.as_ref()
685                    && let Some(task) = response.get_ref().clone().eager_workflow_task
686                        && let Err(e) = s.schedule_wft(task) {
687                            // This is a latency issue, i.e., the client does not need to handle
688                            //  this error, because the WFT will be retried after a timeout.
689                            warn!(details = ?e, "Eager workflow task rejected by worker.");
690                        }
691            resp
692        }
693    );
694    (
695        get_workflow_execution_history,
696        GetWorkflowExecutionHistoryRequest,
697        GetWorkflowExecutionHistoryResponse,
698        |r| {
699            let labels = namespaced_request!(r);
700            r.extensions_mut().insert(labels);
701            if r.get_ref().wait_new_event {
702                r.extensions_mut().insert(IsUserLongPoll);
703            }
704        }
705    );
706    (
707        get_workflow_execution_history_reverse,
708        GetWorkflowExecutionHistoryReverseRequest,
709        GetWorkflowExecutionHistoryReverseResponse,
710        |r| {
711            let labels = namespaced_request!(r);
712            r.extensions_mut().insert(labels);
713        }
714    );
715    (
716        poll_workflow_task_queue,
717        PollWorkflowTaskQueueRequest,
718        PollWorkflowTaskQueueResponse,
719        |r| {
720            let mut labels = namespaced_request!(r);
721            labels.task_q(r.get_ref().task_queue.clone());
722            r.extensions_mut().insert(labels);
723        }
724    );
725    (
726        respond_workflow_task_completed,
727        RespondWorkflowTaskCompletedRequest,
728        RespondWorkflowTaskCompletedResponse,
729        |r| {
730            let labels = namespaced_request!(r);
731            r.extensions_mut().insert(labels);
732        }
733    );
734    (
735        respond_workflow_task_failed,
736        RespondWorkflowTaskFailedRequest,
737        RespondWorkflowTaskFailedResponse,
738        |r| {
739            let labels = namespaced_request!(r);
740            r.extensions_mut().insert(labels);
741        }
742    );
743    (
744        poll_activity_task_queue,
745        PollActivityTaskQueueRequest,
746        PollActivityTaskQueueResponse,
747        |r| {
748            let mut labels = namespaced_request!(r);
749            labels.task_q(r.get_ref().task_queue.clone());
750            r.extensions_mut().insert(labels);
751        }
752    );
753    (
754        record_activity_task_heartbeat,
755        RecordActivityTaskHeartbeatRequest,
756        RecordActivityTaskHeartbeatResponse,
757        |r| {
758            let labels = namespaced_request!(r);
759            r.extensions_mut().insert(labels);
760        }
761    );
762    (
763        record_activity_task_heartbeat_by_id,
764        RecordActivityTaskHeartbeatByIdRequest,
765        RecordActivityTaskHeartbeatByIdResponse,
766        |r| {
767            let labels = namespaced_request!(r);
768            r.extensions_mut().insert(labels);
769        }
770    );
771    (
772        respond_activity_task_completed,
773        RespondActivityTaskCompletedRequest,
774        RespondActivityTaskCompletedResponse,
775        |r| {
776            let labels = namespaced_request!(r);
777            r.extensions_mut().insert(labels);
778        }
779    );
780    (
781        respond_activity_task_completed_by_id,
782        RespondActivityTaskCompletedByIdRequest,
783        RespondActivityTaskCompletedByIdResponse,
784        |r| {
785            let labels = namespaced_request!(r);
786            r.extensions_mut().insert(labels);
787        }
788    );
789
790    (
791        respond_activity_task_failed,
792        RespondActivityTaskFailedRequest,
793        RespondActivityTaskFailedResponse,
794        |r| {
795            let labels = namespaced_request!(r);
796            r.extensions_mut().insert(labels);
797        }
798    );
799    (
800        respond_activity_task_failed_by_id,
801        RespondActivityTaskFailedByIdRequest,
802        RespondActivityTaskFailedByIdResponse,
803        |r| {
804            let labels = namespaced_request!(r);
805            r.extensions_mut().insert(labels);
806        }
807    );
808    (
809        respond_activity_task_canceled,
810        RespondActivityTaskCanceledRequest,
811        RespondActivityTaskCanceledResponse,
812        |r| {
813            let labels = namespaced_request!(r);
814            r.extensions_mut().insert(labels);
815        }
816    );
817    (
818        respond_activity_task_canceled_by_id,
819        RespondActivityTaskCanceledByIdRequest,
820        RespondActivityTaskCanceledByIdResponse,
821        |r| {
822            let labels = namespaced_request!(r);
823            r.extensions_mut().insert(labels);
824        }
825    );
826    (
827        request_cancel_workflow_execution,
828        RequestCancelWorkflowExecutionRequest,
829        RequestCancelWorkflowExecutionResponse,
830        |r| {
831            let labels = namespaced_request!(r);
832            r.extensions_mut().insert(labels);
833        }
834    );
835    (
836        signal_workflow_execution,
837        SignalWorkflowExecutionRequest,
838        SignalWorkflowExecutionResponse,
839        |r| {
840            let labels = namespaced_request!(r);
841            r.extensions_mut().insert(labels);
842        }
843    );
844    (
845        signal_with_start_workflow_execution,
846        SignalWithStartWorkflowExecutionRequest,
847        SignalWithStartWorkflowExecutionResponse,
848        |r| {
849            let mut labels = namespaced_request!(r);
850            labels.task_q(r.get_ref().task_queue.clone());
851            r.extensions_mut().insert(labels);
852        }
853    );
854    (
855        reset_workflow_execution,
856        ResetWorkflowExecutionRequest,
857        ResetWorkflowExecutionResponse,
858        |r| {
859            let labels = namespaced_request!(r);
860            r.extensions_mut().insert(labels);
861        }
862    );
863    (
864        terminate_workflow_execution,
865        TerminateWorkflowExecutionRequest,
866        TerminateWorkflowExecutionResponse,
867        |r| {
868            let labels = namespaced_request!(r);
869            r.extensions_mut().insert(labels);
870        }
871    );
872    (
873        delete_workflow_execution,
874        DeleteWorkflowExecutionRequest,
875        DeleteWorkflowExecutionResponse,
876        |r| {
877            let labels = namespaced_request!(r);
878            r.extensions_mut().insert(labels);
879        }
880    );
881    (
882        list_open_workflow_executions,
883        ListOpenWorkflowExecutionsRequest,
884        ListOpenWorkflowExecutionsResponse,
885        |r| {
886            let labels = namespaced_request!(r);
887            r.extensions_mut().insert(labels);
888        }
889    );
890    (
891        list_closed_workflow_executions,
892        ListClosedWorkflowExecutionsRequest,
893        ListClosedWorkflowExecutionsResponse,
894        |r| {
895            let labels = namespaced_request!(r);
896            r.extensions_mut().insert(labels);
897        }
898    );
899    (
900        list_workflow_executions,
901        ListWorkflowExecutionsRequest,
902        ListWorkflowExecutionsResponse,
903        |r| {
904            let labels = namespaced_request!(r);
905            r.extensions_mut().insert(labels);
906        }
907    );
908    (
909        list_archived_workflow_executions,
910        ListArchivedWorkflowExecutionsRequest,
911        ListArchivedWorkflowExecutionsResponse,
912        |r| {
913            let labels = namespaced_request!(r);
914            r.extensions_mut().insert(labels);
915        }
916    );
917    (
918        scan_workflow_executions,
919        ScanWorkflowExecutionsRequest,
920        ScanWorkflowExecutionsResponse,
921        |r| {
922            let labels = namespaced_request!(r);
923            r.extensions_mut().insert(labels);
924        }
925    );
926    (
927        count_workflow_executions,
928        CountWorkflowExecutionsRequest,
929        CountWorkflowExecutionsResponse,
930        |r| {
931            let labels = namespaced_request!(r);
932            r.extensions_mut().insert(labels);
933        }
934    );
935    (
936        create_workflow_rule,
937        CreateWorkflowRuleRequest,
938        CreateWorkflowRuleResponse,
939        |r| {
940            let labels = namespaced_request!(r);
941            r.extensions_mut().insert(labels);
942        }
943    );
944    (
945        describe_workflow_rule,
946        DescribeWorkflowRuleRequest,
947        DescribeWorkflowRuleResponse,
948        |r| {
949            let labels = namespaced_request!(r);
950            r.extensions_mut().insert(labels);
951        }
952    );
953    (
954        delete_workflow_rule,
955        DeleteWorkflowRuleRequest,
956        DeleteWorkflowRuleResponse,
957        |r| {
958            let labels = namespaced_request!(r);
959            r.extensions_mut().insert(labels);
960        }
961    );
962    (
963        list_workflow_rules,
964        ListWorkflowRulesRequest,
965        ListWorkflowRulesResponse,
966        |r| {
967            let labels = namespaced_request!(r);
968            r.extensions_mut().insert(labels);
969        }
970    );
971    (
972        trigger_workflow_rule,
973        TriggerWorkflowRuleRequest,
974        TriggerWorkflowRuleResponse,
975        |r| {
976            let labels = namespaced_request!(r);
977            r.extensions_mut().insert(labels);
978        }
979    );
980    (
981        get_search_attributes,
982        GetSearchAttributesRequest,
983        GetSearchAttributesResponse
984    );
985    (
986        respond_query_task_completed,
987        RespondQueryTaskCompletedRequest,
988        RespondQueryTaskCompletedResponse,
989        |r| {
990            let labels = namespaced_request!(r);
991            r.extensions_mut().insert(labels);
992        }
993    );
994    (
995        reset_sticky_task_queue,
996        ResetStickyTaskQueueRequest,
997        ResetStickyTaskQueueResponse,
998        |r| {
999            let labels = namespaced_request!(r);
1000            r.extensions_mut().insert(labels);
1001        }
1002    );
1003    (
1004        query_workflow,
1005        QueryWorkflowRequest,
1006        QueryWorkflowResponse,
1007        |r| {
1008            let labels = namespaced_request!(r);
1009            r.extensions_mut().insert(labels);
1010        }
1011    );
1012    (
1013        describe_workflow_execution,
1014        DescribeWorkflowExecutionRequest,
1015        DescribeWorkflowExecutionResponse,
1016        |r| {
1017            let labels = namespaced_request!(r);
1018            r.extensions_mut().insert(labels);
1019        }
1020    );
1021    (
1022        describe_task_queue,
1023        DescribeTaskQueueRequest,
1024        DescribeTaskQueueResponse,
1025        |r| {
1026            let mut labels = namespaced_request!(r);
1027            labels.task_q(r.get_ref().task_queue.clone());
1028            r.extensions_mut().insert(labels);
1029        }
1030    );
1031    (
1032        get_cluster_info,
1033        GetClusterInfoRequest,
1034        GetClusterInfoResponse
1035    );
1036    (
1037        get_system_info,
1038        GetSystemInfoRequest,
1039        GetSystemInfoResponse
1040    );
1041    (
1042        list_task_queue_partitions,
1043        ListTaskQueuePartitionsRequest,
1044        ListTaskQueuePartitionsResponse,
1045        |r| {
1046            let mut labels = namespaced_request!(r);
1047            labels.task_q(r.get_ref().task_queue.clone());
1048            r.extensions_mut().insert(labels);
1049        }
1050    );
1051    (
1052        create_schedule,
1053        CreateScheduleRequest,
1054        CreateScheduleResponse,
1055        |r| {
1056            let labels = namespaced_request!(r);
1057            r.extensions_mut().insert(labels);
1058        }
1059    );
1060    (
1061        describe_schedule,
1062        DescribeScheduleRequest,
1063        DescribeScheduleResponse,
1064        |r| {
1065            let labels = namespaced_request!(r);
1066            r.extensions_mut().insert(labels);
1067        }
1068    );
1069    (
1070        update_schedule,
1071        UpdateScheduleRequest,
1072        UpdateScheduleResponse,
1073        |r| {
1074            let labels = namespaced_request!(r);
1075            r.extensions_mut().insert(labels);
1076        }
1077    );
1078    (
1079        patch_schedule,
1080        PatchScheduleRequest,
1081        PatchScheduleResponse,
1082        |r| {
1083            let labels = namespaced_request!(r);
1084            r.extensions_mut().insert(labels);
1085        }
1086    );
1087    (
1088        list_schedule_matching_times,
1089        ListScheduleMatchingTimesRequest,
1090        ListScheduleMatchingTimesResponse,
1091        |r| {
1092            let labels = namespaced_request!(r);
1093            r.extensions_mut().insert(labels);
1094        }
1095    );
1096    (
1097        delete_schedule,
1098        DeleteScheduleRequest,
1099        DeleteScheduleResponse,
1100        |r| {
1101            let labels = namespaced_request!(r);
1102            r.extensions_mut().insert(labels);
1103        }
1104    );
1105    (
1106        list_schedules,
1107        ListSchedulesRequest,
1108        ListSchedulesResponse,
1109        |r| {
1110            let labels = namespaced_request!(r);
1111            r.extensions_mut().insert(labels);
1112        }
1113    );
1114    (
1115        count_schedules,
1116        CountSchedulesRequest,
1117        CountSchedulesResponse,
1118        |r| {
1119            let labels = namespaced_request!(r);
1120            r.extensions_mut().insert(labels);
1121        }
1122    );
1123    (
1124        update_worker_build_id_compatibility,
1125        UpdateWorkerBuildIdCompatibilityRequest,
1126        UpdateWorkerBuildIdCompatibilityResponse,
1127        |r| {
1128            let mut labels = namespaced_request!(r);
1129            labels.task_q_str(r.get_ref().task_queue.clone());
1130            r.extensions_mut().insert(labels);
1131        }
1132    );
1133    (
1134        get_worker_build_id_compatibility,
1135        GetWorkerBuildIdCompatibilityRequest,
1136        GetWorkerBuildIdCompatibilityResponse,
1137        |r| {
1138            let mut labels = namespaced_request!(r);
1139            labels.task_q_str(r.get_ref().task_queue.clone());
1140            r.extensions_mut().insert(labels);
1141        }
1142    );
1143    (
1144        get_worker_task_reachability,
1145        GetWorkerTaskReachabilityRequest,
1146        GetWorkerTaskReachabilityResponse,
1147        |r| {
1148            let labels = namespaced_request!(r);
1149            r.extensions_mut().insert(labels);
1150        }
1151    );
1152    (
1153        update_workflow_execution,
1154        UpdateWorkflowExecutionRequest,
1155        UpdateWorkflowExecutionResponse,
1156        |r| {
1157            let labels = namespaced_request!(r);
1158            let exts = r.extensions_mut();
1159            exts.insert(labels);
1160            exts.insert(IsUserLongPoll);
1161        }
1162    );
1163    (
1164        poll_workflow_execution_update,
1165        PollWorkflowExecutionUpdateRequest,
1166        PollWorkflowExecutionUpdateResponse,
1167        |r| {
1168            let labels = namespaced_request!(r);
1169            r.extensions_mut().insert(labels);
1170        }
1171    );
1172    (
1173        start_batch_operation,
1174        StartBatchOperationRequest,
1175        StartBatchOperationResponse,
1176        |r| {
1177            let labels = namespaced_request!(r);
1178            r.extensions_mut().insert(labels);
1179        }
1180    );
1181    (
1182        stop_batch_operation,
1183        StopBatchOperationRequest,
1184        StopBatchOperationResponse,
1185        |r| {
1186            let labels = namespaced_request!(r);
1187            r.extensions_mut().insert(labels);
1188        }
1189    );
1190    (
1191        describe_batch_operation,
1192        DescribeBatchOperationRequest,
1193        DescribeBatchOperationResponse,
1194        |r| {
1195            let labels = namespaced_request!(r);
1196            r.extensions_mut().insert(labels);
1197        }
1198    );
1199    (
1200        describe_deployment,
1201        DescribeDeploymentRequest,
1202        DescribeDeploymentResponse,
1203        |r| {
1204            let labels = namespaced_request!(r);
1205            r.extensions_mut().insert(labels);
1206        }
1207    );
1208    (
1209        list_batch_operations,
1210        ListBatchOperationsRequest,
1211        ListBatchOperationsResponse,
1212        |r| {
1213            let labels = namespaced_request!(r);
1214            r.extensions_mut().insert(labels);
1215        }
1216    );
1217    (
1218        list_deployments,
1219        ListDeploymentsRequest,
1220        ListDeploymentsResponse,
1221        |r| {
1222            let labels = namespaced_request!(r);
1223            r.extensions_mut().insert(labels);
1224        }
1225    );
1226    (
1227        execute_multi_operation,
1228        ExecuteMultiOperationRequest,
1229        ExecuteMultiOperationResponse,
1230        |r| {
1231            let labels = namespaced_request!(r);
1232            r.extensions_mut().insert(labels);
1233        }
1234    );
1235    (
1236        get_current_deployment,
1237        GetCurrentDeploymentRequest,
1238        GetCurrentDeploymentResponse,
1239        |r| {
1240            let labels = namespaced_request!(r);
1241            r.extensions_mut().insert(labels);
1242        }
1243    );
1244    (
1245        get_deployment_reachability,
1246        GetDeploymentReachabilityRequest,
1247        GetDeploymentReachabilityResponse,
1248        |r| {
1249            let labels = namespaced_request!(r);
1250            r.extensions_mut().insert(labels);
1251        }
1252    );
1253    (
1254        get_worker_versioning_rules,
1255        GetWorkerVersioningRulesRequest,
1256        GetWorkerVersioningRulesResponse,
1257        |r| {
1258            let mut labels = namespaced_request!(r);
1259            labels.task_q_str(&r.get_ref().task_queue);
1260            r.extensions_mut().insert(labels);
1261        }
1262    );
1263    (
1264        update_worker_versioning_rules,
1265        UpdateWorkerVersioningRulesRequest,
1266        UpdateWorkerVersioningRulesResponse,
1267        |r| {
1268            let mut labels = namespaced_request!(r);
1269            labels.task_q_str(&r.get_ref().task_queue);
1270            r.extensions_mut().insert(labels);
1271        }
1272    );
1273    (
1274        poll_nexus_task_queue,
1275        PollNexusTaskQueueRequest,
1276        PollNexusTaskQueueResponse,
1277        |r| {
1278            let mut labels = namespaced_request!(r);
1279            labels.task_q(r.get_ref().task_queue.clone());
1280            r.extensions_mut().insert(labels);
1281        }
1282    );
1283    (
1284        respond_nexus_task_completed,
1285        RespondNexusTaskCompletedRequest,
1286        RespondNexusTaskCompletedResponse,
1287        |r| {
1288            let labels = namespaced_request!(r);
1289            r.extensions_mut().insert(labels);
1290        }
1291    );
1292    (
1293        respond_nexus_task_failed,
1294        RespondNexusTaskFailedRequest,
1295        RespondNexusTaskFailedResponse,
1296        |r| {
1297            let labels = namespaced_request!(r);
1298            r.extensions_mut().insert(labels);
1299        }
1300    );
1301    (
1302        set_current_deployment,
1303        SetCurrentDeploymentRequest,
1304        SetCurrentDeploymentResponse,
1305        |r| {
1306            let labels = namespaced_request!(r);
1307            r.extensions_mut().insert(labels);
1308        }
1309    );
1310    (
1311        shutdown_worker,
1312        ShutdownWorkerRequest,
1313        ShutdownWorkerResponse,
1314        |r| {
1315            let labels = namespaced_request!(r);
1316            r.extensions_mut().insert(labels);
1317        }
1318    );
1319    (
1320        update_activity_options,
1321        UpdateActivityOptionsRequest,
1322        UpdateActivityOptionsResponse,
1323        |r| {
1324            let labels = namespaced_request!(r);
1325            r.extensions_mut().insert(labels);
1326        }
1327    );
1328    (
1329        pause_activity,
1330        PauseActivityRequest,
1331        PauseActivityResponse,
1332        |r| {
1333            let labels = namespaced_request!(r);
1334            r.extensions_mut().insert(labels);
1335        }
1336    );
1337    (
1338        unpause_activity,
1339        UnpauseActivityRequest,
1340        UnpauseActivityResponse,
1341        |r| {
1342            let labels = namespaced_request!(r);
1343            r.extensions_mut().insert(labels);
1344        }
1345    );
1346    (
1347        update_workflow_execution_options,
1348        UpdateWorkflowExecutionOptionsRequest,
1349        UpdateWorkflowExecutionOptionsResponse,
1350        |r| {
1351            let labels = namespaced_request!(r);
1352            r.extensions_mut().insert(labels);
1353        }
1354    );
1355    (
1356        reset_activity,
1357        ResetActivityRequest,
1358        ResetActivityResponse,
1359        |r| {
1360            let labels = namespaced_request!(r);
1361            r.extensions_mut().insert(labels);
1362        }
1363    );
1364    (
1365        delete_worker_deployment,
1366        DeleteWorkerDeploymentRequest,
1367        DeleteWorkerDeploymentResponse,
1368        |r| {
1369            let labels = namespaced_request!(r);
1370            r.extensions_mut().insert(labels);
1371        }
1372    );
1373    (
1374        delete_worker_deployment_version,
1375        DeleteWorkerDeploymentVersionRequest,
1376        DeleteWorkerDeploymentVersionResponse,
1377        |r| {
1378            let labels = namespaced_request!(r);
1379            r.extensions_mut().insert(labels);
1380        }
1381    );
1382    (
1383        describe_worker_deployment,
1384        DescribeWorkerDeploymentRequest,
1385        DescribeWorkerDeploymentResponse,
1386        |r| {
1387            let labels = namespaced_request!(r);
1388            r.extensions_mut().insert(labels);
1389        }
1390    );
1391    (
1392        describe_worker_deployment_version,
1393        DescribeWorkerDeploymentVersionRequest,
1394        DescribeWorkerDeploymentVersionResponse,
1395        |r| {
1396            let labels = namespaced_request!(r);
1397            r.extensions_mut().insert(labels);
1398        }
1399    );
1400    (
1401        list_worker_deployments,
1402        ListWorkerDeploymentsRequest,
1403        ListWorkerDeploymentsResponse,
1404        |r| {
1405            let labels = namespaced_request!(r);
1406            r.extensions_mut().insert(labels);
1407        }
1408    );
1409    (
1410        set_worker_deployment_current_version,
1411        SetWorkerDeploymentCurrentVersionRequest,
1412        SetWorkerDeploymentCurrentVersionResponse,
1413        |r| {
1414            let labels = namespaced_request!(r);
1415            r.extensions_mut().insert(labels);
1416        }
1417    );
1418    (
1419        set_worker_deployment_ramping_version,
1420        SetWorkerDeploymentRampingVersionRequest,
1421        SetWorkerDeploymentRampingVersionResponse,
1422        |r| {
1423            let labels = namespaced_request!(r);
1424            r.extensions_mut().insert(labels);
1425        }
1426    );
1427    (
1428        update_worker_deployment_version_metadata,
1429        UpdateWorkerDeploymentVersionMetadataRequest,
1430        UpdateWorkerDeploymentVersionMetadataResponse,
1431        |r| {
1432            let labels = namespaced_request!(r);
1433            r.extensions_mut().insert(labels);
1434        }
1435    );
1436    (
1437        list_workers,
1438        ListWorkersRequest,
1439        ListWorkersResponse,
1440        |r| {
1441            let labels = namespaced_request!(r);
1442            r.extensions_mut().insert(labels);
1443        }
1444    );
1445    (
1446        record_worker_heartbeat,
1447        RecordWorkerHeartbeatRequest,
1448        RecordWorkerHeartbeatResponse,
1449        |r| {
1450            let labels = namespaced_request!(r);
1451            r.extensions_mut().insert(labels);
1452        }
1453    );
1454    (
1455        update_task_queue_config,
1456        UpdateTaskQueueConfigRequest,
1457        UpdateTaskQueueConfigResponse,
1458        |r| {
1459            let mut labels = namespaced_request!(r);
1460            labels.task_q_str(r.get_ref().task_queue.clone());
1461            r.extensions_mut().insert(labels);
1462        }
1463    );
1464    (
1465        fetch_worker_config,
1466        FetchWorkerConfigRequest,
1467        FetchWorkerConfigResponse,
1468        |r| {
1469            let labels = namespaced_request!(r);
1470            r.extensions_mut().insert(labels);
1471        }
1472    );
1473    (
1474        update_worker_config,
1475        UpdateWorkerConfigRequest,
1476        UpdateWorkerConfigResponse,
1477        |r| {
1478            let labels = namespaced_request!(r);
1479            r.extensions_mut().insert(labels);
1480        }
1481    );
1482    (
1483        describe_worker,
1484        DescribeWorkerRequest,
1485        DescribeWorkerResponse,
1486        |r| {
1487            let labels = namespaced_request!(r);
1488            r.extensions_mut().insert(labels);
1489        }
1490    );
1491    (
1492        set_worker_deployment_manager,
1493        SetWorkerDeploymentManagerRequest,
1494        SetWorkerDeploymentManagerResponse,
1495        |r| {
1496            let labels = namespaced_request!(r);
1497            r.extensions_mut().insert(labels);
1498        }
1499    );
1500    (
1501        pause_workflow_execution,
1502        PauseWorkflowExecutionRequest,
1503        PauseWorkflowExecutionResponse,
1504        |r| {
1505            let labels = namespaced_request!(r);
1506            r.extensions_mut().insert(labels);
1507        }
1508    );
1509    (
1510        unpause_workflow_execution,
1511        UnpauseWorkflowExecutionRequest,
1512        UnpauseWorkflowExecutionResponse,
1513        |r| {
1514            let labels = namespaced_request!(r);
1515            r.extensions_mut().insert(labels);
1516        }
1517    );
1518    (
1519        start_activity_execution,
1520        StartActivityExecutionRequest,
1521        StartActivityExecutionResponse,
1522        |r| {
1523            let labels = namespaced_request!(r);
1524            r.extensions_mut().insert(labels);
1525        }
1526    );
1527    (
1528        describe_activity_execution,
1529        DescribeActivityExecutionRequest,
1530        DescribeActivityExecutionResponse,
1531        |r| {
1532            let labels = namespaced_request!(r);
1533            r.extensions_mut().insert(labels);
1534        }
1535    );
1536    (
1537        poll_activity_execution,
1538        PollActivityExecutionRequest,
1539        PollActivityExecutionResponse,
1540        |r| {
1541            let labels = namespaced_request!(r);
1542            r.extensions_mut().insert(labels);
1543        }
1544    );
1545    (
1546        list_activity_executions,
1547        ListActivityExecutionsRequest,
1548        ListActivityExecutionsResponse,
1549        |r| {
1550            let labels = namespaced_request!(r);
1551            r.extensions_mut().insert(labels);
1552        }
1553    );
1554    (
1555        count_activity_executions,
1556        CountActivityExecutionsRequest,
1557        CountActivityExecutionsResponse,
1558        |r| {
1559            let labels = namespaced_request!(r);
1560            r.extensions_mut().insert(labels);
1561        }
1562    );
1563    (
1564        request_cancel_activity_execution,
1565        RequestCancelActivityExecutionRequest,
1566        RequestCancelActivityExecutionResponse,
1567        |r| {
1568            let labels = namespaced_request!(r);
1569            r.extensions_mut().insert(labels);
1570        }
1571    );
1572    (
1573        terminate_activity_execution,
1574        TerminateActivityExecutionRequest,
1575        TerminateActivityExecutionResponse,
1576        |r| {
1577            let labels = namespaced_request!(r);
1578            r.extensions_mut().insert(labels);
1579        }
1580    );
1581    (
1582        delete_activity_execution,
1583        DeleteActivityExecutionRequest,
1584        DeleteActivityExecutionResponse,
1585        |r| {
1586            let labels = namespaced_request!(r);
1587            r.extensions_mut().insert(labels);
1588        }
1589    );
1590    (
1591        count_nexus_operation_executions,
1592        CountNexusOperationExecutionsRequest,
1593        CountNexusOperationExecutionsResponse,
1594        |r| {
1595            let labels = namespaced_request!(r);
1596            r.extensions_mut().insert(labels);
1597        }
1598    );
1599    (
1600        create_worker_deployment,
1601        CreateWorkerDeploymentRequest,
1602        CreateWorkerDeploymentResponse,
1603        |r| {
1604            let labels = namespaced_request!(r);
1605            r.extensions_mut().insert(labels);
1606        }
1607    );
1608    (
1609        create_worker_deployment_version,
1610        CreateWorkerDeploymentVersionRequest,
1611        CreateWorkerDeploymentVersionResponse,
1612        |r| {
1613            let labels = namespaced_request!(r);
1614            r.extensions_mut().insert(labels);
1615        }
1616    );
1617    (
1618        delete_nexus_operation_execution,
1619        DeleteNexusOperationExecutionRequest,
1620        DeleteNexusOperationExecutionResponse,
1621        |r| {
1622            let labels = namespaced_request!(r);
1623            r.extensions_mut().insert(labels);
1624        }
1625    );
1626    (
1627        describe_nexus_operation_execution,
1628        DescribeNexusOperationExecutionRequest,
1629        DescribeNexusOperationExecutionResponse,
1630        |r| {
1631            let labels = namespaced_request!(r);
1632            r.extensions_mut().insert(labels);
1633        }
1634    );
1635    (
1636        list_nexus_operation_executions,
1637        ListNexusOperationExecutionsRequest,
1638        ListNexusOperationExecutionsResponse,
1639        |r| {
1640            let labels = namespaced_request!(r);
1641            r.extensions_mut().insert(labels);
1642        }
1643    );
1644    (
1645        poll_nexus_operation_execution,
1646        PollNexusOperationExecutionRequest,
1647        PollNexusOperationExecutionResponse,
1648        |r| {
1649            let labels = namespaced_request!(r);
1650            r.extensions_mut().insert(labels);
1651        }
1652    );
1653    (
1654        request_cancel_nexus_operation_execution,
1655        RequestCancelNexusOperationExecutionRequest,
1656        RequestCancelNexusOperationExecutionResponse,
1657        |r| {
1658            let labels = namespaced_request!(r);
1659            r.extensions_mut().insert(labels);
1660        }
1661    );
1662    (
1663        start_nexus_operation_execution,
1664        StartNexusOperationExecutionRequest,
1665        StartNexusOperationExecutionResponse,
1666        |r| {
1667            let labels = namespaced_request!(r);
1668            r.extensions_mut().insert(labels);
1669        }
1670    );
1671    (
1672        terminate_nexus_operation_execution,
1673        TerminateNexusOperationExecutionRequest,
1674        TerminateNexusOperationExecutionResponse,
1675        |r| {
1676            let labels = namespaced_request!(r);
1677            r.extensions_mut().insert(labels);
1678        }
1679    );
1680    (
1681        update_worker_deployment_version_compute_config,
1682        UpdateWorkerDeploymentVersionComputeConfigRequest,
1683        UpdateWorkerDeploymentVersionComputeConfigResponse,
1684        |r| {
1685            let labels = namespaced_request!(r);
1686            r.extensions_mut().insert(labels);
1687        }
1688    );
1689    (
1690        validate_worker_deployment_version_compute_config,
1691        ValidateWorkerDeploymentVersionComputeConfigRequest,
1692        ValidateWorkerDeploymentVersionComputeConfigResponse,
1693        |r| {
1694            let labels = namespaced_request!(r);
1695            r.extensions_mut().insert(labels);
1696        }
1697    );
1698}
1699
1700proxier! {
1701    OperatorService; ALL_IMPLEMENTED_OPERATOR_SERVICE_RPCS; OperatorServiceClient; operator_client; defaults;
1702    (add_search_attributes, AddSearchAttributesRequest, AddSearchAttributesResponse);
1703    (remove_search_attributes, RemoveSearchAttributesRequest, RemoveSearchAttributesResponse);
1704    (list_search_attributes, ListSearchAttributesRequest, ListSearchAttributesResponse);
1705    (delete_namespace, DeleteNamespaceRequest, DeleteNamespaceResponse,
1706        |r| {
1707            let labels = namespaced_request!(r);
1708            r.extensions_mut().insert(labels);
1709        }
1710    );
1711    (add_or_update_remote_cluster, AddOrUpdateRemoteClusterRequest, AddOrUpdateRemoteClusterResponse);
1712    (remove_remote_cluster, RemoveRemoteClusterRequest, RemoveRemoteClusterResponse);
1713    (list_clusters, ListClustersRequest, ListClustersResponse);
1714    (get_nexus_endpoint, GetNexusEndpointRequest, GetNexusEndpointResponse);
1715    (create_nexus_endpoint, CreateNexusEndpointRequest, CreateNexusEndpointResponse);
1716    (update_nexus_endpoint, UpdateNexusEndpointRequest, UpdateNexusEndpointResponse);
1717    (delete_nexus_endpoint, DeleteNexusEndpointRequest, DeleteNexusEndpointResponse);
1718    (list_nexus_endpoints, ListNexusEndpointsRequest, ListNexusEndpointsResponse);
1719}
1720
1721proxier! {
1722    CloudService; ALL_IMPLEMENTED_CLOUD_SERVICE_RPCS; CloudServiceClient; cloud_client; defaults;
1723    (get_users, cloudreq::GetUsersRequest, cloudreq::GetUsersResponse);
1724    (get_user, cloudreq::GetUserRequest, cloudreq::GetUserResponse);
1725    (create_user, cloudreq::CreateUserRequest, cloudreq::CreateUserResponse);
1726    (update_user, cloudreq::UpdateUserRequest, cloudreq::UpdateUserResponse);
1727    (delete_user, cloudreq::DeleteUserRequest, cloudreq::DeleteUserResponse);
1728    (set_user_namespace_access, cloudreq::SetUserNamespaceAccessRequest, cloudreq::SetUserNamespaceAccessResponse);
1729    (get_async_operation, cloudreq::GetAsyncOperationRequest, cloudreq::GetAsyncOperationResponse);
1730    (create_namespace, cloudreq::CreateNamespaceRequest, cloudreq::CreateNamespaceResponse);
1731    (get_namespaces, cloudreq::GetNamespacesRequest, cloudreq::GetNamespacesResponse);
1732    (get_namespace, cloudreq::GetNamespaceRequest, cloudreq::GetNamespaceResponse,
1733        |r| {
1734            let labels = namespaced_request!(r);
1735            r.extensions_mut().insert(labels);
1736        }
1737    );
1738    (update_namespace, cloudreq::UpdateNamespaceRequest, cloudreq::UpdateNamespaceResponse,
1739        |r| {
1740            let labels = namespaced_request!(r);
1741            r.extensions_mut().insert(labels);
1742        }
1743    );
1744    (rename_custom_search_attribute, cloudreq::RenameCustomSearchAttributeRequest, cloudreq::RenameCustomSearchAttributeResponse);
1745    (delete_namespace, cloudreq::DeleteNamespaceRequest, cloudreq::DeleteNamespaceResponse,
1746        |r| {
1747            let labels = namespaced_request!(r);
1748            r.extensions_mut().insert(labels);
1749        }
1750    );
1751    (failover_namespace_region, cloudreq::FailoverNamespaceRegionRequest, cloudreq::FailoverNamespaceRegionResponse);
1752    (add_namespace_region, cloudreq::AddNamespaceRegionRequest, cloudreq::AddNamespaceRegionResponse);
1753    (delete_namespace_region, cloudreq::DeleteNamespaceRegionRequest, cloudreq::DeleteNamespaceRegionResponse);
1754    (get_regions, cloudreq::GetRegionsRequest, cloudreq::GetRegionsResponse);
1755    (get_region, cloudreq::GetRegionRequest, cloudreq::GetRegionResponse);
1756    (get_api_keys, cloudreq::GetApiKeysRequest, cloudreq::GetApiKeysResponse);
1757    (get_api_key, cloudreq::GetApiKeyRequest, cloudreq::GetApiKeyResponse);
1758    (create_api_key, cloudreq::CreateApiKeyRequest, cloudreq::CreateApiKeyResponse);
1759    (update_api_key, cloudreq::UpdateApiKeyRequest, cloudreq::UpdateApiKeyResponse);
1760    (delete_api_key, cloudreq::DeleteApiKeyRequest, cloudreq::DeleteApiKeyResponse);
1761    (get_nexus_endpoints, cloudreq::GetNexusEndpointsRequest, cloudreq::GetNexusEndpointsResponse);
1762    (get_nexus_endpoint, cloudreq::GetNexusEndpointRequest, cloudreq::GetNexusEndpointResponse);
1763    (create_nexus_endpoint, cloudreq::CreateNexusEndpointRequest, cloudreq::CreateNexusEndpointResponse);
1764    (update_nexus_endpoint, cloudreq::UpdateNexusEndpointRequest, cloudreq::UpdateNexusEndpointResponse);
1765    (delete_nexus_endpoint, cloudreq::DeleteNexusEndpointRequest, cloudreq::DeleteNexusEndpointResponse);
1766    (get_user_groups, cloudreq::GetUserGroupsRequest, cloudreq::GetUserGroupsResponse);
1767    (get_user_group, cloudreq::GetUserGroupRequest, cloudreq::GetUserGroupResponse);
1768    (create_user_group, cloudreq::CreateUserGroupRequest, cloudreq::CreateUserGroupResponse);
1769    (update_user_group, cloudreq::UpdateUserGroupRequest, cloudreq::UpdateUserGroupResponse);
1770    (delete_user_group, cloudreq::DeleteUserGroupRequest, cloudreq::DeleteUserGroupResponse);
1771    (add_user_group_member, cloudreq::AddUserGroupMemberRequest, cloudreq::AddUserGroupMemberResponse);
1772    (remove_user_group_member, cloudreq::RemoveUserGroupMemberRequest, cloudreq::RemoveUserGroupMemberResponse);
1773    (get_user_group_members, cloudreq::GetUserGroupMembersRequest, cloudreq::GetUserGroupMembersResponse);
1774    (set_user_group_namespace_access, cloudreq::SetUserGroupNamespaceAccessRequest, cloudreq::SetUserGroupNamespaceAccessResponse);
1775    (create_service_account, cloudreq::CreateServiceAccountRequest, cloudreq::CreateServiceAccountResponse);
1776    (get_service_account, cloudreq::GetServiceAccountRequest, cloudreq::GetServiceAccountResponse);
1777    (get_service_accounts, cloudreq::GetServiceAccountsRequest, cloudreq::GetServiceAccountsResponse);
1778    (update_service_account, cloudreq::UpdateServiceAccountRequest, cloudreq::UpdateServiceAccountResponse);
1779    (delete_service_account, cloudreq::DeleteServiceAccountRequest, cloudreq::DeleteServiceAccountResponse);
1780    (get_usage, cloudreq::GetUsageRequest, cloudreq::GetUsageResponse);
1781    (get_account, cloudreq::GetAccountRequest, cloudreq::GetAccountResponse);
1782    (update_account, cloudreq::UpdateAccountRequest, cloudreq::UpdateAccountResponse);
1783    (create_namespace_export_sink, cloudreq::CreateNamespaceExportSinkRequest, cloudreq::CreateNamespaceExportSinkResponse);
1784    (get_namespace_export_sink, cloudreq::GetNamespaceExportSinkRequest, cloudreq::GetNamespaceExportSinkResponse);
1785    (get_namespace_export_sinks, cloudreq::GetNamespaceExportSinksRequest, cloudreq::GetNamespaceExportSinksResponse);
1786    (update_namespace_export_sink, cloudreq::UpdateNamespaceExportSinkRequest, cloudreq::UpdateNamespaceExportSinkResponse);
1787    (delete_namespace_export_sink, cloudreq::DeleteNamespaceExportSinkRequest, cloudreq::DeleteNamespaceExportSinkResponse);
1788    (validate_namespace_export_sink, cloudreq::ValidateNamespaceExportSinkRequest, cloudreq::ValidateNamespaceExportSinkResponse);
1789    (update_namespace_tags, cloudreq::UpdateNamespaceTagsRequest, cloudreq::UpdateNamespaceTagsResponse);
1790    (create_connectivity_rule, cloudreq::CreateConnectivityRuleRequest, cloudreq::CreateConnectivityRuleResponse);
1791    (get_connectivity_rule, cloudreq::GetConnectivityRuleRequest, cloudreq::GetConnectivityRuleResponse);
1792    (get_connectivity_rules, cloudreq::GetConnectivityRulesRequest, cloudreq::GetConnectivityRulesResponse);
1793    (delete_connectivity_rule, cloudreq::DeleteConnectivityRuleRequest, cloudreq::DeleteConnectivityRuleResponse);
1794    (set_service_account_namespace_access, cloudreq::SetServiceAccountNamespaceAccessRequest, cloudreq::SetServiceAccountNamespaceAccessResponse);
1795    (validate_account_audit_log_sink, cloudreq::ValidateAccountAuditLogSinkRequest, cloudreq::ValidateAccountAuditLogSinkResponse);
1796    (get_current_identity, cloudreq::GetCurrentIdentityRequest, cloudreq::GetCurrentIdentityResponse);
1797    (get_audit_logs, cloudreq::GetAuditLogsRequest, cloudreq::GetAuditLogsResponse);
1798    (create_account_audit_log_sink, cloudreq::CreateAccountAuditLogSinkRequest, cloudreq::CreateAccountAuditLogSinkResponse);
1799    (get_account_audit_log_sink, cloudreq::GetAccountAuditLogSinkRequest, cloudreq::GetAccountAuditLogSinkResponse);
1800    (get_account_audit_log_sinks, cloudreq::GetAccountAuditLogSinksRequest, cloudreq::GetAccountAuditLogSinksResponse);
1801    (update_account_audit_log_sink, cloudreq::UpdateAccountAuditLogSinkRequest, cloudreq::UpdateAccountAuditLogSinkResponse);
1802    (delete_account_audit_log_sink, cloudreq::DeleteAccountAuditLogSinkRequest, cloudreq::DeleteAccountAuditLogSinkResponse);
1803    (get_namespace_capacity_info, cloudreq::GetNamespaceCapacityInfoRequest, cloudreq::GetNamespaceCapacityInfoResponse);
1804    (create_billing_report, cloudreq::CreateBillingReportRequest, cloudreq::CreateBillingReportResponse);
1805    (get_billing_report, cloudreq::GetBillingReportRequest, cloudreq::GetBillingReportResponse);
1806    (get_custom_roles, cloudreq::GetCustomRolesRequest, cloudreq::GetCustomRolesResponse);
1807    (get_custom_role, cloudreq::GetCustomRoleRequest, cloudreq::GetCustomRoleResponse);
1808    (create_custom_role, cloudreq::CreateCustomRoleRequest, cloudreq::CreateCustomRoleResponse);
1809    (update_custom_role, cloudreq::UpdateCustomRoleRequest, cloudreq::UpdateCustomRoleResponse);
1810    (delete_custom_role, cloudreq::DeleteCustomRoleRequest, cloudreq::DeleteCustomRoleResponse);
1811}
1812
1813proxier! {
1814    TestService; ALL_IMPLEMENTED_TEST_SERVICE_RPCS; TestServiceClient; test_client; defaults;
1815    (lock_time_skipping, LockTimeSkippingRequest, LockTimeSkippingResponse);
1816    (unlock_time_skipping, UnlockTimeSkippingRequest, UnlockTimeSkippingResponse);
1817    (sleep, SleepRequest, SleepResponse);
1818    (sleep_until, SleepUntilRequest, SleepResponse);
1819    (unlock_time_skipping_with_sleep, SleepRequest, SleepResponse);
1820    (get_current_time, (), GetCurrentTimeResponse);
1821}
1822
1823proxier! {
1824    HealthService; ALL_IMPLEMENTED_HEALTH_SERVICE_RPCS; HealthClient; health_client;
1825    (check, HealthCheckRequest, HealthCheckResponse);
1826    (watch, HealthCheckRequest, tonic::codec::Streaming<HealthCheckResponse>);
1827}
1828
1829#[cfg(test)]
1830mod tests {
1831    use super::*;
1832    use crate::{ClientOptions, ConnectionOptions};
1833    use std::collections::HashSet;
1834    use temporalio_common::{
1835        protos::temporal::api::{
1836            operatorservice::v1::DeleteNamespaceRequest, workflowservice::v1::ListNamespacesRequest,
1837        },
1838        worker::WorkerTaskTypes,
1839    };
1840    use tonic::IntoRequest;
1841    use url::Url;
1842    use uuid::Uuid;
1843
1844    // Just to help make sure some stuff compiles. Not run.
1845    #[allow(dead_code)]
1846    async fn raw_client_retry_compiles() {
1847        let opts = ConnectionOptions::new(Url::parse("http://localhost:7233").unwrap())
1848            .client_name("test")
1849            .client_version("0.0.0")
1850            .build();
1851        let connection = Connection::connect(opts).await.unwrap();
1852        let mut client = Client::new(connection, ClientOptions::new("default").build()).unwrap();
1853
1854        let list_ns_req = ListNamespacesRequest::default();
1855        let wf_client = client.workflow_client();
1856        let fact = move |req| {
1857            let mut c = wf_client.clone();
1858            async move { c.list_namespaces(req).await }.boxed()
1859        };
1860        client
1861            .call("whatever", fact, Request::new(list_ns_req.clone()))
1862            .await
1863            .unwrap();
1864
1865        // Operator svc method
1866        let op_del_ns_req = DeleteNamespaceRequest::default();
1867        let op_client = client.operator_client();
1868        let fact = move |req| {
1869            let mut c = op_client.clone();
1870            async move { c.delete_namespace(req).await }.boxed()
1871        };
1872        client
1873            .call("whatever", fact, Request::new(op_del_ns_req.clone()))
1874            .await
1875            .unwrap();
1876
1877        // Cloud svc method
1878        let cloud_del_ns_req = cloudreq::DeleteNamespaceRequest::default();
1879        let cloud_client = client.cloud_client();
1880        let fact = move |req| {
1881            let mut c = cloud_client.clone();
1882            async move { c.delete_namespace(req).await }.boxed()
1883        };
1884        client
1885            .call("whatever", fact, Request::new(cloud_del_ns_req.clone()))
1886            .await
1887            .unwrap();
1888
1889        // Verify calling through traits works
1890        client
1891            .list_namespaces(list_ns_req.into_request())
1892            .await
1893            .unwrap();
1894        // Have to disambiguate operator and cloud service
1895        OperatorService::delete_namespace(&mut client, op_del_ns_req.into_request())
1896            .await
1897            .unwrap();
1898        CloudService::delete_namespace(&mut client, cloud_del_ns_req.into_request())
1899            .await
1900            .unwrap();
1901        client.get_current_time(().into_request()).await.unwrap();
1902        client
1903            .check(HealthCheckRequest::default().into_request())
1904            .await
1905            .unwrap();
1906    }
1907
1908    fn verify_methods(proto_def_str: &str, impl_list: &[&str]) {
1909        let methods: Vec<_> = proto_def_str
1910            .lines()
1911            .map(|l| l.trim())
1912            .filter(|l| l.starts_with("rpc"))
1913            .map(|l| {
1914                let stripped = l.strip_prefix("rpc ").unwrap();
1915                stripped[..stripped.find('(').unwrap()].trim()
1916            })
1917            .collect();
1918        let no_underscores: HashSet<_> = impl_list.iter().map(|x| x.replace('_', "")).collect();
1919        let mut not_implemented = vec![];
1920        for method in methods {
1921            if !no_underscores.contains(&method.to_lowercase()) {
1922                not_implemented.push(method);
1923            }
1924        }
1925        if !not_implemented.is_empty() {
1926            panic!(
1927                "The following RPC methods are not implemented by raw client: {not_implemented:?}"
1928            );
1929        }
1930    }
1931    #[test]
1932    fn verify_all_workflow_service_methods_implemented() {
1933        // This is less work than trying to hook into the codegen process
1934        let proto_def = include_str!(
1935            "../../common/protos/api_upstream/temporal/api/workflowservice/v1/service.proto"
1936        );
1937        verify_methods(proto_def, ALL_IMPLEMENTED_WORKFLOW_SERVICE_RPCS);
1938    }
1939
1940    #[test]
1941    fn verify_all_operator_service_methods_implemented() {
1942        let proto_def = include_str!(
1943            "../../common/protos/api_upstream/temporal/api/operatorservice/v1/service.proto"
1944        );
1945        verify_methods(proto_def, ALL_IMPLEMENTED_OPERATOR_SERVICE_RPCS);
1946    }
1947
1948    #[test]
1949    fn verify_all_cloud_service_methods_implemented() {
1950        let proto_def = include_str!(
1951            "../../common/protos/api_cloud_upstream/temporal/api/cloud/cloudservice/v1/service.proto"
1952        );
1953        verify_methods(proto_def, ALL_IMPLEMENTED_CLOUD_SERVICE_RPCS);
1954    }
1955
1956    #[test]
1957    fn verify_all_test_service_methods_implemented() {
1958        let proto_def = include_str!(
1959            "../../common/protos/testsrv_upstream/temporal/api/testservice/v1/service.proto"
1960        );
1961        verify_methods(proto_def, ALL_IMPLEMENTED_TEST_SERVICE_RPCS);
1962    }
1963
1964    #[test]
1965    fn verify_all_health_service_methods_implemented() {
1966        let proto_def = include_str!("../../common/protos/grpc/health/v1/health.proto");
1967        verify_methods(proto_def, ALL_IMPLEMENTED_HEALTH_SERVICE_RPCS);
1968    }
1969
1970    #[tokio::test]
1971    async fn can_mock_services() {
1972        #[derive(Clone)]
1973        struct MyFakeServices {}
1974        impl RawGrpcCaller for MyFakeServices {}
1975        impl WorkflowService for MyFakeServices {
1976            fn list_namespaces(
1977                &mut self,
1978                _request: Request<ListNamespacesRequest>,
1979            ) -> BoxFuture<'_, Result<Response<ListNamespacesResponse>, Status>> {
1980                async {
1981                    Ok(Response::new(ListNamespacesResponse {
1982                        namespaces: vec![DescribeNamespaceResponse {
1983                            failover_version: 12345,
1984                            ..Default::default()
1985                        }],
1986                        ..Default::default()
1987                    }))
1988                }
1989                .boxed()
1990            }
1991        }
1992        impl OperatorService for MyFakeServices {}
1993        impl CloudService for MyFakeServices {}
1994        impl TestService for MyFakeServices {}
1995        // Health service isn't possible to create a default impl for.
1996        impl HealthService for MyFakeServices {
1997            fn check(
1998                &mut self,
1999                _request: tonic::Request<HealthCheckRequest>,
2000            ) -> BoxFuture<'_, Result<tonic::Response<HealthCheckResponse>, tonic::Status>>
2001            {
2002                todo!()
2003            }
2004            fn watch(
2005                &mut self,
2006                _request: tonic::Request<HealthCheckRequest>,
2007            ) -> BoxFuture<
2008                '_,
2009                Result<
2010                    tonic::Response<tonic::codec::Streaming<HealthCheckResponse>>,
2011                    tonic::Status,
2012                >,
2013            > {
2014                todo!()
2015            }
2016        }
2017        let mut mocked_client = TemporalServiceClient::from_services(
2018            Box::new(MyFakeServices {}),
2019            Box::new(MyFakeServices {}),
2020            Box::new(MyFakeServices {}),
2021            Box::new(MyFakeServices {}),
2022            Box::new(MyFakeServices {}),
2023        );
2024        let r = mocked_client
2025            .list_namespaces(ListNamespacesRequest::default().into_request())
2026            .await
2027            .unwrap();
2028        assert_eq!(r.into_inner().namespaces[0].failover_version, 12345);
2029    }
2030
2031    #[rstest::rstest]
2032    #[case::with_versioning(true)]
2033    #[case::without_versioning(false)]
2034    #[tokio::test]
2035    async fn eager_reservations_attach_deployment_options(#[case] use_worker_versioning: bool) {
2036        use crate::worker::{MockClientWorker, MockSlot};
2037        use temporalio_common::{
2038            protos::temporal::api::enums::v1::WorkerVersioningMode,
2039            worker::{WorkerDeploymentOptions, WorkerDeploymentVersion},
2040        };
2041
2042        let expected_mode = if use_worker_versioning {
2043            WorkerVersioningMode::Versioned
2044        } else {
2045            WorkerVersioningMode::Unversioned
2046        };
2047
2048        #[derive(Clone)]
2049        struct MyFakeServices {
2050            client_worker_set: Arc<ClientWorkerSet>,
2051            expected_mode: WorkerVersioningMode,
2052        }
2053        impl RawGrpcCaller for MyFakeServices {}
2054        impl RawClientProducer for MyFakeServices {
2055            fn get_workers_info(&self) -> Option<Arc<ClientWorkerSet>> {
2056                Some(self.client_worker_set.clone())
2057            }
2058            fn workflow_client(&mut self) -> Box<dyn WorkflowService> {
2059                Box::new(MyFakeWfClient {
2060                    expected_mode: self.expected_mode,
2061                })
2062            }
2063            fn operator_client(&mut self) -> Box<dyn OperatorService> {
2064                unimplemented!()
2065            }
2066            fn cloud_client(&mut self) -> Box<dyn CloudService> {
2067                unimplemented!()
2068            }
2069            fn test_client(&mut self) -> Box<dyn TestService> {
2070                unimplemented!()
2071            }
2072            fn health_client(&mut self) -> Box<dyn HealthService> {
2073                unimplemented!()
2074            }
2075        }
2076
2077        let deployment_opts = WorkerDeploymentOptions {
2078            version: WorkerDeploymentVersion {
2079                deployment_name: "test-deployment".to_string(),
2080                build_id: "test-build-123".to_string(),
2081            },
2082            use_worker_versioning,
2083            default_versioning_behavior: None,
2084        };
2085
2086        let mut mock_provider = MockClientWorker::new();
2087        mock_provider
2088            .expect_namespace()
2089            .return_const("test-namespace".to_string());
2090        mock_provider
2091            .expect_task_queue()
2092            .return_const("test-task-queue".to_string());
2093        let mut mock_slot = MockSlot::new();
2094        mock_slot.expect_schedule_wft().returning(|_| Ok(()));
2095        mock_provider
2096            .expect_try_reserve_wft_slot()
2097            .return_once(|| Some(Box::new(mock_slot)));
2098        mock_provider
2099            .expect_deployment_options()
2100            .return_const(Some(deployment_opts.clone()));
2101        mock_provider.expect_heartbeat_enabled().return_const(false);
2102        let uuid = Uuid::new_v4();
2103        mock_provider
2104            .expect_worker_instance_key()
2105            .return_const(uuid);
2106        mock_provider
2107            .expect_worker_task_types()
2108            .return_const(WorkerTaskTypes {
2109                enable_workflows: true,
2110                enable_local_activities: true,
2111                enable_remote_activities: true,
2112                enable_nexus: true,
2113            });
2114
2115        let client_worker_set = Arc::new(ClientWorkerSet::new());
2116        client_worker_set
2117            .register_worker(Arc::new(mock_provider), true)
2118            .unwrap();
2119
2120        #[derive(Clone)]
2121        struct MyFakeWfClient {
2122            expected_mode: WorkerVersioningMode,
2123        }
2124        impl WorkflowService for MyFakeWfClient {
2125            fn start_workflow_execution(
2126                &mut self,
2127                request: tonic::Request<StartWorkflowExecutionRequest>,
2128            ) -> BoxFuture<'_, Result<tonic::Response<StartWorkflowExecutionResponse>, tonic::Status>>
2129            {
2130                let req = request.into_inner();
2131                let expected_mode = self.expected_mode;
2132
2133                assert!(
2134                    req.eager_worker_deployment_options.is_some(),
2135                    "eager_worker_deployment_options should be populated"
2136                );
2137
2138                let opts = req.eager_worker_deployment_options.as_ref().unwrap();
2139                assert_eq!(opts.deployment_name, "test-deployment");
2140                assert_eq!(opts.build_id, "test-build-123");
2141                assert_eq!(opts.worker_versioning_mode, expected_mode as i32);
2142
2143                async { Ok(Response::new(StartWorkflowExecutionResponse::default())) }.boxed()
2144            }
2145        }
2146
2147        let mut mfs = MyFakeServices {
2148            client_worker_set,
2149            expected_mode,
2150        };
2151
2152        // Create a request with eager execution enabled
2153        let req = StartWorkflowExecutionRequest {
2154            namespace: "test-namespace".to_string(),
2155            workflow_id: "test-wf-id".to_string(),
2156            workflow_type: Some(
2157                temporalio_common::protos::temporal::api::common::v1::WorkflowType {
2158                    name: "test-workflow".to_string(),
2159                },
2160            ),
2161            task_queue: Some(TaskQueue {
2162                name: "test-task-queue".to_string(),
2163                kind: 0,
2164                normal_name: String::new(),
2165            }),
2166            request_eager_execution: true,
2167            ..Default::default()
2168        };
2169
2170        mfs.start_workflow_execution(req.into_request())
2171            .await
2172            .unwrap();
2173    }
2174
2175    /// Tests that Connection's RawClientProducer impl correctly provides worker info
2176    /// so that eager workflow start can reserve a slot and dispatch the WFT.
2177    #[tokio::test]
2178    async fn connection_eager_start_dispatches_wft() {
2179        use crate::{
2180            ConnectionOptions,
2181            callback_based::{CallbackBasedGrpcService, GrpcSuccessResponse},
2182            worker::{MockClientWorker, MockSlot},
2183        };
2184        use prost::Message;
2185        use std::sync::atomic::{AtomicBool, Ordering};
2186        use temporalio_common::protos::temporal::api::workflowservice::v1::PollWorkflowTaskQueueResponse;
2187
2188        let dispatched = Arc::new(AtomicBool::new(false));
2189        let dispatched_clone = dispatched.clone();
2190
2191        // Create a callback-based service that returns an eager_workflow_task in the response
2192        let service_override = CallbackBasedGrpcService {
2193            callback: Arc::new(|_req| {
2194                Box::pin(async {
2195                    let resp = StartWorkflowExecutionResponse {
2196                        run_id: "test-run-id".to_string(),
2197                        eager_workflow_task: Some(PollWorkflowTaskQueueResponse {
2198                            task_token: vec![1, 2, 3],
2199                            ..Default::default()
2200                        }),
2201                        ..Default::default()
2202                    };
2203                    let proto = resp.encode_to_vec();
2204                    Ok(GrpcSuccessResponse {
2205                        headers: Default::default(),
2206                        proto,
2207                    })
2208                })
2209            }),
2210        };
2211
2212        let opts = ConnectionOptions::new(url::Url::parse("http://localhost:7233").unwrap())
2213            .skip_get_system_info(true)
2214            .service_override(service_override)
2215            .dns_load_balancing(None)
2216            .build();
2217        let mut connection = crate::Connection::connect(opts).await.unwrap();
2218
2219        // Register a mock worker on the connection's worker set
2220        let mut mock_worker = MockClientWorker::new();
2221        mock_worker
2222            .expect_namespace()
2223            .return_const("default".to_string());
2224        mock_worker
2225            .expect_task_queue()
2226            .return_const("test-tq".to_string());
2227        mock_worker
2228            .expect_deployment_options()
2229            .return_const(None::<temporalio_common::worker::WorkerDeploymentOptions>);
2230        mock_worker.expect_heartbeat_enabled().return_const(false);
2231        let uuid = Uuid::new_v4();
2232        mock_worker.expect_worker_instance_key().return_const(uuid);
2233        mock_worker
2234            .expect_worker_task_types()
2235            .return_const(WorkerTaskTypes {
2236                enable_workflows: true,
2237                enable_local_activities: false,
2238                enable_remote_activities: false,
2239                enable_nexus: false,
2240            });
2241
2242        let mut mock_slot = MockSlot::new();
2243        mock_slot.expect_schedule_wft().returning(move |_| {
2244            dispatched_clone.store(true, Ordering::SeqCst);
2245            Ok(())
2246        });
2247        mock_worker
2248            .expect_try_reserve_wft_slot()
2249            .return_once(|| Some(Box::new(mock_slot)));
2250
2251        connection
2252            .workers()
2253            .register_worker(Arc::new(mock_worker), true)
2254            .unwrap();
2255
2256        // Make an eager start_workflow_execution call through Connection
2257        let req = StartWorkflowExecutionRequest {
2258            namespace: "default".to_string(),
2259            workflow_id: "test-wf".to_string(),
2260            workflow_type: Some(
2261                temporalio_common::protos::temporal::api::common::v1::WorkflowType {
2262                    name: "test-workflow".to_string(),
2263                },
2264            ),
2265            task_queue: Some(TaskQueue {
2266                name: "test-tq".to_string(),
2267                kind: 0,
2268                normal_name: String::new(),
2269            }),
2270            request_eager_execution: true,
2271            ..Default::default()
2272        };
2273
2274        connection
2275            .start_workflow_execution(req.into_request())
2276            .await
2277            .unwrap();
2278
2279        assert!(
2280            dispatched.load(Ordering::SeqCst),
2281            "Eager workflow task should have been dispatched to the worker"
2282        );
2283    }
2284}