pub struct CompleteJobRequest<T: CompleteJobRequestState> { /* private fields */ }Expand description
Implementations§
Source§impl CompleteJobRequest<Initial>
impl CompleteJobRequest<Initial>
Sourcepub fn with_job_key(self, job_key: i64) -> CompleteJobRequest<WithKey>
pub fn with_job_key(self, job_key: i64) -> CompleteJobRequest<WithKey>
Sets the job key to identify which job to complete
§Arguments
job_key- The unique key identifying the job
§Returns
A new instance of CompleteJobRequest<WithKey>
Examples found in repository?
examples/pizza.rs (line 196)
73async fn main() -> Result<(), Box<dyn std::error::Error>> {
74 unsafe { std::env::set_var("RUST_BACKTRACE", "1") };
75
76 // Default configuration using Camunda's docker compose
77 // https://github.com/camunda/camunda-platform/blob/5dc74fe71667e18fbb5c8d4694068d662d83ad00/README.md
78 let client = Client::builder()
79 .with_address("http://localhost", 26500)
80 .with_oauth(
81 String::from("zeebe"),
82 String::from("zecret"),
83 String::from(
84 "http://localhost:18080/auth/realms/camunda-platform/protocol/openid-connect/token",
85 ),
86 String::from("zeebe-api"),
87 Duration::from_secs(30),
88 None,
89 )
90 .build()
91 .await?;
92
93 // Wait until first OAuth token has been retrieved
94 client.auth_initialized().await;
95
96 // Deploy our pizza ordering process
97 // We just discard the result for brevity
98 let deploy_resource_response = client
99 .deploy_resource()
100 .with_resource_file(PathBuf::from("examples/resources/pizza-order.bpmn"))
101 .read_resource_files()?
102 .send()
103 .await?;
104
105 let process_definition_key =
106 if let Some(deployment) = deploy_resource_response.deployments().first() {
107 match deployment.metadata() {
108 Some(metadata) => match metadata {
109 zeebe_rs::Metadata::Process(process_metadata) => {
110 process_metadata.process_definition_key()
111 }
112 _ => {
113 unreachable!("We're only deploying a bpmn here");
114 }
115 },
116 _ => -1,
117 }
118 } else {
119 -1
120 };
121
122 // Let's define our shared state, how much stock we have
123 let mut initial_stock = HashMap::new();
124 initial_stock.insert(String::from("Pepperoni"), 5);
125 initial_stock.insert(String::from("Margherita"), 8);
126 initial_stock.insert(String::from("Hawaiian"), 3);
127 initial_stock.insert(String::from("Quattro Formaggi"), 0);
128 initial_stock.insert(String::from("Vegetarian"), 6);
129
130 let stock = Arc::new(SharedState(Mutex::new(Stock {
131 items: initial_stock,
132 })));
133
134 // This will share the stock among all the workers confirming orders
135 // counting it down for each order.
136 let confirm_worker = client
137 .worker()
138 .with_request_timeout(Duration::from_secs(10))
139 .with_job_timeout(Duration::from_secs(10))
140 .with_max_jobs_to_activate(4)
141 .with_concurrency_limit(2)
142 .with_job_type(String::from("confirm_order"))
143 .with_state(stock)
144 .with_handler(confirm_order)
145 .with_fetch_variable(String::from("items"))
146 .build();
147
148 // We can reuse the worker builder for sharing some configurations
149 let base_worker = client
150 .worker()
151 .with_request_timeout(Duration::from_secs(10))
152 .with_job_timeout(Duration::from_secs(10));
153
154 let bake_worker = base_worker
155 .clone()
156 .with_max_jobs_to_activate(10)
157 .with_concurrency_limit(10)
158 .with_job_type(String::from("bake_pizzas"))
159 .with_handler(bake_pizzas)
160 .build();
161
162 // Handler can be both function callbacks or closures
163 // Handlers have to share return type for this to work
164 let reject_order_worker = client
165 .worker()
166 .with_request_timeout(Duration::from_secs(10))
167 .with_job_timeout(Duration::from_secs(10))
168 .with_max_jobs_to_activate(5)
169 .with_concurrency_limit(5)
170 .with_job_type(String::from("reject_order"))
171 .with_handler(|_client, _job| async move { Ok::<(), WorkerError<()>>(()) })
172 .build();
173
174 let deliver_worker = client
175 .worker()
176 .with_request_timeout(Duration::from_secs(10))
177 .with_job_timeout(Duration::from_secs(10))
178 .with_max_jobs_to_activate(1)
179 .with_concurrency_limit(1)
180 .with_job_type(String::from("deliver_order"))
181 .with_handler(deliver_order)
182 .build();
183
184 let clarify_address_worker = base_worker
185 .clone()
186 .with_max_jobs_to_activate(5)
187 .with_concurrency_limit(5)
188 .with_job_type(String::from("call_customer"))
189 .with_handler(|client, job| async move {
190 let mut customer = job.data::<Customer>().unwrap();
191
192 //Same guy keeps leaving out his address for some reason
193 customer.address = String::from("1337 Coolsville, USA");
194 if let Ok(req) = client
195 .complete_job()
196 .with_job_key(job.key())
197 .with_variables(customer)
198 {
199 let _ = req.send().await;
200 }
201 })
202 .build();
203
204 tokio::spawn(place_order(
205 client.clone(),
206 process_definition_key,
207 "Jane Doe",
208 "100 Coolsville, USA",
209 true,
210 vec!["Pepperoni"],
211 ));
212
213 tokio::spawn(place_order(
214 client.clone(),
215 process_definition_key,
216 "John Doe",
217 "999 NotCoolsville, USA",
218 false,
219 vec!["Quattro Formaggi"; 100],
220 ));
221
222 tokio::spawn(place_order(
223 client.clone(),
224 process_definition_key,
225 "Lorem Ipsum",
226 "",
227 false,
228 vec!["Hawaiian", "Pepperoni"],
229 ));
230
231 tokio::spawn(place_order(
232 client.clone(),
233 process_definition_key,
234 "George W Bush",
235 "White House",
236 true,
237 vec!["Burger"],
238 ));
239
240 let _ = tokio::join!(
241 confirm_worker.run(),
242 reject_order_worker.run(),
243 bake_worker.run(),
244 deliver_worker.run(),
245 clarify_address_worker.run(),
246 );
247
248 Ok(())
249}
250
251async fn confirm_order(
252 _client: Client,
253 job: ActivatedJob,
254 state: Arc<SharedState<Mutex<Stock>>>,
255) -> Result<OrderResult, WorkerError<()>> {
256 let order = job
257 .data::<Order>()
258 .expect("For demonstration purposes we're not handling a missing order here");
259 let mut stock: tokio::sync::MutexGuard<Stock> = state.lock().await;
260
261 let mut order_accepted = true;
262 let mut message = None;
263 for item in order.items {
264 if let Some(quantity) = stock.items.get_mut(&item) {
265 if *quantity > 0 {
266 *quantity -= 1;
267 } else {
268 message = Some(format!("We're out of stock of {}", item));
269 order_accepted = false;
270 }
271 } else {
272 message = Some(format!("We don't serve {}", item));
273 order_accepted = false;
274 }
275 }
276
277 println!("Confirmed order");
278 Ok(OrderResult {
279 order_accepted,
280 message,
281 })
282}
283
284async fn bake_pizzas(client: Client, job: ActivatedJob) {
285 let order = job
286 .data::<Order>()
287 .expect("We shouldn't start baking pizzas if there is no order!");
288
289 println!("Baking pizzas");
290 sleep(Duration::from_secs(10 * order.items.len() as u64)).await;
291 println!("Finished baking {} pizzas", order.items.len());
292
293 let _ = client.complete_job().with_job_key(job.key()).send().await;
294}Source§impl CompleteJobRequest<WithKey>
impl CompleteJobRequest<WithKey>
Sourcepub fn with_variables<T: Serialize>(self, data: T) -> Result<Self, ClientError>
pub fn with_variables<T: Serialize>(self, data: T) -> Result<Self, ClientError>
Sets variables to be included with the job completion
§Arguments
data- The variables to be set when completing the job
§Returns
A result containing the updated CompleteJobRequest<WithKey> instance or a ClientError
Examples found in repository?
examples/pizza.rs (line 197)
73async fn main() -> Result<(), Box<dyn std::error::Error>> {
74 unsafe { std::env::set_var("RUST_BACKTRACE", "1") };
75
76 // Default configuration using Camunda's docker compose
77 // https://github.com/camunda/camunda-platform/blob/5dc74fe71667e18fbb5c8d4694068d662d83ad00/README.md
78 let client = Client::builder()
79 .with_address("http://localhost", 26500)
80 .with_oauth(
81 String::from("zeebe"),
82 String::from("zecret"),
83 String::from(
84 "http://localhost:18080/auth/realms/camunda-platform/protocol/openid-connect/token",
85 ),
86 String::from("zeebe-api"),
87 Duration::from_secs(30),
88 None,
89 )
90 .build()
91 .await?;
92
93 // Wait until first OAuth token has been retrieved
94 client.auth_initialized().await;
95
96 // Deploy our pizza ordering process
97 // We just discard the result for brevity
98 let deploy_resource_response = client
99 .deploy_resource()
100 .with_resource_file(PathBuf::from("examples/resources/pizza-order.bpmn"))
101 .read_resource_files()?
102 .send()
103 .await?;
104
105 let process_definition_key =
106 if let Some(deployment) = deploy_resource_response.deployments().first() {
107 match deployment.metadata() {
108 Some(metadata) => match metadata {
109 zeebe_rs::Metadata::Process(process_metadata) => {
110 process_metadata.process_definition_key()
111 }
112 _ => {
113 unreachable!("We're only deploying a bpmn here");
114 }
115 },
116 _ => -1,
117 }
118 } else {
119 -1
120 };
121
122 // Let's define our shared state, how much stock we have
123 let mut initial_stock = HashMap::new();
124 initial_stock.insert(String::from("Pepperoni"), 5);
125 initial_stock.insert(String::from("Margherita"), 8);
126 initial_stock.insert(String::from("Hawaiian"), 3);
127 initial_stock.insert(String::from("Quattro Formaggi"), 0);
128 initial_stock.insert(String::from("Vegetarian"), 6);
129
130 let stock = Arc::new(SharedState(Mutex::new(Stock {
131 items: initial_stock,
132 })));
133
134 // This will share the stock among all the workers confirming orders
135 // counting it down for each order.
136 let confirm_worker = client
137 .worker()
138 .with_request_timeout(Duration::from_secs(10))
139 .with_job_timeout(Duration::from_secs(10))
140 .with_max_jobs_to_activate(4)
141 .with_concurrency_limit(2)
142 .with_job_type(String::from("confirm_order"))
143 .with_state(stock)
144 .with_handler(confirm_order)
145 .with_fetch_variable(String::from("items"))
146 .build();
147
148 // We can reuse the worker builder for sharing some configurations
149 let base_worker = client
150 .worker()
151 .with_request_timeout(Duration::from_secs(10))
152 .with_job_timeout(Duration::from_secs(10));
153
154 let bake_worker = base_worker
155 .clone()
156 .with_max_jobs_to_activate(10)
157 .with_concurrency_limit(10)
158 .with_job_type(String::from("bake_pizzas"))
159 .with_handler(bake_pizzas)
160 .build();
161
162 // Handler can be both function callbacks or closures
163 // Handlers have to share return type for this to work
164 let reject_order_worker = client
165 .worker()
166 .with_request_timeout(Duration::from_secs(10))
167 .with_job_timeout(Duration::from_secs(10))
168 .with_max_jobs_to_activate(5)
169 .with_concurrency_limit(5)
170 .with_job_type(String::from("reject_order"))
171 .with_handler(|_client, _job| async move { Ok::<(), WorkerError<()>>(()) })
172 .build();
173
174 let deliver_worker = client
175 .worker()
176 .with_request_timeout(Duration::from_secs(10))
177 .with_job_timeout(Duration::from_secs(10))
178 .with_max_jobs_to_activate(1)
179 .with_concurrency_limit(1)
180 .with_job_type(String::from("deliver_order"))
181 .with_handler(deliver_order)
182 .build();
183
184 let clarify_address_worker = base_worker
185 .clone()
186 .with_max_jobs_to_activate(5)
187 .with_concurrency_limit(5)
188 .with_job_type(String::from("call_customer"))
189 .with_handler(|client, job| async move {
190 let mut customer = job.data::<Customer>().unwrap();
191
192 //Same guy keeps leaving out his address for some reason
193 customer.address = String::from("1337 Coolsville, USA");
194 if let Ok(req) = client
195 .complete_job()
196 .with_job_key(job.key())
197 .with_variables(customer)
198 {
199 let _ = req.send().await;
200 }
201 })
202 .build();
203
204 tokio::spawn(place_order(
205 client.clone(),
206 process_definition_key,
207 "Jane Doe",
208 "100 Coolsville, USA",
209 true,
210 vec!["Pepperoni"],
211 ));
212
213 tokio::spawn(place_order(
214 client.clone(),
215 process_definition_key,
216 "John Doe",
217 "999 NotCoolsville, USA",
218 false,
219 vec!["Quattro Formaggi"; 100],
220 ));
221
222 tokio::spawn(place_order(
223 client.clone(),
224 process_definition_key,
225 "Lorem Ipsum",
226 "",
227 false,
228 vec!["Hawaiian", "Pepperoni"],
229 ));
230
231 tokio::spawn(place_order(
232 client.clone(),
233 process_definition_key,
234 "George W Bush",
235 "White House",
236 true,
237 vec!["Burger"],
238 ));
239
240 let _ = tokio::join!(
241 confirm_worker.run(),
242 reject_order_worker.run(),
243 bake_worker.run(),
244 deliver_worker.run(),
245 clarify_address_worker.run(),
246 );
247
248 Ok(())
249}Sourcepub fn with_job_result(self) -> JobResultBuilder
pub fn with_job_result(self) -> JobResultBuilder
Sourcepub async fn send(self) -> Result<CompleteJobResponse, ClientError>
pub async fn send(self) -> Result<CompleteJobResponse, ClientError>
Sends the job completion request to the Zeebe workflow engine
§Returns
A result containing the CompleteJobResponse or a ClientError
Examples found in repository?
examples/pizza.rs (line 199)
73async fn main() -> Result<(), Box<dyn std::error::Error>> {
74 unsafe { std::env::set_var("RUST_BACKTRACE", "1") };
75
76 // Default configuration using Camunda's docker compose
77 // https://github.com/camunda/camunda-platform/blob/5dc74fe71667e18fbb5c8d4694068d662d83ad00/README.md
78 let client = Client::builder()
79 .with_address("http://localhost", 26500)
80 .with_oauth(
81 String::from("zeebe"),
82 String::from("zecret"),
83 String::from(
84 "http://localhost:18080/auth/realms/camunda-platform/protocol/openid-connect/token",
85 ),
86 String::from("zeebe-api"),
87 Duration::from_secs(30),
88 None,
89 )
90 .build()
91 .await?;
92
93 // Wait until first OAuth token has been retrieved
94 client.auth_initialized().await;
95
96 // Deploy our pizza ordering process
97 // We just discard the result for brevity
98 let deploy_resource_response = client
99 .deploy_resource()
100 .with_resource_file(PathBuf::from("examples/resources/pizza-order.bpmn"))
101 .read_resource_files()?
102 .send()
103 .await?;
104
105 let process_definition_key =
106 if let Some(deployment) = deploy_resource_response.deployments().first() {
107 match deployment.metadata() {
108 Some(metadata) => match metadata {
109 zeebe_rs::Metadata::Process(process_metadata) => {
110 process_metadata.process_definition_key()
111 }
112 _ => {
113 unreachable!("We're only deploying a bpmn here");
114 }
115 },
116 _ => -1,
117 }
118 } else {
119 -1
120 };
121
122 // Let's define our shared state, how much stock we have
123 let mut initial_stock = HashMap::new();
124 initial_stock.insert(String::from("Pepperoni"), 5);
125 initial_stock.insert(String::from("Margherita"), 8);
126 initial_stock.insert(String::from("Hawaiian"), 3);
127 initial_stock.insert(String::from("Quattro Formaggi"), 0);
128 initial_stock.insert(String::from("Vegetarian"), 6);
129
130 let stock = Arc::new(SharedState(Mutex::new(Stock {
131 items: initial_stock,
132 })));
133
134 // This will share the stock among all the workers confirming orders
135 // counting it down for each order.
136 let confirm_worker = client
137 .worker()
138 .with_request_timeout(Duration::from_secs(10))
139 .with_job_timeout(Duration::from_secs(10))
140 .with_max_jobs_to_activate(4)
141 .with_concurrency_limit(2)
142 .with_job_type(String::from("confirm_order"))
143 .with_state(stock)
144 .with_handler(confirm_order)
145 .with_fetch_variable(String::from("items"))
146 .build();
147
148 // We can reuse the worker builder for sharing some configurations
149 let base_worker = client
150 .worker()
151 .with_request_timeout(Duration::from_secs(10))
152 .with_job_timeout(Duration::from_secs(10));
153
154 let bake_worker = base_worker
155 .clone()
156 .with_max_jobs_to_activate(10)
157 .with_concurrency_limit(10)
158 .with_job_type(String::from("bake_pizzas"))
159 .with_handler(bake_pizzas)
160 .build();
161
162 // Handler can be both function callbacks or closures
163 // Handlers have to share return type for this to work
164 let reject_order_worker = client
165 .worker()
166 .with_request_timeout(Duration::from_secs(10))
167 .with_job_timeout(Duration::from_secs(10))
168 .with_max_jobs_to_activate(5)
169 .with_concurrency_limit(5)
170 .with_job_type(String::from("reject_order"))
171 .with_handler(|_client, _job| async move { Ok::<(), WorkerError<()>>(()) })
172 .build();
173
174 let deliver_worker = client
175 .worker()
176 .with_request_timeout(Duration::from_secs(10))
177 .with_job_timeout(Duration::from_secs(10))
178 .with_max_jobs_to_activate(1)
179 .with_concurrency_limit(1)
180 .with_job_type(String::from("deliver_order"))
181 .with_handler(deliver_order)
182 .build();
183
184 let clarify_address_worker = base_worker
185 .clone()
186 .with_max_jobs_to_activate(5)
187 .with_concurrency_limit(5)
188 .with_job_type(String::from("call_customer"))
189 .with_handler(|client, job| async move {
190 let mut customer = job.data::<Customer>().unwrap();
191
192 //Same guy keeps leaving out his address for some reason
193 customer.address = String::from("1337 Coolsville, USA");
194 if let Ok(req) = client
195 .complete_job()
196 .with_job_key(job.key())
197 .with_variables(customer)
198 {
199 let _ = req.send().await;
200 }
201 })
202 .build();
203
204 tokio::spawn(place_order(
205 client.clone(),
206 process_definition_key,
207 "Jane Doe",
208 "100 Coolsville, USA",
209 true,
210 vec!["Pepperoni"],
211 ));
212
213 tokio::spawn(place_order(
214 client.clone(),
215 process_definition_key,
216 "John Doe",
217 "999 NotCoolsville, USA",
218 false,
219 vec!["Quattro Formaggi"; 100],
220 ));
221
222 tokio::spawn(place_order(
223 client.clone(),
224 process_definition_key,
225 "Lorem Ipsum",
226 "",
227 false,
228 vec!["Hawaiian", "Pepperoni"],
229 ));
230
231 tokio::spawn(place_order(
232 client.clone(),
233 process_definition_key,
234 "George W Bush",
235 "White House",
236 true,
237 vec!["Burger"],
238 ));
239
240 let _ = tokio::join!(
241 confirm_worker.run(),
242 reject_order_worker.run(),
243 bake_worker.run(),
244 deliver_worker.run(),
245 clarify_address_worker.run(),
246 );
247
248 Ok(())
249}
250
251async fn confirm_order(
252 _client: Client,
253 job: ActivatedJob,
254 state: Arc<SharedState<Mutex<Stock>>>,
255) -> Result<OrderResult, WorkerError<()>> {
256 let order = job
257 .data::<Order>()
258 .expect("For demonstration purposes we're not handling a missing order here");
259 let mut stock: tokio::sync::MutexGuard<Stock> = state.lock().await;
260
261 let mut order_accepted = true;
262 let mut message = None;
263 for item in order.items {
264 if let Some(quantity) = stock.items.get_mut(&item) {
265 if *quantity > 0 {
266 *quantity -= 1;
267 } else {
268 message = Some(format!("We're out of stock of {}", item));
269 order_accepted = false;
270 }
271 } else {
272 message = Some(format!("We don't serve {}", item));
273 order_accepted = false;
274 }
275 }
276
277 println!("Confirmed order");
278 Ok(OrderResult {
279 order_accepted,
280 message,
281 })
282}
283
284async fn bake_pizzas(client: Client, job: ActivatedJob) {
285 let order = job
286 .data::<Order>()
287 .expect("We shouldn't start baking pizzas if there is no order!");
288
289 println!("Baking pizzas");
290 sleep(Duration::from_secs(10 * order.items.len() as u64)).await;
291 println!("Finished baking {} pizzas", order.items.len());
292
293 let _ = client.complete_job().with_job_key(job.key()).send().await;
294}Trait Implementations§
Source§impl<T: Clone + CompleteJobRequestState> Clone for CompleteJobRequest<T>
impl<T: Clone + CompleteJobRequestState> Clone for CompleteJobRequest<T>
Source§fn clone(&self) -> CompleteJobRequest<T>
fn clone(&self) -> CompleteJobRequest<T>
Returns a duplicate of the value. Read more
1.0.0 · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
Performs copy-assignment from
source. Read moreAuto Trait Implementations§
impl<T> !Freeze for CompleteJobRequest<T>
impl<T> !RefUnwindSafe for CompleteJobRequest<T>
impl<T> Send for CompleteJobRequest<T>where
T: Send,
impl<T> Sync for CompleteJobRequest<T>where
T: Sync,
impl<T> Unpin for CompleteJobRequest<T>where
T: Unpin,
impl<T> !UnwindSafe for CompleteJobRequest<T>
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
Source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
Wrap the input message
T in a tonic::Request