pub struct Worker<H>{ /* private fields */ }Expand description
The Worker is responsible for fetching jobs from Zeebe and processing them with the associated handler. /// A worker implementation for processing Zeebe jobs with configurable concurrency and state management.
The Worker is responsible for:
- Polling for new jobs from the Zeebe broker
- Managing job activation and processing
- Handling concurrent job execution
- Maintaining worker state across job executions
The worker consists of two main components:
WorkProducer: Handles job polling and queue managementWorkConsumer: Manages job execution and concurrency
§Architecture
The worker uses a producer-consumer pattern where:
- The producer polls for jobs at regular intervals
- Jobs are queued in an internal channel
- The consumer processes jobs concurrently up to the configured limit
§Concurrency
Job processing is controlled by:
- A semaphore limiting concurrent job executions
- Channel-based communication between components
- Configurable maximum jobs to activate
§Example
ⓘ
let worker = client
.worker()
.with_job_timeout(Duration::from_secs(60))
.with_request_timeout(Duration::from_secs(10))
.with_max_jobs_to_activate(5)
.with_concurrency_limit(3)
.with_job_type("example-service")
.with_handler(|client, job| async move {
// Process job here
client.complete_job().with_job_key(job.key()).send().await;
})
.build();
// Start the worker
worker.run().await?;
§Error Handling
The worker implements automatic error handling for:
- Job activation timeouts
- Network errors during polling
- Job processing failures
Implementations§
Source§impl<H> Worker<H>
impl<H> Worker<H>
Sourcepub async fn run(self)
pub async fn run(self)
Starts the worker by running both the poller and dispatcher concurrently.
This method uses tokio::join! to run the poller and dispatcher concurrently.
The poller continuously polls the Zeebe broker for new jobs, while the dispatcher
processes the jobs using the provided callback.
§Example
ⓘ
#[tokio::main]
async fn main() {
client
.worker()
//Worker configuration...
.build()
.run()
.await;
}Examples found in repository?
examples/custom_auto_handler.rs (line 76)
43async fn main() -> Result<(), Box<dyn std::error::Error>> {
44 unsafe { std::env::set_var("RUST_BACKTRACE", "1") };
45
46 // Default configuration using Camunda's docker compose
47 // https://github.com/camunda/camunda-platform/blob/5dc74fe71667e18fbb5c8d4694068d662d83ad00/README.md
48 let client = Client::builder()
49 .with_address("http://localhost", 26500)
50 .with_oauth(
51 String::from("zeebe"),
52 String::from("zecret"),
53 String::from(
54 "http://localhost:18080/auth/realms/camunda-platform/protocol/openid-connect/token",
55 ),
56 String::from("zeebe-api"),
57 Duration::from_secs(30),
58 None,
59 )
60 .build()
61 .await?;
62
63 // Wait until first OAuth token has been retrieved
64 client.auth_initialized().await;
65
66 let worker = client
67 .worker()
68 .with_request_timeout(Duration::from_secs(1))
69 .with_job_timeout(Duration::from_secs(1))
70 .with_max_jobs_to_activate(1)
71 .with_concurrency_limit(1)
72 .with_job_type(String::from("placeholder"))
73 .with_handler(always_fail)
74 .build();
75
76 tokio::join!(worker.run());
77
78 Ok(())
79}More examples
examples/pizza.rs (line 241)
73async fn main() -> Result<(), Box<dyn std::error::Error>> {
74 unsafe { std::env::set_var("RUST_BACKTRACE", "1") };
75
76 // Default configuration using Camunda's docker compose
77 // https://github.com/camunda/camunda-platform/blob/5dc74fe71667e18fbb5c8d4694068d662d83ad00/README.md
78 let client = Client::builder()
79 .with_address("http://localhost", 26500)
80 .with_oauth(
81 String::from("zeebe"),
82 String::from("zecret"),
83 String::from(
84 "http://localhost:18080/auth/realms/camunda-platform/protocol/openid-connect/token",
85 ),
86 String::from("zeebe-api"),
87 Duration::from_secs(30),
88 None,
89 )
90 .build()
91 .await?;
92
93 // Wait until first OAuth token has been retrieved
94 client.auth_initialized().await;
95
96 // Deploy our pizza ordering process
97 // We just discard the result for brevity
98 let deploy_resource_response = client
99 .deploy_resource()
100 .with_resource_file(PathBuf::from("examples/resources/pizza-order.bpmn"))
101 .read_resource_files()?
102 .send()
103 .await?;
104
105 let process_definition_key =
106 if let Some(deployment) = deploy_resource_response.deployments().first() {
107 match deployment.metadata() {
108 Some(metadata) => match metadata {
109 zeebe_rs::Metadata::Process(process_metadata) => {
110 process_metadata.process_definition_key()
111 }
112 _ => {
113 unreachable!("We're only deploying a bpmn here");
114 }
115 },
116 _ => -1,
117 }
118 } else {
119 -1
120 };
121
122 // Let's define our shared state, how much stock we have
123 let mut initial_stock = HashMap::new();
124 initial_stock.insert(String::from("Pepperoni"), 5);
125 initial_stock.insert(String::from("Margherita"), 8);
126 initial_stock.insert(String::from("Hawaiian"), 3);
127 initial_stock.insert(String::from("Quattro Formaggi"), 0);
128 initial_stock.insert(String::from("Vegetarian"), 6);
129
130 let stock = Arc::new(SharedState(Mutex::new(Stock {
131 items: initial_stock,
132 })));
133
134 // This will share the stock among all the workers confirming orders
135 // counting it down for each order.
136 let confirm_worker = client
137 .worker()
138 .with_request_timeout(Duration::from_secs(10))
139 .with_job_timeout(Duration::from_secs(10))
140 .with_max_jobs_to_activate(4)
141 .with_concurrency_limit(2)
142 .with_job_type(String::from("confirm_order"))
143 .with_state(stock)
144 .with_handler(confirm_order)
145 .with_fetch_variable(String::from("items"))
146 .build();
147
148 // We can reuse the worker builder for sharing some configurations
149 let base_worker = client
150 .worker()
151 .with_request_timeout(Duration::from_secs(10))
152 .with_job_timeout(Duration::from_secs(10));
153
154 let bake_worker = base_worker
155 .clone()
156 .with_max_jobs_to_activate(10)
157 .with_concurrency_limit(10)
158 .with_job_type(String::from("bake_pizzas"))
159 .with_handler(bake_pizzas)
160 .build();
161
162 // Handler can be both function callbacks or closures
163 // Handlers have to share return type for this to work
164 let reject_order_worker = client
165 .worker()
166 .with_request_timeout(Duration::from_secs(10))
167 .with_job_timeout(Duration::from_secs(10))
168 .with_max_jobs_to_activate(5)
169 .with_concurrency_limit(5)
170 .with_job_type(String::from("reject_order"))
171 .with_handler(|_client, _job| async move { Ok::<(), WorkerError<()>>(()) })
172 .build();
173
174 let deliver_worker = client
175 .worker()
176 .with_request_timeout(Duration::from_secs(10))
177 .with_job_timeout(Duration::from_secs(10))
178 .with_max_jobs_to_activate(1)
179 .with_concurrency_limit(1)
180 .with_job_type(String::from("deliver_order"))
181 .with_handler(deliver_order)
182 .build();
183
184 let clarify_address_worker = base_worker
185 .clone()
186 .with_max_jobs_to_activate(5)
187 .with_concurrency_limit(5)
188 .with_job_type(String::from("call_customer"))
189 .with_handler(|client, job| async move {
190 let mut customer = job.data::<Customer>().unwrap();
191
192 //Same guy keeps leaving out his address for some reason
193 customer.address = String::from("1337 Coolsville, USA");
194 if let Ok(req) = client
195 .complete_job()
196 .with_job_key(job.key())
197 .with_variables(customer)
198 {
199 let _ = req.send().await;
200 }
201 })
202 .build();
203
204 tokio::spawn(place_order(
205 client.clone(),
206 process_definition_key,
207 "Jane Doe",
208 "100 Coolsville, USA",
209 true,
210 vec!["Pepperoni"],
211 ));
212
213 tokio::spawn(place_order(
214 client.clone(),
215 process_definition_key,
216 "John Doe",
217 "999 NotCoolsville, USA",
218 false,
219 vec!["Quattro Formaggi"; 100],
220 ));
221
222 tokio::spawn(place_order(
223 client.clone(),
224 process_definition_key,
225 "Lorem Ipsum",
226 "",
227 false,
228 vec!["Hawaiian", "Pepperoni"],
229 ));
230
231 tokio::spawn(place_order(
232 client.clone(),
233 process_definition_key,
234 "George W Bush",
235 "White House",
236 true,
237 vec!["Burger"],
238 ));
239
240 let _ = tokio::join!(
241 confirm_worker.run(),
242 reject_order_worker.run(),
243 bake_worker.run(),
244 deliver_worker.run(),
245 clarify_address_worker.run(),
246 );
247
248 Ok(())
249}Auto Trait Implementations§
impl<H> !Freeze for Worker<H>
impl<H> !RefUnwindSafe for Worker<H>
impl<H> Send for Worker<H>
impl<H> Sync for Worker<H>
impl<H> Unpin for Worker<H>
impl<H> !UnwindSafe for Worker<H>
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
Source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
Wrap the input message
T in a tonic::Request