Skip to main content

temporalio_client/
grpc.rs

1//! gRPC service traits for direct access to Temporal services.
2//!
3//! Most users should use the higher-level methods on [`Client`] or [`Connection`] instead.
4//! These traits are useful for advanced scenarios like custom interceptors, testing with mocks,
5//! or making raw gRPC calls not covered by the higher-level API.
6
7use crate::{
8    Client, Connection, LONG_POLL_TIMEOUT, RequestExt, SharedReplaceableClient,
9    TEMPORAL_NAMESPACE_HEADER_KEY, TemporalServiceClient,
10    metrics::namespace_kv,
11    retry::make_future_retry,
12    worker::{ClientWorkerSet, Slot},
13};
14use dyn_clone::DynClone;
15use futures_util::{FutureExt, TryFutureExt, future::BoxFuture};
16use std::{any::Any, marker::PhantomData, sync::Arc};
17use temporalio_common::{
18    protos::{
19        grpc::health::v1::{health_client::HealthClient, *},
20        temporal::api::{
21            cloud::cloudservice::{v1 as cloudreq, v1::cloud_service_client::CloudServiceClient},
22            operatorservice::v1::{operator_service_client::OperatorServiceClient, *},
23            taskqueue::v1::TaskQueue,
24            testservice::v1::{test_service_client::TestServiceClient, *},
25            workflowservice::v1::{workflow_service_client::WorkflowServiceClient, *},
26        },
27    },
28    telemetry::metrics::MetricKeyValue,
29};
30use tonic::{
31    Request, Response, Status,
32    body::Body,
33    client::GrpcService,
34    metadata::{AsciiMetadataValue, KeyAndValueRef},
35};
36
37/// Something that has access to the raw grpc services
38pub(crate) trait RawClientProducer {
39    /// Returns information about workers associated with this client. Implementers outside of
40    /// core can safely return `None`.
41    fn get_workers_info(&self) -> Option<Arc<ClientWorkerSet>>;
42
43    /// Return a workflow service client instance
44    fn workflow_client(&mut self) -> Box<dyn WorkflowService>;
45
46    /// Return a mutable ref to the operator service client instance
47    fn operator_client(&mut self) -> Box<dyn OperatorService>;
48
49    /// Return a mutable ref to the cloud service client instance
50    fn cloud_client(&mut self) -> Box<dyn CloudService>;
51
52    /// Return a mutable ref to the test service client instance
53    fn test_client(&mut self) -> Box<dyn TestService>;
54
55    /// Return a mutable ref to the health service client instance
56    fn health_client(&mut self) -> Box<dyn HealthService>;
57}
58
59/// Any client that can make gRPC calls. The default implementation simply invokes the passed-in
60/// function. Implementers may override this to provide things like retry behavior.
61#[async_trait::async_trait]
62pub(crate) trait RawGrpcCaller: Send + Sync + 'static {
63    /// Make a gRPC call. The default implementation simply invokes the provided function.
64    async fn call<F, Req, Resp>(
65        &mut self,
66        _call_name: &'static str,
67        mut callfn: F,
68        req: Request<Req>,
69    ) -> Result<Response<Resp>, Status>
70    where
71        Req: Clone + Unpin + Send + Sync + 'static,
72        Resp: Send + 'static,
73        F: FnMut(Request<Req>) -> BoxFuture<'static, Result<Response<Resp>, Status>>,
74        F: Send + Sync + Unpin + 'static,
75    {
76        callfn(req).await
77    }
78}
79
80trait ErasedRawClient: Send + Sync + 'static {
81    fn erased_call(
82        &mut self,
83        call_name: &'static str,
84        op: &mut dyn ErasedCallOp,
85    ) -> BoxFuture<'static, Result<Response<Box<dyn Any + Send>>, Status>>;
86}
87
88trait ErasedCallOp: Send {
89    fn invoke(
90        &mut self,
91        raw: &mut dyn ErasedRawClient,
92        call_name: &'static str,
93    ) -> BoxFuture<'static, Result<Response<Box<dyn Any + Send>>, Status>>;
94}
95
96struct CallShim<F, Req, Resp> {
97    callfn: F,
98    seed_req: Option<Request<Req>>,
99    _resp: PhantomData<Resp>,
100}
101
102impl<F, Req, Resp> CallShim<F, Req, Resp> {
103    fn new(callfn: F, seed_req: Request<Req>) -> Self {
104        Self {
105            callfn,
106            seed_req: Some(seed_req),
107            _resp: PhantomData,
108        }
109    }
110}
111impl<F, Req, Resp> ErasedCallOp for CallShim<F, Req, Resp>
112where
113    Req: Clone + Unpin + Send + Sync + 'static,
114    Resp: Send + 'static,
115    F: FnMut(Request<Req>) -> BoxFuture<'static, Result<Response<Resp>, Status>>,
116    F: Send + Sync + Unpin + 'static,
117{
118    fn invoke(
119        &mut self,
120        _raw: &mut dyn ErasedRawClient,
121        _call_name: &'static str,
122    ) -> BoxFuture<'static, Result<Response<Box<dyn Any + Send>>, Status>> {
123        (self.callfn)(
124            self.seed_req
125                .take()
126                .expect("CallShim must have request populated"),
127        )
128        .map(|res| res.map(|payload| payload.map(|t| Box::new(t) as Box<dyn Any + Send>)))
129        .boxed()
130    }
131}
132
133#[async_trait::async_trait]
134impl RawGrpcCaller for Connection {
135    async fn call<F, Req, Resp>(
136        &mut self,
137        call_name: &'static str,
138        mut callfn: F,
139        mut req: Request<Req>,
140    ) -> Result<Response<Resp>, Status>
141    where
142        Req: Clone + Unpin + Send + Sync + 'static,
143        Resp: Send + 'static,
144        F: FnMut(Request<Req>) -> BoxFuture<'static, Result<Response<Resp>, Status>>,
145        F: Send + Sync + Unpin + 'static,
146    {
147        let info = self
148            .inner
149            .retry_options
150            .get_call_info(call_name, Some(&req));
151        req.extensions_mut().insert(info.call_type);
152        if info.call_type.is_long() {
153            req.set_default_timeout(LONG_POLL_TIMEOUT);
154        }
155
156        let fact = || {
157            let req_clone = req_cloner(&req);
158            callfn(req_clone)
159        };
160
161        let res = make_future_retry(info, fact);
162        res.map_err(|(e, _attempt)| e).map_ok(|x| x.0).await
163    }
164}
165
166/// Helper for cloning a tonic request as long as the inner message may be cloned.
167fn req_cloner<T: Clone>(cloneme: &Request<T>) -> Request<T> {
168    let msg = cloneme.get_ref().clone();
169    let mut new_req = Request::new(msg);
170    let new_met = new_req.metadata_mut();
171    for kv in cloneme.metadata().iter() {
172        match kv {
173            KeyAndValueRef::Ascii(k, v) => {
174                new_met.insert(k, v.clone());
175            }
176            KeyAndValueRef::Binary(k, v) => {
177                new_met.insert_bin(k, v.clone());
178            }
179        }
180    }
181    *new_req.extensions_mut() = cloneme.extensions().clone();
182    new_req
183}
184
185#[async_trait::async_trait]
186impl RawGrpcCaller for dyn ErasedRawClient {
187    async fn call<F, Req, Resp>(
188        &mut self,
189        call_name: &'static str,
190        callfn: F,
191        req: Request<Req>,
192    ) -> Result<Response<Resp>, Status>
193    where
194        Req: Clone + Unpin + Send + Sync + 'static,
195        Resp: Send + 'static,
196        F: FnMut(Request<Req>) -> BoxFuture<'static, Result<Response<Resp>, Status>>,
197        F: Send + Sync + Unpin + 'static,
198    {
199        let mut shim = CallShim::new(callfn, req);
200        let erased_resp = ErasedRawClient::erased_call(self, call_name, &mut shim).await?;
201        Ok(erased_resp.map(|boxed| {
202            *boxed
203                .downcast()
204                .expect("RawGrpcCaller erased response type mismatch")
205        }))
206    }
207}
208
209impl<T> ErasedRawClient for T
210where
211    T: RawGrpcCaller + 'static,
212{
213    fn erased_call(
214        &mut self,
215        call_name: &'static str,
216        op: &mut dyn ErasedCallOp,
217    ) -> BoxFuture<'static, Result<Response<Box<dyn Any + Send>>, Status>> {
218        let raw: &mut dyn ErasedRawClient = self;
219        op.invoke(raw, call_name)
220    }
221}
222
223impl RawClientProducer for Connection {
224    fn get_workers_info(&self) -> Option<Arc<ClientWorkerSet>> {
225        None
226    }
227
228    fn workflow_client(&mut self) -> Box<dyn WorkflowService> {
229        self.inner.service.workflow_service()
230    }
231
232    fn operator_client(&mut self) -> Box<dyn OperatorService> {
233        self.inner.service.operator_service()
234    }
235
236    fn cloud_client(&mut self) -> Box<dyn CloudService> {
237        self.inner.service.cloud_service()
238    }
239
240    fn test_client(&mut self) -> Box<dyn TestService> {
241        self.inner.service.test_service()
242    }
243
244    fn health_client(&mut self) -> Box<dyn HealthService> {
245        self.inner.service.health_service()
246    }
247}
248
249impl RawClientProducer for TemporalServiceClient {
250    fn get_workers_info(&self) -> Option<Arc<ClientWorkerSet>> {
251        None
252    }
253
254    fn workflow_client(&mut self) -> Box<dyn WorkflowService> {
255        self.workflow_service()
256    }
257
258    fn operator_client(&mut self) -> Box<dyn OperatorService> {
259        self.operator_service()
260    }
261
262    fn cloud_client(&mut self) -> Box<dyn CloudService> {
263        self.cloud_service()
264    }
265
266    fn test_client(&mut self) -> Box<dyn TestService> {
267        self.test_service()
268    }
269
270    fn health_client(&mut self) -> Box<dyn HealthService> {
271        self.health_service()
272    }
273}
274
275impl RawGrpcCaller for TemporalServiceClient {}
276
277impl RawClientProducer for Client {
278    fn get_workers_info(&self) -> Option<Arc<ClientWorkerSet>> {
279        Some(self.connection.workers())
280    }
281
282    fn workflow_client(&mut self) -> Box<dyn WorkflowService> {
283        self.connection.workflow_client()
284    }
285
286    fn operator_client(&mut self) -> Box<dyn OperatorService> {
287        self.connection.operator_client()
288    }
289
290    fn cloud_client(&mut self) -> Box<dyn CloudService> {
291        self.connection.cloud_client()
292    }
293
294    fn test_client(&mut self) -> Box<dyn TestService> {
295        self.connection.test_client()
296    }
297
298    fn health_client(&mut self) -> Box<dyn HealthService> {
299        self.connection.health_client()
300    }
301}
302
303#[async_trait::async_trait]
304impl RawGrpcCaller for Client {
305    async fn call<F, Req, Resp>(
306        &mut self,
307        call_name: &'static str,
308        callfn: F,
309        req: Request<Req>,
310    ) -> Result<Response<Resp>, Status>
311    where
312        Req: Clone + Unpin + Send + Sync + 'static,
313        Resp: Send + 'static,
314        F: FnMut(Request<Req>) -> BoxFuture<'static, Result<Response<Resp>, Status>>,
315        F: Send + Sync + Unpin + 'static,
316    {
317        self.connection.call(call_name, callfn, req).await
318    }
319}
320
321impl<RC> RawClientProducer for SharedReplaceableClient<RC>
322where
323    RC: RawClientProducer + Clone + Send + Sync + 'static,
324{
325    fn get_workers_info(&self) -> Option<Arc<ClientWorkerSet>> {
326        self.inner_cow().get_workers_info()
327    }
328    fn workflow_client(&mut self) -> Box<dyn WorkflowService> {
329        self.inner_mut_refreshed().workflow_client()
330    }
331
332    fn operator_client(&mut self) -> Box<dyn OperatorService> {
333        self.inner_mut_refreshed().operator_client()
334    }
335
336    fn cloud_client(&mut self) -> Box<dyn CloudService> {
337        self.inner_mut_refreshed().cloud_client()
338    }
339
340    fn test_client(&mut self) -> Box<dyn TestService> {
341        self.inner_mut_refreshed().test_client()
342    }
343
344    fn health_client(&mut self) -> Box<dyn HealthService> {
345        self.inner_mut_refreshed().health_client()
346    }
347}
348
349#[async_trait::async_trait]
350impl<RC> RawGrpcCaller for SharedReplaceableClient<RC>
351where
352    RC: RawGrpcCaller + Clone + Sync + 'static,
353{
354    async fn call<F, Req, Resp>(
355        &mut self,
356        call_name: &'static str,
357        callfn: F,
358        req: Request<Req>,
359    ) -> Result<Response<Resp>, Status>
360    where
361        Req: Clone + Unpin + Send + Sync + 'static,
362        Resp: Send + 'static,
363        F: FnMut(Request<Req>) -> BoxFuture<'static, Result<Response<Resp>, Status>>,
364        F: Send + Sync + Unpin + 'static,
365    {
366        self.inner_mut_refreshed()
367            .call(call_name, callfn, req)
368            .await
369    }
370}
371
372#[derive(Clone, Debug)]
373pub(super) struct AttachMetricLabels {
374    pub(super) labels: Vec<MetricKeyValue>,
375    pub(super) normal_task_queue: Option<String>,
376    pub(super) sticky_task_queue: Option<String>,
377}
378impl AttachMetricLabels {
379    pub(super) fn new(kvs: impl Into<Vec<MetricKeyValue>>) -> Self {
380        Self {
381            labels: kvs.into(),
382            normal_task_queue: None,
383            sticky_task_queue: None,
384        }
385    }
386    pub(super) fn namespace(ns: impl Into<String>) -> Self {
387        AttachMetricLabels::new(vec![namespace_kv(ns.into())])
388    }
389    pub(super) fn task_q(&mut self, tq: Option<TaskQueue>) -> &mut Self {
390        if let Some(tq) = tq {
391            if !tq.normal_name.is_empty() {
392                self.sticky_task_queue = Some(tq.name);
393                self.normal_task_queue = Some(tq.normal_name);
394            } else {
395                self.normal_task_queue = Some(tq.name);
396            }
397        }
398        self
399    }
400    pub(super) fn task_q_str(&mut self, tq: impl Into<String>) -> &mut Self {
401        self.normal_task_queue = Some(tq.into());
402        self
403    }
404}
405
406/// A request extension that, when set, should make the [RetryClient] consider this call to be a
407/// [super::retry::CallType::UserLongPoll]
408#[derive(Copy, Clone, Debug)]
409pub(super) struct IsUserLongPoll;
410
411macro_rules! proxy_def {
412    ($client_type:tt, $client_meth:ident, $method:ident, $req:ty, $resp:ty, defaults) => {
413        #[doc = concat!("See [", stringify!($client_type), "::", stringify!($method), "]")]
414        fn $method(
415            &mut self,
416            _request: tonic::Request<$req>,
417        ) -> BoxFuture<'_, Result<tonic::Response<$resp>, tonic::Status>> {
418            async { Ok(tonic::Response::new(<$resp>::default())) }.boxed()
419        }
420    };
421    ($client_type:tt, $client_meth:ident, $method:ident, $req:ty, $resp:ty) => {
422        #[doc = concat!("See [", stringify!($client_type), "::", stringify!($method), "]")]
423        fn $method(
424            &mut self,
425            _request: tonic::Request<$req>,
426        ) -> BoxFuture<'_, Result<tonic::Response<$resp>, tonic::Status>>;
427    };
428}
429
430/// Helps re-declare gRPC client methods
431///
432/// There are four forms:
433///
434/// * The first takes a closure that can modify the request. This is only called once, before the
435///   actual rpc call is made, and before determinations are made about the kind of call (long poll
436///   or not) and retry policy.
437/// * The second takes three closures. The first can modify the request like in the first form.
438///   The second can modify the request and return a value, and is called right before every call
439///   (including on retries). The third is called with the response to the call after it resolves.
440/// * The third and fourth are equivalents of the above that skip calling through the `call` method
441///   and are implemented directly on the generated gRPC clients (IE: the bottom of the stack).
442macro_rules! proxy_impl {
443    ($client_type:tt, $client_meth:ident, $method:ident, $req:ty, $resp:ty $(, $closure:expr)?) => {
444        #[doc = concat!("See [", stringify!($client_type), "::", stringify!($method), "]")]
445        fn $method(
446            &mut self,
447            #[allow(unused_mut)]
448            mut request: tonic::Request<$req>,
449        ) -> BoxFuture<'_, Result<tonic::Response<$resp>, tonic::Status>> {
450            $( type_closure_arg(&mut request, $closure); )*
451            let mut self_clone = self.clone();
452            #[allow(unused_mut)]
453            let fact = move |mut req: tonic::Request<$req>| {
454                let mut c = self_clone.$client_meth();
455                async move { c.$method(req).await }.boxed()
456            };
457            self.call(stringify!($method), fact, request)
458        }
459    };
460    ($client_type:tt, $client_meth:ident, $method:ident, $req:ty, $resp:ty,
461     $closure_request:expr, $closure_before:expr, $closure_after:expr) => {
462        #[doc = concat!("See [", stringify!($client_type), "::", stringify!($method), "]")]
463        fn $method(
464            &mut self,
465            mut request: tonic::Request<$req>,
466        ) -> BoxFuture<'_, Result<tonic::Response<$resp>, tonic::Status>> {
467            type_closure_arg(&mut request, $closure_request);
468            let workers_info = self.get_workers_info();
469            let mut self_clone = self.clone();
470            #[allow(unused_mut)]
471            let fact = move |mut req: tonic::Request<$req>| {
472                let data = type_closure_two_arg(&mut req, workers_info.clone(), $closure_before);
473                let mut c = self_clone.$client_meth();
474                async move {
475                    type_closure_two_arg(c.$method(req).await, data, $closure_after)
476                }.boxed()
477            };
478            self.call(stringify!($method), fact, request)
479        }
480    };
481    ($client_type:tt, $method:ident, $req:ty, $resp:ty $(, $closure:expr)?) => {
482        #[doc = concat!("See [", stringify!($client_type), "::", stringify!($method), "]")]
483        fn $method(
484            &mut self,
485            #[allow(unused_mut)]
486            mut request: tonic::Request<$req>,
487        ) -> BoxFuture<'_, Result<tonic::Response<$resp>, tonic::Status>> {
488            $( type_closure_arg(&mut request, $closure); )*
489            async move { <$client_type<_>>::$method(self, request).await }.boxed()
490        }
491    };
492    ($client_type:tt, $method:ident, $req:ty, $resp:ty,
493     $closure_request:expr, $closure_before:expr, $closure_after:expr) => {
494        #[doc = concat!("See [", stringify!($client_type), "::", stringify!($method), "]")]
495        fn $method(
496            &mut self,
497            mut request: tonic::Request<$req>,
498        ) -> BoxFuture<'_, Result<tonic::Response<$resp>, tonic::Status>> {
499            type_closure_arg(&mut request, $closure_request);
500            let data = type_closure_two_arg(&mut request, Option::<Arc<ClientWorkerSet>>::None,
501                                            $closure_before);
502            async move {
503                type_closure_two_arg(<$client_type<_>>::$method(self, request).await,
504                                     data, $closure_after)
505            }.boxed()
506        }
507    };
508}
509macro_rules! proxier_impl {
510    ($trait_name:ident; $impl_list_name:ident; $client_type:tt; $client_meth:ident;
511     [$( proxy_def!($($def_args:tt)*); )*];
512     $(($method:ident, $req:ty, $resp:ty
513       $(, $closure:expr $(, $closure_before:expr, $closure_after:expr)?)? );)* ) => {
514        #[cfg(test)]
515        const $impl_list_name: &'static [&'static str] = &[$(stringify!($method)),*];
516
517        #[doc = concat!("Trait version of [", stringify!($client_type), "]")]
518        pub trait $trait_name: Send + Sync + DynClone
519        {
520            $( proxy_def!($($def_args)*); )*
521        }
522        dyn_clone::clone_trait_object!($trait_name);
523
524        impl<RC> $trait_name for RC
525        where
526            RC: RawGrpcCaller + RawClientProducer + Clone + Unpin,
527        {
528            $(
529                proxy_impl!($client_type, $client_meth, $method, $req, $resp
530                            $(,$closure $(,$closure_before, $closure_after)*)*);
531            )*
532        }
533
534        impl<T: Send + Sync + 'static> RawGrpcCaller for $client_type<T> {}
535
536        impl<T> $trait_name for $client_type<T>
537        where
538            T: GrpcService<Body> + Clone + Send + Sync + 'static,
539            T::ResponseBody: tonic::codegen::Body<Data = tonic::codegen::Bytes> + Send + 'static,
540            T::Error: Into<tonic::codegen::StdError>,
541            <T::ResponseBody as tonic::codegen::Body>::Error: Into<tonic::codegen::StdError> + Send,
542            <T as tonic::client::GrpcService<Body>>::Future: Send
543        {
544            $(
545                proxy_impl!($client_type, $method, $req, $resp
546                            $(,$closure $(,$closure_before, $closure_after)*)*);
547            )*
548        }
549    };
550}
551
552macro_rules! proxier {
553    ( $trait_name:ident; $impl_list_name:ident; $client_type:tt; $client_meth:ident;
554      $(($method:ident, $req:ty, $resp:ty
555         $(, $closure:expr $(, $closure_before:expr, $closure_after:expr)?)? );)* ) => {
556        proxier_impl!($trait_name; $impl_list_name; $client_type; $client_meth;
557                      [$(proxy_def!($client_type, $client_meth, $method, $req, $resp);)*];
558                      $(($method, $req, $resp $(, $closure $(, $closure_before, $closure_after)?)?);)*);
559    };
560    ( $trait_name:ident; $impl_list_name:ident; $client_type:tt; $client_meth:ident; defaults;
561      $(($method:ident, $req:ty, $resp:ty
562         $(, $closure:expr $(, $closure_before:expr, $closure_after:expr)?)? );)* ) => {
563        proxier_impl!($trait_name; $impl_list_name; $client_type; $client_meth;
564                      [$(proxy_def!($client_type, $client_meth, $method, $req, $resp, defaults);)*];
565                      $(($method, $req, $resp $(, $closure $(, $closure_before, $closure_after)?)?);)*);
566    };
567}
568
569macro_rules! namespaced_request {
570    ($req:ident) => {{
571        let ns_str = $req.get_ref().namespace.clone();
572        // Attach namespace header
573        $req.metadata_mut().insert(
574            TEMPORAL_NAMESPACE_HEADER_KEY,
575            ns_str.parse().unwrap_or_else(|e| {
576                warn!("Unable to parse namespace for header: {e:?}");
577                AsciiMetadataValue::from_static("")
578            }),
579        );
580        // Init metric labels
581        AttachMetricLabels::namespace(ns_str)
582    }};
583}
584
585// Nice little trick to avoid the callsite asking to type the closure parameter
586fn type_closure_arg<T, R>(arg: T, f: impl FnOnce(T) -> R) -> R {
587    f(arg)
588}
589
590fn type_closure_two_arg<T, R, S>(arg1: R, arg2: T, f: impl FnOnce(R, T) -> S) -> S {
591    f(arg1, arg2)
592}
593
594proxier! {
595    WorkflowService; ALL_IMPLEMENTED_WORKFLOW_SERVICE_RPCS; WorkflowServiceClient; workflow_client; defaults;
596    (
597        register_namespace,
598        RegisterNamespaceRequest,
599        RegisterNamespaceResponse,
600        |r| {
601            let labels = namespaced_request!(r);
602            r.extensions_mut().insert(labels);
603        }
604    );
605    (
606        describe_namespace,
607        DescribeNamespaceRequest,
608        DescribeNamespaceResponse,
609        |r| {
610            let labels = namespaced_request!(r);
611            r.extensions_mut().insert(labels);
612        }
613    );
614    (
615        list_namespaces,
616        ListNamespacesRequest,
617        ListNamespacesResponse
618    );
619    (
620        update_namespace,
621        UpdateNamespaceRequest,
622        UpdateNamespaceResponse,
623        |r| {
624            let labels = namespaced_request!(r);
625            r.extensions_mut().insert(labels);
626        }
627    );
628    (
629        deprecate_namespace,
630        DeprecateNamespaceRequest,
631        DeprecateNamespaceResponse,
632        |r| {
633            let labels = namespaced_request!(r);
634            r.extensions_mut().insert(labels);
635        }
636    );
637    (
638        start_workflow_execution,
639        StartWorkflowExecutionRequest,
640        StartWorkflowExecutionResponse,
641        |r| {
642            let mut labels = namespaced_request!(r);
643            labels.task_q(r.get_ref().task_queue.clone());
644            r.extensions_mut().insert(labels);
645        },
646        |r, workers| {
647            if let Some(workers) = workers {
648                let mut slot: Option<Box<dyn Slot + Send>> = None;
649                let req_mut = r.get_mut();
650                if req_mut.request_eager_execution {
651                    let namespace = req_mut.namespace.clone();
652                    let task_queue = req_mut.task_queue.as_ref()
653                                        .map(|tq| tq.name.clone()).unwrap_or_default();
654                    match workers.try_reserve_wft_slot(namespace, task_queue) {
655                        Some(reservation) => {
656                            // Populate eager_worker_deployment_options from the slot reservation
657                            if let Some(opts) = reservation.deployment_options {
658                                req_mut.eager_worker_deployment_options = Some(temporalio_common::protos::temporal::api::deployment::v1::WorkerDeploymentOptions {
659                                    deployment_name: opts.version.deployment_name,
660                                    build_id: opts.version.build_id,
661                                    worker_versioning_mode: if opts.use_worker_versioning {
662                                        temporalio_common::protos::temporal::api::enums::v1::WorkerVersioningMode::Versioned.into()
663                                    } else {
664                                        temporalio_common::protos::temporal::api::enums::v1::WorkerVersioningMode::Unversioned.into()
665                                    },
666                                });                            }
667                            slot = Some(reservation.slot);
668                        }
669                        None => req_mut.request_eager_execution = false
670                    }
671                }
672                slot
673            } else {
674                None
675            }
676        },
677        |resp, slot| {
678            if let Some(s) = slot
679                && let Ok(response) = resp.as_ref()
680                    && let Some(task) = response.get_ref().clone().eager_workflow_task
681                        && let Err(e) = s.schedule_wft(task) {
682                            // This is a latency issue, i.e., the client does not need to handle
683                            //  this error, because the WFT will be retried after a timeout.
684                            warn!(details = ?e, "Eager workflow task rejected by worker.");
685                        }
686            resp
687        }
688    );
689    (
690        get_workflow_execution_history,
691        GetWorkflowExecutionHistoryRequest,
692        GetWorkflowExecutionHistoryResponse,
693        |r| {
694            let labels = namespaced_request!(r);
695            r.extensions_mut().insert(labels);
696            if r.get_ref().wait_new_event {
697                r.extensions_mut().insert(IsUserLongPoll);
698            }
699        }
700    );
701    (
702        get_workflow_execution_history_reverse,
703        GetWorkflowExecutionHistoryReverseRequest,
704        GetWorkflowExecutionHistoryReverseResponse,
705        |r| {
706            let labels = namespaced_request!(r);
707            r.extensions_mut().insert(labels);
708        }
709    );
710    (
711        poll_workflow_task_queue,
712        PollWorkflowTaskQueueRequest,
713        PollWorkflowTaskQueueResponse,
714        |r| {
715            let mut labels = namespaced_request!(r);
716            labels.task_q(r.get_ref().task_queue.clone());
717            r.extensions_mut().insert(labels);
718        }
719    );
720    (
721        respond_workflow_task_completed,
722        RespondWorkflowTaskCompletedRequest,
723        RespondWorkflowTaskCompletedResponse,
724        |r| {
725            let labels = namespaced_request!(r);
726            r.extensions_mut().insert(labels);
727        }
728    );
729    (
730        respond_workflow_task_failed,
731        RespondWorkflowTaskFailedRequest,
732        RespondWorkflowTaskFailedResponse,
733        |r| {
734            let labels = namespaced_request!(r);
735            r.extensions_mut().insert(labels);
736        }
737    );
738    (
739        poll_activity_task_queue,
740        PollActivityTaskQueueRequest,
741        PollActivityTaskQueueResponse,
742        |r| {
743            let mut labels = namespaced_request!(r);
744            labels.task_q(r.get_ref().task_queue.clone());
745            r.extensions_mut().insert(labels);
746        }
747    );
748    (
749        record_activity_task_heartbeat,
750        RecordActivityTaskHeartbeatRequest,
751        RecordActivityTaskHeartbeatResponse,
752        |r| {
753            let labels = namespaced_request!(r);
754            r.extensions_mut().insert(labels);
755        }
756    );
757    (
758        record_activity_task_heartbeat_by_id,
759        RecordActivityTaskHeartbeatByIdRequest,
760        RecordActivityTaskHeartbeatByIdResponse,
761        |r| {
762            let labels = namespaced_request!(r);
763            r.extensions_mut().insert(labels);
764        }
765    );
766    (
767        respond_activity_task_completed,
768        RespondActivityTaskCompletedRequest,
769        RespondActivityTaskCompletedResponse,
770        |r| {
771            let labels = namespaced_request!(r);
772            r.extensions_mut().insert(labels);
773        }
774    );
775    (
776        respond_activity_task_completed_by_id,
777        RespondActivityTaskCompletedByIdRequest,
778        RespondActivityTaskCompletedByIdResponse,
779        |r| {
780            let labels = namespaced_request!(r);
781            r.extensions_mut().insert(labels);
782        }
783    );
784
785    (
786        respond_activity_task_failed,
787        RespondActivityTaskFailedRequest,
788        RespondActivityTaskFailedResponse,
789        |r| {
790            let labels = namespaced_request!(r);
791            r.extensions_mut().insert(labels);
792        }
793    );
794    (
795        respond_activity_task_failed_by_id,
796        RespondActivityTaskFailedByIdRequest,
797        RespondActivityTaskFailedByIdResponse,
798        |r| {
799            let labels = namespaced_request!(r);
800            r.extensions_mut().insert(labels);
801        }
802    );
803    (
804        respond_activity_task_canceled,
805        RespondActivityTaskCanceledRequest,
806        RespondActivityTaskCanceledResponse,
807        |r| {
808            let labels = namespaced_request!(r);
809            r.extensions_mut().insert(labels);
810        }
811    );
812    (
813        respond_activity_task_canceled_by_id,
814        RespondActivityTaskCanceledByIdRequest,
815        RespondActivityTaskCanceledByIdResponse,
816        |r| {
817            let labels = namespaced_request!(r);
818            r.extensions_mut().insert(labels);
819        }
820    );
821    (
822        request_cancel_workflow_execution,
823        RequestCancelWorkflowExecutionRequest,
824        RequestCancelWorkflowExecutionResponse,
825        |r| {
826            let labels = namespaced_request!(r);
827            r.extensions_mut().insert(labels);
828        }
829    );
830    (
831        signal_workflow_execution,
832        SignalWorkflowExecutionRequest,
833        SignalWorkflowExecutionResponse,
834        |r| {
835            let labels = namespaced_request!(r);
836            r.extensions_mut().insert(labels);
837        }
838    );
839    (
840        signal_with_start_workflow_execution,
841        SignalWithStartWorkflowExecutionRequest,
842        SignalWithStartWorkflowExecutionResponse,
843        |r| {
844            let mut labels = namespaced_request!(r);
845            labels.task_q(r.get_ref().task_queue.clone());
846            r.extensions_mut().insert(labels);
847        }
848    );
849    (
850        reset_workflow_execution,
851        ResetWorkflowExecutionRequest,
852        ResetWorkflowExecutionResponse,
853        |r| {
854            let labels = namespaced_request!(r);
855            r.extensions_mut().insert(labels);
856        }
857    );
858    (
859        terminate_workflow_execution,
860        TerminateWorkflowExecutionRequest,
861        TerminateWorkflowExecutionResponse,
862        |r| {
863            let labels = namespaced_request!(r);
864            r.extensions_mut().insert(labels);
865        }
866    );
867    (
868        delete_workflow_execution,
869        DeleteWorkflowExecutionRequest,
870        DeleteWorkflowExecutionResponse,
871        |r| {
872            let labels = namespaced_request!(r);
873            r.extensions_mut().insert(labels);
874        }
875    );
876    (
877        list_open_workflow_executions,
878        ListOpenWorkflowExecutionsRequest,
879        ListOpenWorkflowExecutionsResponse,
880        |r| {
881            let labels = namespaced_request!(r);
882            r.extensions_mut().insert(labels);
883        }
884    );
885    (
886        list_closed_workflow_executions,
887        ListClosedWorkflowExecutionsRequest,
888        ListClosedWorkflowExecutionsResponse,
889        |r| {
890            let labels = namespaced_request!(r);
891            r.extensions_mut().insert(labels);
892        }
893    );
894    (
895        list_workflow_executions,
896        ListWorkflowExecutionsRequest,
897        ListWorkflowExecutionsResponse,
898        |r| {
899            let labels = namespaced_request!(r);
900            r.extensions_mut().insert(labels);
901        }
902    );
903    (
904        list_archived_workflow_executions,
905        ListArchivedWorkflowExecutionsRequest,
906        ListArchivedWorkflowExecutionsResponse,
907        |r| {
908            let labels = namespaced_request!(r);
909            r.extensions_mut().insert(labels);
910        }
911    );
912    (
913        scan_workflow_executions,
914        ScanWorkflowExecutionsRequest,
915        ScanWorkflowExecutionsResponse,
916        |r| {
917            let labels = namespaced_request!(r);
918            r.extensions_mut().insert(labels);
919        }
920    );
921    (
922        count_workflow_executions,
923        CountWorkflowExecutionsRequest,
924        CountWorkflowExecutionsResponse,
925        |r| {
926            let labels = namespaced_request!(r);
927            r.extensions_mut().insert(labels);
928        }
929    );
930    (
931        create_workflow_rule,
932        CreateWorkflowRuleRequest,
933        CreateWorkflowRuleResponse,
934        |r| {
935            let labels = namespaced_request!(r);
936            r.extensions_mut().insert(labels);
937        }
938    );
939    (
940        describe_workflow_rule,
941        DescribeWorkflowRuleRequest,
942        DescribeWorkflowRuleResponse,
943        |r| {
944            let labels = namespaced_request!(r);
945            r.extensions_mut().insert(labels);
946        }
947    );
948    (
949        delete_workflow_rule,
950        DeleteWorkflowRuleRequest,
951        DeleteWorkflowRuleResponse,
952        |r| {
953            let labels = namespaced_request!(r);
954            r.extensions_mut().insert(labels);
955        }
956    );
957    (
958        list_workflow_rules,
959        ListWorkflowRulesRequest,
960        ListWorkflowRulesResponse,
961        |r| {
962            let labels = namespaced_request!(r);
963            r.extensions_mut().insert(labels);
964        }
965    );
966    (
967        trigger_workflow_rule,
968        TriggerWorkflowRuleRequest,
969        TriggerWorkflowRuleResponse,
970        |r| {
971            let labels = namespaced_request!(r);
972            r.extensions_mut().insert(labels);
973        }
974    );
975    (
976        get_search_attributes,
977        GetSearchAttributesRequest,
978        GetSearchAttributesResponse
979    );
980    (
981        respond_query_task_completed,
982        RespondQueryTaskCompletedRequest,
983        RespondQueryTaskCompletedResponse,
984        |r| {
985            let labels = namespaced_request!(r);
986            r.extensions_mut().insert(labels);
987        }
988    );
989    (
990        reset_sticky_task_queue,
991        ResetStickyTaskQueueRequest,
992        ResetStickyTaskQueueResponse,
993        |r| {
994            let labels = namespaced_request!(r);
995            r.extensions_mut().insert(labels);
996        }
997    );
998    (
999        query_workflow,
1000        QueryWorkflowRequest,
1001        QueryWorkflowResponse,
1002        |r| {
1003            let labels = namespaced_request!(r);
1004            r.extensions_mut().insert(labels);
1005        }
1006    );
1007    (
1008        describe_workflow_execution,
1009        DescribeWorkflowExecutionRequest,
1010        DescribeWorkflowExecutionResponse,
1011        |r| {
1012            let labels = namespaced_request!(r);
1013            r.extensions_mut().insert(labels);
1014        }
1015    );
1016    (
1017        describe_task_queue,
1018        DescribeTaskQueueRequest,
1019        DescribeTaskQueueResponse,
1020        |r| {
1021            let mut labels = namespaced_request!(r);
1022            labels.task_q(r.get_ref().task_queue.clone());
1023            r.extensions_mut().insert(labels);
1024        }
1025    );
1026    (
1027        get_cluster_info,
1028        GetClusterInfoRequest,
1029        GetClusterInfoResponse
1030    );
1031    (
1032        get_system_info,
1033        GetSystemInfoRequest,
1034        GetSystemInfoResponse
1035    );
1036    (
1037        list_task_queue_partitions,
1038        ListTaskQueuePartitionsRequest,
1039        ListTaskQueuePartitionsResponse,
1040        |r| {
1041            let mut labels = namespaced_request!(r);
1042            labels.task_q(r.get_ref().task_queue.clone());
1043            r.extensions_mut().insert(labels);
1044        }
1045    );
1046    (
1047        create_schedule,
1048        CreateScheduleRequest,
1049        CreateScheduleResponse,
1050        |r| {
1051            let labels = namespaced_request!(r);
1052            r.extensions_mut().insert(labels);
1053        }
1054    );
1055    (
1056        describe_schedule,
1057        DescribeScheduleRequest,
1058        DescribeScheduleResponse,
1059        |r| {
1060            let labels = namespaced_request!(r);
1061            r.extensions_mut().insert(labels);
1062        }
1063    );
1064    (
1065        update_schedule,
1066        UpdateScheduleRequest,
1067        UpdateScheduleResponse,
1068        |r| {
1069            let labels = namespaced_request!(r);
1070            r.extensions_mut().insert(labels);
1071        }
1072    );
1073    (
1074        patch_schedule,
1075        PatchScheduleRequest,
1076        PatchScheduleResponse,
1077        |r| {
1078            let labels = namespaced_request!(r);
1079            r.extensions_mut().insert(labels);
1080        }
1081    );
1082    (
1083        list_schedule_matching_times,
1084        ListScheduleMatchingTimesRequest,
1085        ListScheduleMatchingTimesResponse,
1086        |r| {
1087            let labels = namespaced_request!(r);
1088            r.extensions_mut().insert(labels);
1089        }
1090    );
1091    (
1092        delete_schedule,
1093        DeleteScheduleRequest,
1094        DeleteScheduleResponse,
1095        |r| {
1096            let labels = namespaced_request!(r);
1097            r.extensions_mut().insert(labels);
1098        }
1099    );
1100    (
1101        list_schedules,
1102        ListSchedulesRequest,
1103        ListSchedulesResponse,
1104        |r| {
1105            let labels = namespaced_request!(r);
1106            r.extensions_mut().insert(labels);
1107        }
1108    );
1109    (
1110        count_schedules,
1111        CountSchedulesRequest,
1112        CountSchedulesResponse,
1113        |r| {
1114            let labels = namespaced_request!(r);
1115            r.extensions_mut().insert(labels);
1116        }
1117    );
1118    (
1119        update_worker_build_id_compatibility,
1120        UpdateWorkerBuildIdCompatibilityRequest,
1121        UpdateWorkerBuildIdCompatibilityResponse,
1122        |r| {
1123            let mut labels = namespaced_request!(r);
1124            labels.task_q_str(r.get_ref().task_queue.clone());
1125            r.extensions_mut().insert(labels);
1126        }
1127    );
1128    (
1129        get_worker_build_id_compatibility,
1130        GetWorkerBuildIdCompatibilityRequest,
1131        GetWorkerBuildIdCompatibilityResponse,
1132        |r| {
1133            let mut labels = namespaced_request!(r);
1134            labels.task_q_str(r.get_ref().task_queue.clone());
1135            r.extensions_mut().insert(labels);
1136        }
1137    );
1138    (
1139        get_worker_task_reachability,
1140        GetWorkerTaskReachabilityRequest,
1141        GetWorkerTaskReachabilityResponse,
1142        |r| {
1143            let labels = namespaced_request!(r);
1144            r.extensions_mut().insert(labels);
1145        }
1146    );
1147    (
1148        update_workflow_execution,
1149        UpdateWorkflowExecutionRequest,
1150        UpdateWorkflowExecutionResponse,
1151        |r| {
1152            let labels = namespaced_request!(r);
1153            let exts = r.extensions_mut();
1154            exts.insert(labels);
1155            exts.insert(IsUserLongPoll);
1156        }
1157    );
1158    (
1159        poll_workflow_execution_update,
1160        PollWorkflowExecutionUpdateRequest,
1161        PollWorkflowExecutionUpdateResponse,
1162        |r| {
1163            let labels = namespaced_request!(r);
1164            r.extensions_mut().insert(labels);
1165        }
1166    );
1167    (
1168        start_batch_operation,
1169        StartBatchOperationRequest,
1170        StartBatchOperationResponse,
1171        |r| {
1172            let labels = namespaced_request!(r);
1173            r.extensions_mut().insert(labels);
1174        }
1175    );
1176    (
1177        stop_batch_operation,
1178        StopBatchOperationRequest,
1179        StopBatchOperationResponse,
1180        |r| {
1181            let labels = namespaced_request!(r);
1182            r.extensions_mut().insert(labels);
1183        }
1184    );
1185    (
1186        describe_batch_operation,
1187        DescribeBatchOperationRequest,
1188        DescribeBatchOperationResponse,
1189        |r| {
1190            let labels = namespaced_request!(r);
1191            r.extensions_mut().insert(labels);
1192        }
1193    );
1194    (
1195        describe_deployment,
1196        DescribeDeploymentRequest,
1197        DescribeDeploymentResponse,
1198        |r| {
1199            let labels = namespaced_request!(r);
1200            r.extensions_mut().insert(labels);
1201        }
1202    );
1203    (
1204        list_batch_operations,
1205        ListBatchOperationsRequest,
1206        ListBatchOperationsResponse,
1207        |r| {
1208            let labels = namespaced_request!(r);
1209            r.extensions_mut().insert(labels);
1210        }
1211    );
1212    (
1213        list_deployments,
1214        ListDeploymentsRequest,
1215        ListDeploymentsResponse,
1216        |r| {
1217            let labels = namespaced_request!(r);
1218            r.extensions_mut().insert(labels);
1219        }
1220    );
1221    (
1222        execute_multi_operation,
1223        ExecuteMultiOperationRequest,
1224        ExecuteMultiOperationResponse,
1225        |r| {
1226            let labels = namespaced_request!(r);
1227            r.extensions_mut().insert(labels);
1228        }
1229    );
1230    (
1231        get_current_deployment,
1232        GetCurrentDeploymentRequest,
1233        GetCurrentDeploymentResponse,
1234        |r| {
1235            let labels = namespaced_request!(r);
1236            r.extensions_mut().insert(labels);
1237        }
1238    );
1239    (
1240        get_deployment_reachability,
1241        GetDeploymentReachabilityRequest,
1242        GetDeploymentReachabilityResponse,
1243        |r| {
1244            let labels = namespaced_request!(r);
1245            r.extensions_mut().insert(labels);
1246        }
1247    );
1248    (
1249        get_worker_versioning_rules,
1250        GetWorkerVersioningRulesRequest,
1251        GetWorkerVersioningRulesResponse,
1252        |r| {
1253            let mut labels = namespaced_request!(r);
1254            labels.task_q_str(&r.get_ref().task_queue);
1255            r.extensions_mut().insert(labels);
1256        }
1257    );
1258    (
1259        update_worker_versioning_rules,
1260        UpdateWorkerVersioningRulesRequest,
1261        UpdateWorkerVersioningRulesResponse,
1262        |r| {
1263            let mut labels = namespaced_request!(r);
1264            labels.task_q_str(&r.get_ref().task_queue);
1265            r.extensions_mut().insert(labels);
1266        }
1267    );
1268    (
1269        poll_nexus_task_queue,
1270        PollNexusTaskQueueRequest,
1271        PollNexusTaskQueueResponse,
1272        |r| {
1273            let mut labels = namespaced_request!(r);
1274            labels.task_q(r.get_ref().task_queue.clone());
1275            r.extensions_mut().insert(labels);
1276        }
1277    );
1278    (
1279        respond_nexus_task_completed,
1280        RespondNexusTaskCompletedRequest,
1281        RespondNexusTaskCompletedResponse,
1282        |r| {
1283            let labels = namespaced_request!(r);
1284            r.extensions_mut().insert(labels);
1285        }
1286    );
1287    (
1288        respond_nexus_task_failed,
1289        RespondNexusTaskFailedRequest,
1290        RespondNexusTaskFailedResponse,
1291        |r| {
1292            let labels = namespaced_request!(r);
1293            r.extensions_mut().insert(labels);
1294        }
1295    );
1296    (
1297        set_current_deployment,
1298        SetCurrentDeploymentRequest,
1299        SetCurrentDeploymentResponse,
1300        |r| {
1301            let labels = namespaced_request!(r);
1302            r.extensions_mut().insert(labels);
1303        }
1304    );
1305    (
1306        shutdown_worker,
1307        ShutdownWorkerRequest,
1308        ShutdownWorkerResponse,
1309        |r| {
1310            let labels = namespaced_request!(r);
1311            r.extensions_mut().insert(labels);
1312        }
1313    );
1314    (
1315        update_activity_options,
1316        UpdateActivityOptionsRequest,
1317        UpdateActivityOptionsResponse,
1318        |r| {
1319            let labels = namespaced_request!(r);
1320            r.extensions_mut().insert(labels);
1321        }
1322    );
1323    (
1324        pause_activity,
1325        PauseActivityRequest,
1326        PauseActivityResponse,
1327        |r| {
1328            let labels = namespaced_request!(r);
1329            r.extensions_mut().insert(labels);
1330        }
1331    );
1332    (
1333        unpause_activity,
1334        UnpauseActivityRequest,
1335        UnpauseActivityResponse,
1336        |r| {
1337            let labels = namespaced_request!(r);
1338            r.extensions_mut().insert(labels);
1339        }
1340    );
1341    (
1342        update_workflow_execution_options,
1343        UpdateWorkflowExecutionOptionsRequest,
1344        UpdateWorkflowExecutionOptionsResponse,
1345        |r| {
1346            let labels = namespaced_request!(r);
1347            r.extensions_mut().insert(labels);
1348        }
1349    );
1350    (
1351        reset_activity,
1352        ResetActivityRequest,
1353        ResetActivityResponse,
1354        |r| {
1355            let labels = namespaced_request!(r);
1356            r.extensions_mut().insert(labels);
1357        }
1358    );
1359    (
1360        delete_worker_deployment,
1361        DeleteWorkerDeploymentRequest,
1362        DeleteWorkerDeploymentResponse,
1363        |r| {
1364            let labels = namespaced_request!(r);
1365            r.extensions_mut().insert(labels);
1366        }
1367    );
1368    (
1369        delete_worker_deployment_version,
1370        DeleteWorkerDeploymentVersionRequest,
1371        DeleteWorkerDeploymentVersionResponse,
1372        |r| {
1373            let labels = namespaced_request!(r);
1374            r.extensions_mut().insert(labels);
1375        }
1376    );
1377    (
1378        describe_worker_deployment,
1379        DescribeWorkerDeploymentRequest,
1380        DescribeWorkerDeploymentResponse,
1381        |r| {
1382            let labels = namespaced_request!(r);
1383            r.extensions_mut().insert(labels);
1384        }
1385    );
1386    (
1387        describe_worker_deployment_version,
1388        DescribeWorkerDeploymentVersionRequest,
1389        DescribeWorkerDeploymentVersionResponse,
1390        |r| {
1391            let labels = namespaced_request!(r);
1392            r.extensions_mut().insert(labels);
1393        }
1394    );
1395    (
1396        list_worker_deployments,
1397        ListWorkerDeploymentsRequest,
1398        ListWorkerDeploymentsResponse,
1399        |r| {
1400            let labels = namespaced_request!(r);
1401            r.extensions_mut().insert(labels);
1402        }
1403    );
1404    (
1405        set_worker_deployment_current_version,
1406        SetWorkerDeploymentCurrentVersionRequest,
1407        SetWorkerDeploymentCurrentVersionResponse,
1408        |r| {
1409            let labels = namespaced_request!(r);
1410            r.extensions_mut().insert(labels);
1411        }
1412    );
1413    (
1414        set_worker_deployment_ramping_version,
1415        SetWorkerDeploymentRampingVersionRequest,
1416        SetWorkerDeploymentRampingVersionResponse,
1417        |r| {
1418            let labels = namespaced_request!(r);
1419            r.extensions_mut().insert(labels);
1420        }
1421    );
1422    (
1423        update_worker_deployment_version_metadata,
1424        UpdateWorkerDeploymentVersionMetadataRequest,
1425        UpdateWorkerDeploymentVersionMetadataResponse,
1426        |r| {
1427            let labels = namespaced_request!(r);
1428            r.extensions_mut().insert(labels);
1429        }
1430    );
1431    (
1432        list_workers,
1433        ListWorkersRequest,
1434        ListWorkersResponse,
1435        |r| {
1436            let labels = namespaced_request!(r);
1437            r.extensions_mut().insert(labels);
1438        }
1439    );
1440    (
1441        record_worker_heartbeat,
1442        RecordWorkerHeartbeatRequest,
1443        RecordWorkerHeartbeatResponse,
1444        |r| {
1445            let labels = namespaced_request!(r);
1446            r.extensions_mut().insert(labels);
1447        }
1448    );
1449    (
1450        update_task_queue_config,
1451        UpdateTaskQueueConfigRequest,
1452        UpdateTaskQueueConfigResponse,
1453        |r| {
1454            let mut labels = namespaced_request!(r);
1455            labels.task_q_str(r.get_ref().task_queue.clone());
1456            r.extensions_mut().insert(labels);
1457        }
1458    );
1459    (
1460        fetch_worker_config,
1461        FetchWorkerConfigRequest,
1462        FetchWorkerConfigResponse,
1463        |r| {
1464            let labels = namespaced_request!(r);
1465            r.extensions_mut().insert(labels);
1466        }
1467    );
1468    (
1469        update_worker_config,
1470        UpdateWorkerConfigRequest,
1471        UpdateWorkerConfigResponse,
1472        |r| {
1473            let labels = namespaced_request!(r);
1474            r.extensions_mut().insert(labels);
1475        }
1476    );
1477    (
1478        describe_worker,
1479        DescribeWorkerRequest,
1480        DescribeWorkerResponse,
1481        |r| {
1482            let labels = namespaced_request!(r);
1483            r.extensions_mut().insert(labels);
1484        }
1485    );
1486    (
1487        set_worker_deployment_manager,
1488        SetWorkerDeploymentManagerRequest,
1489        SetWorkerDeploymentManagerResponse,
1490        |r| {
1491            let labels = namespaced_request!(r);
1492            r.extensions_mut().insert(labels);
1493        }
1494    );
1495    (
1496        pause_workflow_execution,
1497        PauseWorkflowExecutionRequest,
1498        PauseWorkflowExecutionResponse,
1499        |r| {
1500            let labels = namespaced_request!(r);
1501            r.extensions_mut().insert(labels);
1502        }
1503    );
1504    (
1505        unpause_workflow_execution,
1506        UnpauseWorkflowExecutionRequest,
1507        UnpauseWorkflowExecutionResponse,
1508        |r| {
1509            let labels = namespaced_request!(r);
1510            r.extensions_mut().insert(labels);
1511        }
1512    );
1513    (
1514        start_activity_execution,
1515        StartActivityExecutionRequest,
1516        StartActivityExecutionResponse,
1517        |r| {
1518            let labels = namespaced_request!(r);
1519            r.extensions_mut().insert(labels);
1520        }
1521    );
1522    (
1523        describe_activity_execution,
1524        DescribeActivityExecutionRequest,
1525        DescribeActivityExecutionResponse,
1526        |r| {
1527            let labels = namespaced_request!(r);
1528            r.extensions_mut().insert(labels);
1529        }
1530    );
1531    (
1532        poll_activity_execution,
1533        PollActivityExecutionRequest,
1534        PollActivityExecutionResponse,
1535        |r| {
1536            let labels = namespaced_request!(r);
1537            r.extensions_mut().insert(labels);
1538        }
1539    );
1540    (
1541        list_activity_executions,
1542        ListActivityExecutionsRequest,
1543        ListActivityExecutionsResponse,
1544        |r| {
1545            let labels = namespaced_request!(r);
1546            r.extensions_mut().insert(labels);
1547        }
1548    );
1549    (
1550        count_activity_executions,
1551        CountActivityExecutionsRequest,
1552        CountActivityExecutionsResponse,
1553        |r| {
1554            let labels = namespaced_request!(r);
1555            r.extensions_mut().insert(labels);
1556        }
1557    );
1558    (
1559        request_cancel_activity_execution,
1560        RequestCancelActivityExecutionRequest,
1561        RequestCancelActivityExecutionResponse,
1562        |r| {
1563            let labels = namespaced_request!(r);
1564            r.extensions_mut().insert(labels);
1565        }
1566    );
1567    (
1568        terminate_activity_execution,
1569        TerminateActivityExecutionRequest,
1570        TerminateActivityExecutionResponse,
1571        |r| {
1572            let labels = namespaced_request!(r);
1573            r.extensions_mut().insert(labels);
1574        }
1575    );
1576    (
1577        delete_activity_execution,
1578        DeleteActivityExecutionRequest,
1579        DeleteActivityExecutionResponse,
1580        |r| {
1581            let labels = namespaced_request!(r);
1582            r.extensions_mut().insert(labels);
1583        }
1584    );
1585}
1586
1587proxier! {
1588    OperatorService; ALL_IMPLEMENTED_OPERATOR_SERVICE_RPCS; OperatorServiceClient; operator_client; defaults;
1589    (add_search_attributes, AddSearchAttributesRequest, AddSearchAttributesResponse);
1590    (remove_search_attributes, RemoveSearchAttributesRequest, RemoveSearchAttributesResponse);
1591    (list_search_attributes, ListSearchAttributesRequest, ListSearchAttributesResponse);
1592    (delete_namespace, DeleteNamespaceRequest, DeleteNamespaceResponse,
1593        |r| {
1594            let labels = namespaced_request!(r);
1595            r.extensions_mut().insert(labels);
1596        }
1597    );
1598    (add_or_update_remote_cluster, AddOrUpdateRemoteClusterRequest, AddOrUpdateRemoteClusterResponse);
1599    (remove_remote_cluster, RemoveRemoteClusterRequest, RemoveRemoteClusterResponse);
1600    (list_clusters, ListClustersRequest, ListClustersResponse);
1601    (get_nexus_endpoint, GetNexusEndpointRequest, GetNexusEndpointResponse);
1602    (create_nexus_endpoint, CreateNexusEndpointRequest, CreateNexusEndpointResponse);
1603    (update_nexus_endpoint, UpdateNexusEndpointRequest, UpdateNexusEndpointResponse);
1604    (delete_nexus_endpoint, DeleteNexusEndpointRequest, DeleteNexusEndpointResponse);
1605    (list_nexus_endpoints, ListNexusEndpointsRequest, ListNexusEndpointsResponse);
1606}
1607
1608proxier! {
1609    CloudService; ALL_IMPLEMENTED_CLOUD_SERVICE_RPCS; CloudServiceClient; cloud_client; defaults;
1610    (get_users, cloudreq::GetUsersRequest, cloudreq::GetUsersResponse);
1611    (get_user, cloudreq::GetUserRequest, cloudreq::GetUserResponse);
1612    (create_user, cloudreq::CreateUserRequest, cloudreq::CreateUserResponse);
1613    (update_user, cloudreq::UpdateUserRequest, cloudreq::UpdateUserResponse);
1614    (delete_user, cloudreq::DeleteUserRequest, cloudreq::DeleteUserResponse);
1615    (set_user_namespace_access, cloudreq::SetUserNamespaceAccessRequest, cloudreq::SetUserNamespaceAccessResponse);
1616    (get_async_operation, cloudreq::GetAsyncOperationRequest, cloudreq::GetAsyncOperationResponse);
1617    (create_namespace, cloudreq::CreateNamespaceRequest, cloudreq::CreateNamespaceResponse);
1618    (get_namespaces, cloudreq::GetNamespacesRequest, cloudreq::GetNamespacesResponse);
1619    (get_namespace, cloudreq::GetNamespaceRequest, cloudreq::GetNamespaceResponse,
1620        |r| {
1621            let labels = namespaced_request!(r);
1622            r.extensions_mut().insert(labels);
1623        }
1624    );
1625    (update_namespace, cloudreq::UpdateNamespaceRequest, cloudreq::UpdateNamespaceResponse,
1626        |r| {
1627            let labels = namespaced_request!(r);
1628            r.extensions_mut().insert(labels);
1629        }
1630    );
1631    (rename_custom_search_attribute, cloudreq::RenameCustomSearchAttributeRequest, cloudreq::RenameCustomSearchAttributeResponse);
1632    (delete_namespace, cloudreq::DeleteNamespaceRequest, cloudreq::DeleteNamespaceResponse,
1633        |r| {
1634            let labels = namespaced_request!(r);
1635            r.extensions_mut().insert(labels);
1636        }
1637    );
1638    (failover_namespace_region, cloudreq::FailoverNamespaceRegionRequest, cloudreq::FailoverNamespaceRegionResponse);
1639    (add_namespace_region, cloudreq::AddNamespaceRegionRequest, cloudreq::AddNamespaceRegionResponse);
1640    (delete_namespace_region, cloudreq::DeleteNamespaceRegionRequest, cloudreq::DeleteNamespaceRegionResponse);
1641    (get_regions, cloudreq::GetRegionsRequest, cloudreq::GetRegionsResponse);
1642    (get_region, cloudreq::GetRegionRequest, cloudreq::GetRegionResponse);
1643    (get_api_keys, cloudreq::GetApiKeysRequest, cloudreq::GetApiKeysResponse);
1644    (get_api_key, cloudreq::GetApiKeyRequest, cloudreq::GetApiKeyResponse);
1645    (create_api_key, cloudreq::CreateApiKeyRequest, cloudreq::CreateApiKeyResponse);
1646    (update_api_key, cloudreq::UpdateApiKeyRequest, cloudreq::UpdateApiKeyResponse);
1647    (delete_api_key, cloudreq::DeleteApiKeyRequest, cloudreq::DeleteApiKeyResponse);
1648    (get_nexus_endpoints, cloudreq::GetNexusEndpointsRequest, cloudreq::GetNexusEndpointsResponse);
1649    (get_nexus_endpoint, cloudreq::GetNexusEndpointRequest, cloudreq::GetNexusEndpointResponse);
1650    (create_nexus_endpoint, cloudreq::CreateNexusEndpointRequest, cloudreq::CreateNexusEndpointResponse);
1651    (update_nexus_endpoint, cloudreq::UpdateNexusEndpointRequest, cloudreq::UpdateNexusEndpointResponse);
1652    (delete_nexus_endpoint, cloudreq::DeleteNexusEndpointRequest, cloudreq::DeleteNexusEndpointResponse);
1653    (get_user_groups, cloudreq::GetUserGroupsRequest, cloudreq::GetUserGroupsResponse);
1654    (get_user_group, cloudreq::GetUserGroupRequest, cloudreq::GetUserGroupResponse);
1655    (create_user_group, cloudreq::CreateUserGroupRequest, cloudreq::CreateUserGroupResponse);
1656    (update_user_group, cloudreq::UpdateUserGroupRequest, cloudreq::UpdateUserGroupResponse);
1657    (delete_user_group, cloudreq::DeleteUserGroupRequest, cloudreq::DeleteUserGroupResponse);
1658    (add_user_group_member, cloudreq::AddUserGroupMemberRequest, cloudreq::AddUserGroupMemberResponse);
1659    (remove_user_group_member, cloudreq::RemoveUserGroupMemberRequest, cloudreq::RemoveUserGroupMemberResponse);
1660    (get_user_group_members, cloudreq::GetUserGroupMembersRequest, cloudreq::GetUserGroupMembersResponse);
1661    (set_user_group_namespace_access, cloudreq::SetUserGroupNamespaceAccessRequest, cloudreq::SetUserGroupNamespaceAccessResponse);
1662    (create_service_account, cloudreq::CreateServiceAccountRequest, cloudreq::CreateServiceAccountResponse);
1663    (get_service_account, cloudreq::GetServiceAccountRequest, cloudreq::GetServiceAccountResponse);
1664    (get_service_accounts, cloudreq::GetServiceAccountsRequest, cloudreq::GetServiceAccountsResponse);
1665    (update_service_account, cloudreq::UpdateServiceAccountRequest, cloudreq::UpdateServiceAccountResponse);
1666    (delete_service_account, cloudreq::DeleteServiceAccountRequest, cloudreq::DeleteServiceAccountResponse);
1667    (get_usage, cloudreq::GetUsageRequest, cloudreq::GetUsageResponse);
1668    (get_account, cloudreq::GetAccountRequest, cloudreq::GetAccountResponse);
1669    (update_account, cloudreq::UpdateAccountRequest, cloudreq::UpdateAccountResponse);
1670    (create_namespace_export_sink, cloudreq::CreateNamespaceExportSinkRequest, cloudreq::CreateNamespaceExportSinkResponse);
1671    (get_namespace_export_sink, cloudreq::GetNamespaceExportSinkRequest, cloudreq::GetNamespaceExportSinkResponse);
1672    (get_namespace_export_sinks, cloudreq::GetNamespaceExportSinksRequest, cloudreq::GetNamespaceExportSinksResponse);
1673    (update_namespace_export_sink, cloudreq::UpdateNamespaceExportSinkRequest, cloudreq::UpdateNamespaceExportSinkResponse);
1674    (delete_namespace_export_sink, cloudreq::DeleteNamespaceExportSinkRequest, cloudreq::DeleteNamespaceExportSinkResponse);
1675    (validate_namespace_export_sink, cloudreq::ValidateNamespaceExportSinkRequest, cloudreq::ValidateNamespaceExportSinkResponse);
1676    (update_namespace_tags, cloudreq::UpdateNamespaceTagsRequest, cloudreq::UpdateNamespaceTagsResponse);
1677    (create_connectivity_rule, cloudreq::CreateConnectivityRuleRequest, cloudreq::CreateConnectivityRuleResponse);
1678    (get_connectivity_rule, cloudreq::GetConnectivityRuleRequest, cloudreq::GetConnectivityRuleResponse);
1679    (get_connectivity_rules, cloudreq::GetConnectivityRulesRequest, cloudreq::GetConnectivityRulesResponse);
1680    (delete_connectivity_rule, cloudreq::DeleteConnectivityRuleRequest, cloudreq::DeleteConnectivityRuleResponse);
1681    (set_service_account_namespace_access, cloudreq::SetServiceAccountNamespaceAccessRequest, cloudreq::SetServiceAccountNamespaceAccessResponse);
1682    (validate_account_audit_log_sink, cloudreq::ValidateAccountAuditLogSinkRequest, cloudreq::ValidateAccountAuditLogSinkResponse);
1683}
1684
1685proxier! {
1686    TestService; ALL_IMPLEMENTED_TEST_SERVICE_RPCS; TestServiceClient; test_client; defaults;
1687    (lock_time_skipping, LockTimeSkippingRequest, LockTimeSkippingResponse);
1688    (unlock_time_skipping, UnlockTimeSkippingRequest, UnlockTimeSkippingResponse);
1689    (sleep, SleepRequest, SleepResponse);
1690    (sleep_until, SleepUntilRequest, SleepResponse);
1691    (unlock_time_skipping_with_sleep, SleepRequest, SleepResponse);
1692    (get_current_time, (), GetCurrentTimeResponse);
1693}
1694
1695proxier! {
1696    HealthService; ALL_IMPLEMENTED_HEALTH_SERVICE_RPCS; HealthClient; health_client;
1697    (check, HealthCheckRequest, HealthCheckResponse);
1698    (watch, HealthCheckRequest, tonic::codec::Streaming<HealthCheckResponse>);
1699}
1700
1701#[cfg(test)]
1702mod tests {
1703    use super::*;
1704    use crate::{ClientOptions, ConnectionOptions};
1705    use std::collections::HashSet;
1706    use temporalio_common::{
1707        protos::temporal::api::{
1708            operatorservice::v1::DeleteNamespaceRequest, workflowservice::v1::ListNamespacesRequest,
1709        },
1710        worker::WorkerTaskTypes,
1711    };
1712    use tonic::IntoRequest;
1713    use url::Url;
1714    use uuid::Uuid;
1715
1716    // Just to help make sure some stuff compiles. Not run.
1717    #[allow(dead_code)]
1718    async fn raw_client_retry_compiles() {
1719        let opts = ConnectionOptions::new(Url::parse("http://localhost:7233").unwrap())
1720            .client_name("test")
1721            .client_version("0.0.0")
1722            .build();
1723        let connection = Connection::connect(opts).await.unwrap();
1724        let mut client = Client::new(connection, ClientOptions::new("default").build()).unwrap();
1725
1726        let list_ns_req = ListNamespacesRequest::default();
1727        let wf_client = client.workflow_client();
1728        let fact = move |req| {
1729            let mut c = wf_client.clone();
1730            async move { c.list_namespaces(req).await }.boxed()
1731        };
1732        client
1733            .call("whatever", fact, Request::new(list_ns_req.clone()))
1734            .await
1735            .unwrap();
1736
1737        // Operator svc method
1738        let op_del_ns_req = DeleteNamespaceRequest::default();
1739        let op_client = client.operator_client();
1740        let fact = move |req| {
1741            let mut c = op_client.clone();
1742            async move { c.delete_namespace(req).await }.boxed()
1743        };
1744        client
1745            .call("whatever", fact, Request::new(op_del_ns_req.clone()))
1746            .await
1747            .unwrap();
1748
1749        // Cloud svc method
1750        let cloud_del_ns_req = cloudreq::DeleteNamespaceRequest::default();
1751        let cloud_client = client.cloud_client();
1752        let fact = move |req| {
1753            let mut c = cloud_client.clone();
1754            async move { c.delete_namespace(req).await }.boxed()
1755        };
1756        client
1757            .call("whatever", fact, Request::new(cloud_del_ns_req.clone()))
1758            .await
1759            .unwrap();
1760
1761        // Verify calling through traits works
1762        client
1763            .list_namespaces(list_ns_req.into_request())
1764            .await
1765            .unwrap();
1766        // Have to disambiguate operator and cloud service
1767        OperatorService::delete_namespace(&mut client, op_del_ns_req.into_request())
1768            .await
1769            .unwrap();
1770        CloudService::delete_namespace(&mut client, cloud_del_ns_req.into_request())
1771            .await
1772            .unwrap();
1773        client.get_current_time(().into_request()).await.unwrap();
1774        client
1775            .check(HealthCheckRequest::default().into_request())
1776            .await
1777            .unwrap();
1778    }
1779
1780    fn verify_methods(proto_def_str: &str, impl_list: &[&str]) {
1781        let methods: Vec<_> = proto_def_str
1782            .lines()
1783            .map(|l| l.trim())
1784            .filter(|l| l.starts_with("rpc"))
1785            .map(|l| {
1786                let stripped = l.strip_prefix("rpc ").unwrap();
1787                stripped[..stripped.find('(').unwrap()].trim()
1788            })
1789            .collect();
1790        let no_underscores: HashSet<_> = impl_list.iter().map(|x| x.replace('_', "")).collect();
1791        let mut not_implemented = vec![];
1792        for method in methods {
1793            if !no_underscores.contains(&method.to_lowercase()) {
1794                not_implemented.push(method);
1795            }
1796        }
1797        if !not_implemented.is_empty() {
1798            panic!(
1799                "The following RPC methods are not implemented by raw client: {not_implemented:?}"
1800            );
1801        }
1802    }
1803    #[test]
1804    fn verify_all_workflow_service_methods_implemented() {
1805        // This is less work than trying to hook into the codegen process
1806        let proto_def = include_str!(
1807            "../../common/protos/api_upstream/temporal/api/workflowservice/v1/service.proto"
1808        );
1809        verify_methods(proto_def, ALL_IMPLEMENTED_WORKFLOW_SERVICE_RPCS);
1810    }
1811
1812    #[test]
1813    fn verify_all_operator_service_methods_implemented() {
1814        let proto_def = include_str!(
1815            "../../common/protos/api_upstream/temporal/api/operatorservice/v1/service.proto"
1816        );
1817        verify_methods(proto_def, ALL_IMPLEMENTED_OPERATOR_SERVICE_RPCS);
1818    }
1819
1820    #[test]
1821    fn verify_all_cloud_service_methods_implemented() {
1822        let proto_def = include_str!(
1823            "../../common/protos/api_cloud_upstream/temporal/api/cloud/cloudservice/v1/service.proto"
1824        );
1825        verify_methods(proto_def, ALL_IMPLEMENTED_CLOUD_SERVICE_RPCS);
1826    }
1827
1828    #[test]
1829    fn verify_all_test_service_methods_implemented() {
1830        let proto_def = include_str!(
1831            "../../common/protos/testsrv_upstream/temporal/api/testservice/v1/service.proto"
1832        );
1833        verify_methods(proto_def, ALL_IMPLEMENTED_TEST_SERVICE_RPCS);
1834    }
1835
1836    #[test]
1837    fn verify_all_health_service_methods_implemented() {
1838        let proto_def = include_str!("../../common/protos/grpc/health/v1/health.proto");
1839        verify_methods(proto_def, ALL_IMPLEMENTED_HEALTH_SERVICE_RPCS);
1840    }
1841
1842    #[tokio::test]
1843    async fn can_mock_services() {
1844        #[derive(Clone)]
1845        struct MyFakeServices {}
1846        impl RawGrpcCaller for MyFakeServices {}
1847        impl WorkflowService for MyFakeServices {
1848            fn list_namespaces(
1849                &mut self,
1850                _request: Request<ListNamespacesRequest>,
1851            ) -> BoxFuture<'_, Result<Response<ListNamespacesResponse>, Status>> {
1852                async {
1853                    Ok(Response::new(ListNamespacesResponse {
1854                        namespaces: vec![DescribeNamespaceResponse {
1855                            failover_version: 12345,
1856                            ..Default::default()
1857                        }],
1858                        ..Default::default()
1859                    }))
1860                }
1861                .boxed()
1862            }
1863        }
1864        impl OperatorService for MyFakeServices {}
1865        impl CloudService for MyFakeServices {}
1866        impl TestService for MyFakeServices {}
1867        // Health service isn't possible to create a default impl for.
1868        impl HealthService for MyFakeServices {
1869            fn check(
1870                &mut self,
1871                _request: tonic::Request<HealthCheckRequest>,
1872            ) -> BoxFuture<'_, Result<tonic::Response<HealthCheckResponse>, tonic::Status>>
1873            {
1874                todo!()
1875            }
1876            fn watch(
1877                &mut self,
1878                _request: tonic::Request<HealthCheckRequest>,
1879            ) -> BoxFuture<
1880                '_,
1881                Result<
1882                    tonic::Response<tonic::codec::Streaming<HealthCheckResponse>>,
1883                    tonic::Status,
1884                >,
1885            > {
1886                todo!()
1887            }
1888        }
1889        let mut mocked_client = TemporalServiceClient::from_services(
1890            Box::new(MyFakeServices {}),
1891            Box::new(MyFakeServices {}),
1892            Box::new(MyFakeServices {}),
1893            Box::new(MyFakeServices {}),
1894            Box::new(MyFakeServices {}),
1895        );
1896        let r = mocked_client
1897            .list_namespaces(ListNamespacesRequest::default().into_request())
1898            .await
1899            .unwrap();
1900        assert_eq!(r.into_inner().namespaces[0].failover_version, 12345);
1901    }
1902
1903    #[rstest::rstest]
1904    #[case::with_versioning(true)]
1905    #[case::without_versioning(false)]
1906    #[tokio::test]
1907    async fn eager_reservations_attach_deployment_options(#[case] use_worker_versioning: bool) {
1908        use crate::worker::{MockClientWorker, MockSlot};
1909        use temporalio_common::{
1910            protos::temporal::api::enums::v1::WorkerVersioningMode,
1911            worker::{WorkerDeploymentOptions, WorkerDeploymentVersion},
1912        };
1913
1914        let expected_mode = if use_worker_versioning {
1915            WorkerVersioningMode::Versioned
1916        } else {
1917            WorkerVersioningMode::Unversioned
1918        };
1919
1920        #[derive(Clone)]
1921        struct MyFakeServices {
1922            client_worker_set: Arc<ClientWorkerSet>,
1923            expected_mode: WorkerVersioningMode,
1924        }
1925        impl RawGrpcCaller for MyFakeServices {}
1926        impl RawClientProducer for MyFakeServices {
1927            fn get_workers_info(&self) -> Option<Arc<ClientWorkerSet>> {
1928                Some(self.client_worker_set.clone())
1929            }
1930            fn workflow_client(&mut self) -> Box<dyn WorkflowService> {
1931                Box::new(MyFakeWfClient {
1932                    expected_mode: self.expected_mode,
1933                })
1934            }
1935            fn operator_client(&mut self) -> Box<dyn OperatorService> {
1936                unimplemented!()
1937            }
1938            fn cloud_client(&mut self) -> Box<dyn CloudService> {
1939                unimplemented!()
1940            }
1941            fn test_client(&mut self) -> Box<dyn TestService> {
1942                unimplemented!()
1943            }
1944            fn health_client(&mut self) -> Box<dyn HealthService> {
1945                unimplemented!()
1946            }
1947        }
1948
1949        let deployment_opts = WorkerDeploymentOptions {
1950            version: WorkerDeploymentVersion {
1951                deployment_name: "test-deployment".to_string(),
1952                build_id: "test-build-123".to_string(),
1953            },
1954            use_worker_versioning,
1955            default_versioning_behavior: None,
1956        };
1957
1958        let mut mock_provider = MockClientWorker::new();
1959        mock_provider
1960            .expect_namespace()
1961            .return_const("test-namespace".to_string());
1962        mock_provider
1963            .expect_task_queue()
1964            .return_const("test-task-queue".to_string());
1965        let mut mock_slot = MockSlot::new();
1966        mock_slot.expect_schedule_wft().returning(|_| Ok(()));
1967        mock_provider
1968            .expect_try_reserve_wft_slot()
1969            .return_once(|| Some(Box::new(mock_slot)));
1970        mock_provider
1971            .expect_deployment_options()
1972            .return_const(Some(deployment_opts.clone()));
1973        mock_provider.expect_heartbeat_enabled().return_const(false);
1974        let uuid = Uuid::new_v4();
1975        mock_provider
1976            .expect_worker_instance_key()
1977            .return_const(uuid);
1978        mock_provider
1979            .expect_worker_task_types()
1980            .return_const(WorkerTaskTypes {
1981                enable_workflows: true,
1982                enable_local_activities: true,
1983                enable_remote_activities: true,
1984                enable_nexus: true,
1985            });
1986
1987        let client_worker_set = Arc::new(ClientWorkerSet::new());
1988        client_worker_set
1989            .register_worker(Arc::new(mock_provider), true)
1990            .unwrap();
1991
1992        #[derive(Clone)]
1993        struct MyFakeWfClient {
1994            expected_mode: WorkerVersioningMode,
1995        }
1996        impl WorkflowService for MyFakeWfClient {
1997            fn start_workflow_execution(
1998                &mut self,
1999                request: tonic::Request<StartWorkflowExecutionRequest>,
2000            ) -> BoxFuture<'_, Result<tonic::Response<StartWorkflowExecutionResponse>, tonic::Status>>
2001            {
2002                let req = request.into_inner();
2003                let expected_mode = self.expected_mode;
2004
2005                assert!(
2006                    req.eager_worker_deployment_options.is_some(),
2007                    "eager_worker_deployment_options should be populated"
2008                );
2009
2010                let opts = req.eager_worker_deployment_options.as_ref().unwrap();
2011                assert_eq!(opts.deployment_name, "test-deployment");
2012                assert_eq!(opts.build_id, "test-build-123");
2013                assert_eq!(opts.worker_versioning_mode, expected_mode as i32);
2014
2015                async { Ok(Response::new(StartWorkflowExecutionResponse::default())) }.boxed()
2016            }
2017        }
2018
2019        let mut mfs = MyFakeServices {
2020            client_worker_set,
2021            expected_mode,
2022        };
2023
2024        // Create a request with eager execution enabled
2025        let req = StartWorkflowExecutionRequest {
2026            namespace: "test-namespace".to_string(),
2027            workflow_id: "test-wf-id".to_string(),
2028            workflow_type: Some(
2029                temporalio_common::protos::temporal::api::common::v1::WorkflowType {
2030                    name: "test-workflow".to_string(),
2031                },
2032            ),
2033            task_queue: Some(TaskQueue {
2034                name: "test-task-queue".to_string(),
2035                kind: 0,
2036                normal_name: String::new(),
2037            }),
2038            request_eager_execution: true,
2039            ..Default::default()
2040        };
2041
2042        mfs.start_workflow_execution(req.into_request())
2043            .await
2044            .unwrap();
2045    }
2046}