Skip to main content

temporalio_client/
grpc.rs

1//! gRPC service traits for direct access to Temporal services.
2//!
3//! Most users should use the higher-level methods on [`Client`] or [`Connection`] instead.
4//! These traits are useful for advanced scenarios like custom interceptors, testing with mocks,
5//! or making raw gRPC calls not covered by the higher-level API.
6
7use crate::{
8    Client, Connection, LONG_POLL_TIMEOUT, RequestExt, SharedReplaceableClient,
9    TEMPORAL_NAMESPACE_HEADER_KEY, TemporalServiceClient,
10    metrics::namespace_kv,
11    retry::make_future_retry,
12    worker::{ClientWorkerSet, Slot},
13};
14use dyn_clone::DynClone;
15use futures_util::{FutureExt, TryFutureExt, future::BoxFuture};
16use std::{any::Any, marker::PhantomData, sync::Arc};
17use temporalio_common::{
18    protos::{
19        grpc::health::v1::{health_client::HealthClient, *},
20        temporal::api::{
21            cloud::cloudservice::{v1 as cloudreq, v1::cloud_service_client::CloudServiceClient},
22            operatorservice::v1::{operator_service_client::OperatorServiceClient, *},
23            taskqueue::v1::TaskQueue,
24            testservice::v1::{test_service_client::TestServiceClient, *},
25            workflowservice::v1::{workflow_service_client::WorkflowServiceClient, *},
26        },
27    },
28    telemetry::metrics::MetricKeyValue,
29};
30use tonic::{
31    Request, Response, Status,
32    body::Body,
33    client::GrpcService,
34    metadata::{AsciiMetadataValue, KeyAndValueRef},
35};
36
37/// Something that has access to the raw grpc services
38pub(crate) trait RawClientProducer {
39    /// Returns information about workers associated with this client. Implementers outside of
40    /// core can safely return `None`.
41    fn get_workers_info(&self) -> Option<Arc<ClientWorkerSet>>;
42
43    /// Return a workflow service client instance
44    fn workflow_client(&mut self) -> Box<dyn WorkflowService>;
45
46    /// Return a mutable ref to the operator service client instance
47    fn operator_client(&mut self) -> Box<dyn OperatorService>;
48
49    /// Return a mutable ref to the cloud service client instance
50    fn cloud_client(&mut self) -> Box<dyn CloudService>;
51
52    /// Return a mutable ref to the test service client instance
53    fn test_client(&mut self) -> Box<dyn TestService>;
54
55    /// Return a mutable ref to the health service client instance
56    fn health_client(&mut self) -> Box<dyn HealthService>;
57}
58
59/// Any client that can make gRPC calls. The default implementation simply invokes the passed-in
60/// function. Implementers may override this to provide things like retry behavior.
61#[async_trait::async_trait]
62pub(crate) trait RawGrpcCaller: Send + Sync + 'static {
63    /// Make a gRPC call. The default implementation simply invokes the provided function.
64    async fn call<F, Req, Resp>(
65        &mut self,
66        _call_name: &'static str,
67        mut callfn: F,
68        req: Request<Req>,
69    ) -> Result<Response<Resp>, Status>
70    where
71        Req: Clone + Unpin + Send + Sync + 'static,
72        Resp: Send + 'static,
73        F: FnMut(Request<Req>) -> BoxFuture<'static, Result<Response<Resp>, Status>>,
74        F: Send + Sync + Unpin + 'static,
75    {
76        callfn(req).await
77    }
78}
79
80trait ErasedRawClient: Send + Sync + 'static {
81    fn erased_call(
82        &mut self,
83        call_name: &'static str,
84        op: &mut dyn ErasedCallOp,
85    ) -> BoxFuture<'static, Result<Response<Box<dyn Any + Send>>, Status>>;
86}
87
88trait ErasedCallOp: Send {
89    fn invoke(
90        &mut self,
91        raw: &mut dyn ErasedRawClient,
92        call_name: &'static str,
93    ) -> BoxFuture<'static, Result<Response<Box<dyn Any + Send>>, Status>>;
94}
95
96struct CallShim<F, Req, Resp> {
97    callfn: F,
98    seed_req: Option<Request<Req>>,
99    _resp: PhantomData<Resp>,
100}
101
102impl<F, Req, Resp> CallShim<F, Req, Resp> {
103    fn new(callfn: F, seed_req: Request<Req>) -> Self {
104        Self {
105            callfn,
106            seed_req: Some(seed_req),
107            _resp: PhantomData,
108        }
109    }
110}
111impl<F, Req, Resp> ErasedCallOp for CallShim<F, Req, Resp>
112where
113    Req: Clone + Unpin + Send + Sync + 'static,
114    Resp: Send + 'static,
115    F: FnMut(Request<Req>) -> BoxFuture<'static, Result<Response<Resp>, Status>>,
116    F: Send + Sync + Unpin + 'static,
117{
118    fn invoke(
119        &mut self,
120        _raw: &mut dyn ErasedRawClient,
121        _call_name: &'static str,
122    ) -> BoxFuture<'static, Result<Response<Box<dyn Any + Send>>, Status>> {
123        (self.callfn)(
124            self.seed_req
125                .take()
126                .expect("CallShim must have request populated"),
127        )
128        .map(|res| res.map(|payload| payload.map(|t| Box::new(t) as Box<dyn Any + Send>)))
129        .boxed()
130    }
131}
132
133#[async_trait::async_trait]
134impl RawGrpcCaller for Connection {
135    async fn call<F, Req, Resp>(
136        &mut self,
137        call_name: &'static str,
138        mut callfn: F,
139        mut req: Request<Req>,
140    ) -> Result<Response<Resp>, Status>
141    where
142        Req: Clone + Unpin + Send + Sync + 'static,
143        Resp: Send + 'static,
144        F: FnMut(Request<Req>) -> BoxFuture<'static, Result<Response<Resp>, Status>>,
145        F: Send + Sync + Unpin + 'static,
146    {
147        let info = self
148            .inner
149            .retry_options
150            .get_call_info(call_name, Some(&req));
151        req.extensions_mut().insert(info.call_type);
152        if info.call_type.is_long() {
153            req.set_default_timeout(LONG_POLL_TIMEOUT);
154        }
155
156        let fact = || {
157            let req_clone = req_cloner(&req);
158            callfn(req_clone)
159        };
160
161        let res = make_future_retry(info, fact);
162        res.map_err(|(e, _attempt)| e).map_ok(|x| x.0).await
163    }
164}
165
166/// Helper for cloning a tonic request as long as the inner message may be cloned.
167fn req_cloner<T: Clone>(cloneme: &Request<T>) -> Request<T> {
168    let msg = cloneme.get_ref().clone();
169    let mut new_req = Request::new(msg);
170    let new_met = new_req.metadata_mut();
171    for kv in cloneme.metadata().iter() {
172        match kv {
173            KeyAndValueRef::Ascii(k, v) => {
174                new_met.insert(k, v.clone());
175            }
176            KeyAndValueRef::Binary(k, v) => {
177                new_met.insert_bin(k, v.clone());
178            }
179        }
180    }
181    *new_req.extensions_mut() = cloneme.extensions().clone();
182    new_req
183}
184
185#[async_trait::async_trait]
186impl RawGrpcCaller for dyn ErasedRawClient {
187    async fn call<F, Req, Resp>(
188        &mut self,
189        call_name: &'static str,
190        callfn: F,
191        req: Request<Req>,
192    ) -> Result<Response<Resp>, Status>
193    where
194        Req: Clone + Unpin + Send + Sync + 'static,
195        Resp: Send + 'static,
196        F: FnMut(Request<Req>) -> BoxFuture<'static, Result<Response<Resp>, Status>>,
197        F: Send + Sync + Unpin + 'static,
198    {
199        let mut shim = CallShim::new(callfn, req);
200        let erased_resp = ErasedRawClient::erased_call(self, call_name, &mut shim).await?;
201        Ok(erased_resp.map(|boxed| {
202            *boxed
203                .downcast()
204                .expect("RawGrpcCaller erased response type mismatch")
205        }))
206    }
207}
208
209impl<T> ErasedRawClient for T
210where
211    T: RawGrpcCaller + 'static,
212{
213    fn erased_call(
214        &mut self,
215        call_name: &'static str,
216        op: &mut dyn ErasedCallOp,
217    ) -> BoxFuture<'static, Result<Response<Box<dyn Any + Send>>, Status>> {
218        let raw: &mut dyn ErasedRawClient = self;
219        op.invoke(raw, call_name)
220    }
221}
222
223impl RawClientProducer for Connection {
224    fn get_workers_info(&self) -> Option<Arc<ClientWorkerSet>> {
225        Some(self.workers())
226    }
227
228    fn workflow_client(&mut self) -> Box<dyn WorkflowService> {
229        self.inner.service.workflow_service()
230    }
231
232    fn operator_client(&mut self) -> Box<dyn OperatorService> {
233        self.inner.service.operator_service()
234    }
235
236    fn cloud_client(&mut self) -> Box<dyn CloudService> {
237        self.inner.service.cloud_service()
238    }
239
240    fn test_client(&mut self) -> Box<dyn TestService> {
241        self.inner.service.test_service()
242    }
243
244    fn health_client(&mut self) -> Box<dyn HealthService> {
245        self.inner.service.health_service()
246    }
247}
248
249impl RawClientProducer for TemporalServiceClient {
250    fn get_workers_info(&self) -> Option<Arc<ClientWorkerSet>> {
251        None
252    }
253
254    fn workflow_client(&mut self) -> Box<dyn WorkflowService> {
255        self.workflow_service()
256    }
257
258    fn operator_client(&mut self) -> Box<dyn OperatorService> {
259        self.operator_service()
260    }
261
262    fn cloud_client(&mut self) -> Box<dyn CloudService> {
263        self.cloud_service()
264    }
265
266    fn test_client(&mut self) -> Box<dyn TestService> {
267        self.test_service()
268    }
269
270    fn health_client(&mut self) -> Box<dyn HealthService> {
271        self.health_service()
272    }
273}
274
275impl RawGrpcCaller for TemporalServiceClient {}
276
277impl RawClientProducer for Client {
278    fn get_workers_info(&self) -> Option<Arc<ClientWorkerSet>> {
279        Some(self.connection.workers())
280    }
281
282    fn workflow_client(&mut self) -> Box<dyn WorkflowService> {
283        self.connection.workflow_client()
284    }
285
286    fn operator_client(&mut self) -> Box<dyn OperatorService> {
287        self.connection.operator_client()
288    }
289
290    fn cloud_client(&mut self) -> Box<dyn CloudService> {
291        self.connection.cloud_client()
292    }
293
294    fn test_client(&mut self) -> Box<dyn TestService> {
295        self.connection.test_client()
296    }
297
298    fn health_client(&mut self) -> Box<dyn HealthService> {
299        self.connection.health_client()
300    }
301}
302
303#[async_trait::async_trait]
304impl RawGrpcCaller for Client {
305    async fn call<F, Req, Resp>(
306        &mut self,
307        call_name: &'static str,
308        callfn: F,
309        req: Request<Req>,
310    ) -> Result<Response<Resp>, Status>
311    where
312        Req: Clone + Unpin + Send + Sync + 'static,
313        Resp: Send + 'static,
314        F: FnMut(Request<Req>) -> BoxFuture<'static, Result<Response<Resp>, Status>>,
315        F: Send + Sync + Unpin + 'static,
316    {
317        self.connection.call(call_name, callfn, req).await
318    }
319}
320
321impl<RC> RawClientProducer for SharedReplaceableClient<RC>
322where
323    RC: RawClientProducer + Clone + Send + Sync + 'static,
324{
325    fn get_workers_info(&self) -> Option<Arc<ClientWorkerSet>> {
326        self.inner_cow().get_workers_info()
327    }
328    fn workflow_client(&mut self) -> Box<dyn WorkflowService> {
329        self.inner_mut_refreshed().workflow_client()
330    }
331
332    fn operator_client(&mut self) -> Box<dyn OperatorService> {
333        self.inner_mut_refreshed().operator_client()
334    }
335
336    fn cloud_client(&mut self) -> Box<dyn CloudService> {
337        self.inner_mut_refreshed().cloud_client()
338    }
339
340    fn test_client(&mut self) -> Box<dyn TestService> {
341        self.inner_mut_refreshed().test_client()
342    }
343
344    fn health_client(&mut self) -> Box<dyn HealthService> {
345        self.inner_mut_refreshed().health_client()
346    }
347}
348
349#[async_trait::async_trait]
350impl<RC> RawGrpcCaller for SharedReplaceableClient<RC>
351where
352    RC: RawGrpcCaller + Clone + Sync + 'static,
353{
354    async fn call<F, Req, Resp>(
355        &mut self,
356        call_name: &'static str,
357        callfn: F,
358        req: Request<Req>,
359    ) -> Result<Response<Resp>, Status>
360    where
361        Req: Clone + Unpin + Send + Sync + 'static,
362        Resp: Send + 'static,
363        F: FnMut(Request<Req>) -> BoxFuture<'static, Result<Response<Resp>, Status>>,
364        F: Send + Sync + Unpin + 'static,
365    {
366        self.inner_mut_refreshed()
367            .call(call_name, callfn, req)
368            .await
369    }
370}
371
372#[derive(Clone, Debug)]
373pub(super) struct AttachMetricLabels {
374    pub(super) labels: Vec<MetricKeyValue>,
375    pub(super) normal_task_queue: Option<String>,
376    pub(super) sticky_task_queue: Option<String>,
377}
378impl AttachMetricLabels {
379    pub(super) fn new(kvs: impl Into<Vec<MetricKeyValue>>) -> Self {
380        Self {
381            labels: kvs.into(),
382            normal_task_queue: None,
383            sticky_task_queue: None,
384        }
385    }
386    pub(super) fn namespace(ns: impl Into<String>) -> Self {
387        AttachMetricLabels::new(vec![namespace_kv(ns.into())])
388    }
389    pub(super) fn task_q(&mut self, tq: Option<TaskQueue>) -> &mut Self {
390        if let Some(tq) = tq {
391            if !tq.normal_name.is_empty() {
392                self.sticky_task_queue = Some(tq.name);
393                self.normal_task_queue = Some(tq.normal_name);
394            } else {
395                self.normal_task_queue = Some(tq.name);
396            }
397        }
398        self
399    }
400    pub(super) fn task_q_str(&mut self, tq: impl Into<String>) -> &mut Self {
401        self.normal_task_queue = Some(tq.into());
402        self
403    }
404}
405
406/// A request extension that, when set, should make the [RetryClient] consider this call to be a
407/// [super::retry::CallType::UserLongPoll]
408#[derive(Copy, Clone, Debug)]
409pub(super) struct IsUserLongPoll;
410
411macro_rules! proxy_def {
412    ($client_type:tt, $client_meth:ident, $method:ident, $req:ty, $resp:ty, defaults) => {
413        #[doc = concat!("See [", stringify!($client_type), "::", stringify!($method), "]")]
414        fn $method(
415            &mut self,
416            _request: tonic::Request<$req>,
417        ) -> BoxFuture<'_, Result<tonic::Response<$resp>, tonic::Status>> {
418            async { Ok(tonic::Response::new(<$resp>::default())) }.boxed()
419        }
420    };
421    ($client_type:tt, $client_meth:ident, $method:ident, $req:ty, $resp:ty) => {
422        #[doc = concat!("See [", stringify!($client_type), "::", stringify!($method), "]")]
423        fn $method(
424            &mut self,
425            _request: tonic::Request<$req>,
426        ) -> BoxFuture<'_, Result<tonic::Response<$resp>, tonic::Status>>;
427    };
428}
429
430/// Helps re-declare gRPC client methods
431///
432/// There are four forms:
433///
434/// * The first takes a closure that can modify the request. This is only called once, before the
435///   actual rpc call is made, and before determinations are made about the kind of call (long poll
436///   or not) and retry policy.
437/// * The second takes three closures. The first can modify the request like in the first form.
438///   The second can modify the request and return a value, and is called right before every call
439///   (including on retries). The third is called with the response to the call after it resolves.
440/// * The third and fourth are equivalents of the above that skip calling through the `call` method
441///   and are implemented directly on the generated gRPC clients (IE: the bottom of the stack).
442macro_rules! proxy_impl {
443    ($client_type:tt, $client_meth:ident, $method:ident, $req:ty, $resp:ty $(, $closure:expr)?) => {
444        #[doc = concat!("See [", stringify!($client_type), "::", stringify!($method), "]")]
445        fn $method(
446            &mut self,
447            #[allow(unused_mut)]
448            mut request: tonic::Request<$req>,
449        ) -> BoxFuture<'_, Result<tonic::Response<$resp>, tonic::Status>> {
450            $( type_closure_arg(&mut request, $closure); )*
451            let mut self_clone = self.clone();
452            #[allow(unused_mut)]
453            let fact = move |mut req: tonic::Request<$req>| {
454                let mut c = self_clone.$client_meth();
455                async move { c.$method(req).await }.boxed()
456            };
457            self.call(stringify!($method), fact, request)
458        }
459    };
460    ($client_type:tt, $client_meth:ident, $method:ident, $req:ty, $resp:ty,
461     $closure_request:expr, $closure_before:expr, $closure_after:expr) => {
462        #[doc = concat!("See [", stringify!($client_type), "::", stringify!($method), "]")]
463        fn $method(
464            &mut self,
465            mut request: tonic::Request<$req>,
466        ) -> BoxFuture<'_, Result<tonic::Response<$resp>, tonic::Status>> {
467            type_closure_arg(&mut request, $closure_request);
468            let workers_info = self.get_workers_info();
469            let mut self_clone = self.clone();
470            #[allow(unused_mut)]
471            let fact = move |mut req: tonic::Request<$req>| {
472                let data = type_closure_two_arg(&mut req, workers_info.clone(), $closure_before);
473                let mut c = self_clone.$client_meth();
474                async move {
475                    type_closure_two_arg(c.$method(req).await, data, $closure_after)
476                }.boxed()
477            };
478            self.call(stringify!($method), fact, request)
479        }
480    };
481    ($client_type:tt, $method:ident, $req:ty, $resp:ty $(, $closure:expr)?) => {
482        #[doc = concat!("See [", stringify!($client_type), "::", stringify!($method), "]")]
483        fn $method(
484            &mut self,
485            #[allow(unused_mut)]
486            mut request: tonic::Request<$req>,
487        ) -> BoxFuture<'_, Result<tonic::Response<$resp>, tonic::Status>> {
488            $( type_closure_arg(&mut request, $closure); )*
489            async move { <$client_type<_>>::$method(self, request).await }.boxed()
490        }
491    };
492    ($client_type:tt, $method:ident, $req:ty, $resp:ty,
493     $closure_request:expr, $closure_before:expr, $closure_after:expr) => {
494        #[doc = concat!("See [", stringify!($client_type), "::", stringify!($method), "]")]
495        fn $method(
496            &mut self,
497            mut request: tonic::Request<$req>,
498        ) -> BoxFuture<'_, Result<tonic::Response<$resp>, tonic::Status>> {
499            type_closure_arg(&mut request, $closure_request);
500            let data = type_closure_two_arg(&mut request, Option::<Arc<ClientWorkerSet>>::None,
501                                            $closure_before);
502            async move {
503                type_closure_two_arg(<$client_type<_>>::$method(self, request).await,
504                                     data, $closure_after)
505            }.boxed()
506        }
507    };
508}
509macro_rules! proxier_impl {
510    ($trait_name:ident; $impl_list_name:ident; $client_type:tt; $client_meth:ident;
511     [$( proxy_def!($($def_args:tt)*); )*];
512     $(($method:ident, $req:ty, $resp:ty
513       $(, $closure:expr $(, $closure_before:expr, $closure_after:expr)?)? );)* ) => {
514        #[cfg(test)]
515        const $impl_list_name: &'static [&'static str] = &[$(stringify!($method)),*];
516
517        #[doc = concat!("Trait version of [", stringify!($client_type), "]")]
518        pub trait $trait_name: Send + Sync + DynClone
519        {
520            $( proxy_def!($($def_args)*); )*
521        }
522        dyn_clone::clone_trait_object!($trait_name);
523
524        impl<RC> $trait_name for RC
525        where
526            RC: RawGrpcCaller + RawClientProducer + Clone + Unpin,
527        {
528            $(
529                proxy_impl!($client_type, $client_meth, $method, $req, $resp
530                            $(,$closure $(,$closure_before, $closure_after)*)*);
531            )*
532        }
533
534        impl<T: Send + Sync + 'static> RawGrpcCaller for $client_type<T> {}
535
536        impl<T> $trait_name for $client_type<T>
537        where
538            T: GrpcService<Body> + Clone + Send + Sync + 'static,
539            T::ResponseBody: tonic::codegen::Body<Data = tonic::codegen::Bytes> + Send + 'static,
540            T::Error: Into<tonic::codegen::StdError>,
541            <T::ResponseBody as tonic::codegen::Body>::Error: Into<tonic::codegen::StdError> + Send,
542            <T as tonic::client::GrpcService<Body>>::Future: Send
543        {
544            $(
545                proxy_impl!($client_type, $method, $req, $resp
546                            $(,$closure $(,$closure_before, $closure_after)*)*);
547            )*
548        }
549    };
550}
551
552macro_rules! proxier {
553    ( $trait_name:ident; $impl_list_name:ident; $client_type:tt; $client_meth:ident;
554      $(($method:ident, $req:ty, $resp:ty
555         $(, $closure:expr $(, $closure_before:expr, $closure_after:expr)?)? );)* ) => {
556        proxier_impl!($trait_name; $impl_list_name; $client_type; $client_meth;
557                      [$(proxy_def!($client_type, $client_meth, $method, $req, $resp);)*];
558                      $(($method, $req, $resp $(, $closure $(, $closure_before, $closure_after)?)?);)*);
559    };
560    ( $trait_name:ident; $impl_list_name:ident; $client_type:tt; $client_meth:ident; defaults;
561      $(($method:ident, $req:ty, $resp:ty
562         $(, $closure:expr $(, $closure_before:expr, $closure_after:expr)?)? );)* ) => {
563        proxier_impl!($trait_name; $impl_list_name; $client_type; $client_meth;
564                      [$(proxy_def!($client_type, $client_meth, $method, $req, $resp, defaults);)*];
565                      $(($method, $req, $resp $(, $closure $(, $closure_before, $closure_after)?)?);)*);
566    };
567}
568
569macro_rules! namespaced_request {
570    ($req:ident) => {{
571        let ns_str = $req.get_ref().namespace.clone();
572        // Attach namespace header
573        $req.metadata_mut().insert(
574            TEMPORAL_NAMESPACE_HEADER_KEY,
575            ns_str.parse().unwrap_or_else(|e| {
576                warn!("Unable to parse namespace for header: {e:?}");
577                AsciiMetadataValue::from_static("")
578            }),
579        );
580        // Init metric labels
581        AttachMetricLabels::namespace(ns_str)
582    }};
583}
584
585// Nice little trick to avoid the callsite asking to type the closure parameter
586fn type_closure_arg<T, R>(arg: T, f: impl FnOnce(T) -> R) -> R {
587    f(arg)
588}
589
590fn type_closure_two_arg<T, R, S>(arg1: R, arg2: T, f: impl FnOnce(R, T) -> S) -> S {
591    f(arg1, arg2)
592}
593
594proxier! {
595    WorkflowService; ALL_IMPLEMENTED_WORKFLOW_SERVICE_RPCS; WorkflowServiceClient; workflow_client; defaults;
596    (
597        register_namespace,
598        RegisterNamespaceRequest,
599        RegisterNamespaceResponse,
600        |r| {
601            let labels = namespaced_request!(r);
602            r.extensions_mut().insert(labels);
603        }
604    );
605    (
606        describe_namespace,
607        DescribeNamespaceRequest,
608        DescribeNamespaceResponse,
609        |r| {
610            let labels = namespaced_request!(r);
611            r.extensions_mut().insert(labels);
612        }
613    );
614    (
615        list_namespaces,
616        ListNamespacesRequest,
617        ListNamespacesResponse
618    );
619    (
620        update_namespace,
621        UpdateNamespaceRequest,
622        UpdateNamespaceResponse,
623        |r| {
624            let labels = namespaced_request!(r);
625            r.extensions_mut().insert(labels);
626        }
627    );
628    (
629        deprecate_namespace,
630        DeprecateNamespaceRequest,
631        DeprecateNamespaceResponse,
632        |r| {
633            let labels = namespaced_request!(r);
634            r.extensions_mut().insert(labels);
635        }
636    );
637    (
638        start_workflow_execution,
639        StartWorkflowExecutionRequest,
640        StartWorkflowExecutionResponse,
641        |r| {
642            let mut labels = namespaced_request!(r);
643            labels.task_q(r.get_ref().task_queue.clone());
644            r.extensions_mut().insert(labels);
645        },
646        |r, workers| {
647            if let Some(workers) = workers {
648                let mut slot: Option<Box<dyn Slot + Send>> = None;
649                let req_mut = r.get_mut();
650                if req_mut.request_eager_execution {
651                    let namespace = req_mut.namespace.clone();
652                    let task_queue = req_mut.task_queue.as_ref()
653                                        .map(|tq| tq.name.clone()).unwrap_or_default();
654                    match workers.try_reserve_wft_slot(namespace, task_queue) {
655                        Some(reservation) => {
656                            // Populate eager_worker_deployment_options from the slot reservation
657                            if let Some(opts) = reservation.deployment_options {
658                                req_mut.eager_worker_deployment_options = Some(temporalio_common::protos::temporal::api::deployment::v1::WorkerDeploymentOptions {
659                                    deployment_name: opts.version.deployment_name,
660                                    build_id: opts.version.build_id,
661                                    worker_versioning_mode: if opts.use_worker_versioning {
662                                        temporalio_common::protos::temporal::api::enums::v1::WorkerVersioningMode::Versioned.into()
663                                    } else {
664                                        temporalio_common::protos::temporal::api::enums::v1::WorkerVersioningMode::Unversioned.into()
665                                    },
666                                });                            }
667                            slot = Some(reservation.slot);
668                        }
669                        None => req_mut.request_eager_execution = false
670                    }
671                }
672                slot
673            } else {
674                None
675            }
676        },
677        |resp, slot| {
678            if let Some(s) = slot
679                && let Ok(response) = resp.as_ref()
680                    && let Some(task) = response.get_ref().clone().eager_workflow_task
681                        && let Err(e) = s.schedule_wft(task) {
682                            // This is a latency issue, i.e., the client does not need to handle
683                            //  this error, because the WFT will be retried after a timeout.
684                            warn!(details = ?e, "Eager workflow task rejected by worker.");
685                        }
686            resp
687        }
688    );
689    (
690        get_workflow_execution_history,
691        GetWorkflowExecutionHistoryRequest,
692        GetWorkflowExecutionHistoryResponse,
693        |r| {
694            let labels = namespaced_request!(r);
695            r.extensions_mut().insert(labels);
696            if r.get_ref().wait_new_event {
697                r.extensions_mut().insert(IsUserLongPoll);
698            }
699        }
700    );
701    (
702        get_workflow_execution_history_reverse,
703        GetWorkflowExecutionHistoryReverseRequest,
704        GetWorkflowExecutionHistoryReverseResponse,
705        |r| {
706            let labels = namespaced_request!(r);
707            r.extensions_mut().insert(labels);
708        }
709    );
710    (
711        poll_workflow_task_queue,
712        PollWorkflowTaskQueueRequest,
713        PollWorkflowTaskQueueResponse,
714        |r| {
715            let mut labels = namespaced_request!(r);
716            labels.task_q(r.get_ref().task_queue.clone());
717            r.extensions_mut().insert(labels);
718        }
719    );
720    (
721        respond_workflow_task_completed,
722        RespondWorkflowTaskCompletedRequest,
723        RespondWorkflowTaskCompletedResponse,
724        |r| {
725            let labels = namespaced_request!(r);
726            r.extensions_mut().insert(labels);
727        }
728    );
729    (
730        respond_workflow_task_failed,
731        RespondWorkflowTaskFailedRequest,
732        RespondWorkflowTaskFailedResponse,
733        |r| {
734            let labels = namespaced_request!(r);
735            r.extensions_mut().insert(labels);
736        }
737    );
738    (
739        poll_activity_task_queue,
740        PollActivityTaskQueueRequest,
741        PollActivityTaskQueueResponse,
742        |r| {
743            let mut labels = namespaced_request!(r);
744            labels.task_q(r.get_ref().task_queue.clone());
745            r.extensions_mut().insert(labels);
746        }
747    );
748    (
749        record_activity_task_heartbeat,
750        RecordActivityTaskHeartbeatRequest,
751        RecordActivityTaskHeartbeatResponse,
752        |r| {
753            let labels = namespaced_request!(r);
754            r.extensions_mut().insert(labels);
755        }
756    );
757    (
758        record_activity_task_heartbeat_by_id,
759        RecordActivityTaskHeartbeatByIdRequest,
760        RecordActivityTaskHeartbeatByIdResponse,
761        |r| {
762            let labels = namespaced_request!(r);
763            r.extensions_mut().insert(labels);
764        }
765    );
766    (
767        respond_activity_task_completed,
768        RespondActivityTaskCompletedRequest,
769        RespondActivityTaskCompletedResponse,
770        |r| {
771            let labels = namespaced_request!(r);
772            r.extensions_mut().insert(labels);
773        }
774    );
775    (
776        respond_activity_task_completed_by_id,
777        RespondActivityTaskCompletedByIdRequest,
778        RespondActivityTaskCompletedByIdResponse,
779        |r| {
780            let labels = namespaced_request!(r);
781            r.extensions_mut().insert(labels);
782        }
783    );
784
785    (
786        respond_activity_task_failed,
787        RespondActivityTaskFailedRequest,
788        RespondActivityTaskFailedResponse,
789        |r| {
790            let labels = namespaced_request!(r);
791            r.extensions_mut().insert(labels);
792        }
793    );
794    (
795        respond_activity_task_failed_by_id,
796        RespondActivityTaskFailedByIdRequest,
797        RespondActivityTaskFailedByIdResponse,
798        |r| {
799            let labels = namespaced_request!(r);
800            r.extensions_mut().insert(labels);
801        }
802    );
803    (
804        respond_activity_task_canceled,
805        RespondActivityTaskCanceledRequest,
806        RespondActivityTaskCanceledResponse,
807        |r| {
808            let labels = namespaced_request!(r);
809            r.extensions_mut().insert(labels);
810        }
811    );
812    (
813        respond_activity_task_canceled_by_id,
814        RespondActivityTaskCanceledByIdRequest,
815        RespondActivityTaskCanceledByIdResponse,
816        |r| {
817            let labels = namespaced_request!(r);
818            r.extensions_mut().insert(labels);
819        }
820    );
821    (
822        request_cancel_workflow_execution,
823        RequestCancelWorkflowExecutionRequest,
824        RequestCancelWorkflowExecutionResponse,
825        |r| {
826            let labels = namespaced_request!(r);
827            r.extensions_mut().insert(labels);
828        }
829    );
830    (
831        signal_workflow_execution,
832        SignalWorkflowExecutionRequest,
833        SignalWorkflowExecutionResponse,
834        |r| {
835            let labels = namespaced_request!(r);
836            r.extensions_mut().insert(labels);
837        }
838    );
839    (
840        signal_with_start_workflow_execution,
841        SignalWithStartWorkflowExecutionRequest,
842        SignalWithStartWorkflowExecutionResponse,
843        |r| {
844            let mut labels = namespaced_request!(r);
845            labels.task_q(r.get_ref().task_queue.clone());
846            r.extensions_mut().insert(labels);
847        }
848    );
849    (
850        reset_workflow_execution,
851        ResetWorkflowExecutionRequest,
852        ResetWorkflowExecutionResponse,
853        |r| {
854            let labels = namespaced_request!(r);
855            r.extensions_mut().insert(labels);
856        }
857    );
858    (
859        terminate_workflow_execution,
860        TerminateWorkflowExecutionRequest,
861        TerminateWorkflowExecutionResponse,
862        |r| {
863            let labels = namespaced_request!(r);
864            r.extensions_mut().insert(labels);
865        }
866    );
867    (
868        delete_workflow_execution,
869        DeleteWorkflowExecutionRequest,
870        DeleteWorkflowExecutionResponse,
871        |r| {
872            let labels = namespaced_request!(r);
873            r.extensions_mut().insert(labels);
874        }
875    );
876    (
877        list_open_workflow_executions,
878        ListOpenWorkflowExecutionsRequest,
879        ListOpenWorkflowExecutionsResponse,
880        |r| {
881            let labels = namespaced_request!(r);
882            r.extensions_mut().insert(labels);
883        }
884    );
885    (
886        list_closed_workflow_executions,
887        ListClosedWorkflowExecutionsRequest,
888        ListClosedWorkflowExecutionsResponse,
889        |r| {
890            let labels = namespaced_request!(r);
891            r.extensions_mut().insert(labels);
892        }
893    );
894    (
895        list_workflow_executions,
896        ListWorkflowExecutionsRequest,
897        ListWorkflowExecutionsResponse,
898        |r| {
899            let labels = namespaced_request!(r);
900            r.extensions_mut().insert(labels);
901        }
902    );
903    (
904        list_archived_workflow_executions,
905        ListArchivedWorkflowExecutionsRequest,
906        ListArchivedWorkflowExecutionsResponse,
907        |r| {
908            let labels = namespaced_request!(r);
909            r.extensions_mut().insert(labels);
910        }
911    );
912    (
913        scan_workflow_executions,
914        ScanWorkflowExecutionsRequest,
915        ScanWorkflowExecutionsResponse,
916        |r| {
917            let labels = namespaced_request!(r);
918            r.extensions_mut().insert(labels);
919        }
920    );
921    (
922        count_workflow_executions,
923        CountWorkflowExecutionsRequest,
924        CountWorkflowExecutionsResponse,
925        |r| {
926            let labels = namespaced_request!(r);
927            r.extensions_mut().insert(labels);
928        }
929    );
930    (
931        create_workflow_rule,
932        CreateWorkflowRuleRequest,
933        CreateWorkflowRuleResponse,
934        |r| {
935            let labels = namespaced_request!(r);
936            r.extensions_mut().insert(labels);
937        }
938    );
939    (
940        describe_workflow_rule,
941        DescribeWorkflowRuleRequest,
942        DescribeWorkflowRuleResponse,
943        |r| {
944            let labels = namespaced_request!(r);
945            r.extensions_mut().insert(labels);
946        }
947    );
948    (
949        delete_workflow_rule,
950        DeleteWorkflowRuleRequest,
951        DeleteWorkflowRuleResponse,
952        |r| {
953            let labels = namespaced_request!(r);
954            r.extensions_mut().insert(labels);
955        }
956    );
957    (
958        list_workflow_rules,
959        ListWorkflowRulesRequest,
960        ListWorkflowRulesResponse,
961        |r| {
962            let labels = namespaced_request!(r);
963            r.extensions_mut().insert(labels);
964        }
965    );
966    (
967        trigger_workflow_rule,
968        TriggerWorkflowRuleRequest,
969        TriggerWorkflowRuleResponse,
970        |r| {
971            let labels = namespaced_request!(r);
972            r.extensions_mut().insert(labels);
973        }
974    );
975    (
976        get_search_attributes,
977        GetSearchAttributesRequest,
978        GetSearchAttributesResponse
979    );
980    (
981        respond_query_task_completed,
982        RespondQueryTaskCompletedRequest,
983        RespondQueryTaskCompletedResponse,
984        |r| {
985            let labels = namespaced_request!(r);
986            r.extensions_mut().insert(labels);
987        }
988    );
989    (
990        reset_sticky_task_queue,
991        ResetStickyTaskQueueRequest,
992        ResetStickyTaskQueueResponse,
993        |r| {
994            let labels = namespaced_request!(r);
995            r.extensions_mut().insert(labels);
996        }
997    );
998    (
999        query_workflow,
1000        QueryWorkflowRequest,
1001        QueryWorkflowResponse,
1002        |r| {
1003            let labels = namespaced_request!(r);
1004            r.extensions_mut().insert(labels);
1005        }
1006    );
1007    (
1008        describe_workflow_execution,
1009        DescribeWorkflowExecutionRequest,
1010        DescribeWorkflowExecutionResponse,
1011        |r| {
1012            let labels = namespaced_request!(r);
1013            r.extensions_mut().insert(labels);
1014        }
1015    );
1016    (
1017        describe_task_queue,
1018        DescribeTaskQueueRequest,
1019        DescribeTaskQueueResponse,
1020        |r| {
1021            let mut labels = namespaced_request!(r);
1022            labels.task_q(r.get_ref().task_queue.clone());
1023            r.extensions_mut().insert(labels);
1024        }
1025    );
1026    (
1027        get_cluster_info,
1028        GetClusterInfoRequest,
1029        GetClusterInfoResponse
1030    );
1031    (
1032        get_system_info,
1033        GetSystemInfoRequest,
1034        GetSystemInfoResponse
1035    );
1036    (
1037        list_task_queue_partitions,
1038        ListTaskQueuePartitionsRequest,
1039        ListTaskQueuePartitionsResponse,
1040        |r| {
1041            let mut labels = namespaced_request!(r);
1042            labels.task_q(r.get_ref().task_queue.clone());
1043            r.extensions_mut().insert(labels);
1044        }
1045    );
1046    (
1047        create_schedule,
1048        CreateScheduleRequest,
1049        CreateScheduleResponse,
1050        |r| {
1051            let labels = namespaced_request!(r);
1052            r.extensions_mut().insert(labels);
1053        }
1054    );
1055    (
1056        describe_schedule,
1057        DescribeScheduleRequest,
1058        DescribeScheduleResponse,
1059        |r| {
1060            let labels = namespaced_request!(r);
1061            r.extensions_mut().insert(labels);
1062        }
1063    );
1064    (
1065        update_schedule,
1066        UpdateScheduleRequest,
1067        UpdateScheduleResponse,
1068        |r| {
1069            let labels = namespaced_request!(r);
1070            r.extensions_mut().insert(labels);
1071        }
1072    );
1073    (
1074        patch_schedule,
1075        PatchScheduleRequest,
1076        PatchScheduleResponse,
1077        |r| {
1078            let labels = namespaced_request!(r);
1079            r.extensions_mut().insert(labels);
1080        }
1081    );
1082    (
1083        list_schedule_matching_times,
1084        ListScheduleMatchingTimesRequest,
1085        ListScheduleMatchingTimesResponse,
1086        |r| {
1087            let labels = namespaced_request!(r);
1088            r.extensions_mut().insert(labels);
1089        }
1090    );
1091    (
1092        delete_schedule,
1093        DeleteScheduleRequest,
1094        DeleteScheduleResponse,
1095        |r| {
1096            let labels = namespaced_request!(r);
1097            r.extensions_mut().insert(labels);
1098        }
1099    );
1100    (
1101        list_schedules,
1102        ListSchedulesRequest,
1103        ListSchedulesResponse,
1104        |r| {
1105            let labels = namespaced_request!(r);
1106            r.extensions_mut().insert(labels);
1107        }
1108    );
1109    (
1110        count_schedules,
1111        CountSchedulesRequest,
1112        CountSchedulesResponse,
1113        |r| {
1114            let labels = namespaced_request!(r);
1115            r.extensions_mut().insert(labels);
1116        }
1117    );
1118    (
1119        update_worker_build_id_compatibility,
1120        UpdateWorkerBuildIdCompatibilityRequest,
1121        UpdateWorkerBuildIdCompatibilityResponse,
1122        |r| {
1123            let mut labels = namespaced_request!(r);
1124            labels.task_q_str(r.get_ref().task_queue.clone());
1125            r.extensions_mut().insert(labels);
1126        }
1127    );
1128    (
1129        get_worker_build_id_compatibility,
1130        GetWorkerBuildIdCompatibilityRequest,
1131        GetWorkerBuildIdCompatibilityResponse,
1132        |r| {
1133            let mut labels = namespaced_request!(r);
1134            labels.task_q_str(r.get_ref().task_queue.clone());
1135            r.extensions_mut().insert(labels);
1136        }
1137    );
1138    (
1139        get_worker_task_reachability,
1140        GetWorkerTaskReachabilityRequest,
1141        GetWorkerTaskReachabilityResponse,
1142        |r| {
1143            let labels = namespaced_request!(r);
1144            r.extensions_mut().insert(labels);
1145        }
1146    );
1147    (
1148        update_workflow_execution,
1149        UpdateWorkflowExecutionRequest,
1150        UpdateWorkflowExecutionResponse,
1151        |r| {
1152            let labels = namespaced_request!(r);
1153            let exts = r.extensions_mut();
1154            exts.insert(labels);
1155            exts.insert(IsUserLongPoll);
1156        }
1157    );
1158    (
1159        poll_workflow_execution_update,
1160        PollWorkflowExecutionUpdateRequest,
1161        PollWorkflowExecutionUpdateResponse,
1162        |r| {
1163            let labels = namespaced_request!(r);
1164            r.extensions_mut().insert(labels);
1165        }
1166    );
1167    (
1168        start_batch_operation,
1169        StartBatchOperationRequest,
1170        StartBatchOperationResponse,
1171        |r| {
1172            let labels = namespaced_request!(r);
1173            r.extensions_mut().insert(labels);
1174        }
1175    );
1176    (
1177        stop_batch_operation,
1178        StopBatchOperationRequest,
1179        StopBatchOperationResponse,
1180        |r| {
1181            let labels = namespaced_request!(r);
1182            r.extensions_mut().insert(labels);
1183        }
1184    );
1185    (
1186        describe_batch_operation,
1187        DescribeBatchOperationRequest,
1188        DescribeBatchOperationResponse,
1189        |r| {
1190            let labels = namespaced_request!(r);
1191            r.extensions_mut().insert(labels);
1192        }
1193    );
1194    (
1195        describe_deployment,
1196        DescribeDeploymentRequest,
1197        DescribeDeploymentResponse,
1198        |r| {
1199            let labels = namespaced_request!(r);
1200            r.extensions_mut().insert(labels);
1201        }
1202    );
1203    (
1204        list_batch_operations,
1205        ListBatchOperationsRequest,
1206        ListBatchOperationsResponse,
1207        |r| {
1208            let labels = namespaced_request!(r);
1209            r.extensions_mut().insert(labels);
1210        }
1211    );
1212    (
1213        list_deployments,
1214        ListDeploymentsRequest,
1215        ListDeploymentsResponse,
1216        |r| {
1217            let labels = namespaced_request!(r);
1218            r.extensions_mut().insert(labels);
1219        }
1220    );
1221    (
1222        execute_multi_operation,
1223        ExecuteMultiOperationRequest,
1224        ExecuteMultiOperationResponse,
1225        |r| {
1226            let labels = namespaced_request!(r);
1227            r.extensions_mut().insert(labels);
1228        }
1229    );
1230    (
1231        get_current_deployment,
1232        GetCurrentDeploymentRequest,
1233        GetCurrentDeploymentResponse,
1234        |r| {
1235            let labels = namespaced_request!(r);
1236            r.extensions_mut().insert(labels);
1237        }
1238    );
1239    (
1240        get_deployment_reachability,
1241        GetDeploymentReachabilityRequest,
1242        GetDeploymentReachabilityResponse,
1243        |r| {
1244            let labels = namespaced_request!(r);
1245            r.extensions_mut().insert(labels);
1246        }
1247    );
1248    (
1249        get_worker_versioning_rules,
1250        GetWorkerVersioningRulesRequest,
1251        GetWorkerVersioningRulesResponse,
1252        |r| {
1253            let mut labels = namespaced_request!(r);
1254            labels.task_q_str(&r.get_ref().task_queue);
1255            r.extensions_mut().insert(labels);
1256        }
1257    );
1258    (
1259        update_worker_versioning_rules,
1260        UpdateWorkerVersioningRulesRequest,
1261        UpdateWorkerVersioningRulesResponse,
1262        |r| {
1263            let mut labels = namespaced_request!(r);
1264            labels.task_q_str(&r.get_ref().task_queue);
1265            r.extensions_mut().insert(labels);
1266        }
1267    );
1268    (
1269        poll_nexus_task_queue,
1270        PollNexusTaskQueueRequest,
1271        PollNexusTaskQueueResponse,
1272        |r| {
1273            let mut labels = namespaced_request!(r);
1274            labels.task_q(r.get_ref().task_queue.clone());
1275            r.extensions_mut().insert(labels);
1276        }
1277    );
1278    (
1279        respond_nexus_task_completed,
1280        RespondNexusTaskCompletedRequest,
1281        RespondNexusTaskCompletedResponse,
1282        |r| {
1283            let labels = namespaced_request!(r);
1284            r.extensions_mut().insert(labels);
1285        }
1286    );
1287    (
1288        respond_nexus_task_failed,
1289        RespondNexusTaskFailedRequest,
1290        RespondNexusTaskFailedResponse,
1291        |r| {
1292            let labels = namespaced_request!(r);
1293            r.extensions_mut().insert(labels);
1294        }
1295    );
1296    (
1297        set_current_deployment,
1298        SetCurrentDeploymentRequest,
1299        SetCurrentDeploymentResponse,
1300        |r| {
1301            let labels = namespaced_request!(r);
1302            r.extensions_mut().insert(labels);
1303        }
1304    );
1305    (
1306        shutdown_worker,
1307        ShutdownWorkerRequest,
1308        ShutdownWorkerResponse,
1309        |r| {
1310            let labels = namespaced_request!(r);
1311            r.extensions_mut().insert(labels);
1312        }
1313    );
1314    (
1315        update_activity_options,
1316        UpdateActivityOptionsRequest,
1317        UpdateActivityOptionsResponse,
1318        |r| {
1319            let labels = namespaced_request!(r);
1320            r.extensions_mut().insert(labels);
1321        }
1322    );
1323    (
1324        pause_activity,
1325        PauseActivityRequest,
1326        PauseActivityResponse,
1327        |r| {
1328            let labels = namespaced_request!(r);
1329            r.extensions_mut().insert(labels);
1330        }
1331    );
1332    (
1333        unpause_activity,
1334        UnpauseActivityRequest,
1335        UnpauseActivityResponse,
1336        |r| {
1337            let labels = namespaced_request!(r);
1338            r.extensions_mut().insert(labels);
1339        }
1340    );
1341    (
1342        update_workflow_execution_options,
1343        UpdateWorkflowExecutionOptionsRequest,
1344        UpdateWorkflowExecutionOptionsResponse,
1345        |r| {
1346            let labels = namespaced_request!(r);
1347            r.extensions_mut().insert(labels);
1348        }
1349    );
1350    (
1351        reset_activity,
1352        ResetActivityRequest,
1353        ResetActivityResponse,
1354        |r| {
1355            let labels = namespaced_request!(r);
1356            r.extensions_mut().insert(labels);
1357        }
1358    );
1359    (
1360        delete_worker_deployment,
1361        DeleteWorkerDeploymentRequest,
1362        DeleteWorkerDeploymentResponse,
1363        |r| {
1364            let labels = namespaced_request!(r);
1365            r.extensions_mut().insert(labels);
1366        }
1367    );
1368    (
1369        delete_worker_deployment_version,
1370        DeleteWorkerDeploymentVersionRequest,
1371        DeleteWorkerDeploymentVersionResponse,
1372        |r| {
1373            let labels = namespaced_request!(r);
1374            r.extensions_mut().insert(labels);
1375        }
1376    );
1377    (
1378        describe_worker_deployment,
1379        DescribeWorkerDeploymentRequest,
1380        DescribeWorkerDeploymentResponse,
1381        |r| {
1382            let labels = namespaced_request!(r);
1383            r.extensions_mut().insert(labels);
1384        }
1385    );
1386    (
1387        describe_worker_deployment_version,
1388        DescribeWorkerDeploymentVersionRequest,
1389        DescribeWorkerDeploymentVersionResponse,
1390        |r| {
1391            let labels = namespaced_request!(r);
1392            r.extensions_mut().insert(labels);
1393        }
1394    );
1395    (
1396        list_worker_deployments,
1397        ListWorkerDeploymentsRequest,
1398        ListWorkerDeploymentsResponse,
1399        |r| {
1400            let labels = namespaced_request!(r);
1401            r.extensions_mut().insert(labels);
1402        }
1403    );
1404    (
1405        set_worker_deployment_current_version,
1406        SetWorkerDeploymentCurrentVersionRequest,
1407        SetWorkerDeploymentCurrentVersionResponse,
1408        |r| {
1409            let labels = namespaced_request!(r);
1410            r.extensions_mut().insert(labels);
1411        }
1412    );
1413    (
1414        set_worker_deployment_ramping_version,
1415        SetWorkerDeploymentRampingVersionRequest,
1416        SetWorkerDeploymentRampingVersionResponse,
1417        |r| {
1418            let labels = namespaced_request!(r);
1419            r.extensions_mut().insert(labels);
1420        }
1421    );
1422    (
1423        update_worker_deployment_version_metadata,
1424        UpdateWorkerDeploymentVersionMetadataRequest,
1425        UpdateWorkerDeploymentVersionMetadataResponse,
1426        |r| {
1427            let labels = namespaced_request!(r);
1428            r.extensions_mut().insert(labels);
1429        }
1430    );
1431    (
1432        list_workers,
1433        ListWorkersRequest,
1434        ListWorkersResponse,
1435        |r| {
1436            let labels = namespaced_request!(r);
1437            r.extensions_mut().insert(labels);
1438        }
1439    );
1440    (
1441        record_worker_heartbeat,
1442        RecordWorkerHeartbeatRequest,
1443        RecordWorkerHeartbeatResponse,
1444        |r| {
1445            let labels = namespaced_request!(r);
1446            r.extensions_mut().insert(labels);
1447        }
1448    );
1449    (
1450        update_task_queue_config,
1451        UpdateTaskQueueConfigRequest,
1452        UpdateTaskQueueConfigResponse,
1453        |r| {
1454            let mut labels = namespaced_request!(r);
1455            labels.task_q_str(r.get_ref().task_queue.clone());
1456            r.extensions_mut().insert(labels);
1457        }
1458    );
1459    (
1460        fetch_worker_config,
1461        FetchWorkerConfigRequest,
1462        FetchWorkerConfigResponse,
1463        |r| {
1464            let labels = namespaced_request!(r);
1465            r.extensions_mut().insert(labels);
1466        }
1467    );
1468    (
1469        update_worker_config,
1470        UpdateWorkerConfigRequest,
1471        UpdateWorkerConfigResponse,
1472        |r| {
1473            let labels = namespaced_request!(r);
1474            r.extensions_mut().insert(labels);
1475        }
1476    );
1477    (
1478        describe_worker,
1479        DescribeWorkerRequest,
1480        DescribeWorkerResponse,
1481        |r| {
1482            let labels = namespaced_request!(r);
1483            r.extensions_mut().insert(labels);
1484        }
1485    );
1486    (
1487        set_worker_deployment_manager,
1488        SetWorkerDeploymentManagerRequest,
1489        SetWorkerDeploymentManagerResponse,
1490        |r| {
1491            let labels = namespaced_request!(r);
1492            r.extensions_mut().insert(labels);
1493        }
1494    );
1495    (
1496        pause_workflow_execution,
1497        PauseWorkflowExecutionRequest,
1498        PauseWorkflowExecutionResponse,
1499        |r| {
1500            let labels = namespaced_request!(r);
1501            r.extensions_mut().insert(labels);
1502        }
1503    );
1504    (
1505        unpause_workflow_execution,
1506        UnpauseWorkflowExecutionRequest,
1507        UnpauseWorkflowExecutionResponse,
1508        |r| {
1509            let labels = namespaced_request!(r);
1510            r.extensions_mut().insert(labels);
1511        }
1512    );
1513    (
1514        start_activity_execution,
1515        StartActivityExecutionRequest,
1516        StartActivityExecutionResponse,
1517        |r| {
1518            let labels = namespaced_request!(r);
1519            r.extensions_mut().insert(labels);
1520        }
1521    );
1522    (
1523        describe_activity_execution,
1524        DescribeActivityExecutionRequest,
1525        DescribeActivityExecutionResponse,
1526        |r| {
1527            let labels = namespaced_request!(r);
1528            r.extensions_mut().insert(labels);
1529        }
1530    );
1531    (
1532        poll_activity_execution,
1533        PollActivityExecutionRequest,
1534        PollActivityExecutionResponse,
1535        |r| {
1536            let labels = namespaced_request!(r);
1537            r.extensions_mut().insert(labels);
1538        }
1539    );
1540    (
1541        list_activity_executions,
1542        ListActivityExecutionsRequest,
1543        ListActivityExecutionsResponse,
1544        |r| {
1545            let labels = namespaced_request!(r);
1546            r.extensions_mut().insert(labels);
1547        }
1548    );
1549    (
1550        count_activity_executions,
1551        CountActivityExecutionsRequest,
1552        CountActivityExecutionsResponse,
1553        |r| {
1554            let labels = namespaced_request!(r);
1555            r.extensions_mut().insert(labels);
1556        }
1557    );
1558    (
1559        request_cancel_activity_execution,
1560        RequestCancelActivityExecutionRequest,
1561        RequestCancelActivityExecutionResponse,
1562        |r| {
1563            let labels = namespaced_request!(r);
1564            r.extensions_mut().insert(labels);
1565        }
1566    );
1567    (
1568        terminate_activity_execution,
1569        TerminateActivityExecutionRequest,
1570        TerminateActivityExecutionResponse,
1571        |r| {
1572            let labels = namespaced_request!(r);
1573            r.extensions_mut().insert(labels);
1574        }
1575    );
1576    (
1577        delete_activity_execution,
1578        DeleteActivityExecutionRequest,
1579        DeleteActivityExecutionResponse,
1580        |r| {
1581            let labels = namespaced_request!(r);
1582            r.extensions_mut().insert(labels);
1583        }
1584    );
1585}
1586
1587proxier! {
1588    OperatorService; ALL_IMPLEMENTED_OPERATOR_SERVICE_RPCS; OperatorServiceClient; operator_client; defaults;
1589    (add_search_attributes, AddSearchAttributesRequest, AddSearchAttributesResponse);
1590    (remove_search_attributes, RemoveSearchAttributesRequest, RemoveSearchAttributesResponse);
1591    (list_search_attributes, ListSearchAttributesRequest, ListSearchAttributesResponse);
1592    (delete_namespace, DeleteNamespaceRequest, DeleteNamespaceResponse,
1593        |r| {
1594            let labels = namespaced_request!(r);
1595            r.extensions_mut().insert(labels);
1596        }
1597    );
1598    (add_or_update_remote_cluster, AddOrUpdateRemoteClusterRequest, AddOrUpdateRemoteClusterResponse);
1599    (remove_remote_cluster, RemoveRemoteClusterRequest, RemoveRemoteClusterResponse);
1600    (list_clusters, ListClustersRequest, ListClustersResponse);
1601    (get_nexus_endpoint, GetNexusEndpointRequest, GetNexusEndpointResponse);
1602    (create_nexus_endpoint, CreateNexusEndpointRequest, CreateNexusEndpointResponse);
1603    (update_nexus_endpoint, UpdateNexusEndpointRequest, UpdateNexusEndpointResponse);
1604    (delete_nexus_endpoint, DeleteNexusEndpointRequest, DeleteNexusEndpointResponse);
1605    (list_nexus_endpoints, ListNexusEndpointsRequest, ListNexusEndpointsResponse);
1606}
1607
1608proxier! {
1609    CloudService; ALL_IMPLEMENTED_CLOUD_SERVICE_RPCS; CloudServiceClient; cloud_client; defaults;
1610    (get_users, cloudreq::GetUsersRequest, cloudreq::GetUsersResponse);
1611    (get_user, cloudreq::GetUserRequest, cloudreq::GetUserResponse);
1612    (create_user, cloudreq::CreateUserRequest, cloudreq::CreateUserResponse);
1613    (update_user, cloudreq::UpdateUserRequest, cloudreq::UpdateUserResponse);
1614    (delete_user, cloudreq::DeleteUserRequest, cloudreq::DeleteUserResponse);
1615    (set_user_namespace_access, cloudreq::SetUserNamespaceAccessRequest, cloudreq::SetUserNamespaceAccessResponse);
1616    (get_async_operation, cloudreq::GetAsyncOperationRequest, cloudreq::GetAsyncOperationResponse);
1617    (create_namespace, cloudreq::CreateNamespaceRequest, cloudreq::CreateNamespaceResponse);
1618    (get_namespaces, cloudreq::GetNamespacesRequest, cloudreq::GetNamespacesResponse);
1619    (get_namespace, cloudreq::GetNamespaceRequest, cloudreq::GetNamespaceResponse,
1620        |r| {
1621            let labels = namespaced_request!(r);
1622            r.extensions_mut().insert(labels);
1623        }
1624    );
1625    (update_namespace, cloudreq::UpdateNamespaceRequest, cloudreq::UpdateNamespaceResponse,
1626        |r| {
1627            let labels = namespaced_request!(r);
1628            r.extensions_mut().insert(labels);
1629        }
1630    );
1631    (rename_custom_search_attribute, cloudreq::RenameCustomSearchAttributeRequest, cloudreq::RenameCustomSearchAttributeResponse);
1632    (delete_namespace, cloudreq::DeleteNamespaceRequest, cloudreq::DeleteNamespaceResponse,
1633        |r| {
1634            let labels = namespaced_request!(r);
1635            r.extensions_mut().insert(labels);
1636        }
1637    );
1638    (failover_namespace_region, cloudreq::FailoverNamespaceRegionRequest, cloudreq::FailoverNamespaceRegionResponse);
1639    (add_namespace_region, cloudreq::AddNamespaceRegionRequest, cloudreq::AddNamespaceRegionResponse);
1640    (delete_namespace_region, cloudreq::DeleteNamespaceRegionRequest, cloudreq::DeleteNamespaceRegionResponse);
1641    (get_regions, cloudreq::GetRegionsRequest, cloudreq::GetRegionsResponse);
1642    (get_region, cloudreq::GetRegionRequest, cloudreq::GetRegionResponse);
1643    (get_api_keys, cloudreq::GetApiKeysRequest, cloudreq::GetApiKeysResponse);
1644    (get_api_key, cloudreq::GetApiKeyRequest, cloudreq::GetApiKeyResponse);
1645    (create_api_key, cloudreq::CreateApiKeyRequest, cloudreq::CreateApiKeyResponse);
1646    (update_api_key, cloudreq::UpdateApiKeyRequest, cloudreq::UpdateApiKeyResponse);
1647    (delete_api_key, cloudreq::DeleteApiKeyRequest, cloudreq::DeleteApiKeyResponse);
1648    (get_nexus_endpoints, cloudreq::GetNexusEndpointsRequest, cloudreq::GetNexusEndpointsResponse);
1649    (get_nexus_endpoint, cloudreq::GetNexusEndpointRequest, cloudreq::GetNexusEndpointResponse);
1650    (create_nexus_endpoint, cloudreq::CreateNexusEndpointRequest, cloudreq::CreateNexusEndpointResponse);
1651    (update_nexus_endpoint, cloudreq::UpdateNexusEndpointRequest, cloudreq::UpdateNexusEndpointResponse);
1652    (delete_nexus_endpoint, cloudreq::DeleteNexusEndpointRequest, cloudreq::DeleteNexusEndpointResponse);
1653    (get_user_groups, cloudreq::GetUserGroupsRequest, cloudreq::GetUserGroupsResponse);
1654    (get_user_group, cloudreq::GetUserGroupRequest, cloudreq::GetUserGroupResponse);
1655    (create_user_group, cloudreq::CreateUserGroupRequest, cloudreq::CreateUserGroupResponse);
1656    (update_user_group, cloudreq::UpdateUserGroupRequest, cloudreq::UpdateUserGroupResponse);
1657    (delete_user_group, cloudreq::DeleteUserGroupRequest, cloudreq::DeleteUserGroupResponse);
1658    (add_user_group_member, cloudreq::AddUserGroupMemberRequest, cloudreq::AddUserGroupMemberResponse);
1659    (remove_user_group_member, cloudreq::RemoveUserGroupMemberRequest, cloudreq::RemoveUserGroupMemberResponse);
1660    (get_user_group_members, cloudreq::GetUserGroupMembersRequest, cloudreq::GetUserGroupMembersResponse);
1661    (set_user_group_namespace_access, cloudreq::SetUserGroupNamespaceAccessRequest, cloudreq::SetUserGroupNamespaceAccessResponse);
1662    (create_service_account, cloudreq::CreateServiceAccountRequest, cloudreq::CreateServiceAccountResponse);
1663    (get_service_account, cloudreq::GetServiceAccountRequest, cloudreq::GetServiceAccountResponse);
1664    (get_service_accounts, cloudreq::GetServiceAccountsRequest, cloudreq::GetServiceAccountsResponse);
1665    (update_service_account, cloudreq::UpdateServiceAccountRequest, cloudreq::UpdateServiceAccountResponse);
1666    (delete_service_account, cloudreq::DeleteServiceAccountRequest, cloudreq::DeleteServiceAccountResponse);
1667    (get_usage, cloudreq::GetUsageRequest, cloudreq::GetUsageResponse);
1668    (get_account, cloudreq::GetAccountRequest, cloudreq::GetAccountResponse);
1669    (update_account, cloudreq::UpdateAccountRequest, cloudreq::UpdateAccountResponse);
1670    (create_namespace_export_sink, cloudreq::CreateNamespaceExportSinkRequest, cloudreq::CreateNamespaceExportSinkResponse);
1671    (get_namespace_export_sink, cloudreq::GetNamespaceExportSinkRequest, cloudreq::GetNamespaceExportSinkResponse);
1672    (get_namespace_export_sinks, cloudreq::GetNamespaceExportSinksRequest, cloudreq::GetNamespaceExportSinksResponse);
1673    (update_namespace_export_sink, cloudreq::UpdateNamespaceExportSinkRequest, cloudreq::UpdateNamespaceExportSinkResponse);
1674    (delete_namespace_export_sink, cloudreq::DeleteNamespaceExportSinkRequest, cloudreq::DeleteNamespaceExportSinkResponse);
1675    (validate_namespace_export_sink, cloudreq::ValidateNamespaceExportSinkRequest, cloudreq::ValidateNamespaceExportSinkResponse);
1676    (update_namespace_tags, cloudreq::UpdateNamespaceTagsRequest, cloudreq::UpdateNamespaceTagsResponse);
1677    (create_connectivity_rule, cloudreq::CreateConnectivityRuleRequest, cloudreq::CreateConnectivityRuleResponse);
1678    (get_connectivity_rule, cloudreq::GetConnectivityRuleRequest, cloudreq::GetConnectivityRuleResponse);
1679    (get_connectivity_rules, cloudreq::GetConnectivityRulesRequest, cloudreq::GetConnectivityRulesResponse);
1680    (delete_connectivity_rule, cloudreq::DeleteConnectivityRuleRequest, cloudreq::DeleteConnectivityRuleResponse);
1681    (set_service_account_namespace_access, cloudreq::SetServiceAccountNamespaceAccessRequest, cloudreq::SetServiceAccountNamespaceAccessResponse);
1682    (validate_account_audit_log_sink, cloudreq::ValidateAccountAuditLogSinkRequest, cloudreq::ValidateAccountAuditLogSinkResponse);
1683    (get_current_identity, cloudreq::GetCurrentIdentityRequest, cloudreq::GetCurrentIdentityResponse);
1684    (get_audit_logs, cloudreq::GetAuditLogsRequest, cloudreq::GetAuditLogsResponse);
1685    (create_account_audit_log_sink, cloudreq::CreateAccountAuditLogSinkRequest, cloudreq::CreateAccountAuditLogSinkResponse);
1686    (get_account_audit_log_sink, cloudreq::GetAccountAuditLogSinkRequest, cloudreq::GetAccountAuditLogSinkResponse);
1687    (get_account_audit_log_sinks, cloudreq::GetAccountAuditLogSinksRequest, cloudreq::GetAccountAuditLogSinksResponse);
1688    (update_account_audit_log_sink, cloudreq::UpdateAccountAuditLogSinkRequest, cloudreq::UpdateAccountAuditLogSinkResponse);
1689    (delete_account_audit_log_sink, cloudreq::DeleteAccountAuditLogSinkRequest, cloudreq::DeleteAccountAuditLogSinkResponse);
1690    (get_namespace_capacity_info, cloudreq::GetNamespaceCapacityInfoRequest, cloudreq::GetNamespaceCapacityInfoResponse);
1691    (create_billing_report, cloudreq::CreateBillingReportRequest, cloudreq::CreateBillingReportResponse);
1692    (get_billing_report, cloudreq::GetBillingReportRequest, cloudreq::GetBillingReportResponse);
1693}
1694
1695proxier! {
1696    TestService; ALL_IMPLEMENTED_TEST_SERVICE_RPCS; TestServiceClient; test_client; defaults;
1697    (lock_time_skipping, LockTimeSkippingRequest, LockTimeSkippingResponse);
1698    (unlock_time_skipping, UnlockTimeSkippingRequest, UnlockTimeSkippingResponse);
1699    (sleep, SleepRequest, SleepResponse);
1700    (sleep_until, SleepUntilRequest, SleepResponse);
1701    (unlock_time_skipping_with_sleep, SleepRequest, SleepResponse);
1702    (get_current_time, (), GetCurrentTimeResponse);
1703}
1704
1705proxier! {
1706    HealthService; ALL_IMPLEMENTED_HEALTH_SERVICE_RPCS; HealthClient; health_client;
1707    (check, HealthCheckRequest, HealthCheckResponse);
1708    (watch, HealthCheckRequest, tonic::codec::Streaming<HealthCheckResponse>);
1709}
1710
1711#[cfg(test)]
1712mod tests {
1713    use super::*;
1714    use crate::{ClientOptions, ConnectionOptions};
1715    use std::collections::HashSet;
1716    use temporalio_common::{
1717        protos::temporal::api::{
1718            operatorservice::v1::DeleteNamespaceRequest, workflowservice::v1::ListNamespacesRequest,
1719        },
1720        worker::WorkerTaskTypes,
1721    };
1722    use tonic::IntoRequest;
1723    use url::Url;
1724    use uuid::Uuid;
1725
1726    // Just to help make sure some stuff compiles. Not run.
1727    #[allow(dead_code)]
1728    async fn raw_client_retry_compiles() {
1729        let opts = ConnectionOptions::new(Url::parse("http://localhost:7233").unwrap())
1730            .client_name("test")
1731            .client_version("0.0.0")
1732            .build();
1733        let connection = Connection::connect(opts).await.unwrap();
1734        let mut client = Client::new(connection, ClientOptions::new("default").build()).unwrap();
1735
1736        let list_ns_req = ListNamespacesRequest::default();
1737        let wf_client = client.workflow_client();
1738        let fact = move |req| {
1739            let mut c = wf_client.clone();
1740            async move { c.list_namespaces(req).await }.boxed()
1741        };
1742        client
1743            .call("whatever", fact, Request::new(list_ns_req.clone()))
1744            .await
1745            .unwrap();
1746
1747        // Operator svc method
1748        let op_del_ns_req = DeleteNamespaceRequest::default();
1749        let op_client = client.operator_client();
1750        let fact = move |req| {
1751            let mut c = op_client.clone();
1752            async move { c.delete_namespace(req).await }.boxed()
1753        };
1754        client
1755            .call("whatever", fact, Request::new(op_del_ns_req.clone()))
1756            .await
1757            .unwrap();
1758
1759        // Cloud svc method
1760        let cloud_del_ns_req = cloudreq::DeleteNamespaceRequest::default();
1761        let cloud_client = client.cloud_client();
1762        let fact = move |req| {
1763            let mut c = cloud_client.clone();
1764            async move { c.delete_namespace(req).await }.boxed()
1765        };
1766        client
1767            .call("whatever", fact, Request::new(cloud_del_ns_req.clone()))
1768            .await
1769            .unwrap();
1770
1771        // Verify calling through traits works
1772        client
1773            .list_namespaces(list_ns_req.into_request())
1774            .await
1775            .unwrap();
1776        // Have to disambiguate operator and cloud service
1777        OperatorService::delete_namespace(&mut client, op_del_ns_req.into_request())
1778            .await
1779            .unwrap();
1780        CloudService::delete_namespace(&mut client, cloud_del_ns_req.into_request())
1781            .await
1782            .unwrap();
1783        client.get_current_time(().into_request()).await.unwrap();
1784        client
1785            .check(HealthCheckRequest::default().into_request())
1786            .await
1787            .unwrap();
1788    }
1789
1790    fn verify_methods(proto_def_str: &str, impl_list: &[&str]) {
1791        let methods: Vec<_> = proto_def_str
1792            .lines()
1793            .map(|l| l.trim())
1794            .filter(|l| l.starts_with("rpc"))
1795            .map(|l| {
1796                let stripped = l.strip_prefix("rpc ").unwrap();
1797                stripped[..stripped.find('(').unwrap()].trim()
1798            })
1799            .collect();
1800        let no_underscores: HashSet<_> = impl_list.iter().map(|x| x.replace('_', "")).collect();
1801        let mut not_implemented = vec![];
1802        for method in methods {
1803            if !no_underscores.contains(&method.to_lowercase()) {
1804                not_implemented.push(method);
1805            }
1806        }
1807        if !not_implemented.is_empty() {
1808            panic!(
1809                "The following RPC methods are not implemented by raw client: {not_implemented:?}"
1810            );
1811        }
1812    }
1813    #[test]
1814    fn verify_all_workflow_service_methods_implemented() {
1815        // This is less work than trying to hook into the codegen process
1816        let proto_def = include_str!(
1817            "../../common/protos/api_upstream/temporal/api/workflowservice/v1/service.proto"
1818        );
1819        verify_methods(proto_def, ALL_IMPLEMENTED_WORKFLOW_SERVICE_RPCS);
1820    }
1821
1822    #[test]
1823    fn verify_all_operator_service_methods_implemented() {
1824        let proto_def = include_str!(
1825            "../../common/protos/api_upstream/temporal/api/operatorservice/v1/service.proto"
1826        );
1827        verify_methods(proto_def, ALL_IMPLEMENTED_OPERATOR_SERVICE_RPCS);
1828    }
1829
1830    #[test]
1831    fn verify_all_cloud_service_methods_implemented() {
1832        let proto_def = include_str!(
1833            "../../common/protos/api_cloud_upstream/temporal/api/cloud/cloudservice/v1/service.proto"
1834        );
1835        verify_methods(proto_def, ALL_IMPLEMENTED_CLOUD_SERVICE_RPCS);
1836    }
1837
1838    #[test]
1839    fn verify_all_test_service_methods_implemented() {
1840        let proto_def = include_str!(
1841            "../../common/protos/testsrv_upstream/temporal/api/testservice/v1/service.proto"
1842        );
1843        verify_methods(proto_def, ALL_IMPLEMENTED_TEST_SERVICE_RPCS);
1844    }
1845
1846    #[test]
1847    fn verify_all_health_service_methods_implemented() {
1848        let proto_def = include_str!("../../common/protos/grpc/health/v1/health.proto");
1849        verify_methods(proto_def, ALL_IMPLEMENTED_HEALTH_SERVICE_RPCS);
1850    }
1851
1852    #[tokio::test]
1853    async fn can_mock_services() {
1854        #[derive(Clone)]
1855        struct MyFakeServices {}
1856        impl RawGrpcCaller for MyFakeServices {}
1857        impl WorkflowService for MyFakeServices {
1858            fn list_namespaces(
1859                &mut self,
1860                _request: Request<ListNamespacesRequest>,
1861            ) -> BoxFuture<'_, Result<Response<ListNamespacesResponse>, Status>> {
1862                async {
1863                    Ok(Response::new(ListNamespacesResponse {
1864                        namespaces: vec![DescribeNamespaceResponse {
1865                            failover_version: 12345,
1866                            ..Default::default()
1867                        }],
1868                        ..Default::default()
1869                    }))
1870                }
1871                .boxed()
1872            }
1873        }
1874        impl OperatorService for MyFakeServices {}
1875        impl CloudService for MyFakeServices {}
1876        impl TestService for MyFakeServices {}
1877        // Health service isn't possible to create a default impl for.
1878        impl HealthService for MyFakeServices {
1879            fn check(
1880                &mut self,
1881                _request: tonic::Request<HealthCheckRequest>,
1882            ) -> BoxFuture<'_, Result<tonic::Response<HealthCheckResponse>, tonic::Status>>
1883            {
1884                todo!()
1885            }
1886            fn watch(
1887                &mut self,
1888                _request: tonic::Request<HealthCheckRequest>,
1889            ) -> BoxFuture<
1890                '_,
1891                Result<
1892                    tonic::Response<tonic::codec::Streaming<HealthCheckResponse>>,
1893                    tonic::Status,
1894                >,
1895            > {
1896                todo!()
1897            }
1898        }
1899        let mut mocked_client = TemporalServiceClient::from_services(
1900            Box::new(MyFakeServices {}),
1901            Box::new(MyFakeServices {}),
1902            Box::new(MyFakeServices {}),
1903            Box::new(MyFakeServices {}),
1904            Box::new(MyFakeServices {}),
1905        );
1906        let r = mocked_client
1907            .list_namespaces(ListNamespacesRequest::default().into_request())
1908            .await
1909            .unwrap();
1910        assert_eq!(r.into_inner().namespaces[0].failover_version, 12345);
1911    }
1912
1913    #[rstest::rstest]
1914    #[case::with_versioning(true)]
1915    #[case::without_versioning(false)]
1916    #[tokio::test]
1917    async fn eager_reservations_attach_deployment_options(#[case] use_worker_versioning: bool) {
1918        use crate::worker::{MockClientWorker, MockSlot};
1919        use temporalio_common::{
1920            protos::temporal::api::enums::v1::WorkerVersioningMode,
1921            worker::{WorkerDeploymentOptions, WorkerDeploymentVersion},
1922        };
1923
1924        let expected_mode = if use_worker_versioning {
1925            WorkerVersioningMode::Versioned
1926        } else {
1927            WorkerVersioningMode::Unversioned
1928        };
1929
1930        #[derive(Clone)]
1931        struct MyFakeServices {
1932            client_worker_set: Arc<ClientWorkerSet>,
1933            expected_mode: WorkerVersioningMode,
1934        }
1935        impl RawGrpcCaller for MyFakeServices {}
1936        impl RawClientProducer for MyFakeServices {
1937            fn get_workers_info(&self) -> Option<Arc<ClientWorkerSet>> {
1938                Some(self.client_worker_set.clone())
1939            }
1940            fn workflow_client(&mut self) -> Box<dyn WorkflowService> {
1941                Box::new(MyFakeWfClient {
1942                    expected_mode: self.expected_mode,
1943                })
1944            }
1945            fn operator_client(&mut self) -> Box<dyn OperatorService> {
1946                unimplemented!()
1947            }
1948            fn cloud_client(&mut self) -> Box<dyn CloudService> {
1949                unimplemented!()
1950            }
1951            fn test_client(&mut self) -> Box<dyn TestService> {
1952                unimplemented!()
1953            }
1954            fn health_client(&mut self) -> Box<dyn HealthService> {
1955                unimplemented!()
1956            }
1957        }
1958
1959        let deployment_opts = WorkerDeploymentOptions {
1960            version: WorkerDeploymentVersion {
1961                deployment_name: "test-deployment".to_string(),
1962                build_id: "test-build-123".to_string(),
1963            },
1964            use_worker_versioning,
1965            default_versioning_behavior: None,
1966        };
1967
1968        let mut mock_provider = MockClientWorker::new();
1969        mock_provider
1970            .expect_namespace()
1971            .return_const("test-namespace".to_string());
1972        mock_provider
1973            .expect_task_queue()
1974            .return_const("test-task-queue".to_string());
1975        let mut mock_slot = MockSlot::new();
1976        mock_slot.expect_schedule_wft().returning(|_| Ok(()));
1977        mock_provider
1978            .expect_try_reserve_wft_slot()
1979            .return_once(|| Some(Box::new(mock_slot)));
1980        mock_provider
1981            .expect_deployment_options()
1982            .return_const(Some(deployment_opts.clone()));
1983        mock_provider.expect_heartbeat_enabled().return_const(false);
1984        let uuid = Uuid::new_v4();
1985        mock_provider
1986            .expect_worker_instance_key()
1987            .return_const(uuid);
1988        mock_provider
1989            .expect_worker_task_types()
1990            .return_const(WorkerTaskTypes {
1991                enable_workflows: true,
1992                enable_local_activities: true,
1993                enable_remote_activities: true,
1994                enable_nexus: true,
1995            });
1996
1997        let client_worker_set = Arc::new(ClientWorkerSet::new());
1998        client_worker_set
1999            .register_worker(Arc::new(mock_provider), true)
2000            .unwrap();
2001
2002        #[derive(Clone)]
2003        struct MyFakeWfClient {
2004            expected_mode: WorkerVersioningMode,
2005        }
2006        impl WorkflowService for MyFakeWfClient {
2007            fn start_workflow_execution(
2008                &mut self,
2009                request: tonic::Request<StartWorkflowExecutionRequest>,
2010            ) -> BoxFuture<'_, Result<tonic::Response<StartWorkflowExecutionResponse>, tonic::Status>>
2011            {
2012                let req = request.into_inner();
2013                let expected_mode = self.expected_mode;
2014
2015                assert!(
2016                    req.eager_worker_deployment_options.is_some(),
2017                    "eager_worker_deployment_options should be populated"
2018                );
2019
2020                let opts = req.eager_worker_deployment_options.as_ref().unwrap();
2021                assert_eq!(opts.deployment_name, "test-deployment");
2022                assert_eq!(opts.build_id, "test-build-123");
2023                assert_eq!(opts.worker_versioning_mode, expected_mode as i32);
2024
2025                async { Ok(Response::new(StartWorkflowExecutionResponse::default())) }.boxed()
2026            }
2027        }
2028
2029        let mut mfs = MyFakeServices {
2030            client_worker_set,
2031            expected_mode,
2032        };
2033
2034        // Create a request with eager execution enabled
2035        let req = StartWorkflowExecutionRequest {
2036            namespace: "test-namespace".to_string(),
2037            workflow_id: "test-wf-id".to_string(),
2038            workflow_type: Some(
2039                temporalio_common::protos::temporal::api::common::v1::WorkflowType {
2040                    name: "test-workflow".to_string(),
2041                },
2042            ),
2043            task_queue: Some(TaskQueue {
2044                name: "test-task-queue".to_string(),
2045                kind: 0,
2046                normal_name: String::new(),
2047            }),
2048            request_eager_execution: true,
2049            ..Default::default()
2050        };
2051
2052        mfs.start_workflow_execution(req.into_request())
2053            .await
2054            .unwrap();
2055    }
2056
2057    /// Tests that Connection's RawClientProducer impl correctly provides worker info
2058    /// so that eager workflow start can reserve a slot and dispatch the WFT.
2059    #[tokio::test]
2060    async fn connection_eager_start_dispatches_wft() {
2061        use crate::{
2062            ConnectionOptions,
2063            callback_based::{CallbackBasedGrpcService, GrpcSuccessResponse},
2064            worker::{MockClientWorker, MockSlot},
2065        };
2066        use prost::Message;
2067        use std::sync::atomic::{AtomicBool, Ordering};
2068        use temporalio_common::protos::temporal::api::workflowservice::v1::PollWorkflowTaskQueueResponse;
2069
2070        let dispatched = Arc::new(AtomicBool::new(false));
2071        let dispatched_clone = dispatched.clone();
2072
2073        // Create a callback-based service that returns an eager_workflow_task in the response
2074        let service_override = CallbackBasedGrpcService {
2075            callback: Arc::new(|_req| {
2076                Box::pin(async {
2077                    let resp = StartWorkflowExecutionResponse {
2078                        run_id: "test-run-id".to_string(),
2079                        eager_workflow_task: Some(PollWorkflowTaskQueueResponse {
2080                            task_token: vec![1, 2, 3],
2081                            ..Default::default()
2082                        }),
2083                        ..Default::default()
2084                    };
2085                    let proto = resp.encode_to_vec();
2086                    Ok(GrpcSuccessResponse {
2087                        headers: Default::default(),
2088                        proto,
2089                    })
2090                })
2091            }),
2092        };
2093
2094        let opts = ConnectionOptions::new(url::Url::parse("http://localhost:7233").unwrap())
2095            .skip_get_system_info(true)
2096            .service_override(service_override)
2097            .dns_load_balancing(None)
2098            .build();
2099        let mut connection = crate::Connection::connect(opts).await.unwrap();
2100
2101        // Register a mock worker on the connection's worker set
2102        let mut mock_worker = MockClientWorker::new();
2103        mock_worker
2104            .expect_namespace()
2105            .return_const("default".to_string());
2106        mock_worker
2107            .expect_task_queue()
2108            .return_const("test-tq".to_string());
2109        mock_worker
2110            .expect_deployment_options()
2111            .return_const(None::<temporalio_common::worker::WorkerDeploymentOptions>);
2112        mock_worker.expect_heartbeat_enabled().return_const(false);
2113        let uuid = Uuid::new_v4();
2114        mock_worker.expect_worker_instance_key().return_const(uuid);
2115        mock_worker
2116            .expect_worker_task_types()
2117            .return_const(WorkerTaskTypes {
2118                enable_workflows: true,
2119                enable_local_activities: false,
2120                enable_remote_activities: false,
2121                enable_nexus: false,
2122            });
2123
2124        let mut mock_slot = MockSlot::new();
2125        mock_slot.expect_schedule_wft().returning(move |_| {
2126            dispatched_clone.store(true, Ordering::SeqCst);
2127            Ok(())
2128        });
2129        mock_worker
2130            .expect_try_reserve_wft_slot()
2131            .return_once(|| Some(Box::new(mock_slot)));
2132
2133        connection
2134            .workers()
2135            .register_worker(Arc::new(mock_worker), true)
2136            .unwrap();
2137
2138        // Make an eager start_workflow_execution call through Connection
2139        let req = StartWorkflowExecutionRequest {
2140            namespace: "default".to_string(),
2141            workflow_id: "test-wf".to_string(),
2142            workflow_type: Some(
2143                temporalio_common::protos::temporal::api::common::v1::WorkflowType {
2144                    name: "test-workflow".to_string(),
2145                },
2146            ),
2147            task_queue: Some(TaskQueue {
2148                name: "test-tq".to_string(),
2149                kind: 0,
2150                normal_name: String::new(),
2151            }),
2152            request_eager_execution: true,
2153            ..Default::default()
2154        };
2155
2156        connection
2157            .start_workflow_execution(req.into_request())
2158            .await
2159            .unwrap();
2160
2161        assert!(
2162            dispatched.load(Ordering::SeqCst),
2163            "Eager workflow task should have been dispatched to the worker"
2164        );
2165    }
2166}