Worker

Struct Worker 

Source
pub struct Worker<H>
where H: JobHandler + Send + Sync + 'static,
{ /* private fields */ }
Expand description

The Worker is responsible for fetching jobs from Zeebe and processing them with the associated handler. /// A worker implementation for processing Zeebe jobs with configurable concurrency and state management.

The Worker is responsible for:

  • Polling for new jobs from the Zeebe broker
  • Managing job activation and processing
  • Handling concurrent job execution
  • Maintaining worker state across job executions

The worker consists of two main components:

  • WorkProducer: Handles job polling and queue management
  • WorkConsumer: Manages job execution and concurrency

§Architecture

The worker uses a producer-consumer pattern where:

  1. The producer polls for jobs at regular intervals
  2. Jobs are queued in an internal channel
  3. The consumer processes jobs concurrently up to the configured limit

§Concurrency

Job processing is controlled by:

  • A semaphore limiting concurrent job executions
  • Channel-based communication between components
  • Configurable maximum jobs to activate

§Example

let worker = client
    .worker()
    .with_job_timeout(Duration::from_secs(60))
    .with_request_timeout(Duration::from_secs(10))
    .with_max_jobs_to_activate(5)
    .with_concurrency_limit(3)
    .with_job_type("example-service")
    .with_handler(|client, job| async move {
        // Process job here
        client.complete_job().with_job_key(job.key()).send().await;
    })
    .build();

// Start the worker
worker.run().await?;

§Error Handling

The worker implements automatic error handling for:

  • Job activation timeouts
  • Network errors during polling
  • Job processing failures

Implementations§

Source§

impl<H> Worker<H>
where H: JobHandler + Send + Sync + 'static,

Source

pub async fn run(self)

Starts the worker by running both the poller and dispatcher concurrently.

This method uses tokio::join! to run the poller and dispatcher concurrently. The poller continuously polls the Zeebe broker for new jobs, while the dispatcher processes the jobs using the provided callback.

