zeebe_rs/worker.rs
1use crate::{ActivatedJob, Client, client, proto};
2use serde::Serialize;
3use std::{
4 future::Future,
5 ops::{Deref, DerefMut},
6 sync::Arc,
7 time::Duration,
8};
9use thiserror::Error;
10use tokio::{
11 sync::{
12 Semaphore,
13 mpsc::{self, Receiver, Sender},
14 },
15 time::{Interval, interval, timeout},
16};
17use tracing::{debug, error, info};
18
19/// An enum representing possible errors that can occur during job processing.
20///
21/// This enum provides different error variants that can be returned by a worker
22/// when processing jobs, allowing for different types of failure handling.
23///
24/// # Type Parameters
25///
26/// * `T` - A serializable type that can be included with error data
27#[derive(Debug, Error)]
28pub enum WorkerError<T>
29where
30 T: Serialize + Send + 'static,
31{
32 #[error("fail job")]
33 FailJob(String),
34
35 #[error("fail job with data")]
36 FailJobWithData { error_message: String, data: T },
37
38 #[error("throw error")]
39 ThrowError {
40 error_code: String,
41 error_message: Option<String>,
42 },
43
44 #[error("")]
45 ThrowErrorWithData {
46 error_code: String,
47 error_message: Option<String>,
48 data: T,
49 },
50
51 #[error(transparent)]
52 ClientError(#[from] client::ClientError),
53}
54
55const DEFAULT_POLL_INTERVAL: Duration = Duration::from_secs(15);
56
57/// A wrapper struct that holds a shared state of type `T`.
58///
59/// This struct is designed to encapsulate a shared state that can be accessed
60/// and modified by multiple worker instances
61///
62/// # Type Parameters
63///
64/// * `T` - The type of the shared state.
65#[derive(Debug)]
66pub struct SharedState<T>(pub T);
67
68impl<T> Deref for SharedState<T> {
69 type Target = T;
70 fn deref(&self) -> &Self::Target {
71 &self.0
72 }
73}
74
75impl<T> DerefMut for SharedState<T> {
76 fn deref_mut(&mut self) -> &mut Self::Target {
77 &mut self.0
78 }
79}
80
81/// A trait for handling the output of job processing.
82///
83/// This trait defines how different output types should be handled after
84/// job processing is complete. It provides built-in implementations for
85/// common result types.
86///
87/// # Type Parameters
88///
89/// * `T` - The type of output produced by the job handler
90///
91/// # Examples
92///
93/// ```ignore
94/// impl WorkerOutputHandler for MyStruct {
95/// fn handle_result(self, client: Client, job: ActivatedJob) -> impl Future<Output = ()> + Send + 'static {
96/// if let Ok(req) = client
97/// .complete_job()
98/// .with_job_key(job.key())
99/// .with_variables(self)
100/// {
101/// let _ = req.send().await;
102/// }
103/// }
104/// ```
105pub trait WorkerOutputHandler {
106 fn handle_result(
107 self,
108 client: Client,
109 job: ActivatedJob,
110 ) -> impl Future<Output = ()> + Send + 'static;
111}
112
113impl WorkerOutputHandler for () {
114 fn handle_result(
115 self,
116 _client: Client,
117 _job: ActivatedJob,
118 ) -> impl Future<Output = ()> + Send + 'static {
119 std::future::ready(())
120 }
121}
122
123impl<Output, T> WorkerOutputHandler for Result<Output, WorkerError<T>>
124where
125 Output: Serialize + Send + 'static,
126 T: Serialize + Send + 'static,
127{
128 async fn handle_result(self, client: Client, job: ActivatedJob) {
129 match self {
130 Ok(value) => {
131 if let Ok(req) = client
132 .complete_job()
133 .with_job_key(job.key())
134 .with_variables(value)
135 {
136 let _ = req.send().await;
137 info!("Completed job {}", job.key());
138 }
139 }
140 Err(error) => match error {
141 WorkerError::FailJob(error_message) => {
142 let _ = client
143 .fail_job()
144 .with_job_key(job.key())
145 .with_retries(job.retries() - 1)
146 .with_error_message(error_message.clone())
147 .send()
148 .await;
149 error!("Failed job {} with error {}", job.key(), error_message);
150 }
151 WorkerError::FailJobWithData {
152 error_message,
153 data,
154 } => {
155 if let Ok(req) = client
156 .fail_job()
157 .with_job_key(job.key())
158 .with_retries(job.retries() - 1)
159 .with_error_message(error_message.clone())
160 .with_variables(data)
161 {
162 let _ = req.send().await;
163 error!("Failed job {} with error {}", job.key(), error_message);
164 }
165 }
166 WorkerError::ThrowError {
167 error_code,
168 error_message,
169 } => {
170 let mut builder = client
171 .throw_error()
172 .with_job_key(job.key())
173 .with_error_code(error_code.clone());
174
175 if let Some(error_message) = error_message.clone() {
176 builder = builder.with_error_message(error_message);
177 }
178
179 let _ = builder.send().await;
180 error!(
181 "Job {} threw error {} {}",
182 error_code,
183 error_message.unwrap_or(String::from("")),
184 job.key(),
185 );
186 }
187 WorkerError::ThrowErrorWithData {
188 error_code,
189 error_message,
190 data,
191 } => {
192 if let Ok(mut req) = client
193 .throw_error()
194 .with_job_key(job.key())
195 .with_error_code(error_code.clone())
196 .with_variables(data)
197 {
198 if let Some(error_message) = error_message.clone() {
199 req = req.with_error_message(error_message);
200 }
201 let _ = req.send().await;
202 error!(
203 "Job {} threw error {} {}",
204 error_code,
205 error_message.unwrap_or(String::from("")),
206 job.key(),
207 );
208 }
209 }
210 WorkerError::ClientError(client_error) => {
211 let _ = client
212 .fail_job()
213 .with_job_key(job.key())
214 .with_retries(job.retries() - 1)
215 .with_error_message(client_error.to_string())
216 .send()
217 .await;
218
219 error!("Failed job {} with error {}", job.key(), client_error);
220 }
221 },
222 }
223 }
224}
225
226pub trait JobHandler {
227 type Output: WorkerOutputHandler + Send + 'static;
228 fn execute(&self, client: Client, job: ActivatedJob) -> impl Future<Output = ()> + Send;
229}
230
231impl<F, Fut> JobHandler for F
232where
233 F: Fn(Client, ActivatedJob) -> Fut + Send + Sync + 'static,
234 Fut: Future + Send + 'static,
235 Fut::Output: WorkerOutputHandler + Send + 'static,
236{
237 type Output = Fut::Output;
238 async fn execute(&self, client: Client, job: ActivatedJob) {
239 let res = (self)(client.clone(), job.clone()).await;
240 res.handle_result(client, job).await;
241 }
242}
243
244impl<F, T, Fut> JobHandler for (F, Arc<SharedState<T>>)
245where
246 F: Fn(Client, ActivatedJob, Arc<SharedState<T>>) -> Fut + Send + Sync + 'static,
247 Fut: Future + Send + 'static,
248 Fut::Output: WorkerOutputHandler + Send + 'static,
249 T: Send + Sync + 'static,
250{
251 type Output = Fut::Output;
252 async fn execute(&self, client: Client, job: ActivatedJob) {
253 let state = self.1.clone();
254 let res = (self.0)(client.clone(), job.clone(), state).await;
255 res.handle_result(client, job).await;
256 }
257}
258
259#[derive(Clone)]
260pub struct Initial {}
261
262#[derive(Clone)]
263pub struct WithJobType {}
264
265#[derive(Clone)]
266pub struct WithJobTimeout {}
267
268#[derive(Clone)]
269pub struct WithRequestTimeout {}
270
271#[derive(Clone)]
272pub struct WithMaxJobs {}
273
274#[derive(Clone)]
275pub struct WithConcurrency {}
276
277#[derive(Clone)]
278pub struct WithState {}
279
280#[derive(Clone)]
281pub struct WithHandler {}
282
283pub trait WorkerBuilderState {}
284
285impl WorkerBuilderState for Initial {}
286impl WorkerBuilderState for WithJobType {}
287impl WorkerBuilderState for WithJobTimeout {}
288impl WorkerBuilderState for WithRequestTimeout {}
289impl WorkerBuilderState for WithMaxJobs {}
290impl WorkerBuilderState for WithConcurrency {}
291impl WorkerBuilderState for WithState {}
292impl WorkerBuilderState for WithHandler {}
293
294#[derive(Clone)]
295/// `WorkerBuilder` is a builder pattern struct for constructing a `Worker` instance.
296///
297/// This builder uses the typestate pattern to ensure that all required parameters
298/// are set before a Worker can be constructed. The builder enforces proper
299/// configuration through its type system.
300///
301/// # Type Parameters
302///
303/// * `S` - The current state of the builder (enforces configuration order)
304///
305/// # Examples
306/// ```ignore
307/// struct ExampleSharedState {
308/// pub increment_me: u32,
309/// }
310///
311/// let state = Arc::new(SharedState(Mutex::new(ExampleSharedState {
312/// increment_me: 0,
313/// })));
314///
315/// // Client instantiation
316///
317/// client
318/// .worker()
319/// .with_job_timeout(Duration::from_secs(60))
320/// .with_request_timeout(Duration::from_secs(10))
321/// .with_max_jobs_to_activate(4)
322/// .with_concurrency_limit(2)
323/// .with_job_type(String::from("demo-service"))
324/// .with_state(state)
325/// .with_handler(|client, job, state| async move {
326/// let mut lock = state.lock().await;
327/// lock.increment_me += 1;
328/// let _ = client.complete_job().with_job_key(job.key()).send().await;
329/// })
330/// .build()
331/// ```
332pub struct WorkerBuilder<S, H = (), T = ()>
333where
334 S: WorkerBuilderState,
335 T: Send + Sync + 'static,
336{
337 client: Client,
338 job_type: String,
339 worker_name: String,
340 timeout: Duration,
341 max_jobs_to_activate: i32,
342 concurrency_limit: u32,
343 worker_callback: Option<Arc<H>>,
344 state: Option<Arc<SharedState<T>>>,
345 fetch_variable: Vec<String>,
346 request_timeout: Duration,
347 tenant_ids: Vec<String>,
348 _state: std::marker::PhantomData<S>,
349}
350
351impl WorkerBuilder<Initial> {
352 pub fn new(client: Client) -> Self {
353 Self {
354 client,
355 job_type: String::new(),
356 worker_name: String::new(),
357 timeout: Duration::default(),
358 max_jobs_to_activate: 0,
359 concurrency_limit: 0,
360 worker_callback: None,
361 state: None,
362 fetch_variable: vec![],
363 request_timeout: Duration::default(),
364 tenant_ids: vec![],
365 _state: std::marker::PhantomData,
366 }
367 }
368
369 /// Sets the request timeout for the worker.
370 ///
371 /// The request will be completed when at least one job is activated or after the specified `request_timeout`.
372 ///
373 /// # Arguments
374 ///
375 /// * `request_timeout` - The duration to wait before the request times out.
376 ///
377 /// # Returns
378 ///
379 /// A `WorkerBuilder<WithRequestTimeout>` instance with the request timeout configured.
380 pub fn with_request_timeout(
381 self,
382 request_timeout: Duration,
383 ) -> WorkerBuilder<WithRequestTimeout> {
384 WorkerBuilder {
385 client: self.client,
386 job_type: self.job_type,
387 worker_name: self.worker_name,
388 timeout: self.timeout,
389 max_jobs_to_activate: self.max_jobs_to_activate,
390 concurrency_limit: self.concurrency_limit,
391 worker_callback: self.worker_callback,
392 state: self.state,
393 fetch_variable: self.fetch_variable,
394 request_timeout,
395 tenant_ids: self.tenant_ids,
396 _state: std::marker::PhantomData,
397 }
398 }
399}
400
401impl WorkerBuilder<WithRequestTimeout> {
402 /// Sets the job timeout for the worker.
403 ///
404 /// A job returned after this call will not be activated by another call until the
405 /// specified timeout (in milliseconds) has been reached. This ensures that the job
406 /// is not picked up by another worker before the timeout expires.
407 ///
408 /// # Parameters
409 ///
410 /// - `timeout`: The duration for which the job should be locked.
411 ///
412 /// # Returns
413 ///
414 /// A `WorkerBuilder<WithJobTimeout>` instance with the job timeout configured.
415 pub fn with_job_timeout(self, timeout: Duration) -> WorkerBuilder<WithJobTimeout> {
416 WorkerBuilder {
417 client: self.client,
418 job_type: self.job_type,
419 worker_name: self.worker_name,
420 timeout,
421 max_jobs_to_activate: self.max_jobs_to_activate,
422 concurrency_limit: self.concurrency_limit,
423 worker_callback: self.worker_callback,
424 state: self.state,
425 fetch_variable: self.fetch_variable,
426 request_timeout: self.request_timeout,
427 tenant_ids: self.tenant_ids,
428 _state: std::marker::PhantomData,
429 }
430 }
431}
432
433impl WorkerBuilder<WithJobTimeout> {
434 /// Sets the maximum number of jobs to activate in a single request.
435 ///
436 /// # Arguments
437 ///
438 /// * `max_jobs_to_activate` - The maximum number of jobs to activate.
439 ///
440 /// # Returns
441 ///
442 /// A `WorkerBuilder<WithMaxJobs>` instance with the `WithMaxJobs` state.
443 pub fn with_max_jobs_to_activate(
444 self,
445 max_jobs_to_activate: i32,
446 ) -> WorkerBuilder<WithMaxJobs> {
447 WorkerBuilder {
448 client: self.client,
449 job_type: self.job_type,
450 worker_name: self.worker_name,
451 timeout: self.timeout,
452 max_jobs_to_activate,
453 concurrency_limit: self.concurrency_limit,
454 worker_callback: self.worker_callback,
455 state: self.state,
456 fetch_variable: self.fetch_variable,
457 request_timeout: self.request_timeout,
458 tenant_ids: self.tenant_ids,
459 _state: std::marker::PhantomData,
460 }
461 }
462}
463
464impl WorkerBuilder<WithMaxJobs> {
465 /// Sets the maximum number of jobs that can be processed concurrently by the worker.
466 ///
467 /// # Arguments
468 ///
469 /// * `concurrency_limit` - The maximum number of jobs that the worker can handle at the same time.
470 ///
471 /// # Returns
472 ///
473 /// A `WorkerBuilder<WithConcurrency>` instance with the concurrency limit set.
474 pub fn with_concurrency_limit(self, concurrency_limit: u32) -> WorkerBuilder<WithConcurrency> {
475 WorkerBuilder {
476 client: self.client,
477 job_type: self.job_type,
478 worker_name: self.worker_name,
479 timeout: self.timeout,
480 max_jobs_to_activate: self.max_jobs_to_activate,
481 concurrency_limit,
482 worker_callback: self.worker_callback,
483 state: self.state,
484 fetch_variable: self.fetch_variable,
485 request_timeout: self.request_timeout,
486 tenant_ids: self.tenant_ids,
487 _state: std::marker::PhantomData,
488 }
489 }
490}
491
492impl WorkerBuilder<WithConcurrency> {
493 /// Sets the job type for the worker.
494 ///
495 /// The job type is defined in the BPMN process, for example:
496 /// `<zeebe:taskDefinition type="payment-service" />`.
497 ///
498 /// # Parameters
499 ///
500 /// - `job_type`: A `String` representing the job type.
501 ///
502 /// # Returns
503 ///
504 /// A `WorkerBuilder<WithJobType>` instance with the job type set.
505 pub fn with_job_type(self, job_type: String) -> WorkerBuilder<WithJobType> {
506 WorkerBuilder {
507 client: self.client,
508 job_type,
509 worker_name: self.worker_name,
510 timeout: self.timeout,
511 max_jobs_to_activate: self.max_jobs_to_activate,
512 concurrency_limit: self.concurrency_limit,
513 worker_callback: self.worker_callback,
514 state: self.state,
515 fetch_variable: self.fetch_variable,
516 request_timeout: self.request_timeout,
517 tenant_ids: self.tenant_ids,
518 _state: std::marker::PhantomData,
519 }
520 }
521}
522
523impl WorkerBuilder<WithJobType> {
524 /// Sets the handler function for the worker.
525 ///
526 /// # Arguments
527 ///
528 /// * `handler` - A function that takes a `Client` and an `ActivatedJob` as arguments and returns a `Future` that resolves to `()`.
529 ///
530 /// # Returns
531 ///
532 /// Returns a `WorkerBuilder` with the `WithHandler` state.
533 ///
534 /// # Examples
535 /// ```ignore
536 ///
537 /// // You can choose to manually handle returning results from handler functions
538 /// async fn example_service(client: Client, job: ActivatedJob) {
539 /// // Your job handling logic here
540 /// // Function has to use the client to return results
541 /// let _ = client.complete_job().with_job_key(job.key()).send().await;
542 /// }
543 ///
544 /// client
545 /// .worker()
546 /// .with_job_timeout(Duration::from_secs(5 * 60))
547 /// .with_request_timeout(Duration::from_secs(10))
548 /// .with_max_jobs_to_activate(4)
549 /// .with_concurrency_limit(2)
550 /// .with_job_type(String::from("example-service"))
551 /// .with_handler(example_service)
552 /// ...
553 /// ```
554 ///
555 /// If the function is defined to return a Result instead the result is used to automatically set the status
556 /// of the job.
557 ///
558 /// ```ignore
559 /// async fn example_service_with_result(_client: Client, job: ActivatedJob) -> Result<(), WorkerError<()>> {
560 /// Ok(())
561 /// }
562 ///
563 /// client
564 /// .worker()
565 /// .with_job_timeout(Duration::from_secs(5 * 60))
566 /// .with_request_timeout(Duration::from_secs(10))
567 /// .with_max_jobs_to_activate(4)
568 /// .with_concurrency_limit(2)
569 /// .with_job_type(String::from("example-service"))
570 /// .with_handler(example_service_with_result)
571 /// ...
572 /// ```
573 /// This works for closures as well but requires them to be type annotated.
574 ///
575 /// ```ignore
576 /// client
577 /// .worker()
578 /// .with_request_timeout(Duration::from_secs(10))
579 /// .with_job_timeout(Duration::from_secs(10))
580 /// .with_max_jobs_to_activate(5)
581 /// .with_concurrency_limit(5)
582 /// .with_job_type(String::from("example_service"))
583 /// .with_handler(|_client, _job| async move { Ok::<(), WorkerError<()>>(()) })
584 /// .build();
585 ///
586 /// ```
587 ///
588 /// # Type Parameters
589 ///
590 /// * `F` - The type of the handler function.
591 /// * `Fut` - The return of F
592 /// * `Fut::Output` - Fut return value that must implement WorkerOutputHandler
593 ///
594 /// # Constraints
595 ///
596 /// * `F` must implement `Fn(Client, ActivatedJob) -> Fut` and must be `Send` and `'static`.
597 pub fn with_handler<F, Fut>(self, handler: F) -> WorkerBuilder<WithHandler, F>
598 where
599 F: Fn(Client, ActivatedJob) -> Fut + Send + 'static,
600 Fut: Future + Send + 'static,
601 Fut::Output: WorkerOutputHandler + Send + 'static,
602 {
603 WorkerBuilder {
604 client: self.client,
605 job_type: self.job_type,
606 worker_name: self.worker_name,
607 timeout: self.timeout,
608 max_jobs_to_activate: self.max_jobs_to_activate,
609 concurrency_limit: self.concurrency_limit,
610 worker_callback: Some(Arc::new(handler)),
611 state: self.state,
612 fetch_variable: self.fetch_variable,
613 request_timeout: self.request_timeout,
614 tenant_ids: self.tenant_ids,
615 _state: std::marker::PhantomData,
616 }
617 }
618
619 /// Sets the state that will be shared across all concurrent instances of the worker.
620 ///
621 /// # Arguments
622 ///
623 /// * `shared_state` - An `Arc` containing the shared state.
624 ///
625 /// # Returns
626 ///
627 /// Returns a `WorkerStateBuilder` with the provided shared state.
628 ///
629 /// # Type Parameters
630 ///
631 /// * `T` - The type of the shared state.
632 ///
633 /// # Constraints
634 ///
635 /// * `T` must be `Send`, `Sync`, and `'static`.
636 ///
637 pub fn with_state<T>(self, state: Arc<SharedState<T>>) -> WorkerBuilder<WithState, (), T>
638 where
639 T: Send + Sync + 'static,
640 {
641 WorkerBuilder {
642 client: self.client,
643 job_type: self.job_type,
644 worker_name: self.worker_name,
645 timeout: self.timeout,
646 max_jobs_to_activate: self.max_jobs_to_activate,
647 concurrency_limit: self.concurrency_limit,
648 worker_callback: None,
649 state: Some(state),
650 fetch_variable: self.fetch_variable,
651 request_timeout: self.request_timeout,
652 tenant_ids: self.tenant_ids,
653 _state: std::marker::PhantomData,
654 }
655 }
656}
657
658impl<T> WorkerBuilder<WithState, (), T>
659where
660 T: Send + Sync + 'static,
661{
662 pub fn with_handler<F, Fut>(
663 self,
664 handler: F,
665 ) -> WorkerBuilder<WithHandler, (F, Arc<SharedState<T>>), T>
666 where
667 F: Fn(Client, ActivatedJob, Arc<SharedState<T>>) -> Fut + Send + 'static,
668 Fut: Future + Send + 'static,
669 Fut::Output: WorkerOutputHandler + Send + 'static,
670 {
671 let state = self.state.unwrap();
672 WorkerBuilder {
673 client: self.client,
674 job_type: self.job_type,
675 worker_name: self.worker_name,
676 timeout: self.timeout,
677 max_jobs_to_activate: self.max_jobs_to_activate,
678 concurrency_limit: self.concurrency_limit,
679 worker_callback: Some(Arc::new((handler, state))),
680 state: None,
681 fetch_variable: self.fetch_variable,
682 request_timeout: self.request_timeout,
683 tenant_ids: self.tenant_ids,
684 _state: std::marker::PhantomData,
685 }
686 }
687}
688
689impl<H, T> WorkerBuilder<WithHandler, H, T>
690where
691 H: JobHandler + Send + Sync + 'static,
692 T: Send + Sync + 'static,
693{
694 /// Builds a `Worker` using the collected inputs.
695 ///
696 /// # Returns
697 ///
698 /// * `Worker` - The constructed Worker instance
699 pub fn build(self) -> Worker<H> {
700 let request = proto::ActivateJobsRequest {
701 r#type: self.job_type,
702 worker: self.worker_name,
703 timeout: self.timeout.as_millis() as i64,
704 max_jobs_to_activate: self.max_jobs_to_activate,
705 fetch_variable: self.fetch_variable,
706 request_timeout: self.request_timeout.as_millis() as i64,
707 tenant_ids: self.tenant_ids,
708 };
709
710 Worker::new(
711 self.client,
712 request,
713 self.request_timeout,
714 self.concurrency_limit,
715 self.worker_callback
716 .expect("Don't transition to build without handler"),
717 )
718 }
719
720 /// Sets the worker name.
721 ///
722 /// # Arguments
723 ///
724 /// * `worker_name` - A `String` representing the name of the worker.
725 ///
726 /// # Returns
727 ///
728 /// * `Self` - The updated `WorkerBuilder` instance.
729 pub fn with_worker_name(mut self, worker_name: String) -> Self {
730 self.worker_name = worker_name;
731 self
732 }
733
734 /// Adds a single variable to fetch.
735 ///
736 /// A list of variables to fetch as the job variables; if empty, all visible variables at
737 /// the time of activation for the scope of the job will be returned
738 ///
739 /// # Arguments
740 ///
741 /// * `fetch_variable` - A `String` representing the variable to fetch.
742 ///
743 /// # Returns
744 ///
745 /// * `Self` - The updated `WorkerBuilder` instance.
746 pub fn with_fetch_variable(mut self, fetch_variable: String) -> Self {
747 self.fetch_variable.push(fetch_variable);
748 self
749 }
750
751 /// Adds multiple variables to fetch.
752 ///
753 /// A list of variables to fetch as the job variables; if empty, all visible variables at
754 /// the time of activation for the scope of the job will be returned
755 ///
756 /// # Arguments
757 ///
758 /// * `fetch_variables` - A `Vec<String>` representing the variables to fetch.
759 ///
760 /// # Returns
761 ///
762 /// * `Self` - The updated `WorkerBuilder` instance.
763 pub fn with_fetch_variables(mut self, mut fetch_variables: Vec<String>) -> Self {
764 self.fetch_variable.append(&mut fetch_variables);
765 self
766 }
767
768 /// Adds a single tenant ID.
769 ///
770 /// # Arguments
771 ///
772 /// * `tenant_id` - A `String` representing the tenant ID.
773 ///
774 /// # Returns
775 ///
776 /// * `Self` - The updated `WorkerBuilder` instance.
777 pub fn with_tenant_id(mut self, tenant_id: String) -> Self {
778 self.tenant_ids.push(tenant_id);
779 self
780 }
781
782 /// Adds multiple tenant IDs.
783 ///
784 /// # Arguments
785 ///
786 /// * `tenant_ids` - A `Vec<String>` representing the tenant IDs.
787 ///
788 /// # Returns
789 ///
790 /// * `Self` - The updated `WorkerBuilder` instance.
791 pub fn with_tenant_ids(mut self, mut tenant_ids: Vec<String>) -> Self {
792 self.tenant_ids.append(&mut tenant_ids);
793 self
794 }
795}
796
797enum PollingMessage {
798 FetchJobs,
799 JobsFetched(u32),
800 FetchJobsComplete,
801 JobFinished,
802}
803
804struct WorkProducer {
805 client: Client,
806 job_tx: Sender<ActivatedJob>,
807 poll_tx: Sender<PollingMessage>,
808 poll_rx: Receiver<PollingMessage>,
809 poll_interval: Interval,
810 request_timeout: Duration,
811 request: proto::ActivateJobsRequest,
812 queued_jobs_count: u32,
813 max_jobs_to_activate: u32,
814}
815
816impl WorkProducer {
817 fn fetch_jobs(&mut self) {
818 let mut client = self.client.clone();
819 let mut request = self.request.clone();
820 let poll_tx = self.poll_tx.clone();
821 let job_tx = self.job_tx.clone();
822 let request_timeout = self.request_timeout;
823
824 request.max_jobs_to_activate = (self.max_jobs_to_activate - self.queued_jobs_count) as i32;
825
826 tokio::spawn(async move {
827 if let Err(_err) = timeout(request_timeout, async {
828 let res = client
829 .gateway_client
830 .activate_jobs(tonic::Request::new(request))
831 .await
832 .map(|response| {
833 debug!("Response {:?}", response);
834 response.into_inner()
835 });
836
837 let mut jobs_fetched = 0;
838 debug!("Res {:?}", res);
839
840 if let Ok(mut stream) = res {
841 while let Ok(Some(activate_job_response)) = stream.message().await {
842 jobs_fetched += activate_job_response.jobs.len() as u32;
843
844 for job in activate_job_response.jobs {
845 let _ = job_tx.send(job.into()).await;
846 }
847 }
848 }
849
850 let _ = poll_tx
851 .send(PollingMessage::JobsFetched(jobs_fetched))
852 .await;
853 })
854 .await
855 {
856 error!("{}", _err);
857 };
858
859 let _ = poll_tx.send(PollingMessage::FetchJobsComplete).await;
860 });
861 }
862
863 async fn run(&mut self) {
864 let mut fetching_jobs = false;
865 loop {
866 tokio::select! {
867 Some(message) = self.poll_rx.recv() => {
868 match message {
869 PollingMessage::JobsFetched(new_job_count) => {
870 self.queued_jobs_count = self.queued_jobs_count.saturating_add(new_job_count);
871 }
872 PollingMessage::JobFinished => {
873 self.queued_jobs_count = self.queued_jobs_count.saturating_sub(1);
874 }
875 PollingMessage::FetchJobs => {
876 if self.queued_jobs_count <= self.max_jobs_to_activate && !fetching_jobs {
877 fetching_jobs = true;
878 self.fetch_jobs();
879 }
880 }
881 PollingMessage::FetchJobsComplete => {
882 fetching_jobs = false;
883 }
884 }
885 },
886 _ = self.poll_interval.tick() => {
887 let _ = self.poll_tx.send(PollingMessage::FetchJobs).await;
888 },
889 else => {
890 break;
891 }
892 }
893 }
894 }
895}
896
897struct WorkConsumer<H>
898where
899 H: JobHandler + Send + Sync + 'static,
900{
901 client: Client,
902 job_rx: Receiver<ActivatedJob>,
903 poll_tx: Sender<PollingMessage>,
904 semaphore: Arc<Semaphore>,
905 worker_callback: Arc<H>,
906}
907
908impl<H> WorkConsumer<H>
909where
910 H: JobHandler + Send + Sync + 'static,
911{
912 async fn run(&mut self) {
913 while let Some(job) = self.job_rx.recv().await {
914 let permit = self.semaphore.clone().acquire_owned().await.unwrap();
915 let poll_tx = self.poll_tx.clone();
916 let client = self.client.clone();
917 let callback = self.worker_callback.clone();
918
919 tokio::spawn(async move {
920 callback.execute(client.clone(), job.clone()).await;
921 let _ = poll_tx.send(PollingMessage::JobFinished).await;
922 drop(permit);
923 });
924 }
925 }
926}
927
928/// The Worker is responsible for fetching jobs from Zeebe and processing them
929/// with the associated handler.
930/// /// A worker implementation for processing Zeebe jobs with configurable concurrency and state management.
931///
932/// The `Worker` is responsible for:
933/// - Polling for new jobs from the Zeebe broker
934/// - Managing job activation and processing
935/// - Handling concurrent job execution
936/// - Maintaining worker state across job executions
937///
938/// The worker consists of two main components:
939/// - `WorkProducer`: Handles job polling and queue management
940/// - `WorkConsumer`: Manages job execution and concurrency
941///
942/// # Architecture
943///
944/// The worker uses a producer-consumer pattern where:
945/// 1. The producer polls for jobs at regular intervals
946/// 2. Jobs are queued in an internal channel
947/// 3. The consumer processes jobs concurrently up to the configured limit
948///
949/// # Concurrency
950///
951/// Job processing is controlled by:
952/// - A semaphore limiting concurrent job executions
953/// - Channel-based communication between components
954/// - Configurable maximum jobs to activate
955///
956/// # Example
957///
958/// ```ignore
959/// let worker = client
960/// .worker()
961/// .with_job_timeout(Duration::from_secs(60))
962/// .with_request_timeout(Duration::from_secs(10))
963/// .with_max_jobs_to_activate(5)
964/// .with_concurrency_limit(3)
965/// .with_job_type("example-service")
966/// .with_handler(|client, job| async move {
967/// // Process job here
968/// client.complete_job().with_job_key(job.key()).send().await;
969/// })
970/// .build();
971///
972/// // Start the worker
973/// worker.run().await?;
974///
975/// ```
976/// # Error Handling
977///
978/// The worker implements automatic error handling for:
979/// - Job activation timeouts
980/// - Network errors during polling
981/// - Job processing failures
982pub struct Worker<H>
983where
984 H: JobHandler + Send + Sync + 'static,
985{
986 poller: WorkProducer,
987 dispatcher: WorkConsumer<H>,
988}
989
990impl<H> Worker<H>
991where
992 H: JobHandler + Send + Sync + 'static,
993{
994 fn new(
995 client: Client,
996 request: proto::ActivateJobsRequest,
997 request_timeout: Duration,
998 concurrency_limit: u32,
999 callback: Arc<H>,
1000 ) -> Worker<H> {
1001 let (job_tx, job_rx) = mpsc::channel(32);
1002 let (poll_tx, poll_rx) = mpsc::channel(32);
1003 let max_jobs_to_activate = request.max_jobs_to_activate as u32;
1004
1005 let poller = WorkProducer {
1006 client: client.clone(),
1007 job_tx,
1008 poll_tx: poll_tx.clone(),
1009 poll_rx,
1010 poll_interval: interval(DEFAULT_POLL_INTERVAL),
1011 request,
1012 request_timeout,
1013 max_jobs_to_activate,
1014 queued_jobs_count: 0,
1015 };
1016
1017 let dispatcher = WorkConsumer {
1018 client: client.clone(),
1019 job_rx,
1020 poll_tx: poll_tx.clone(),
1021 semaphore: Arc::new(Semaphore::new(concurrency_limit as usize)),
1022 worker_callback: callback,
1023 };
1024
1025 Worker { poller, dispatcher }
1026 }
1027
1028 /// Starts the worker by running both the poller and dispatcher concurrently.
1029 ///
1030 /// This method uses `tokio::join!` to run the `poller` and `dispatcher` concurrently.
1031 /// The poller continuously polls the Zeebe broker for new jobs, while the dispatcher
1032 /// processes the jobs using the provided callback.
1033 /// # Example
1034 /// ```ignore
1035 /// #[tokio::main]
1036 /// async fn main() {
1037 /// client
1038 /// .worker()
1039 /// //Worker configuration...
1040 /// .build()
1041 /// .run()
1042 /// .await;
1043 /// }
1044 /// ```
1045 pub async fn run(mut self) {
1046 tokio::join!(self.poller.run(), self.dispatcher.run());
1047 }
1048}