zeebe_rs/client.rs
1use crate::{
2 decision::EvaluateDecisionRequest,
3 incident::ResolveIncidentRequest,
4 job::{
5 complete::CompleteJobRequest, fail::FailJobRequest,
6 update_retries::UpdateJobRetriesRequest, update_timeout::UpdateJobTimeoutRequest,
7 },
8 message::PublishMessageRequest,
9 oauth::{AuthType, OAuthConfig, OAuthInterceptor},
10 process_instance::{
11 cancel::CancelProcessInstanceRequest, create::CreateProcessInstanceRequest,
12 migrate::MigrateProcessInstanceRequest, modify::ModifyProcessInstanceRequest,
13 },
14 proto::gateway_client::GatewayClient,
15 resource::{DeleteResourceRequest, DeployResourceError, DeployResourceRequest},
16 set_variables::SetVariablesRequest,
17 signal::BroadcastSignalRequest,
18 throw_error::ThrowErrorRequest,
19 topology::TopologyRequest,
20 worker::WorkerBuilder,
21};
22use std::{path::Path, time::Duration};
23use thiserror::Error;
24use tonic::{
25 codegen::InterceptedService,
26 transport::{Certificate, Channel, ClientTlsConfig},
27};
28
29/// Represents the different types of errors that can occur in the client.
30///
31/// The `ClientError` enum encapsulates various error types, providing a unified way to handle errors
32/// that arise during client operations, including networking issues, JSON processing errors, and
33/// resource-related errors. Each variant includes contextual information to help diagnose and resolve
34/// issues effectively.
35///
36/// # Variants
37///
38/// - `RequestFailed`
39/// Represents an error originating from a gRPC request. This variant wraps the `tonic::Status`
40/// error type, which provides details about the request failure.
41/// - Source: `tonic::Status`
42///
43/// - `JsonError`
44/// Represents a general JSON parsing or serialization error. This variant wraps the `serde_json::Error`
45/// type, which is returned when JSON data cannot be processed correctly.
46/// - Source: `serde_json::Error`
47///
48/// - `ResourceError`
49/// Represents an error related to resource deployment. This variant wraps the `DeployResourceError`
50/// type, which provides context about specific resource-related issues.
51/// - Source: `DeployResourceError`
52///
53/// - `DeserializationFailed`
54/// Indicates that deserialization of a JSON string failed. This variant provides additional
55/// context by including the original value that caused the error, as well as the underlying
56/// `serde_json::Error`.
57/// - Fields:
58/// - `value`: The JSON string that failed to deserialize.
59/// - `source`: The underlying `serde_json::Error`.
60///
61/// - `SerializationFailed`
62/// Indicates that serialization of data into JSON failed. This variant includes the underlying
63/// `serde_json::Error` to provide details about the failure.
64/// - Fields:
65/// - `source`: The underlying `serde_json::Error`.
66#[derive(Error, Debug)]
67pub enum ClientError {
68 #[error(transparent)]
69 RequestFailed(#[from] tonic::Status),
70
71 #[error(transparent)]
72 JsonError(#[from] serde_json::Error),
73
74 #[error(transparent)]
75 ResourceError(#[from] DeployResourceError),
76
77 #[error("deserialize failed on {value:?}")]
78 DeserializationFailed {
79 value: String,
80 source: serde_json::Error,
81 },
82
83 #[error("serialize failed")]
84 SerializationFailed { source: serde_json::Error },
85}
86
87/// Represents errors that can occur while building a `Client`.
88///
89/// The `ClientBuilderError` enum provides variants for different types of errors
90/// that can occur during the client building process, such as loading certificates,
91/// transport errors, HTTP errors, and URI parsing errors.
92#[derive(Error, Debug)]
93pub enum ClientBuilderError {
94 #[error("failed to load certificate")]
95 Certificate(#[from] std::io::Error),
96
97 #[error(transparent)]
98 Transport(#[from] tonic::transport::Error),
99
100 #[error(transparent)]
101 Http(#[from] tonic::codegen::http::Error),
102
103 #[error("unable to parse URI")]
104 InvalidUri(#[from] tonic::codegen::http::uri::InvalidUri),
105}
106
107#[derive(Default, Clone)]
108pub struct Initial;
109
110#[derive(Default, Clone)]
111pub struct WithAddress;
112
113pub trait ClientBuilderState {}
114impl ClientBuilderState for Initial {}
115impl ClientBuilderState for WithAddress {}
116
117/// A builder for configuring and creating a `Client`.
118///
119/// The `ClientBuilder` allows you to configure various aspects of the client,
120/// such as the endpoint, TLS settings, timeouts, and OAuth configuration.
121#[derive(Debug, Clone)]
122pub struct ClientBuilder<S: ClientBuilderState> {
123 endpoint: Option<String>,
124 tls: Option<ClientTlsConfig>,
125 timeout: Option<Duration>,
126 keep_alive: Option<Duration>,
127 auth_timeout: Option<Duration>,
128 oauth_config: Option<OAuthConfig>,
129 _state: std::marker::PhantomData<S>,
130}
131
132impl<S: ClientBuilderState + Default> Default for ClientBuilder<S> {
133 fn default() -> Self {
134 Self {
135 endpoint: Default::default(),
136 tls: Default::default(),
137 timeout: Default::default(),
138 auth_timeout: Default::default(),
139 keep_alive: Default::default(),
140 oauth_config: Default::default(),
141 _state: std::marker::PhantomData,
142 }
143 }
144}
145
146impl<S: ClientBuilderState> ClientBuilder<S> {
147 fn transition<NewState: ClientBuilderState>(self) -> ClientBuilder<NewState> {
148 ClientBuilder {
149 endpoint: self.endpoint,
150 tls: self.tls,
151 timeout: self.timeout,
152 auth_timeout: self.auth_timeout,
153 keep_alive: self.keep_alive,
154 oauth_config: self.oauth_config,
155 _state: std::marker::PhantomData,
156 }
157 }
158}
159
160impl ClientBuilder<Initial> {
161 fn set_endpoint(&mut self, zeebe_address: &str, port: u16) {
162 self.endpoint = Some(format!("{}:{}", zeebe_address, port));
163 }
164
165 /// Sets the endpoint for the Zeebe client.
166 ///
167 /// # Arguments
168 ///
169 /// * `zeebe_address` - A string slice that holds the address of the Zeebe broker.
170 /// * `port` - A 16-bit unsigned integer that holds the port number of the Zeebe broker.
171 ///
172 /// # Returns
173 ///
174 /// A `ClientBuilder<WithAddress>` instance with the Zeebe endpoint set.
175 pub fn with_address(mut self, zeebe_address: &str, port: u16) -> ClientBuilder<WithAddress> {
176 self.set_endpoint(zeebe_address, port);
177 self.transition()
178 }
179}
180
181impl ClientBuilder<WithAddress> {
182 /// Configures OAuth authentication for the client.
183 ///
184 /// # Arguments
185 ///
186 /// * `client_id` - The client ID for OAuth authentication.
187 /// * `client_secret` - The client secret for OAuth authentication.
188 /// * `auth_url` - The URL for the OAuth authentication server.
189 /// * `audience` - The audience for the OAuth token.
190 /// * `auth_timeout` - The timeout duration for the OAuth authentication process.
191 /// * `auth_type` - If OAuth credentials should use request body or basic auth, defaults to request body
192 ///
193 /// # Returns
194 ///
195 /// A `ClientBuilder<WithAddress>` instance with OAuth configuration set.
196 pub fn with_oauth(
197 mut self,
198 client_id: String,
199 client_secret: String,
200 auth_url: String,
201 audience: String,
202 auth_timeout: Duration,
203 auth_type: Option<AuthType>,
204 ) -> Self {
205 self.oauth_config = Some(OAuthConfig::new(
206 client_id,
207 client_secret,
208 auth_url,
209 audience,
210 auth_type,
211 ));
212 self.auth_timeout = Some(auth_timeout);
213
214 self
215 }
216
217 /// Configures TLS for the client root certificates required for Camunda Cloud
218 ///
219 /// # Returns
220 ///
221 /// A `ClientBuilder<WithAddress>` instance with tls configuration set.
222 pub fn with_cloud_tls(mut self) -> Self {
223 self.tls = Some(ClientTlsConfig::new().with_enabled_roots());
224 self
225 }
226
227 /// Configures TLS for the client using a PEM file.
228 ///
229 /// # Arguments
230 ///
231 /// * `pem` - The path to the PEM file containing the TLS certificate.
232 ///
233 /// # Returns
234 ///
235 /// A `Result` containing either a `ClientBuilder<WithAddress>` instance with TLS configuration set,
236 /// or a `ClientBuilderError` if reading the PEM file fails.
237 pub fn with_tls(mut self, pem: &Path) -> Result<Self, ClientBuilderError> {
238 let cert = std::fs::read_to_string(pem)?;
239 self.tls = Some(ClientTlsConfig::new().ca_certificate(Certificate::from_pem(&cert)));
240
241 Ok(self)
242 }
243
244 /// Builds the gRPC channel for the client.
245 ///
246 /// # Returns
247 ///
248 /// A `Result` containing either a `Channel` instance or a `ClientBuilderError` if the channel
249 /// could not be created.
250 async fn build_channel(&self) -> Result<Channel, ClientBuilderError> {
251 let endpoint = self
252 .endpoint
253 .as_ref()
254 .expect("Only transition to buildable if endpoint is set")
255 .to_owned();
256 let mut channel = Channel::from_shared(endpoint)?;
257
258 if let Some(ref tls) = self.tls {
259 channel = channel.tls_config(tls.clone())?;
260 }
261
262 if let Some(timeout) = self.timeout {
263 channel = channel.timeout(timeout);
264 }
265
266 if let Some(keep_alive) = self.keep_alive {
267 channel = channel.keep_alive_timeout(keep_alive);
268 }
269
270 Ok(channel.connect().await?)
271 }
272
273 /// Builds the client with the configured settings.
274 ///
275 /// # Returns
276 ///
277 /// A `Result` containing either a `Client` instance or a `ClientBuilderError` if the client
278 /// could not be built.
279 pub async fn build(self) -> Result<Client, ClientBuilderError> {
280 let channel = self.build_channel().await?;
281
282 let auth_interceptor = if let Some(cfg) = self.oauth_config {
283 OAuthInterceptor::new(
284 cfg,
285 self.auth_timeout
286 .expect("Only build oauth provider if auth timeout is set"),
287 )
288 } else {
289 OAuthInterceptor::default()
290 };
291 let gateway_client = GatewayClient::with_interceptor(channel, auth_interceptor.clone());
292 Ok(Client {
293 gateway_client,
294 auth_interceptor,
295 })
296 }
297
298 /// Sets the timeout duration for the client.
299 ///
300 /// # Arguments
301 ///
302 /// * `timeout` - The timeout duration.
303 ///
304 /// # Returns
305 ///
306 /// A `ClientBuilder<WithAddress>` instance with the timeout configuration set.
307 pub fn with_timeout(mut self, timeout: Duration) -> Self {
308 self.timeout = Some(timeout);
309 self
310 }
311
312 /// Sets the keep-alive duration for the client.
313 ///
314 /// # Arguments
315 ///
316 /// * `keep_alive` - The keep-alive duration.
317 ///
318 /// # Returns
319 ///
320 /// A `ClientBuilder<WithAddress>` instance with the keep-alive configuration set.
321 pub fn with_keep_alive(mut self, keep_alive: Duration) -> Self {
322 self.keep_alive = Some(keep_alive);
323 self
324 }
325}
326
327/// A client for interacting with the Zeebe cluster.
328///
329/// The `Client` struct provides methods to create various requests and operations
330/// on the Zeebe cluster, such as deploying resources, managing process instances,
331/// handling jobs, and more.
332///
333/// # Examples
334///
335/// ```ignore
336/// #[tokio::main]
337/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
338/// let client = zeebe_rs::Client::builder()
339/// .with_address("http://localhost", 26500)
340/// .build()
341/// .await?;
342///
343/// let topology = client.topology().send().await;
344///
345/// Ok(())
346/// }
347/// ```
348/// # Notes
349///
350/// Each method returns a request builder that can be further configured and then sent
351/// to the Zeebe cluster. The requests are asynchronous and return futures that need to
352/// be awaited.
353#[derive(Clone, Debug)]
354pub struct Client {
355 pub(crate) gateway_client: GatewayClient<InterceptedService<Channel, OAuthInterceptor>>,
356 pub(crate) auth_interceptor: OAuthInterceptor,
357}
358
359impl Client {
360 /// Creates a new `ClientBuilder` instance for configuring and building a `Client`.
361 ///
362 /// The `ClientBuilder` allows you to set various configurations such as the endpoint,
363 /// TLS settings, timeouts, and OAuth configuration before building the `Client`.
364 ///
365 /// # Examples
366 ///
367 /// ```ignore
368 /// #[tokio::main]
369 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
370 /// let client = zeebe_rs::Client::builder()
371 /// .with_address("http://localhost", 26500)
372 /// .build()
373 /// .await?;
374 ///
375 /// let topology = client.topology().send().await;
376 ///
377 /// Ok(())
378 /// }
379 /// ```
380 pub fn builder() -> ClientBuilder<Initial> {
381 ClientBuilder::default()
382 }
383
384 /// Waits for the first OAuth token to be fetched before returning.
385 /// Returns instantly if OAuth is not enabled.
386 /// # Examples
387 ///
388 /// ```ignore
389 /// #[tokio::main]
390 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
391 /// let client = zeebe_rs::Client::builder()
392 /// // Configure client with OAuth...
393 /// .build()
394 /// .await?;
395 ///
396 /// // Await first OAuth token before proceeding
397 /// let _ = client.auth_initialized().await;
398 ///
399 /// // Fetch topology after acquiring OAuth token
400 /// let topology = client.topology().send().await;
401 ///
402 /// Ok(())
403 ///}
404 /// ```
405 pub async fn auth_initialized(&self) {
406 self.auth_interceptor.auth_initialized().await;
407 }
408
409 /// Creates a `TopologyRequest` to build a request for fetching the toplogy
410 /// of the Zeebe cluster.
411 ///
412 /// # Examples
413 ///
414 /// ```ignore
415 /// let topology = client.topology().send().await;
416 /// ```
417 pub fn topology(&self) -> TopologyRequest {
418 TopologyRequest::new(self.clone())
419 }
420
421 /// Creates a `DeployResourceRequest` to build a request for deploying a
422 /// resource to Zeebe.
423 ///
424 /// # Examples
425 /// ```ignore
426 /// let result = client
427 /// .deploy_resource()
428 /// .with_resource_file(PathBuf::from("./examples/resources/hello_world.bpmn"))
429 /// .read_resource_files()?
430 /// .send()
431 /// .await?;
432 /// ```
433 pub fn deploy_resource(&self) -> DeployResourceRequest<crate::resource::Initial> {
434 DeployResourceRequest::<crate::resource::Initial>::new(self.clone())
435 }
436
437 /// Creates a `DeleteResourceRequest` to build a request for deleting a
438 /// deployed resource in Zeebe.
439 ///
440 /// # Examples
441 ///
442 /// ```ignore
443 /// let response = client
444 /// .delete_resource()
445 /// .with_resource_key(12345)
446 /// .send()
447 /// .await?;
448 /// ```
449 pub fn delete_resource(&self) -> DeleteResourceRequest<crate::resource::Initial> {
450 DeleteResourceRequest::<crate::resource::Initial>::new(self.clone())
451 }
452
453 /// Creates a `CreateProcessInstanceRequest` to build a request for creating
454 /// a process instance in Zeebe.
455 /// # Examples
456 ///
457 /// ```ignore
458 /// // Create a process instance with a BPMN process ID and no input variables
459 /// client
460 /// .create_process_instance()
461 /// .with_bpmn_process_id(String::from("order-process"))
462 /// .without_input()
463 /// .send()
464 /// .await?;
465 ///
466 /// // Create a process instance with a process definition key and input variables
467 /// client
468 /// .create_process_instance()
469 /// .with_process_definition_key(12345)
470 /// .with_variables(json!({"orderId": 123}))
471 /// .unwrap()
472 /// .send()
473 /// .await?;
474 /// ```
475 pub fn create_process_instance(
476 &self,
477 ) -> CreateProcessInstanceRequest<crate::process_instance::create::Initial> {
478 CreateProcessInstanceRequest::<crate::process_instance::create::Initial>::new(self.clone())
479 }
480
481 /// Creates a `CancelProcessInstanceRequest` to cancel an active
482 /// process instance in Zeebe.
483 ///
484 /// # Examples
485 ///
486 /// ```ignore
487 /// client
488 /// .cancel_process_instance()
489 /// .with_process_instance_key(123456)
490 /// .send()
491 /// .await?;
492 /// ```
493 ///
494 pub fn cancel_process_instance(
495 &self,
496 ) -> CancelProcessInstanceRequest<crate::process_instance::cancel::Initial> {
497 CancelProcessInstanceRequest::<crate::process_instance::cancel::Initial>::new(self.clone())
498 }
499
500 /// Creates a `MigrateProcessInstanceRequest` to migrate a running process
501 /// instance in Zeebe.
502 ///
503 /// # Examples
504 /// ```ignore
505 /// client
506 /// .migrate_process_instance()
507 /// .with_process_instance_key(12356)
508 /// .without_migration_plan()
509 /// .send()
510 /// .await?;
511 /// ```
512 pub fn migrate_process_instance(
513 &self,
514 ) -> MigrateProcessInstanceRequest<crate::process_instance::migrate::Initial> {
515 MigrateProcessInstanceRequest::<crate::process_instance::migrate::Initial>::new(
516 self.clone(),
517 )
518 }
519
520 /// Creates a `ModifyProcessInstanceRequest` to modify a running process
521 /// instance in Zeebe.
522 ///
523 /// # Examples
524 /// ```ignore
525 /// client
526 /// .modify_process_instance()
527 /// .with_process_instance_key(12345)
528 /// .with_activate_instruction("element_id".to_string(), 67890)
529 /// .with_variable_instruction("scope_id".to_string(), serde_json::json!({"key": "value"}))?
530 /// .build()
531 /// .with_terminate_instruction(54321)
532 /// .with_operation_reference(98765)
533 /// .send()
534 /// .await?;
535 /// ```
536 pub fn modify_process_instance(
537 &self,
538 ) -> ModifyProcessInstanceRequest<crate::process_instance::modify::Initial> {
539 ModifyProcessInstanceRequest::<crate::process_instance::modify::Initial>::new(self.clone())
540 }
541
542 /// Creates a `SetVariablesRequest` to update variables for a particular scope.
543 ///
544 /// # Examples
545 /// ```ignore
546 ///
547 /// #[derive(Serialize)]
548 /// struct Foo {
549 /// bar: String
550 /// }
551 ///
552 /// client
553 /// .set_variables()
554 /// .with_element_instance_key(123456)
555 /// .with_variable(Foo {bar: String::from("foobar")})
556 /// .send()
557 /// .await?;
558 /// ```
559 pub fn set_variables(&self) -> SetVariablesRequest<crate::set_variables::Initial> {
560 SetVariablesRequest::<crate::set_variables::Initial>::new(self.clone())
561 }
562
563 /// Creates a `PublishMessageRequest` to publish a message to a running
564 /// process instance in Zeebe.
565 ///
566 /// # Examples
567 /// ```ignore
568 /// // Publish a message without a correlation key
569 /// client
570 /// .publish_message()
571 /// .with_name(String::from("hello_world"))
572 /// .without_correlation_key()
573 /// .with_variables(HelloWorld {
574 /// hello: String::from("foo"),
575 /// })?
576 /// .send()
577 /// .await?;
578 ///
579 /// // Publish a message with a correlation key
580 /// client
581 /// .publish_message()
582 /// .with_name(String::from("hello_message"))
583 /// .with_correlation_key(String::from("foo"))
584 /// .send()
585 /// .await?;
586 /// ```
587 pub fn publish_message(&self) -> PublishMessageRequest<crate::message::Initial> {
588 PublishMessageRequest::<crate::message::Initial>::new(self.clone())
589 }
590
591 /// Creates a `BroadcastSignalRequest` to publish a signal to Zeebe.
592 ///
593 /// # Examples
594 ///
595 /// ```ignore
596 /// client
597 /// .broadcast_signal()
598 /// .with_signal_name(String::from("Hello_Signal"))
599 /// .send()
600 /// .await?;
601 /// ```
602 pub fn broadcast_signal(&self) -> BroadcastSignalRequest<crate::signal::Initial> {
603 BroadcastSignalRequest::<crate::signal::Initial>::new(self.clone())
604 }
605
606 /// Creates a `ResolveIncidentRequest` to resolve an active incident in
607 /// Zeebe.
608 ///
609 /// # Examples
610 ///
611 /// ```ignore
612 /// client
613 /// .resolve_incident()
614 /// .with_incident_key(123456)
615 /// .send()
616 /// .await?;
617 /// ```
618 pub fn resolve_incident(&self) -> ResolveIncidentRequest<crate::incident::Initial> {
619 ResolveIncidentRequest::<crate::incident::Initial>::new(self.clone())
620 }
621
622 /// Creates a `ThrowErrorRequest` to throw a business error.
623 ///
624 /// # Examples
625 /// ```ignore
626 /// client
627 /// .throw_error()
628 /// .with_job_key(123456)
629 /// .with_error_code(String::from("error_code"))
630 /// .send()
631 /// .await?;
632 /// ```
633 pub fn throw_error(&self) -> ThrowErrorRequest<crate::throw_error::Initial> {
634 ThrowErrorRequest::<crate::throw_error::Initial>::new(self.clone())
635 }
636
637 /// Creates a `EvaluateDecisionRequest` to request the evaluation of a DMN decision.
638 ///
639 /// # Examples
640 /// ```ignore
641 /// client
642 /// .evaluate_decision()
643 /// .with_decision_key(123456)
644 /// .with_decision_id(String::from("decision_id"))
645 /// .send()
646 /// .await?;
647 /// ```
648 pub fn evaluate_decision(&self) -> EvaluateDecisionRequest<crate::decision::Initial> {
649 EvaluateDecisionRequest::<crate::decision::Initial>::new(self.clone())
650 }
651
652 /// Creates a `CompleteJobRequest` to complete a job for a process instance
653 /// in Zeebe.
654 ///
655 /// # Examples
656 ///
657 /// ```ignore
658 /// client
659 /// .complete_job()
660 /// .with_job_key(123456)
661 /// .send()
662 /// .await?;
663 /// ```
664 pub fn complete_job(&self) -> CompleteJobRequest<crate::job::complete::Initial> {
665 CompleteJobRequest::<crate::job::complete::Initial>::new(self.clone())
666 }
667
668 /// Creates a `FailJobRequest` to fail a job for a process instance in Zeebe.
669 ///
670 /// # Examples
671 /// ```ignore
672 /// client
673 /// .fail_job()
674 /// .with_job_key(123456)
675 /// .send()
676 /// .await?;
677 /// ```
678 pub fn fail_job(&self) -> FailJobRequest<crate::job::fail::Initial> {
679 FailJobRequest::<crate::job::fail::Initial>::new(self.clone())
680 }
681
682 /// Creates a `UpdateJobTimeoutRequest` to update the timeout for a running
683 /// job in Zeebe.
684 ///
685 /// # Examples
686 /// ```ignore
687 /// client
688 /// .update_job_timeout()
689 /// .with_job_key(123456)
690 /// .with_timeout(Duration::from_secs(10))
691 /// .send()
692 /// .await?;
693 /// ```
694 pub fn update_job_timeout(
695 &self,
696 ) -> UpdateJobTimeoutRequest<crate::job::update_timeout::Initial> {
697 UpdateJobTimeoutRequest::<crate::job::update_timeout::Initial>::new(self.clone())
698 }
699
700 /// Creates a `UpdateJobRetriesRequest` that updates the number of retries
701 /// for a job in Zeebe.
702 ///
703 ///
704 /// # Examples
705 /// ```ignore
706 /// client
707 /// .update_job_retries()
708 /// .with_job_key(123456)
709 /// .with_retries(1)
710 /// .send()
711 /// .await?;
712 /// ```
713 pub fn update_job_retries(
714 &self,
715 ) -> UpdateJobRetriesRequest<crate::job::update_retries::Initial> {
716 UpdateJobRetriesRequest::<crate::job::update_retries::Initial>::new(self.clone())
717 }
718
719 /// Creates a `WorkerBuilder` to build a worker for processing Zeebe jobs.
720 ///
721 /// # Examples
722 ///
723 /// ```ignore
724 /// client
725 /// .worker()
726 /// .with_job_type(String::from("example-service"))
727 /// .with_job_timeout(Duration::from_secs(5 * 60))
728 /// .with_request_timeout(Duration::from_secs(10))
729 /// .with_max_jobs_to_activate(4)
730 /// .with_concurrency_limit(2)
731 /// .with_handler(|client, job| async move {
732 /// let _ = client.complete_job().with_job_key(job.key()).send().await;
733 /// })
734 /// .build()
735 /// .run()
736 /// .await;
737 /// ```
738 pub fn worker(&self) -> WorkerBuilder<crate::worker::Initial> {
739 WorkerBuilder::new(self.clone())
740 }
741}