§Example
#[tokio::main]
async fn main() {
    client
        .worker()
        //Worker configuration...
        .build()
        .run()
        .await;
}
Examples found in repository?
examples/custom_auto_handler.rs (line 76)
43async fn main() -> Result<(), Box<dyn std::error::Error>> {
44    unsafe { std::env::set_var("RUST_BACKTRACE", "1") };
45
46    // Default configuration using Camunda's docker compose
47    // https://github.com/camunda/camunda-platform/blob/5dc74fe71667e18fbb5c8d4694068d662d83ad00/README.md
48    let client = Client::builder()
49        .with_address("http://localhost", 26500)
50        .with_oauth(
51            String::from("zeebe"),
52            String::from("zecret"),
53            String::from(
54                "http://localhost:18080/auth/realms/camunda-platform/protocol/openid-connect/token",
55            ),
56            String::from("zeebe-api"),
57            Duration::from_secs(30),
58            None,
59        )
60        .build()
61        .await?;
62
63    // Wait until first OAuth token has been retrieved
64    client.auth_initialized().await;
65
66    let worker = client
67        .worker()
68        .with_request_timeout(Duration::from_secs(1))
69        .with_job_timeout(Duration::from_secs(1))
70        .with_max_jobs_to_activate(1)
71        .with_concurrency_limit(1)
72        .with_job_type(String::from("placeholder"))
73        .with_handler(always_fail)
74        .build();
75
76    tokio::join!(worker.run());
77
78    Ok(())
79}
More examples
Hide additional examples
examples/pizza.rs (line 241)
73async fn main() -> Result<(), Box<dyn std::error::Error>> {
74    unsafe { std::env::set_var("RUST_BACKTRACE", "1") };
75
76    // Default configuration using Camunda's docker compose
77    // https://github.com/camunda/camunda-platform/blob/5dc74fe71667e18fbb5c8d4694068d662d83ad00/README.md
78    let client = Client::builder()
79        .with_address("http://localhost", 26500)
80        .with_oauth(
81            String::from("zeebe"),
82            String::from("zecret"),
83            String::from(
84                "http://localhost:18080/auth/realms/camunda-platform/protocol/openid-connect/token",
85            ),
86            String::from("zeebe-api"),
87            Duration::from_secs(30),
88            None,
89        )
90        .build()
91        .await?;
92
93    // Wait until first OAuth token has been retrieved
94    client.auth_initialized().await;
95
96    // Deploy our pizza ordering process
97    // We just discard the result for brevity
98    let deploy_resource_response = client
99        .deploy_resource()
100        .with_resource_file(PathBuf::from("examples/resources/pizza-order.bpmn"))
101        .read_resource_files()?
102        .send()
103        .await?;
104
105    let process_definition_key =
106        if let Some(deployment) = deploy_resource_response.deployments().first() {
107            match deployment.metadata() {
108                Some(metadata) => match metadata {
109                    zeebe_rs::Metadata::Process(process_metadata) => {
110                        process_metadata.process_definition_key()
111                    }
112                    _ => {
113                        unreachable!("We're only deploying a bpmn here");
114                    }
115                },
116                _ => -1,
117            }
118        } else {
119            -1
120        };
121
122    // Let's define our shared state, how much stock we have
123    let mut initial_stock = HashMap::new();
124    initial_stock.insert(String::from("Pepperoni"), 5);
125    initial_stock.insert(String::from("Margherita"), 8);
126    initial_stock.insert(String::from("Hawaiian"), 3);
127    initial_stock.insert(String::from("Quattro Formaggi"), 0);
128    initial_stock.insert(String::from("Vegetarian"), 6);
129
130    let stock = Arc::new(SharedState(Mutex::new(Stock {
131        items: initial_stock,
132    })));
133
134    // This will share the stock among all the workers confirming orders
135    // counting it down for each order.
136    let confirm_worker = client
137        .worker()
138        .with_request_timeout(Duration::from_secs(10))
139        .with_job_timeout(Duration::from_secs(10))
140        .with_max_jobs_to_activate(4)
141        .with_concurrency_limit(2)
142        .with_job_type(String::from("confirm_order"))
143        .with_state(stock)
144        .with_handler(confirm_order)
145        .with_fetch_variable(String::from("items"))
146        .build();
147
148    // We can reuse the worker builder for sharing some configurations
149    let base_worker = client
150        .worker()
151        .with_request_timeout(Duration::from_secs(10))
152        .with_job_timeout(Duration::from_secs(10));
153
154    let bake_worker = base_worker
155        .clone()
156        .with_max_jobs_to_activate(10)
157        .with_concurrency_limit(10)
158        .with_job_type(String::from("bake_pizzas"))
159        .with_handler(bake_pizzas)
160        .build();
161
162    // Handler can be both function callbacks or closures
163    // Handlers have to share return type for this to work
164    let reject_order_worker = client
165        .worker()
166        .with_request_timeout(Duration::from_secs(10))
167        .with_job_timeout(Duration::from_secs(10))
168        .with_max_jobs_to_activate(5)
169        .with_concurrency_limit(5)
170        .with_job_type(String::from("reject_order"))
171        .with_handler(|_client, _job| async move { Ok::<(), WorkerError<()>>(()) })
172        .build();
173
174    let deliver_worker = client
175        .worker()
176        .with_request_timeout(Duration::from_secs(10))
177        .with_job_timeout(Duration::from_secs(10))
178        .with_max_jobs_to_activate(1)
179        .with_concurrency_limit(1)
180        .with_job_type(String::from("deliver_order"))
181        .with_handler(deliver_order)
182        .build();
183
184    let clarify_address_worker = base_worker
185        .clone()
186        .with_max_jobs_to_activate(5)
187        .with_concurrency_limit(5)
188        .with_job_type(String::from("call_customer"))
189        .with_handler(|client, job| async move {
190            let mut customer = job.data::<Customer>().unwrap();
191
192            //Same guy keeps leaving out his address for some reason
193            customer.address = String::from("1337 Coolsville, USA");
194            if let Ok(req) = client
195                .complete_job()
196                .with_job_key(job.key())
197                .with_variables(customer)
198            {
199                let _ = req.send().await;
200            }
201        })
202        .build();
203
204    tokio::spawn(place_order(
205        client.clone(),
206        process_definition_key,
207        "Jane Doe",
208        "100 Coolsville, USA",
209        true,
210        vec!["Pepperoni"],
211    ));
212
213    tokio::spawn(place_order(
214        client.clone(),
215        process_definition_key,
216        "John Doe",
217        "999 NotCoolsville, USA",
218        false,
219        vec!["Quattro Formaggi"; 100],
220    ));
221
222    tokio::spawn(place_order(
223        client.clone(),
224        process_definition_key,
225        "Lorem Ipsum",
226        "",
227        false,
228        vec!["Hawaiian", "Pepperoni"],
229    ));
230
231    tokio::spawn(place_order(
232        client.clone(),
233        process_definition_key,
234        "George W Bush",
235        "White House",
236        true,
237        vec!["Burger"],
238    ));
239
240    let _ = tokio::join!(
241        confirm_worker.run(),
242        reject_order_worker.run(),
243        bake_worker.run(),
244        deliver_worker.run(),
245        clarify_address_worker.run(),
246    );
247
248    Ok(())
249}

Auto Trait Implementations§

§

impl<H> !Freeze for Worker<H>

§

impl<H> !RefUnwindSafe for Worker<H>

§

impl<H> Send for Worker<H>

§

impl<H> Sync for Worker<H>

§

impl<H> Unpin for Worker<H>

§

impl<H> !UnwindSafe for Worker<H>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoRequest<T> for T

Source§

fn into_request(self) -> Request<T>

Wrap the input message T in a tonic::Request
Source§

impl<T> PolicyExt for T
where T: ?Sized,

Source§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow only if self and other return Action::Follow. Read more
Source§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow if either self or other returns Action::Follow. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

impl<T> ErasedDestructor for T
where T: 'static,