zeebe_rs/
worker.rs

1use crate::{ActivatedJob, Client, client, proto};
2use serde::Serialize;
3use std::{
4    future::Future,
5    ops::{Deref, DerefMut},
6    sync::Arc,
7    time::Duration,
8};
9use thiserror::Error;
10use tokio::{
11    sync::{
12        Semaphore,
13        mpsc::{self, Receiver, Sender},
14    },
15    time::{Interval, interval, timeout},
16};
17use tracing::{debug, error, info};
18
19/// An enum representing possible errors that can occur during job processing.
20///
21/// This enum provides different error variants that can be returned by a worker
22/// when processing jobs, allowing for different types of failure handling.
23///
24/// # Type Parameters
25///
26/// * `T` - A serializable type that can be included with error data
27#[derive(Debug, Error)]
28pub enum WorkerError<T>
29where
30    T: Serialize + Send + 'static,
31{
32    #[error("fail job")]
33    FailJob(String),
34
35    #[error("fail job with data")]
36    FailJobWithData { error_message: String, data: T },
37
38    #[error("throw error")]
39    ThrowError {
40        error_code: String,
41        error_message: Option<String>,
42    },
43
44    #[error("")]
45    ThrowErrorWithData {
46        error_code: String,
47        error_message: Option<String>,
48        data: T,
49    },
50
51    #[error(transparent)]
52    ClientError(#[from] client::ClientError),
53}
54
55const DEFAULT_POLL_INTERVAL: Duration = Duration::from_secs(15);
56
57/// A wrapper struct that holds a shared state of type `T`.
58///
59/// This struct is designed to encapsulate a shared state that can be accessed
60/// and modified by multiple worker instances
61///
62/// # Type Parameters
63///
64/// * `T` - The type of the shared state.
65#[derive(Debug)]
66pub struct SharedState<T>(pub T);
67
68impl<T> Deref for SharedState<T> {
69    type Target = T;
70    fn deref(&self) -> &Self::Target {
71        &self.0
72    }
73}
74
75impl<T> DerefMut for SharedState<T> {
76    fn deref_mut(&mut self) -> &mut Self::Target {
77        &mut self.0
78    }
79}
80
81/// A trait for handling the output of job processing.
82///
83/// This trait defines how different output types should be handled after
84/// job processing is complete. It provides built-in implementations for
85/// common result types.
86///
87/// # Type Parameters
88///
89/// * `T` - The type of output produced by the job handler
90///
91/// # Examples
92///
93/// ```ignore
94/// impl WorkerOutputHandler for MyStruct {
95///     fn handle_result(self, client: Client, job: ActivatedJob) ->  impl Future<Output = ()> + Send + 'static {
96///         if let Ok(req) = client
97///                 .complete_job()
98///                 .with_job_key(job.key())
99///                 .with_variables(self)
100///                 {
101///                     let _ = req.send().await;
102///                 }
103///     }
104/// ```
105pub trait WorkerOutputHandler {
106    fn handle_result(
107        self,
108        client: Client,
109        job: ActivatedJob,
110    ) -> impl Future<Output = ()> + Send + 'static;
111}
112
113impl WorkerOutputHandler for () {
114    fn handle_result(
115        self,
116        _client: Client,
117        _job: ActivatedJob,
118    ) -> impl Future<Output = ()> + Send + 'static {
119        std::future::ready(())
120    }
121}
122
123impl<Output, T> WorkerOutputHandler for Result<Output, WorkerError<T>>
124where
125    Output: Serialize + Send + 'static,
126    T: Serialize + Send + 'static,
127{
128    async fn handle_result(self, client: Client, job: ActivatedJob) {
129        match self {
130            Ok(value) => {
131                if let Ok(req) = client
132                    .complete_job()
133                    .with_job_key(job.key())
134                    .with_variables(value)
135                {
136                    let _ = req.send().await;
137                    info!("Completed job {}", job.key());
138                }
139            }
140            Err(error) => match error {
141                WorkerError::FailJob(error_message) => {
142                    let _ = client
143                        .fail_job()
144                        .with_job_key(job.key())
145                        .with_retries(job.retries() - 1)
146                        .with_error_message(error_message.clone())
147                        .send()
148                        .await;
149                    error!("Failed job {} with error {}", job.key(), error_message);
150                }
151                WorkerError::FailJobWithData {
152                    error_message,
153                    data,
154                } => {
155                    if let Ok(req) = client
156                        .fail_job()
157                        .with_job_key(job.key())
158                        .with_retries(job.retries() - 1)
159                        .with_error_message(error_message.clone())
160                        .with_variables(data)
161                    {
162                        let _ = req.send().await;
163                        error!("Failed job {} with error {}", job.key(), error_message);
164                    }
165                }
166                WorkerError::ThrowError {
167                    error_code,
168                    error_message,
169                } => {
170                    let mut builder = client
171                        .throw_error()
172                        .with_job_key(job.key())
173                        .with_error_code(error_code.clone());
174
175                    if let Some(error_message) = error_message.clone() {
176                        builder = builder.with_error_message(error_message);
177                    }
178
179                    let _ = builder.send().await;
180                    error!(
181                        "Job {} threw error {} {}",
182                        error_code,
183                        error_message.unwrap_or(String::from("")),
184                        job.key(),
185                    );
186                }
187                WorkerError::ThrowErrorWithData {
188                    error_code,
189                    error_message,
190                    data,
191                } => {
192                    if let Ok(mut req) = client
193                        .throw_error()
194                        .with_job_key(job.key())
195                        .with_error_code(error_code.clone())
196                        .with_variables(data)
197                    {
198                        if let Some(error_message) = error_message.clone() {
199                            req = req.with_error_message(error_message);
200                        }
201                        let _ = req.send().await;
202                        error!(
203                            "Job {} threw error {} {}",
204                            error_code,
205                            error_message.unwrap_or(String::from("")),
206                            job.key(),
207                        );
208                    }
209                }
210                WorkerError::ClientError(client_error) => {
211                    let _ = client
212                        .fail_job()
213                        .with_job_key(job.key())
214                        .with_retries(job.retries() - 1)
215                        .with_error_message(client_error.to_string())
216                        .send()
217                        .await;
218
219                    error!("Failed job {} with error {}", job.key(), client_error);
220                }
221            },
222        }
223    }
224}
225
226pub trait JobHandler {
227    type Output: WorkerOutputHandler + Send + 'static;
228    fn execute(&self, client: Client, job: ActivatedJob) -> impl Future<Output = ()> + Send;
229}
230
231impl<F, Fut> JobHandler for F
232where
233    F: Fn(Client, ActivatedJob) -> Fut + Send + Sync + 'static,
234    Fut: Future + Send + 'static,
235    Fut::Output: WorkerOutputHandler + Send + 'static,
236{
237    type Output = Fut::Output;
238    async fn execute(&self, client: Client, job: ActivatedJob) {
239        let res = (self)(client.clone(), job.clone()).await;
240        res.handle_result(client, job).await;
241    }
242}
243
244impl<F, T, Fut> JobHandler for (F, Arc<SharedState<T>>)
245where
246    F: Fn(Client, ActivatedJob, Arc<SharedState<T>>) -> Fut + Send + Sync + 'static,
247    Fut: Future + Send + 'static,
248    Fut::Output: WorkerOutputHandler + Send + 'static,
249    T: Send + Sync + 'static,
250{
251    type Output = Fut::Output;
252    async fn execute(&self, client: Client, job: ActivatedJob) {
253        let state = self.1.clone();
254        let res = (self.0)(client.clone(), job.clone(), state).await;
255        res.handle_result(client, job).await;
256    }
257}
258
259#[derive(Clone)]
260pub struct Initial {}
261
262#[derive(Clone)]
263pub struct WithJobType {}
264
265#[derive(Clone)]
266pub struct WithJobTimeout {}
267
268#[derive(Clone)]
269pub struct WithRequestTimeout {}
270
271#[derive(Clone)]
272pub struct WithMaxJobs {}
273
274#[derive(Clone)]
275pub struct WithConcurrency {}
276
277#[derive(Clone)]
278pub struct WithState {}
279
280#[derive(Clone)]
281pub struct WithHandler {}
282
283pub trait WorkerBuilderState {}
284
285impl WorkerBuilderState for Initial {}
286impl WorkerBuilderState for WithJobType {}
287impl WorkerBuilderState for WithJobTimeout {}
288impl WorkerBuilderState for WithRequestTimeout {}
289impl WorkerBuilderState for WithMaxJobs {}
290impl WorkerBuilderState for WithConcurrency {}
291impl WorkerBuilderState for WithState {}
292impl WorkerBuilderState for WithHandler {}
293
294#[derive(Clone)]
295/// `WorkerBuilder` is a builder pattern struct for constructing a `Worker` instance.
296///
297/// This builder uses the typestate pattern to ensure that all required parameters
298/// are set before a Worker can be constructed. The builder enforces proper
299/// configuration through its type system.
300///
301/// # Type Parameters
302///
303/// * `S` - The current state of the builder (enforces configuration order)
304///
305/// # Examples
306/// ```ignore
307/// struct ExampleSharedState {
308///     pub increment_me: u32,
309/// }
310///
311/// let state = Arc::new(SharedState(Mutex::new(ExampleSharedState {
312///        increment_me: 0,
313/// })));
314///
315/// // Client instantiation
316///
317/// client
318///     .worker()
319///     .with_job_timeout(Duration::from_secs(60))
320///     .with_request_timeout(Duration::from_secs(10))
321///     .with_max_jobs_to_activate(4)
322///     .with_concurrency_limit(2)
323///     .with_job_type(String::from("demo-service"))
324///     .with_state(state)
325///     .with_handler(|client, job, state| async move {
326///         let mut lock = state.lock().await;
327///         lock.increment_me += 1;
328///         let _ = client.complete_job().with_job_key(job.key()).send().await;
329///      })
330///      .build()
331/// ```
332pub struct WorkerBuilder<S, H = (), T = ()>
333where
334    S: WorkerBuilderState,
335    T: Send + Sync + 'static,
336{
337    client: Client,
338    job_type: String,
339    worker_name: String,
340    timeout: Duration,
341    max_jobs_to_activate: i32,
342    concurrency_limit: u32,
343    worker_callback: Option<Arc<H>>,
344    state: Option<Arc<SharedState<T>>>,
345    fetch_variable: Vec<String>,
346    request_timeout: Duration,
347    tenant_ids: Vec<String>,
348    _state: std::marker::PhantomData<S>,
349}
350
351impl WorkerBuilder<Initial> {
352    pub fn new(client: Client) -> Self {
353        Self {
354            client,
355            job_type: String::new(),
356            worker_name: String::new(),
357            timeout: Duration::default(),
358            max_jobs_to_activate: 0,
359            concurrency_limit: 0,
360            worker_callback: None,
361            state: None,
362            fetch_variable: vec![],
363            request_timeout: Duration::default(),
364            tenant_ids: vec![],
365            _state: std::marker::PhantomData,
366        }
367    }
368
369    /// Sets the request timeout for the worker.
370    ///
371    /// The request will be completed when at least one job is activated or after the specified `request_timeout`.
372    ///
373    /// # Arguments
374    ///
375    /// * `request_timeout` - The duration to wait before the request times out.
376    ///
377    /// # Returns
378    ///
379    /// A `WorkerBuilder<WithRequestTimeout>` instance with the request timeout configured.
380    pub fn with_request_timeout(
381        self,
382        request_timeout: Duration,
383    ) -> WorkerBuilder<WithRequestTimeout> {
384        WorkerBuilder {
385            client: self.client,
386            job_type: self.job_type,
387            worker_name: self.worker_name,
388            timeout: self.timeout,
389            max_jobs_to_activate: self.max_jobs_to_activate,
390            concurrency_limit: self.concurrency_limit,
391            worker_callback: self.worker_callback,
392            state: self.state,
393            fetch_variable: self.fetch_variable,
394            request_timeout,
395            tenant_ids: self.tenant_ids,
396            _state: std::marker::PhantomData,
397        }
398    }
399}
400
401impl WorkerBuilder<WithRequestTimeout> {
402    /// Sets the job timeout for the worker.
403    ///
404    /// A job returned after this call will not be activated by another call until the
405    /// specified timeout (in milliseconds) has been reached. This ensures that the job
406    /// is not picked up by another worker before the timeout expires.
407    ///
408    /// # Parameters
409    ///
410    /// - `timeout`: The duration for which the job should be locked.
411    ///
412    /// # Returns
413    ///
414    /// A `WorkerBuilder<WithJobTimeout>` instance with the job timeout configured.
415    pub fn with_job_timeout(self, timeout: Duration) -> WorkerBuilder<WithJobTimeout> {
416        WorkerBuilder {
417            client: self.client,
418            job_type: self.job_type,
419            worker_name: self.worker_name,
420            timeout,
421            max_jobs_to_activate: self.max_jobs_to_activate,
422            concurrency_limit: self.concurrency_limit,
423            worker_callback: self.worker_callback,
424            state: self.state,
425            fetch_variable: self.fetch_variable,
426            request_timeout: self.request_timeout,
427            tenant_ids: self.tenant_ids,
428            _state: std::marker::PhantomData,
429        }
430    }
431}
432
433impl WorkerBuilder<WithJobTimeout> {
434    /// Sets the maximum number of jobs to activate in a single request.
435    ///
436    /// # Arguments
437    ///
438    /// * `max_jobs_to_activate` - The maximum number of jobs to activate.
439    ///
440    /// # Returns
441    ///
442    /// A `WorkerBuilder<WithMaxJobs>` instance with the `WithMaxJobs` state.
443    pub fn with_max_jobs_to_activate(
444        self,
445        max_jobs_to_activate: i32,
446    ) -> WorkerBuilder<WithMaxJobs> {
447        WorkerBuilder {
448            client: self.client,
449            job_type: self.job_type,
450            worker_name: self.worker_name,
451            timeout: self.timeout,
452            max_jobs_to_activate,
453            concurrency_limit: self.concurrency_limit,
454            worker_callback: self.worker_callback,
455            state: self.state,
456            fetch_variable: self.fetch_variable,
457            request_timeout: self.request_timeout,
458            tenant_ids: self.tenant_ids,
459            _state: std::marker::PhantomData,
460        }
461    }
462}
463
464impl WorkerBuilder<WithMaxJobs> {
465    /// Sets the maximum number of jobs that can be processed concurrently by the worker.
466    ///
467    /// # Arguments
468    ///
469    /// * `concurrency_limit` - The maximum number of jobs that the worker can handle at the same time.
470    ///
471    /// # Returns
472    ///
473    /// A `WorkerBuilder<WithConcurrency>` instance with the concurrency limit set.
474    pub fn with_concurrency_limit(self, concurrency_limit: u32) -> WorkerBuilder<WithConcurrency> {
475        WorkerBuilder {
476            client: self.client,
477            job_type: self.job_type,
478            worker_name: self.worker_name,
479            timeout: self.timeout,
480            max_jobs_to_activate: self.max_jobs_to_activate,
481            concurrency_limit,
482            worker_callback: self.worker_callback,
483            state: self.state,
484            fetch_variable: self.fetch_variable,
485            request_timeout: self.request_timeout,
486            tenant_ids: self.tenant_ids,
487            _state: std::marker::PhantomData,
488        }
489    }
490}
491
492impl WorkerBuilder<WithConcurrency> {
493    /// Sets the job type for the worker.
494    ///
495    /// The job type is defined in the BPMN process, for example:
496    /// `<zeebe:taskDefinition type="payment-service" />`.
497    ///
498    /// # Parameters
499    ///
500    /// - `job_type`: A `String` representing the job type.
501    ///
502    /// # Returns
503    ///
504    /// A `WorkerBuilder<WithJobType>` instance with the job type set.
505    pub fn with_job_type(self, job_type: String) -> WorkerBuilder<WithJobType> {
506        WorkerBuilder {
507            client: self.client,
508            job_type,
509            worker_name: self.worker_name,
510            timeout: self.timeout,
511            max_jobs_to_activate: self.max_jobs_to_activate,
512            concurrency_limit: self.concurrency_limit,
513            worker_callback: self.worker_callback,
514            state: self.state,
515            fetch_variable: self.fetch_variable,
516            request_timeout: self.request_timeout,
517            tenant_ids: self.tenant_ids,
518            _state: std::marker::PhantomData,
519        }
520    }
521}
522
523impl WorkerBuilder<WithJobType> {
524    /// Sets the handler function for the worker.
525    ///
526    /// # Arguments
527    ///
528    /// * `handler` - A function that takes a `Client` and an `ActivatedJob` as arguments and returns a `Future` that resolves to `()`.
529    ///
530    /// # Returns
531    ///
532    /// Returns a `WorkerBuilder` with the `WithHandler` state.
533    ///
534    /// # Examples
535    /// ```ignore
536    ///
537    /// // You can choose to manually handle returning results from handler functions
538    /// async fn example_service(client: Client, job: ActivatedJob) {
539    ///     // Your job handling logic here
540    ///     // Function has to use the client to return results
541    ///     let _ = client.complete_job().with_job_key(job.key()).send().await;
542    /// }
543    ///
544    /// client
545    ///    .worker()
546    ///    .with_job_timeout(Duration::from_secs(5 * 60))
547    ///    .with_request_timeout(Duration::from_secs(10))
548    ///    .with_max_jobs_to_activate(4)
549    ///    .with_concurrency_limit(2)
550    ///     .with_job_type(String::from("example-service"))
551    ///    .with_handler(example_service)
552    ///    ...
553    /// ```
554    ///
555    /// If the function is defined to return a Result instead the result is used to automatically set the status
556    /// of the job.
557    ///
558    /// ```ignore
559    /// async fn example_service_with_result(_client: Client, job: ActivatedJob) -> Result<(), WorkerError<()>> {
560    ///     Ok(())
561    /// }
562    ///
563    /// client
564    ///    .worker()
565    ///    .with_job_timeout(Duration::from_secs(5 * 60))
566    ///    .with_request_timeout(Duration::from_secs(10))
567    ///    .with_max_jobs_to_activate(4)
568    ///    .with_concurrency_limit(2)
569    ///    .with_job_type(String::from("example-service"))
570    ///    .with_handler(example_service_with_result)
571    ///    ...
572    /// ```
573    /// This works for closures as well but requires them to be type annotated.
574    ///
575    /// ```ignore
576    /// client
577    ///     .worker()
578    ///     .with_request_timeout(Duration::from_secs(10))
579    ///     .with_job_timeout(Duration::from_secs(10))
580    ///     .with_max_jobs_to_activate(5)
581    ///     .with_concurrency_limit(5)
582    ///     .with_job_type(String::from("example_service"))
583    ///     .with_handler(|_client, _job| async move { Ok::<(), WorkerError<()>>(()) })
584    ///     .build();
585    ///
586    /// ```
587    ///
588    /// # Type Parameters
589    ///
590    /// * `F` - The type of the handler function.
591    /// * `Fut` - The return of F
592    /// * `Fut::Output` - Fut return value that must implement WorkerOutputHandler
593    ///
594    /// # Constraints
595    ///
596    /// * `F` must implement `Fn(Client, ActivatedJob) -> Fut` and must be `Send` and `'static`.
597    pub fn with_handler<F, Fut>(self, handler: F) -> WorkerBuilder<WithHandler, F>
598    where
599        F: Fn(Client, ActivatedJob) -> Fut + Send + 'static,
600        Fut: Future + Send + 'static,
601        Fut::Output: WorkerOutputHandler + Send + 'static,
602    {
603        WorkerBuilder {
604            client: self.client,
605            job_type: self.job_type,
606            worker_name: self.worker_name,
607            timeout: self.timeout,
608            max_jobs_to_activate: self.max_jobs_to_activate,
609            concurrency_limit: self.concurrency_limit,
610            worker_callback: Some(Arc::new(handler)),
611            state: self.state,
612            fetch_variable: self.fetch_variable,
613            request_timeout: self.request_timeout,
614            tenant_ids: self.tenant_ids,
615            _state: std::marker::PhantomData,
616        }
617    }
618
619    /// Sets the state that will be shared across all concurrent instances of the worker.
620    ///
621    /// # Arguments
622    ///
623    /// * `shared_state` - An `Arc` containing the shared state.
624    ///
625    /// # Returns
626    ///
627    /// Returns a `WorkerStateBuilder` with the provided shared state.
628    ///
629    /// # Type Parameters
630    ///
631    /// * `T` - The type of the shared state.
632    ///
633    /// # Constraints
634    ///
635    /// * `T` must be `Send`, `Sync`, and `'static`.
636    ///
637    pub fn with_state<T>(self, state: Arc<SharedState<T>>) -> WorkerBuilder<WithState, (), T>
638    where
639        T: Send + Sync + 'static,
640    {
641        WorkerBuilder {
642            client: self.client,
643            job_type: self.job_type,
644            worker_name: self.worker_name,
645            timeout: self.timeout,
646            max_jobs_to_activate: self.max_jobs_to_activate,
647            concurrency_limit: self.concurrency_limit,
648            worker_callback: None,
649            state: Some(state),
650            fetch_variable: self.fetch_variable,
651            request_timeout: self.request_timeout,
652            tenant_ids: self.tenant_ids,
653            _state: std::marker::PhantomData,
654        }
655    }
656}
657
658impl<T> WorkerBuilder<WithState, (), T>
659where
660    T: Send + Sync + 'static,
661{
662    pub fn with_handler<F, Fut>(
663        self,
664        handler: F,
665    ) -> WorkerBuilder<WithHandler, (F, Arc<SharedState<T>>), T>
666    where
667        F: Fn(Client, ActivatedJob, Arc<SharedState<T>>) -> Fut + Send + 'static,
668        Fut: Future + Send + 'static,
669        Fut::Output: WorkerOutputHandler + Send + 'static,
670    {
671        let state = self.state.unwrap();
672        WorkerBuilder {
673            client: self.client,
674            job_type: self.job_type,
675            worker_name: self.worker_name,
676            timeout: self.timeout,
677            max_jobs_to_activate: self.max_jobs_to_activate,
678            concurrency_limit: self.concurrency_limit,
679            worker_callback: Some(Arc::new((handler, state))),
680            state: None,
681            fetch_variable: self.fetch_variable,
682            request_timeout: self.request_timeout,
683            tenant_ids: self.tenant_ids,
684            _state: std::marker::PhantomData,
685        }
686    }
687}
688
689impl<H, T> WorkerBuilder<WithHandler, H, T>
690where
691    H: JobHandler + Send + Sync + 'static,
692    T: Send + Sync + 'static,
693{
694    /// Builds a `Worker` using the collected inputs.
695    ///
696    /// # Returns
697    ///
698    /// * `Worker` - The constructed Worker instance
699    pub fn build(self) -> Worker<H> {
700        let request = proto::ActivateJobsRequest {
701            r#type: self.job_type,
702            worker: self.worker_name,
703            timeout: self.timeout.as_millis() as i64,
704            max_jobs_to_activate: self.max_jobs_to_activate,
705            fetch_variable: self.fetch_variable,
706            request_timeout: self.request_timeout.as_millis() as i64,
707            tenant_ids: self.tenant_ids,
708        };
709
710        Worker::new(
711            self.client,
712            request,
713            self.request_timeout,
714            self.concurrency_limit,
715            self.worker_callback
716                .expect("Don't transition to build without handler"),
717        )
718    }
719
720    /// Sets the worker name.
721    ///
722    /// # Arguments
723    ///
724    /// * `worker_name` - A `String` representing the name of the worker.
725    ///
726    /// # Returns
727    ///
728    /// * `Self` - The updated `WorkerBuilder` instance.
729    pub fn with_worker_name(mut self, worker_name: String) -> Self {
730        self.worker_name = worker_name;
731        self
732    }
733
734    /// Adds a single variable to fetch.
735    ///
736    /// A list of variables to fetch as the job variables; if empty, all visible variables at
737    /// the time of activation for the scope of the job will be returned
738    ///
739    /// # Arguments
740    ///
741    /// * `fetch_variable` - A `String` representing the variable to fetch.
742    ///
743    /// # Returns
744    ///
745    /// * `Self` - The updated `WorkerBuilder` instance.
746    pub fn with_fetch_variable(mut self, fetch_variable: String) -> Self {
747        self.fetch_variable.push(fetch_variable);
748        self
749    }
750
751    /// Adds multiple variables to fetch.
752    ///
753    /// A list of variables to fetch as the job variables; if empty, all visible variables at
754    /// the time of activation for the scope of the job will be returned
755    ///
756    /// # Arguments
757    ///
758    /// * `fetch_variables` - A `Vec<String>` representing the variables to fetch.
759    ///
760    /// # Returns
761    ///
762    /// * `Self` - The updated `WorkerBuilder` instance.
763    pub fn with_fetch_variables(mut self, mut fetch_variables: Vec<String>) -> Self {
764        self.fetch_variable.append(&mut fetch_variables);
765        self
766    }
767
768    /// Adds a single tenant ID.
769    ///
770    /// # Arguments
771    ///
772    /// * `tenant_id` - A `String` representing the tenant ID.
773    ///
774    /// # Returns
775    ///
776    /// * `Self` - The updated `WorkerBuilder` instance.
777    pub fn with_tenant_id(mut self, tenant_id: String) -> Self {
778        self.tenant_ids.push(tenant_id);
779        self
780    }
781
782    /// Adds multiple tenant IDs.
783    ///
784    /// # Arguments
785    ///
786    /// * `tenant_ids` - A `Vec<String>` representing the tenant IDs.
787    ///
788    /// # Returns
789    ///
790    /// * `Self` - The updated `WorkerBuilder` instance.
791    pub fn with_tenant_ids(mut self, mut tenant_ids: Vec<String>) -> Self {
792        self.tenant_ids.append(&mut tenant_ids);
793        self
794    }
795}
796
797enum PollingMessage {
798    FetchJobs,
799    JobsFetched(u32),
800    FetchJobsComplete,
801    JobFinished,
802}
803
804struct WorkProducer {
805    client: Client,
806    job_tx: Sender<ActivatedJob>,
807    poll_tx: Sender<PollingMessage>,
808    poll_rx: Receiver<PollingMessage>,
809    poll_interval: Interval,
810    request_timeout: Duration,
811    request: proto::ActivateJobsRequest,
812    queued_jobs_count: u32,
813    max_jobs_to_activate: u32,
814}
815
816impl WorkProducer {
817    fn fetch_jobs(&mut self) {
818        let mut client = self.client.clone();
819        let mut request = self.request.clone();
820        let poll_tx = self.poll_tx.clone();
821        let job_tx = self.job_tx.clone();
822        let request_timeout = self.request_timeout;
823
824        request.max_jobs_to_activate = (self.max_jobs_to_activate - self.queued_jobs_count) as i32;
825
826        tokio::spawn(async move {
827            if let Err(_err) = timeout(request_timeout, async {
828                let res = client
829                    .gateway_client
830                    .activate_jobs(tonic::Request::new(request))
831                    .await
832                    .map(|response| {
833                        debug!("Response {:?}", response);
834                        response.into_inner()
835                    });
836
837                let mut jobs_fetched = 0;
838                debug!("Res {:?}", res);
839
840                if let Ok(mut stream) = res {
841                    while let Ok(Some(activate_job_response)) = stream.message().await {
842                        jobs_fetched += activate_job_response.jobs.len() as u32;
843
844                        for job in activate_job_response.jobs {
845                            let _ = job_tx.send(job.into()).await;
846                        }
847                    }
848                }
849
850                let _ = poll_tx
851                    .send(PollingMessage::JobsFetched(jobs_fetched))
852                    .await;
853            })
854            .await
855            {
856                error!("{}", _err);
857            };
858
859            let _ = poll_tx.send(PollingMessage::FetchJobsComplete).await;
860        });
861    }
862
863    async fn run(&mut self) {
864        let mut fetching_jobs = false;
865        loop {
866            tokio::select! {
867                Some(message) = self.poll_rx.recv() => {
868                    match message {
869                        PollingMessage::JobsFetched(new_job_count) => {
870                            self.queued_jobs_count = self.queued_jobs_count.saturating_add(new_job_count);
871                        }
872                        PollingMessage::JobFinished => {
873                            self.queued_jobs_count = self.queued_jobs_count.saturating_sub(1);
874                        }
875                        PollingMessage::FetchJobs => {
876                            if self.queued_jobs_count <= self.max_jobs_to_activate && !fetching_jobs {
877                                fetching_jobs = true;
878                                self.fetch_jobs();
879                            }
880                        }
881                        PollingMessage::FetchJobsComplete => {
882                            fetching_jobs = false;
883                        }
884                    }
885                },
886                _ = self.poll_interval.tick() => {
887                    let _ = self.poll_tx.send(PollingMessage::FetchJobs).await;
888                },
889                else => {
890                    break;
891                }
892            }
893        }
894    }
895}
896
897struct WorkConsumer<H>
898where
899    H: JobHandler + Send + Sync + 'static,
900{
901    client: Client,
902    job_rx: Receiver<ActivatedJob>,
903    poll_tx: Sender<PollingMessage>,
904    semaphore: Arc<Semaphore>,
905    worker_callback: Arc<H>,
906}
907
908impl<H> WorkConsumer<H>
909where
910    H: JobHandler + Send + Sync + 'static,
911{
912    async fn run(&mut self) {
913        while let Some(job) = self.job_rx.recv().await {
914            let permit = self.semaphore.clone().acquire_owned().await.unwrap();
915            let poll_tx = self.poll_tx.clone();
916            let client = self.client.clone();
917            let callback = self.worker_callback.clone();
918
919            tokio::spawn(async move {
920                callback.execute(client.clone(), job.clone()).await;
921                let _ = poll_tx.send(PollingMessage::JobFinished).await;
922                drop(permit);
923            });
924        }
925    }
926}
927
928/// The Worker is responsible for fetching jobs from Zeebe and processing them
929/// with the associated handler.
930/// /// A worker implementation for processing Zeebe jobs with configurable concurrency and state management.
931///
932/// The `Worker` is responsible for:
933/// - Polling for new jobs from the Zeebe broker
934/// - Managing job activation and processing
935/// - Handling concurrent job execution
936/// - Maintaining worker state across job executions
937///
938/// The worker consists of two main components:
939/// - `WorkProducer`: Handles job polling and queue management
940/// - `WorkConsumer`: Manages job execution and concurrency
941///
942/// # Architecture
943///
944/// The worker uses a producer-consumer pattern where:
945/// 1. The producer polls for jobs at regular intervals
946/// 2. Jobs are queued in an internal channel
947/// 3. The consumer processes jobs concurrently up to the configured limit
948///
949/// # Concurrency
950///
951/// Job processing is controlled by:
952/// - A semaphore limiting concurrent job executions
953/// - Channel-based communication between components
954/// - Configurable maximum jobs to activate
955///
956/// # Example
957///
958/// ```ignore
959/// let worker = client
960///     .worker()
961///     .with_job_timeout(Duration::from_secs(60))
962///     .with_request_timeout(Duration::from_secs(10))
963///     .with_max_jobs_to_activate(5)
964///     .with_concurrency_limit(3)
965///     .with_job_type("example-service")
966///     .with_handler(|client, job| async move {
967///         // Process job here
968///         client.complete_job().with_job_key(job.key()).send().await;
969///     })
970///     .build();
971///
972/// // Start the worker
973/// worker.run().await?;
974///
975/// ```
976/// # Error Handling
977///
978/// The worker implements automatic error handling for:
979/// - Job activation timeouts
980/// - Network errors during polling
981/// - Job processing failures
982pub struct Worker<H>
983where
984    H: JobHandler + Send + Sync + 'static,
985{
986    poller: WorkProducer,
987    dispatcher: WorkConsumer<H>,
988}
989
990impl<H> Worker<H>
991where
992    H: JobHandler + Send + Sync + 'static,
993{
994    fn new(
995        client: Client,
996        request: proto::ActivateJobsRequest,
997        request_timeout: Duration,
998        concurrency_limit: u32,
999        callback: Arc<H>,
1000    ) -> Worker<H> {
1001        let (job_tx, job_rx) = mpsc::channel(32);
1002        let (poll_tx, poll_rx) = mpsc::channel(32);
1003        let max_jobs_to_activate = request.max_jobs_to_activate as u32;
1004
1005        let poller = WorkProducer {
1006            client: client.clone(),
1007            job_tx,
1008            poll_tx: poll_tx.clone(),
1009            poll_rx,
1010            poll_interval: interval(DEFAULT_POLL_INTERVAL),
1011            request,
1012            request_timeout,
1013            max_jobs_to_activate,
1014            queued_jobs_count: 0,
1015        };
1016
1017        let dispatcher = WorkConsumer {
1018            client: client.clone(),
1019            job_rx,
1020            poll_tx: poll_tx.clone(),
1021            semaphore: Arc::new(Semaphore::new(concurrency_limit as usize)),
1022            worker_callback: callback,
1023        };
1024
1025        Worker { poller, dispatcher }
1026    }
1027
1028    /// Starts the worker by running both the poller and dispatcher concurrently.
1029    ///
1030    /// This method uses `tokio::join!` to run the `poller` and `dispatcher` concurrently.
1031    /// The poller continuously polls the Zeebe broker for new jobs, while the dispatcher
1032    /// processes the jobs using the provided callback.
1033    /// # Example
1034    /// ```ignore
1035    /// #[tokio::main]
1036    /// async fn main() {
1037    ///     client
1038    ///         .worker()
1039    ///         //Worker configuration...
1040    ///         .build()
1041    ///         .run()
1042    ///         .await;
1043    /// }
1044    /// ```
1045    pub async fn run(mut self) {
1046        tokio::join!(self.poller.run(), self.dispatcher.run());
1047    }
1048}