1use crate::worker::{Worker, WorkerContext, WorkerError, WorkerResult};
2use assert_matches::assert_matches;
3use chrono::{DateTime, Utc};
4use concepts::storage::{
5 AppendRequest, DbPool, ExecutionLog, JoinSetResponseEvent, JoinSetResponseEventOuter,
6 LockedExecution,
7};
8use concepts::time::ClockFn;
9use concepts::{prefixed_ulid::ExecutorId, ExecutionId, FunctionFqn};
10use concepts::{
11 storage::{DbConnection, DbError, ExecutionEventInner, JoinSetResponse, Version},
12 FinishedExecutionError,
13};
14use concepts::{ComponentId, FinishedExecutionResult, FunctionMetadata, StrVariant};
15use concepts::{JoinSetId, PermanentFailureKind};
16use std::marker::PhantomData;
17use std::{
18 sync::{
19 atomic::{AtomicBool, Ordering},
20 Arc,
21 },
22 time::Duration,
23};
24use tokio::task::{AbortHandle, JoinHandle};
25use tracing::{debug, error, info, info_span, instrument, trace, warn, Instrument, Level, Span};
26
27#[derive(Debug, Clone)]
28pub struct ExecConfig {
29 pub lock_expiry: Duration,
30 pub tick_sleep: Duration,
31 pub batch_size: u32,
32 pub component_id: ComponentId,
33 pub task_limiter: Option<Arc<tokio::sync::Semaphore>>,
34}
35
36pub struct ExecTask<C: ClockFn, DB: DbConnection, P: DbPool<DB>> {
37 worker: Arc<dyn Worker>,
38 config: ExecConfig,
39 clock_fn: C, db_pool: P,
41 executor_id: ExecutorId,
42 phantom_data: PhantomData<DB>,
43 ffqns: Arc<[FunctionFqn]>,
44}
45
46#[derive(derive_more::Debug, Default)]
47pub struct ExecutionProgress {
48 #[debug(skip)]
49 #[allow(dead_code)]
50 executions: Vec<(ExecutionId, JoinHandle<()>)>,
51}
52
53impl ExecutionProgress {
54 #[cfg(feature = "test")]
55 pub async fn wait_for_tasks(self) -> Result<usize, tokio::task::JoinError> {
56 let execs = self.executions.len();
57 for (_, handle) in self.executions {
58 handle.await?;
59 }
60 Ok(execs)
61 }
62}
63
64#[derive(derive_more::Debug)]
65pub struct ExecutorTaskHandle {
66 #[debug(skip)]
67 is_closing: Arc<AtomicBool>,
68 #[debug(skip)]
69 abort_handle: AbortHandle,
70 component_id: ComponentId,
71 executor_id: ExecutorId,
72}
73
74impl ExecutorTaskHandle {
75 #[instrument(level = Level::DEBUG, name = "executor.close", skip_all, fields(executor_id= %self.executor_id, component_id=%self.component_id))]
76 pub async fn close(&self) {
77 trace!("Gracefully closing");
78 self.is_closing.store(true, Ordering::Relaxed);
79 while !self.abort_handle.is_finished() {
80 tokio::time::sleep(Duration::from_millis(1)).await;
81 }
82 debug!("Gracefully closed");
83 }
84}
85
86impl Drop for ExecutorTaskHandle {
87 #[instrument(level = Level::DEBUG, name = "executor.drop", skip_all, fields(executor_id= %self.executor_id, component_id=%self.component_id))]
88 fn drop(&mut self) {
89 if self.abort_handle.is_finished() {
90 return;
91 }
92 warn!(executor_id= %self.executor_id, component_id=%self.component_id, "Aborting the executor task");
93 self.abort_handle.abort();
94 }
95}
96
97#[cfg(feature = "test")]
98pub fn extract_ffqns_test(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
99 extract_ffqns(worker)
100}
101
102pub(crate) fn extract_ffqns(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
103 worker
104 .exported_functions()
105 .iter()
106 .map(|FunctionMetadata { ffqn, .. }| ffqn.clone())
107 .collect::<Arc<_>>()
108}
109
110impl<C: ClockFn + 'static, DB: DbConnection + 'static, P: DbPool<DB> + 'static> ExecTask<C, DB, P> {
111 #[cfg(feature = "test")]
112 pub fn new(
113 worker: Arc<dyn Worker>,
114 config: ExecConfig,
115 clock_fn: C,
116 db_pool: P,
117 ffqns: Arc<[FunctionFqn]>,
118 ) -> Self {
119 Self {
120 worker,
121 config,
122 executor_id: ExecutorId::generate(),
123 db_pool,
124 phantom_data: PhantomData,
125 ffqns,
126 clock_fn,
127 }
128 }
129
130 pub fn spawn_new(
131 worker: Arc<dyn Worker>,
132 config: ExecConfig,
133 clock_fn: C,
134 db_pool: P,
135 executor_id: ExecutorId,
136 ) -> ExecutorTaskHandle {
137 let is_closing = Arc::new(AtomicBool::default());
138 let is_closing_inner = is_closing.clone();
139 let ffqns = extract_ffqns(worker.as_ref());
140 let component_id = config.component_id.clone();
141 let abort_handle = tokio::spawn(async move {
142 debug!(%executor_id, component_id = %config.component_id, "Spawned executor");
143 let task = Self {
144 worker,
145 config,
146 executor_id,
147 db_pool,
148 phantom_data: PhantomData,
149 ffqns: ffqns.clone(),
150 clock_fn: clock_fn.clone(),
151 };
152 loop {
153 let _ = task.tick(clock_fn.now()).await;
154 let executed_at = clock_fn.now();
155 task.db_pool
156 .connection()
157 .wait_for_pending(executed_at, ffqns.clone(), task.config.tick_sleep)
158 .await;
159 if is_closing_inner.load(Ordering::Relaxed) {
160 return;
161 }
162 }
163 })
164 .abort_handle();
165 ExecutorTaskHandle {
166 is_closing,
167 abort_handle,
168 component_id,
169 executor_id,
170 }
171 }
172
173 fn acquire_task_permits(&self) -> Vec<Option<tokio::sync::OwnedSemaphorePermit>> {
174 if let Some(task_limiter) = &self.config.task_limiter {
175 let mut locks = Vec::new();
176 for _ in 0..self.config.batch_size {
177 if let Ok(permit) = task_limiter.clone().try_acquire_owned() {
178 locks.push(Some(permit));
179 } else {
180 break;
181 }
182 }
183 locks
184 } else {
185 let mut vec = Vec::with_capacity(self.config.batch_size as usize);
186 for _ in 0..self.config.batch_size {
187 vec.push(None);
188 }
189 vec
190 }
191 }
192
193 #[cfg(feature = "test")]
194 pub async fn tick_test(&self, executed_at: DateTime<Utc>) -> Result<ExecutionProgress, ()> {
195 self.tick(executed_at).await
196 }
197
198 #[instrument(level = Level::TRACE, name = "executor.tick" skip_all, fields(executor_id = %self.executor_id, component_id = %self.config.component_id))]
199 async fn tick(&self, executed_at: DateTime<Utc>) -> Result<ExecutionProgress, ()> {
200 let locked_executions = {
201 let mut permits = self.acquire_task_permits();
202 if permits.is_empty() {
203 return Ok(ExecutionProgress::default());
204 }
205 let db_connection = self.db_pool.connection();
206 let lock_expires_at = executed_at + self.config.lock_expiry;
207 let locked_executions = db_connection
208 .lock_pending(
209 permits.len(), executed_at, self.ffqns.clone(),
212 executed_at, self.config.component_id.clone(),
214 self.executor_id,
215 lock_expires_at,
216 )
217 .await
218 .map_err(|err| {
219 warn!(executor_id = %self.executor_id, component_id = %self.config.component_id, "lock_pending error {err:?}");
220 })?;
221 while permits.len() > locked_executions.len() {
223 permits.pop();
224 }
225 assert_eq!(permits.len(), locked_executions.len());
226 locked_executions.into_iter().zip(permits)
227 };
228 let execution_deadline = executed_at + self.config.lock_expiry;
229
230 let mut executions = Vec::with_capacity(locked_executions.len());
231 for (locked_execution, permit) in locked_executions {
232 let execution_id = locked_execution.execution_id.clone();
233 let join_handle = {
234 let worker = self.worker.clone();
235 let db_pool = self.db_pool.clone();
236 let clock_fn = self.clock_fn.clone();
237 let run_id = locked_execution.run_id;
238 let worker_span = info_span!(parent: None, "worker",
239 %execution_id, %run_id, ffqn = %locked_execution.ffqn, executor_id = %self.executor_id, component_id = %self.config.component_id);
240 locked_execution.metadata.enrich(&worker_span);
241 tokio::spawn({
242 let worker_span2 = worker_span.clone();
243 async move {
244 let res = Self::run_worker(
245 worker,
246 &db_pool,
247 execution_deadline,
248 clock_fn,
249 locked_execution,
250 worker_span2,
251 )
252 .await;
253 if let Err(db_error) = res {
254 error!("Execution will be timed out not writing `{db_error:?}`");
255 }
256 drop(permit);
257 }
258 .instrument(worker_span)
259 })
260 };
261 executions.push((execution_id, join_handle));
262 }
263 Ok(ExecutionProgress { executions })
264 }
265
266 async fn run_worker(
267 worker: Arc<dyn Worker>,
268 db_pool: &P,
269 execution_deadline: DateTime<Utc>,
270 clock_fn: C,
271 locked_execution: LockedExecution,
272 worker_span: Span,
273 ) -> Result<(), DbError> {
274 debug!("Worker::run starting");
275 trace!(
276 version = %locked_execution.version,
277 params = ?locked_execution.params,
278 event_history = ?locked_execution.event_history,
279 "Worker::run starting"
280 );
281 let can_be_retried = ExecutionLog::can_be_retried_after(
282 locked_execution.temporary_event_count + 1,
283 locked_execution.max_retries,
284 locked_execution.retry_exp_backoff,
285 );
286 let unlock_expiry_on_limit_reached =
287 ExecutionLog::compute_retry_duration_when_retrying_forever(
288 locked_execution.temporary_event_count + 1,
289 locked_execution.retry_exp_backoff,
290 );
291 let ctx = WorkerContext {
292 execution_id: locked_execution.execution_id.clone(),
293 metadata: locked_execution.metadata,
294 ffqn: locked_execution.ffqn,
295 params: locked_execution.params,
296 event_history: locked_execution.event_history,
297 responses: locked_execution
298 .responses
299 .into_iter()
300 .map(|outer| outer.event)
301 .collect(),
302 version: locked_execution.version,
303 execution_deadline,
304 can_be_retried: can_be_retried.is_some(),
305 run_id: locked_execution.run_id,
306 worker_span,
307 };
308 let worker_result = worker.run(ctx).await;
309 trace!(?worker_result, "Worker::run finished");
310 let result_obtained_at = clock_fn.now();
311 match Self::worker_result_to_execution_event(
312 locked_execution.execution_id,
313 worker_result,
314 result_obtained_at,
315 locked_execution.parent,
316 can_be_retried,
317 unlock_expiry_on_limit_reached,
318 )? {
319 Some(append) => {
320 let db_connection = db_pool.connection();
321 trace!("Appending {append:?}");
322 append.clone().append(&db_connection).await
323 }
324 None => Ok(()),
325 }
326 }
327
328 #[expect(clippy::too_many_lines)]
331 fn worker_result_to_execution_event(
332 execution_id: ExecutionId,
333 worker_result: WorkerResult,
334 result_obtained_at: DateTime<Utc>,
335 parent: Option<(ExecutionId, JoinSetId)>,
336 can_be_retried: Option<Duration>,
337 unlock_expiry_on_limit_reached: Duration,
338 ) -> Result<Option<Append>, DbError> {
339 Ok(match worker_result {
340 WorkerResult::Ok(result, new_version) => {
341 info!(
342 "Execution finished: {}",
343 result.as_pending_state_finished_result()
344 );
345 let child_finished =
346 parent.map(
347 |(parent_execution_id, parent_join_set)| ChildFinishedResponse {
348 parent_execution_id,
349 parent_join_set,
350 result: Ok(result.clone()),
351 },
352 );
353 let primary_event = AppendRequest {
354 created_at: result_obtained_at,
355 event: ExecutionEventInner::Finished { result: Ok(result) },
356 };
357
358 Some(Append {
359 created_at: result_obtained_at,
360 primary_event,
361 execution_id,
362 version: new_version,
363 child_finished,
364 })
365 }
366 WorkerResult::DbUpdatedByWorker => None,
367 WorkerResult::Err(err) => {
368 let reason = err.to_string();
369 let (primary_event, child_finished, version) = match err {
370 WorkerError::TemporaryTimeout => {
371 info!("Temporary timeout");
372 return Ok(None);
374 }
375 WorkerError::DbError(db_error) => {
376 return Err(db_error);
377 }
378 WorkerError::ActivityTrap {
379 reason: reason_inner,
380 trap_kind,
381 detail,
382 version,
383 } => {
384 if let Some(duration) = can_be_retried {
385 let expires_at = result_obtained_at + duration;
386 debug!(
387 "Retrying activity {trap_kind} execution after {duration:?} at {expires_at}"
388 );
389 (
390 ExecutionEventInner::TemporarilyFailed {
391 backoff_expires_at: expires_at,
392 reason_full: StrVariant::from(reason),
393 reason_inner: StrVariant::from(reason_inner),
394 detail: Some(detail),
395 },
396 None,
397 version,
398 )
399 } else {
400 info!(
401 "Activity {trap_kind} marked as permanent failure - {reason_inner}"
402 );
403 let result = Err(FinishedExecutionError::PermanentFailure {
404 reason_inner,
405 reason_full: reason,
406 kind: PermanentFailureKind::ActivityTrap,
407 detail: Some(detail),
408 });
409 let child_finished =
410 parent.map(|(parent_execution_id, parent_join_set)| {
411 ChildFinishedResponse {
412 parent_execution_id,
413 parent_join_set,
414 result: result.clone(),
415 }
416 });
417 (
418 ExecutionEventInner::Finished { result },
419 child_finished,
420 version,
421 )
422 }
423 }
424 WorkerError::ActivityReturnedError { detail, version } => {
425 let duration = can_be_retried.expect(
426 "ActivityReturnedError must not be returned when retries are exhausted",
427 );
428 let expires_at = result_obtained_at + duration;
429 debug!("Retrying ActivityReturnedError after {duration:?} at {expires_at}");
430 (
431 ExecutionEventInner::TemporarilyFailed {
432 backoff_expires_at: expires_at,
433 reason_full: StrVariant::Static("activity returned error"),
434 reason_inner: StrVariant::Static("activity returned error"),
435 detail,
436 },
437 None,
438 version,
439 )
440 }
441 WorkerError::TemporaryWorkflowTrap {
442 reason: reason_inner,
443 kind,
444 detail,
445 version,
446 } => {
447 let duration = can_be_retried.expect("workflows are retried forever");
448 let expires_at = result_obtained_at + duration;
449 debug!(
450 "Retrying workflow {kind} execution after {duration:?} at {expires_at}"
451 );
452 (
453 ExecutionEventInner::TemporarilyFailed {
454 backoff_expires_at: expires_at,
455 reason_full: StrVariant::from(reason),
456 reason_inner: StrVariant::from(reason_inner),
457 detail,
458 },
459 None,
460 version,
461 )
462 }
463 WorkerError::LimitReached {
464 reason: inner_reason,
465 version: new_version,
466 } => {
467 let expires_at = result_obtained_at + unlock_expiry_on_limit_reached;
468 warn!(
469 "Limit reached: {inner_reason}, unlocking after {unlock_expiry_on_limit_reached:?} at {expires_at}"
470 );
471 (
472 ExecutionEventInner::Unlocked {
473 backoff_expires_at: expires_at,
474 reason: StrVariant::from(reason),
475 },
476 None,
477 new_version,
478 )
479 }
480 WorkerError::FatalError(fatal_error, version) => {
481 info!("Fatal worker error - {fatal_error:?}");
482 let result = Err(FinishedExecutionError::from(fatal_error));
483 let child_finished =
484 parent.map(|(parent_execution_id, parent_join_set)| {
485 ChildFinishedResponse {
486 parent_execution_id,
487 parent_join_set,
488 result: result.clone(),
489 }
490 });
491 (
492 ExecutionEventInner::Finished { result },
493 child_finished,
494 version,
495 )
496 }
497 };
498 Some(Append {
499 created_at: result_obtained_at,
500 primary_event: AppendRequest {
501 created_at: result_obtained_at,
502 event: primary_event,
503 },
504 execution_id,
505 version,
506 child_finished,
507 })
508 }
509 })
510 }
511}
512
513#[derive(Debug, Clone)]
514pub(crate) struct ChildFinishedResponse {
515 pub(crate) parent_execution_id: ExecutionId,
516 pub(crate) parent_join_set: JoinSetId,
517 pub(crate) result: FinishedExecutionResult,
518}
519
520#[derive(Debug, Clone)]
521pub(crate) struct Append {
522 pub(crate) created_at: DateTime<Utc>,
523 pub(crate) primary_event: AppendRequest,
524 pub(crate) execution_id: ExecutionId,
525 pub(crate) version: Version,
526 pub(crate) child_finished: Option<ChildFinishedResponse>,
527}
528
529impl Append {
530 pub(crate) async fn append(self, db_connection: &impl DbConnection) -> Result<(), DbError> {
531 if let Some(child_finished) = self.child_finished {
532 assert_matches!(
533 &self.primary_event,
534 AppendRequest {
535 event: ExecutionEventInner::Finished { .. },
536 ..
537 }
538 );
539 let derived = assert_matches!(self.execution_id.clone(), ExecutionId::Derived(derived) => derived);
540 db_connection
541 .append_batch_respond_to_parent(
542 derived.clone(),
543 self.created_at,
544 vec![self.primary_event],
545 self.version.clone(),
546 child_finished.parent_execution_id,
547 JoinSetResponseEventOuter {
548 created_at: self.created_at,
549 event: JoinSetResponseEvent {
550 join_set_id: child_finished.parent_join_set,
551 event: JoinSetResponse::ChildExecutionFinished {
552 child_execution_id: derived,
553 finished_version: self.version,
555 result: child_finished.result,
556 },
557 },
558 },
559 )
560 .await?;
561 } else {
562 db_connection
563 .append_batch(
564 self.created_at,
565 vec![self.primary_event],
566 self.execution_id,
567 self.version,
568 )
569 .await?;
570 }
571 Ok(())
572 }
573}
574
575#[cfg(any(test, feature = "test"))]
576pub mod simple_worker {
577 use crate::worker::{Worker, WorkerContext, WorkerResult};
578 use async_trait::async_trait;
579 use concepts::{
580 storage::{HistoryEvent, Version},
581 FunctionFqn, FunctionMetadata, ParameterTypes,
582 };
583 use indexmap::IndexMap;
584 use std::sync::Arc;
585 use tracing::trace;
586
587 pub(crate) const FFQN_SOME: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn");
588 pub type SimpleWorkerResultMap =
589 Arc<std::sync::Mutex<IndexMap<Version, (Vec<HistoryEvent>, WorkerResult)>>>;
590
591 #[derive(Clone, Debug)]
592 pub struct SimpleWorker {
593 pub worker_results_rev: SimpleWorkerResultMap,
594 pub ffqn: FunctionFqn,
595 exported: [FunctionMetadata; 1],
596 }
597
598 impl SimpleWorker {
599 #[must_use]
600 pub fn with_single_result(res: WorkerResult) -> Self {
601 Self::with_worker_results_rev(Arc::new(std::sync::Mutex::new(IndexMap::from([(
602 Version::new(2),
603 (vec![], res),
604 )]))))
605 }
606
607 #[must_use]
608 pub fn with_ffqn(self, ffqn: FunctionFqn) -> Self {
609 Self {
610 worker_results_rev: self.worker_results_rev,
611 exported: [FunctionMetadata {
612 ffqn: ffqn.clone(),
613 parameter_types: ParameterTypes::default(),
614 return_type: None,
615 extension: None,
616 submittable: true,
617 }],
618 ffqn,
619 }
620 }
621
622 #[must_use]
623 pub fn with_worker_results_rev(worker_results_rev: SimpleWorkerResultMap) -> Self {
624 Self {
625 worker_results_rev,
626 ffqn: FFQN_SOME,
627 exported: [FunctionMetadata {
628 ffqn: FFQN_SOME,
629 parameter_types: ParameterTypes::default(),
630 return_type: None,
631 extension: None,
632 submittable: true,
633 }],
634 }
635 }
636 }
637
638 #[async_trait]
639 impl Worker for SimpleWorker {
640 async fn run(&self, ctx: WorkerContext) -> WorkerResult {
641 let (expected_version, (expected_eh, worker_result)) =
642 self.worker_results_rev.lock().unwrap().pop().unwrap();
643 trace!(%expected_version, version = %ctx.version, ?expected_eh, eh = ?ctx.event_history, "Running SimpleWorker");
644 assert_eq!(expected_version, ctx.version);
645 assert_eq!(expected_eh, ctx.event_history);
646 worker_result
647 }
648
649 fn exported_functions(&self) -> &[FunctionMetadata] {
650 &self.exported
651 }
652
653 fn imported_functions(&self) -> &[FunctionMetadata] {
654 &[]
655 }
656 }
657}
658
659#[cfg(test)]
660mod tests {
661 use self::simple_worker::SimpleWorker;
662 use super::*;
663 use crate::worker::FatalError;
664 use crate::{expired_timers_watcher, worker::WorkerResult};
665 use assert_matches::assert_matches;
666 use async_trait::async_trait;
667 use concepts::storage::{CreateRequest, JoinSetRequest};
668 use concepts::storage::{
669 DbConnection, ExecutionEvent, ExecutionEventInner, HistoryEvent, PendingState,
670 };
671 use concepts::time::Now;
672 use concepts::{
673 FunctionMetadata, JoinSetKind, ParameterTypes, Params, StrVariant,
674 SupportedFunctionReturnValue, TrapKind,
675 };
676 use db_tests::Database;
677 use indexmap::IndexMap;
678 use simple_worker::FFQN_SOME;
679 use std::{fmt::Debug, future::Future, ops::Deref, sync::Arc};
680 use test_utils::set_up;
681 use test_utils::sim_clock::{ConstClock, SimClock};
682
683 pub(crate) const FFQN_CHILD: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn-child");
684
685 async fn tick_fn<
686 W: Worker + Debug,
687 C: ClockFn + 'static,
688 DB: DbConnection + 'static,
689 P: DbPool<DB> + 'static,
690 >(
691 config: ExecConfig,
692 clock_fn: C,
693 db_pool: P,
694 worker: Arc<W>,
695 executed_at: DateTime<Utc>,
696 ) -> ExecutionProgress {
697 trace!("Ticking with {worker:?}");
698 let ffqns = super::extract_ffqns(worker.as_ref());
699 let executor = ExecTask::new(worker, config, clock_fn, db_pool, ffqns);
700 let mut execution_progress = executor.tick(executed_at).await.unwrap();
701 loop {
702 execution_progress
703 .executions
704 .retain(|(_, abort_handle)| !abort_handle.is_finished());
705 if execution_progress.executions.is_empty() {
706 return execution_progress;
707 }
708 tokio::time::sleep(Duration::from_millis(10)).await;
709 }
710 }
711
712 #[tokio::test]
713 async fn execute_simple_lifecycle_tick_based_mem() {
714 let created_at = Now.now();
715 let (_guard, db_pool) = Database::Memory.set_up().await;
716 execute_simple_lifecycle_tick_based(db_pool.clone(), ConstClock(created_at)).await;
717 db_pool.close().await.unwrap();
718 }
719
720 #[cfg(not(madsim))]
721 #[tokio::test]
722 async fn execute_simple_lifecycle_tick_based_sqlite() {
723 let created_at = Now.now();
724 let (_guard, db_pool) = Database::Sqlite.set_up().await;
725 execute_simple_lifecycle_tick_based(db_pool.clone(), ConstClock(created_at)).await;
726 db_pool.close().await.unwrap();
727 }
728
729 async fn execute_simple_lifecycle_tick_based<
730 DB: DbConnection + 'static,
731 P: DbPool<DB> + 'static,
732 C: ClockFn + 'static,
733 >(
734 pool: P,
735 clock_fn: C,
736 ) {
737 set_up();
738 let created_at = clock_fn.now();
739 let exec_config = ExecConfig {
740 batch_size: 1,
741 lock_expiry: Duration::from_secs(1),
742 tick_sleep: Duration::from_millis(100),
743 component_id: ComponentId::dummy_activity(),
744 task_limiter: None,
745 };
746
747 let execution_log = create_and_tick(
748 CreateAndTickConfig {
749 execution_id: ExecutionId::generate(),
750 created_at,
751 max_retries: 0,
752 executed_at: created_at,
753 retry_exp_backoff: Duration::ZERO,
754 },
755 clock_fn,
756 pool,
757 exec_config,
758 Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
759 SupportedFunctionReturnValue::None,
760 Version::new(2),
761 ))),
762 tick_fn,
763 )
764 .await;
765 assert_matches!(
766 execution_log.events.get(2).unwrap(),
767 ExecutionEvent {
768 event: ExecutionEventInner::Finished {
769 result: Ok(SupportedFunctionReturnValue::None),
770 },
771 created_at: _,
772 }
773 );
774 }
775
776 #[tokio::test]
777 async fn stochastic_execute_simple_lifecycle_task_based_mem() {
778 set_up();
779 let created_at = Now.now();
780 let clock_fn = ConstClock(created_at);
781 let (_guard, db_pool) = Database::Memory.set_up().await;
782 let exec_config = ExecConfig {
783 batch_size: 1,
784 lock_expiry: Duration::from_secs(1),
785 tick_sleep: Duration::ZERO,
786 component_id: ComponentId::dummy_activity(),
787 task_limiter: None,
788 };
789
790 let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
791 SupportedFunctionReturnValue::None,
792 Version::new(2),
793 )));
794 let exec_task = ExecTask::spawn_new(
795 worker.clone(),
796 exec_config.clone(),
797 clock_fn,
798 db_pool.clone(),
799 ExecutorId::generate(),
800 );
801
802 let execution_log = create_and_tick(
803 CreateAndTickConfig {
804 execution_id: ExecutionId::generate(),
805 created_at,
806 max_retries: 0,
807 executed_at: created_at,
808 retry_exp_backoff: Duration::ZERO,
809 },
810 clock_fn,
811 db_pool.clone(),
812 exec_config,
813 worker,
814 |_, _, _, _, _| async {
815 tokio::time::sleep(Duration::from_secs(1)).await; ExecutionProgress::default()
817 },
818 )
819 .await;
820 exec_task.close().await;
821 db_pool.close().await.unwrap();
822 assert_matches!(
823 execution_log.events.get(2).unwrap(),
824 ExecutionEvent {
825 event: ExecutionEventInner::Finished {
826 result: Ok(SupportedFunctionReturnValue::None),
827 },
828 created_at: _,
829 }
830 );
831 }
832
833 struct CreateAndTickConfig {
834 execution_id: ExecutionId,
835 created_at: DateTime<Utc>,
836 max_retries: u32,
837 executed_at: DateTime<Utc>,
838 retry_exp_backoff: Duration,
839 }
840
841 async fn create_and_tick<
842 W: Worker,
843 C: ClockFn,
844 DB: DbConnection,
845 P: DbPool<DB>,
846 T: FnMut(ExecConfig, C, P, Arc<W>, DateTime<Utc>) -> F,
847 F: Future<Output = ExecutionProgress>,
848 >(
849 config: CreateAndTickConfig,
850 clock_fn: C,
851 db_pool: P,
852 exec_config: ExecConfig,
853 worker: Arc<W>,
854 mut tick: T,
855 ) -> ExecutionLog {
856 let db_connection = db_pool.connection();
858 db_connection
859 .create(CreateRequest {
860 created_at: config.created_at,
861 execution_id: config.execution_id.clone(),
862 ffqn: FFQN_SOME,
863 params: Params::empty(),
864 parent: None,
865 metadata: concepts::ExecutionMetadata::empty(),
866 scheduled_at: config.created_at,
867 retry_exp_backoff: config.retry_exp_backoff,
868 max_retries: config.max_retries,
869 component_id: ComponentId::dummy_activity(),
870 scheduled_by: None,
871 })
872 .await
873 .unwrap();
874 tick(exec_config, clock_fn, db_pool, worker, config.executed_at).await;
876 let execution_log = db_connection.get(&config.execution_id).await.unwrap();
877 debug!("Execution history after tick: {execution_log:?}");
878 assert_matches!(
880 execution_log.events.first().unwrap(),
881 ExecutionEvent {
882 event: ExecutionEventInner::Created { .. },
883 created_at: actually_created_at,
884 }
885 if config.created_at == *actually_created_at
886 );
887 let locked_at = assert_matches!(
888 execution_log.events.get(1).unwrap(),
889 ExecutionEvent {
890 event: ExecutionEventInner::Locked { .. },
891 created_at: locked_at
892 } if config.created_at <= *locked_at
893 => *locked_at
894 );
895 assert_matches!(execution_log.events.get(2).unwrap(), ExecutionEvent {
896 event: _,
897 created_at: executed_at,
898 } if *executed_at >= locked_at);
899 execution_log
900 }
901
902 #[expect(clippy::too_many_lines)]
903 #[tokio::test]
904 async fn activity_trap_should_trigger_an_execution_retry() {
905 set_up();
906 let sim_clock = SimClock::default();
907 let (_guard, db_pool) = Database::Memory.set_up().await;
908 let exec_config = ExecConfig {
909 batch_size: 1,
910 lock_expiry: Duration::from_secs(1),
911 tick_sleep: Duration::ZERO,
912 component_id: ComponentId::dummy_activity(),
913 task_limiter: None,
914 };
915 let expected_reason = "error reason";
916 let expected_detail = "error detail";
917 let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
918 WorkerError::ActivityTrap {
919 reason: expected_reason.to_string(),
920 trap_kind: concepts::TrapKind::Trap,
921 detail: expected_detail.to_string(),
922 version: Version::new(2),
923 },
924 )));
925 let retry_exp_backoff = Duration::from_millis(100);
926 debug!(now = %sim_clock.now(), "Creating an execution that should fail");
927 let execution_log = create_and_tick(
928 CreateAndTickConfig {
929 execution_id: ExecutionId::generate(),
930 created_at: sim_clock.now(),
931 max_retries: 1,
932 executed_at: sim_clock.now(),
933 retry_exp_backoff,
934 },
935 sim_clock.clone(),
936 db_pool.clone(),
937 exec_config.clone(),
938 worker,
939 tick_fn,
940 )
941 .await;
942 assert_eq!(3, execution_log.events.len());
943 {
944 let (reason_full, reason_inner, detail, at, expires_at) = assert_matches!(
945 &execution_log.events.get(2).unwrap(),
946 ExecutionEvent {
947 event: ExecutionEventInner::TemporarilyFailed {
948 reason_inner,
949 reason_full,
950 detail,
951 backoff_expires_at,
952 },
953 created_at: at,
954 }
955 => (reason_full, reason_inner, detail, *at, *backoff_expires_at)
956 );
957 assert_eq!(expected_reason, reason_inner.deref());
958 assert_eq!(
959 format!("activity trap: {expected_reason}"),
960 reason_full.deref()
961 );
962 assert_eq!(Some(expected_detail), detail.as_deref());
963 assert_eq!(at, sim_clock.now());
964 assert_eq!(sim_clock.now() + retry_exp_backoff, expires_at);
965 }
966 let worker = Arc::new(SimpleWorker::with_worker_results_rev(Arc::new(
967 std::sync::Mutex::new(IndexMap::from([(
968 Version::new(4),
969 (
970 vec![],
971 WorkerResult::Ok(SupportedFunctionReturnValue::None, Version::new(4)),
972 ),
973 )])),
974 )));
975 assert!(tick_fn(
977 exec_config.clone(),
978 sim_clock.clone(),
979 db_pool.clone(),
980 worker.clone(),
981 sim_clock.now(),
982 )
983 .await
984 .executions
985 .is_empty());
986 sim_clock.move_time_forward(retry_exp_backoff).await;
988 tick_fn(
989 exec_config,
990 sim_clock.clone(),
991 db_pool.clone(),
992 worker,
993 sim_clock.now(),
994 )
995 .await;
996 let execution_log = {
997 let db_connection = db_pool.connection();
998 db_connection
999 .get(&execution_log.execution_id)
1000 .await
1001 .unwrap()
1002 };
1003 debug!(now = %sim_clock.now(), "Execution history after second tick: {execution_log:?}");
1004 assert_matches!(
1005 execution_log.events.get(3).unwrap(),
1006 ExecutionEvent {
1007 event: ExecutionEventInner::Locked { .. },
1008 created_at: at
1009 } if *at == sim_clock.now()
1010 );
1011 assert_matches!(
1012 execution_log.events.get(4).unwrap(),
1013 ExecutionEvent {
1014 event: ExecutionEventInner::Finished {
1015 result: Ok(SupportedFunctionReturnValue::None),
1016 },
1017 created_at: finished_at,
1018 } if *finished_at == sim_clock.now()
1019 );
1020 db_pool.close().await.unwrap();
1021 }
1022
1023 #[tokio::test]
1024 async fn activity_trap_should_not_be_retried_if_no_retries_are_set() {
1025 set_up();
1026 let created_at = Now.now();
1027 let clock_fn = ConstClock(created_at);
1028 let (_guard, db_pool) = Database::Memory.set_up().await;
1029 let exec_config = ExecConfig {
1030 batch_size: 1,
1031 lock_expiry: Duration::from_secs(1),
1032 tick_sleep: Duration::ZERO,
1033 component_id: ComponentId::dummy_activity(),
1034 task_limiter: None,
1035 };
1036
1037 let expected_reason = "error reason";
1038 let expected_detail = "error detail";
1039 let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
1040 WorkerError::ActivityTrap {
1041 reason: expected_reason.to_string(),
1042 trap_kind: concepts::TrapKind::Trap,
1043 detail: expected_detail.to_string(),
1044 version: Version::new(2),
1045 },
1046 )));
1047 let execution_log = create_and_tick(
1048 CreateAndTickConfig {
1049 execution_id: ExecutionId::generate(),
1050 created_at,
1051 max_retries: 0,
1052 executed_at: created_at,
1053 retry_exp_backoff: Duration::ZERO,
1054 },
1055 clock_fn,
1056 db_pool.clone(),
1057 exec_config.clone(),
1058 worker,
1059 tick_fn,
1060 )
1061 .await;
1062 assert_eq!(3, execution_log.events.len());
1063 let (reason, kind, detail) = assert_matches!(
1064 &execution_log.events.get(2).unwrap(),
1065 ExecutionEvent {
1066 event: ExecutionEventInner::Finished{
1067 result: Err(FinishedExecutionError::PermanentFailure{reason_inner, kind, detail, reason_full:_})
1068 },
1069 created_at: at,
1070 } if *at == created_at
1071 => (reason_inner, kind, detail)
1072 );
1073 assert_eq!(expected_reason, *reason);
1074 assert_eq!(Some(expected_detail), detail.as_deref());
1075 assert_eq!(PermanentFailureKind::ActivityTrap, *kind);
1076
1077 db_pool.close().await.unwrap();
1078 }
1079
1080 #[tokio::test]
1081 async fn child_execution_permanently_failed_should_notify_parent_permanent_failure() {
1082 let worker_error = WorkerError::ActivityTrap {
1083 reason: "error reason".to_string(),
1084 trap_kind: TrapKind::Trap,
1085 detail: "detail".to_string(),
1086 version: Version::new(2),
1087 };
1088 let expected_child_err = FinishedExecutionError::PermanentFailure {
1089 reason_full: "activity trap: error reason".to_string(),
1090 reason_inner: "error reason".to_string(),
1091 kind: PermanentFailureKind::ActivityTrap,
1092 detail: Some("detail".to_string()),
1093 };
1094 child_execution_permanently_failed_should_notify_parent(worker_error, expected_child_err)
1095 .await;
1096 }
1097
1098 #[tokio::test]
1099 async fn child_execution_permanently_failed_should_notify_parent_timeout() {
1100 let worker_error = WorkerError::TemporaryTimeout;
1101 let expected_child_err = FinishedExecutionError::PermanentTimeout;
1102 child_execution_permanently_failed_should_notify_parent(worker_error, expected_child_err)
1103 .await;
1104 }
1105
1106 #[tokio::test]
1107 async fn child_execution_permanently_failed_should_notify_parent_unhandled_child() {
1108 let parent_id = ExecutionId::from_parts(1, 1);
1109 let join_set_id_outer =
1110 JoinSetId::new(JoinSetKind::OneOff, StrVariant::Static("outer")).unwrap();
1111 let root_cause_id = parent_id.next_level(&join_set_id_outer);
1112 let join_set_id_inner =
1113 JoinSetId::new(JoinSetKind::OneOff, StrVariant::Static("inner")).unwrap();
1114 let child_execution_id = root_cause_id.next_level(&join_set_id_inner);
1115 let worker_error = WorkerError::FatalError(
1116 FatalError::UnhandledChildExecutionError {
1117 child_execution_id: child_execution_id.clone(),
1118 root_cause_id: root_cause_id.clone(),
1119 },
1120 Version::new(2),
1121 );
1122 let expected_child_err = FinishedExecutionError::UnhandledChildExecutionError {
1123 child_execution_id,
1124 root_cause_id,
1125 };
1126 child_execution_permanently_failed_should_notify_parent(worker_error, expected_child_err)
1127 .await;
1128 }
1129
1130 #[expect(clippy::too_many_lines)]
1131 async fn child_execution_permanently_failed_should_notify_parent(
1132 worker_error: WorkerError,
1133 expected_child_err: FinishedExecutionError,
1134 ) {
1135 use concepts::storage::JoinSetResponseEventOuter;
1136 const LOCK_EXPIRY: Duration = Duration::from_secs(1);
1137
1138 set_up();
1139 let sim_clock = SimClock::default();
1140 let (_guard, db_pool) = Database::Memory.set_up().await;
1141
1142 let parent_worker = Arc::new(SimpleWorker::with_single_result(
1143 WorkerResult::DbUpdatedByWorker,
1144 ));
1145 let parent_execution_id = ExecutionId::generate();
1146 db_pool
1147 .connection()
1148 .create(CreateRequest {
1149 created_at: sim_clock.now(),
1150 execution_id: parent_execution_id.clone(),
1151 ffqn: FFQN_SOME,
1152 params: Params::empty(),
1153 parent: None,
1154 metadata: concepts::ExecutionMetadata::empty(),
1155 scheduled_at: sim_clock.now(),
1156 retry_exp_backoff: Duration::ZERO,
1157 max_retries: 0,
1158 component_id: ComponentId::dummy_activity(),
1159 scheduled_by: None,
1160 })
1161 .await
1162 .unwrap();
1163 tick_fn(
1164 ExecConfig {
1165 batch_size: 1,
1166 lock_expiry: LOCK_EXPIRY,
1167 tick_sleep: Duration::ZERO,
1168 component_id: ComponentId::dummy_activity(),
1169 task_limiter: None,
1170 },
1171 sim_clock.clone(),
1172 db_pool.clone(),
1173 parent_worker,
1174 sim_clock.now(),
1175 )
1176 .await;
1177
1178 let join_set_id = JoinSetId::new(JoinSetKind::OneOff, StrVariant::empty()).unwrap();
1179 let child_execution_id = parent_execution_id.next_level(&join_set_id);
1180 {
1182 let child = CreateRequest {
1183 created_at: sim_clock.now(),
1184 execution_id: ExecutionId::Derived(child_execution_id.clone()),
1185 ffqn: FFQN_CHILD,
1186 params: Params::empty(),
1187 parent: Some((parent_execution_id.clone(), join_set_id.clone())),
1188 metadata: concepts::ExecutionMetadata::empty(),
1189 scheduled_at: sim_clock.now(),
1190 retry_exp_backoff: Duration::ZERO,
1191 max_retries: 0,
1192 component_id: ComponentId::dummy_activity(),
1193 scheduled_by: None,
1194 };
1195 let current_time = sim_clock.now();
1196 let join_set = AppendRequest {
1197 created_at: current_time,
1198 event: ExecutionEventInner::HistoryEvent {
1199 event: HistoryEvent::JoinSet {
1200 join_set_id: join_set_id.clone(),
1201 },
1202 },
1203 };
1204 let child_exec_req = AppendRequest {
1205 created_at: current_time,
1206 event: ExecutionEventInner::HistoryEvent {
1207 event: HistoryEvent::JoinSetRequest {
1208 join_set_id: join_set_id.clone(),
1209 request: JoinSetRequest::ChildExecutionRequest {
1210 child_execution_id: child_execution_id.clone(),
1211 },
1212 },
1213 },
1214 };
1215 let join_next = AppendRequest {
1216 created_at: current_time,
1217 event: ExecutionEventInner::HistoryEvent {
1218 event: HistoryEvent::JoinNext {
1219 join_set_id: join_set_id.clone(),
1220 run_expires_at: sim_clock.now(),
1221 closing: false,
1222 },
1223 },
1224 };
1225 db_pool
1226 .connection()
1227 .append_batch_create_new_execution(
1228 current_time,
1229 vec![join_set, child_exec_req, join_next],
1230 parent_execution_id.clone(),
1231 Version::new(2),
1232 vec![child],
1233 )
1234 .await
1235 .unwrap();
1236 }
1237
1238 let child_worker = Arc::new(
1239 SimpleWorker::with_single_result(WorkerResult::Err(worker_error)).with_ffqn(FFQN_CHILD),
1240 );
1241
1242 tick_fn(
1244 ExecConfig {
1245 batch_size: 1,
1246 lock_expiry: LOCK_EXPIRY,
1247 tick_sleep: Duration::ZERO,
1248 component_id: ComponentId::dummy_activity(),
1249 task_limiter: None,
1250 },
1251 sim_clock.clone(),
1252 db_pool.clone(),
1253 child_worker,
1254 sim_clock.now(),
1255 )
1256 .await;
1257 if expected_child_err == FinishedExecutionError::PermanentTimeout {
1258 sim_clock.move_time_forward(LOCK_EXPIRY).await;
1260 expired_timers_watcher::tick(&db_pool.connection(), sim_clock.now())
1261 .await
1262 .unwrap();
1263 }
1264 let child_log = db_pool
1265 .connection()
1266 .get(&ExecutionId::Derived(child_execution_id.clone()))
1267 .await
1268 .unwrap();
1269 assert!(child_log.pending_state.is_finished());
1270 assert_eq!(
1271 Version(2),
1272 child_log.next_version,
1273 "created = 0, locked = 1, with_single_result = 2"
1274 );
1275 assert_eq!(
1276 ExecutionEventInner::Finished {
1277 result: Err(expected_child_err)
1278 },
1279 child_log.last_event().event
1280 );
1281 let parent_log = db_pool
1282 .connection()
1283 .get(&parent_execution_id)
1284 .await
1285 .unwrap();
1286 assert_matches!(
1287 parent_log.pending_state,
1288 PendingState::PendingAt {
1289 scheduled_at
1290 } if scheduled_at == sim_clock.now(),
1291 "parent should be back to pending"
1292 );
1293 let (found_join_set_id, found_child_execution_id, child_finished_version, found_result) = assert_matches!(
1294 parent_log.responses.last(),
1295 Some(JoinSetResponseEventOuter{
1296 created_at: at,
1297 event: JoinSetResponseEvent{
1298 join_set_id: found_join_set_id,
1299 event: JoinSetResponse::ChildExecutionFinished {
1300 child_execution_id: found_child_execution_id,
1301 finished_version,
1302 result: found_result,
1303 }
1304 }
1305 })
1306 if *at == sim_clock.now()
1307 => (found_join_set_id, found_child_execution_id, finished_version, found_result)
1308 );
1309 assert_eq!(join_set_id, *found_join_set_id);
1310 assert_eq!(child_execution_id, *found_child_execution_id);
1311 assert_eq!(child_log.next_version, *child_finished_version);
1312 assert!(found_result.is_err());
1313
1314 db_pool.close().await.unwrap();
1315 }
1316
1317 #[derive(Clone, Debug)]
1318 struct SleepyWorker {
1319 duration: Duration,
1320 result: SupportedFunctionReturnValue,
1321 exported: [FunctionMetadata; 1],
1322 }
1323
1324 #[async_trait]
1325 impl Worker for SleepyWorker {
1326 async fn run(&self, ctx: WorkerContext) -> WorkerResult {
1327 tokio::time::sleep(self.duration).await;
1328 WorkerResult::Ok(self.result.clone(), ctx.version)
1329 }
1330
1331 fn exported_functions(&self) -> &[FunctionMetadata] {
1332 &self.exported
1333 }
1334
1335 fn imported_functions(&self) -> &[FunctionMetadata] {
1336 &[]
1337 }
1338 }
1339
1340 #[expect(clippy::too_many_lines)]
1341 #[tokio::test]
1342 async fn hanging_lock_should_be_cleaned_and_execution_retried() {
1343 set_up();
1344 let sim_clock = SimClock::default();
1345 let (_guard, db_pool) = Database::Memory.set_up().await;
1346 let lock_expiry = Duration::from_millis(100);
1347 let exec_config = ExecConfig {
1348 batch_size: 1,
1349 lock_expiry,
1350 tick_sleep: Duration::ZERO,
1351 component_id: ComponentId::dummy_activity(),
1352 task_limiter: None,
1353 };
1354
1355 let worker = Arc::new(SleepyWorker {
1356 duration: lock_expiry + Duration::from_millis(1), result: SupportedFunctionReturnValue::None,
1358 exported: [FunctionMetadata {
1359 ffqn: FFQN_SOME,
1360 parameter_types: ParameterTypes::default(),
1361 return_type: None,
1362 extension: None,
1363 submittable: true,
1364 }],
1365 });
1366 let execution_id = ExecutionId::generate();
1368 let timeout_duration = Duration::from_millis(300);
1369 let db_connection = db_pool.connection();
1370 db_connection
1371 .create(CreateRequest {
1372 created_at: sim_clock.now(),
1373 execution_id: execution_id.clone(),
1374 ffqn: FFQN_SOME,
1375 params: Params::empty(),
1376 parent: None,
1377 metadata: concepts::ExecutionMetadata::empty(),
1378 scheduled_at: sim_clock.now(),
1379 retry_exp_backoff: timeout_duration,
1380 max_retries: 1,
1381 component_id: ComponentId::dummy_activity(),
1382 scheduled_by: None,
1383 })
1384 .await
1385 .unwrap();
1386
1387 let ffqns = super::extract_ffqns(worker.as_ref());
1388 let executor = ExecTask::new(
1389 worker,
1390 exec_config,
1391 sim_clock.clone(),
1392 db_pool.clone(),
1393 ffqns,
1394 );
1395 let mut first_execution_progress = executor.tick(sim_clock.now()).await.unwrap();
1396 assert_eq!(1, first_execution_progress.executions.len());
1397 sim_clock.move_time_forward(lock_expiry).await;
1399 let now_after_first_lock_expiry = sim_clock.now();
1401 {
1402 debug!(now = %now_after_first_lock_expiry, "Expecting an expired lock");
1403 let cleanup_progress = executor.tick(now_after_first_lock_expiry).await.unwrap();
1404 assert!(cleanup_progress.executions.is_empty());
1405 }
1406 {
1407 let expired_locks =
1408 expired_timers_watcher::tick(&db_pool.connection(), now_after_first_lock_expiry)
1409 .await
1410 .unwrap()
1411 .expired_locks;
1412 assert_eq!(1, expired_locks);
1413 }
1414 assert!(!first_execution_progress
1415 .executions
1416 .pop()
1417 .unwrap()
1418 .1
1419 .is_finished());
1420
1421 let execution_log = db_connection.get(&execution_id).await.unwrap();
1422 let expected_first_timeout_expiry = now_after_first_lock_expiry + timeout_duration;
1423 assert_matches!(
1424 &execution_log.events.get(2).unwrap(),
1425 ExecutionEvent {
1426 event: ExecutionEventInner::TemporarilyTimedOut { backoff_expires_at },
1427 created_at: at,
1428 } if *at == now_after_first_lock_expiry && *backoff_expires_at == expected_first_timeout_expiry
1429 );
1430 assert_eq!(
1431 PendingState::PendingAt {
1432 scheduled_at: expected_first_timeout_expiry
1433 },
1434 execution_log.pending_state
1435 );
1436 sim_clock.move_time_forward(timeout_duration).await;
1437 let now_after_first_timeout = sim_clock.now();
1438 debug!(now = %now_after_first_timeout, "Second execution should hang again and result in a permanent timeout");
1439
1440 let mut second_execution_progress = executor.tick(now_after_first_timeout).await.unwrap();
1441 assert_eq!(1, second_execution_progress.executions.len());
1442
1443 sim_clock.move_time_forward(lock_expiry).await;
1445 let now_after_second_lock_expiry = sim_clock.now();
1447 debug!(now = %now_after_second_lock_expiry, "Expecting the second lock to be expired");
1448 {
1449 let cleanup_progress = executor.tick(now_after_second_lock_expiry).await.unwrap();
1450 assert!(cleanup_progress.executions.is_empty());
1451 }
1452 {
1453 let expired_locks =
1454 expired_timers_watcher::tick(&db_pool.connection(), now_after_second_lock_expiry)
1455 .await
1456 .unwrap()
1457 .expired_locks;
1458 assert_eq!(1, expired_locks);
1459 }
1460 assert!(!second_execution_progress
1461 .executions
1462 .pop()
1463 .unwrap()
1464 .1
1465 .is_finished());
1466
1467 drop(db_connection);
1468 drop(executor);
1469 db_pool.close().await.unwrap();
1470 }
1471}