1use crate::worker::{Worker, WorkerContext, WorkerError, WorkerResult};
2use assert_matches::assert_matches;
3use chrono::{DateTime, Utc};
4use concepts::storage::{
5 AppendRequest, DbPool, ExecutionLog, JoinSetResponseEvent, JoinSetResponseEventOuter,
6 LockedExecution,
7};
8use concepts::time::ClockFn;
9use concepts::{prefixed_ulid::ExecutorId, ExecutionId, FunctionFqn};
10use concepts::{
11 storage::{DbConnection, DbError, ExecutionEventInner, JoinSetResponse, Version},
12 FinishedExecutionError,
13};
14use concepts::{ComponentId, FinishedExecutionResult, FunctionMetadata, StrVariant};
15use concepts::{JoinSetId, PermanentFailureKind};
16use std::marker::PhantomData;
17use std::{
18 sync::{
19 atomic::{AtomicBool, Ordering},
20 Arc,
21 },
22 time::Duration,
23};
24use tokio::task::{AbortHandle, JoinHandle};
25use tracing::{debug, error, info, info_span, instrument, trace, warn, Instrument, Level, Span};
26
27#[derive(Debug, Clone)]
28pub struct ExecConfig {
29 pub lock_expiry: Duration,
30 pub tick_sleep: Duration,
31 pub batch_size: u32,
32 pub component_id: ComponentId,
33 pub task_limiter: Option<Arc<tokio::sync::Semaphore>>,
34}
35
36pub struct ExecTask<C: ClockFn, DB: DbConnection, P: DbPool<DB>> {
37 worker: Arc<dyn Worker>,
38 config: ExecConfig,
39 clock_fn: C, db_pool: P,
41 executor_id: ExecutorId,
42 phantom_data: PhantomData<DB>,
43 ffqns: Arc<[FunctionFqn]>,
44}
45
46#[derive(derive_more::Debug, Default)]
47pub struct ExecutionProgress {
48 #[debug(skip)]
49 #[allow(dead_code)]
50 executions: Vec<(ExecutionId, JoinHandle<()>)>,
51}
52
53impl ExecutionProgress {
54 #[cfg(feature = "test")]
55 pub async fn wait_for_tasks(self) -> Result<usize, tokio::task::JoinError> {
56 let execs = self.executions.len();
57 for (_, handle) in self.executions {
58 handle.await?;
59 }
60 Ok(execs)
61 }
62}
63
64#[derive(derive_more::Debug)]
65pub struct ExecutorTaskHandle {
66 #[debug(skip)]
67 is_closing: Arc<AtomicBool>,
68 #[debug(skip)]
69 abort_handle: AbortHandle,
70 component_id: ComponentId,
71 executor_id: ExecutorId,
72}
73
74impl ExecutorTaskHandle {
75 #[instrument(level = Level::DEBUG, name = "executor.close", skip_all, fields(executor_id= %self.executor_id, component_id=%self.component_id))]
76 pub async fn close(&self) {
77 trace!("Gracefully closing");
78 self.is_closing.store(true, Ordering::Relaxed);
79 while !self.abort_handle.is_finished() {
80 tokio::time::sleep(Duration::from_millis(1)).await;
81 }
82 debug!("Gracefully closed");
83 }
84}
85
86impl Drop for ExecutorTaskHandle {
87 #[instrument(level = Level::DEBUG, name = "executor.drop", skip_all, fields(executor_id= %self.executor_id, component_id=%self.component_id))]
88 fn drop(&mut self) {
89 if self.abort_handle.is_finished() {
90 return;
91 }
92 warn!(executor_id= %self.executor_id, component_id=%self.component_id, "Aborting the executor task");
93 self.abort_handle.abort();
94 }
95}
96
97#[cfg(feature = "test")]
98pub fn extract_ffqns_test(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
99 extract_ffqns(worker)
100}
101
102pub(crate) fn extract_ffqns(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
103 worker
104 .exported_functions()
105 .iter()
106 .map(|FunctionMetadata { ffqn, .. }| ffqn.clone())
107 .collect::<Arc<_>>()
108}
109
110impl<C: ClockFn + 'static, DB: DbConnection + 'static, P: DbPool<DB> + 'static> ExecTask<C, DB, P> {
111 #[cfg(feature = "test")]
112 pub fn new(
113 worker: Arc<dyn Worker>,
114 config: ExecConfig,
115 clock_fn: C,
116 db_pool: P,
117 ffqns: Arc<[FunctionFqn]>,
118 ) -> Self {
119 Self {
120 worker,
121 config,
122 executor_id: ExecutorId::generate(),
123 db_pool,
124 phantom_data: PhantomData,
125 ffqns,
126 clock_fn,
127 }
128 }
129
130 pub fn spawn_new(
131 worker: Arc<dyn Worker>,
132 config: ExecConfig,
133 clock_fn: C,
134 db_pool: P,
135 executor_id: ExecutorId,
136 ) -> ExecutorTaskHandle {
137 let is_closing = Arc::new(AtomicBool::default());
138 let is_closing_inner = is_closing.clone();
139 let ffqns = extract_ffqns(worker.as_ref());
140 let component_id = config.component_id.clone();
141 let abort_handle = tokio::spawn(async move {
142 debug!(%executor_id, component_id = %config.component_id, "Spawned executor");
143 let task = Self {
144 worker,
145 config,
146 executor_id,
147 db_pool,
148 phantom_data: PhantomData,
149 ffqns: ffqns.clone(),
150 clock_fn: clock_fn.clone(),
151 };
152 loop {
153 let _ = task.tick(clock_fn.now()).await;
154 let executed_at = clock_fn.now();
155 task.db_pool
156 .connection()
157 .wait_for_pending(executed_at, ffqns.clone(), task.config.tick_sleep)
158 .await;
159 if is_closing_inner.load(Ordering::Relaxed) {
160 return;
161 }
162 }
163 })
164 .abort_handle();
165 ExecutorTaskHandle {
166 is_closing,
167 abort_handle,
168 component_id,
169 executor_id,
170 }
171 }
172
173 fn acquire_task_permits(&self) -> Vec<Option<tokio::sync::OwnedSemaphorePermit>> {
174 if let Some(task_limiter) = &self.config.task_limiter {
175 let mut locks = Vec::new();
176 for _ in 0..self.config.batch_size {
177 if let Ok(permit) = task_limiter.clone().try_acquire_owned() {
178 locks.push(Some(permit));
179 } else {
180 break;
181 }
182 }
183 locks
184 } else {
185 let mut vec = Vec::with_capacity(self.config.batch_size as usize);
186 for _ in 0..self.config.batch_size {
187 vec.push(None);
188 }
189 vec
190 }
191 }
192
193 #[cfg(feature = "test")]
194 pub async fn tick_test(&self, executed_at: DateTime<Utc>) -> Result<ExecutionProgress, ()> {
195 self.tick(executed_at).await
196 }
197
198 #[instrument(level = Level::TRACE, name = "executor.tick" skip_all, fields(executor_id = %self.executor_id, component_id = %self.config.component_id))]
199 async fn tick(&self, executed_at: DateTime<Utc>) -> Result<ExecutionProgress, ()> {
200 let locked_executions = {
201 let mut permits = self.acquire_task_permits();
202 if permits.is_empty() {
203 return Ok(ExecutionProgress::default());
204 }
205 let db_connection = self.db_pool.connection();
206 let lock_expires_at = executed_at + self.config.lock_expiry;
207 let locked_executions = db_connection
208 .lock_pending(
209 permits.len(), executed_at, self.ffqns.clone(),
212 executed_at, self.config.component_id.clone(),
214 self.executor_id,
215 lock_expires_at,
216 )
217 .await
218 .map_err(|err| {
219 warn!(executor_id = %self.executor_id, component_id = %self.config.component_id, "lock_pending error {err:?}");
220 })?;
221 while permits.len() > locked_executions.len() {
223 permits.pop();
224 }
225 assert_eq!(permits.len(), locked_executions.len());
226 locked_executions.into_iter().zip(permits)
227 };
228 let execution_deadline = executed_at + self.config.lock_expiry;
229
230 let mut executions = Vec::with_capacity(locked_executions.len());
231 for (locked_execution, permit) in locked_executions {
232 let execution_id = locked_execution.execution_id.clone();
233 let join_handle = {
234 let worker = self.worker.clone();
235 let db_pool = self.db_pool.clone();
236 let clock_fn = self.clock_fn.clone();
237 let run_id = locked_execution.run_id;
238 let worker_span = info_span!(parent: None, "worker",
239 "otel.name" = format!("{}", locked_execution.ffqn),
240 %execution_id, %run_id, ffqn = %locked_execution.ffqn, executor_id = %self.executor_id, component_id = %self.config.component_id);
241 locked_execution.metadata.enrich(&worker_span);
242 tokio::spawn({
243 let worker_span2 = worker_span.clone();
244 async move {
245 let res = Self::run_worker(
246 worker,
247 &db_pool,
248 execution_deadline,
249 clock_fn,
250 locked_execution,
251 worker_span2,
252 )
253 .await;
254 if let Err(db_error) = res {
255 error!("Execution will be timed out not writing `{db_error:?}`");
256 }
257 drop(permit);
258 }
259 .instrument(worker_span)
260 })
261 };
262 executions.push((execution_id, join_handle));
263 }
264 Ok(ExecutionProgress { executions })
265 }
266
267 async fn run_worker(
268 worker: Arc<dyn Worker>,
269 db_pool: &P,
270 execution_deadline: DateTime<Utc>,
271 clock_fn: C,
272 locked_execution: LockedExecution,
273 worker_span: Span,
274 ) -> Result<(), DbError> {
275 debug!("Worker::run starting");
276 trace!(
277 version = %locked_execution.version,
278 params = ?locked_execution.params,
279 event_history = ?locked_execution.event_history,
280 "Worker::run starting"
281 );
282 let can_be_retried = ExecutionLog::can_be_retried_after(
283 locked_execution.temporary_event_count + 1,
284 locked_execution.max_retries,
285 locked_execution.retry_exp_backoff,
286 );
287 let unlock_expiry_on_limit_reached =
288 ExecutionLog::compute_retry_duration_when_retrying_forever(
289 locked_execution.temporary_event_count + 1,
290 locked_execution.retry_exp_backoff,
291 );
292 let ctx = WorkerContext {
293 execution_id: locked_execution.execution_id.clone(),
294 metadata: locked_execution.metadata,
295 ffqn: locked_execution.ffqn,
296 params: locked_execution.params,
297 event_history: locked_execution.event_history,
298 responses: locked_execution
299 .responses
300 .into_iter()
301 .map(|outer| outer.event)
302 .collect(),
303 version: locked_execution.version,
304 execution_deadline,
305 can_be_retried: can_be_retried.is_some(),
306 run_id: locked_execution.run_id,
307 worker_span,
308 };
309 let worker_result = worker.run(ctx).await;
310 trace!(?worker_result, "Worker::run finished");
311 let result_obtained_at = clock_fn.now();
312 match Self::worker_result_to_execution_event(
313 locked_execution.execution_id,
314 worker_result,
315 result_obtained_at,
316 locked_execution.parent,
317 can_be_retried,
318 unlock_expiry_on_limit_reached,
319 )? {
320 Some(append) => {
321 let db_connection = db_pool.connection();
322 trace!("Appending {append:?}");
323 append.clone().append(&db_connection).await
324 }
325 None => Ok(()),
326 }
327 }
328
329 #[expect(clippy::too_many_lines)]
332 fn worker_result_to_execution_event(
333 execution_id: ExecutionId,
334 worker_result: WorkerResult,
335 result_obtained_at: DateTime<Utc>,
336 parent: Option<(ExecutionId, JoinSetId)>,
337 can_be_retried: Option<Duration>,
338 unlock_expiry_on_limit_reached: Duration,
339 ) -> Result<Option<Append>, DbError> {
340 Ok(match worker_result {
341 WorkerResult::Ok(result, new_version) => {
342 info!(
343 "Execution finished: {}",
344 result.as_pending_state_finished_result()
345 );
346 let child_finished =
347 parent.map(
348 |(parent_execution_id, parent_join_set)| ChildFinishedResponse {
349 parent_execution_id,
350 parent_join_set,
351 result: Ok(result.clone()),
352 },
353 );
354 let primary_event = AppendRequest {
355 created_at: result_obtained_at,
356 event: ExecutionEventInner::Finished { result: Ok(result) },
357 };
358
359 Some(Append {
360 created_at: result_obtained_at,
361 primary_event,
362 execution_id,
363 version: new_version,
364 child_finished,
365 })
366 }
367 WorkerResult::DbUpdatedByWorker => None,
368 WorkerResult::Err(err) => {
369 let reason = err.to_string();
370 let (primary_event, child_finished, version) = match err {
371 WorkerError::TemporaryTimeout => {
372 info!("Temporary timeout");
373 return Ok(None);
375 }
376 WorkerError::DbError(db_error) => {
377 return Err(db_error);
378 }
379 WorkerError::ActivityTrap {
380 reason: reason_inner,
381 trap_kind,
382 detail,
383 version,
384 } => {
385 if let Some(duration) = can_be_retried {
386 let expires_at = result_obtained_at + duration;
387 debug!(
388 "Retrying activity {trap_kind} execution after {duration:?} at {expires_at}"
389 );
390 (
391 ExecutionEventInner::TemporarilyFailed {
392 backoff_expires_at: expires_at,
393 reason_full: StrVariant::from(reason),
394 reason_inner: StrVariant::from(reason_inner),
395 detail: Some(detail),
396 },
397 None,
398 version,
399 )
400 } else {
401 info!(
402 "Activity {trap_kind} marked as permanent failure - {reason_inner}"
403 );
404 let result = Err(FinishedExecutionError::PermanentFailure {
405 reason_inner,
406 reason_full: reason,
407 kind: PermanentFailureKind::ActivityTrap,
408 detail: Some(detail),
409 });
410 let child_finished =
411 parent.map(|(parent_execution_id, parent_join_set)| {
412 ChildFinishedResponse {
413 parent_execution_id,
414 parent_join_set,
415 result: result.clone(),
416 }
417 });
418 (
419 ExecutionEventInner::Finished { result },
420 child_finished,
421 version,
422 )
423 }
424 }
425 WorkerError::ActivityReturnedError { detail, version } => {
426 let duration = can_be_retried.expect(
427 "ActivityReturnedError must not be returned when retries are exhausted",
428 );
429 let expires_at = result_obtained_at + duration;
430 debug!("Retrying ActivityReturnedError after {duration:?} at {expires_at}");
431 (
432 ExecutionEventInner::TemporarilyFailed {
433 backoff_expires_at: expires_at,
434 reason_full: StrVariant::Static("activity returned error"),
435 reason_inner: StrVariant::Static("activity returned error"),
436 detail,
437 },
438 None,
439 version,
440 )
441 }
442 WorkerError::TemporaryWorkflowTrap {
443 reason: reason_inner,
444 kind,
445 detail,
446 version,
447 } => {
448 let duration = can_be_retried.expect("workflows are retried forever");
449 let expires_at = result_obtained_at + duration;
450 debug!(
451 "Retrying workflow {kind} execution after {duration:?} at {expires_at}"
452 );
453 (
454 ExecutionEventInner::TemporarilyFailed {
455 backoff_expires_at: expires_at,
456 reason_full: StrVariant::from(reason),
457 reason_inner: StrVariant::from(reason_inner),
458 detail,
459 },
460 None,
461 version,
462 )
463 }
464 WorkerError::LimitReached {
465 reason: inner_reason,
466 version: new_version,
467 } => {
468 let expires_at = result_obtained_at + unlock_expiry_on_limit_reached;
469 warn!(
470 "Limit reached: {inner_reason}, unlocking after {unlock_expiry_on_limit_reached:?} at {expires_at}"
471 );
472 (
473 ExecutionEventInner::Unlocked {
474 backoff_expires_at: expires_at,
475 reason: StrVariant::from(reason),
476 },
477 None,
478 new_version,
479 )
480 }
481 WorkerError::FatalError(fatal_error, version) => {
482 info!("Fatal worker error - {fatal_error:?}");
483 let result = Err(FinishedExecutionError::from(fatal_error));
484 let child_finished =
485 parent.map(|(parent_execution_id, parent_join_set)| {
486 ChildFinishedResponse {
487 parent_execution_id,
488 parent_join_set,
489 result: result.clone(),
490 }
491 });
492 (
493 ExecutionEventInner::Finished { result },
494 child_finished,
495 version,
496 )
497 }
498 };
499 Some(Append {
500 created_at: result_obtained_at,
501 primary_event: AppendRequest {
502 created_at: result_obtained_at,
503 event: primary_event,
504 },
505 execution_id,
506 version,
507 child_finished,
508 })
509 }
510 })
511 }
512}
513
514#[derive(Debug, Clone)]
515pub(crate) struct ChildFinishedResponse {
516 pub(crate) parent_execution_id: ExecutionId,
517 pub(crate) parent_join_set: JoinSetId,
518 pub(crate) result: FinishedExecutionResult,
519}
520
521#[derive(Debug, Clone)]
522pub(crate) struct Append {
523 pub(crate) created_at: DateTime<Utc>,
524 pub(crate) primary_event: AppendRequest,
525 pub(crate) execution_id: ExecutionId,
526 pub(crate) version: Version,
527 pub(crate) child_finished: Option<ChildFinishedResponse>,
528}
529
530impl Append {
531 pub(crate) async fn append(self, db_connection: &impl DbConnection) -> Result<(), DbError> {
532 if let Some(child_finished) = self.child_finished {
533 assert_matches!(
534 &self.primary_event,
535 AppendRequest {
536 event: ExecutionEventInner::Finished { .. },
537 ..
538 }
539 );
540 let derived = assert_matches!(self.execution_id.clone(), ExecutionId::Derived(derived) => derived);
541 db_connection
542 .append_batch_respond_to_parent(
543 derived.clone(),
544 self.created_at,
545 vec![self.primary_event],
546 self.version.clone(),
547 child_finished.parent_execution_id,
548 JoinSetResponseEventOuter {
549 created_at: self.created_at,
550 event: JoinSetResponseEvent {
551 join_set_id: child_finished.parent_join_set,
552 event: JoinSetResponse::ChildExecutionFinished {
553 child_execution_id: derived,
554 finished_version: self.version,
556 result: child_finished.result,
557 },
558 },
559 },
560 )
561 .await?;
562 } else {
563 db_connection
564 .append_batch(
565 self.created_at,
566 vec![self.primary_event],
567 self.execution_id,
568 self.version,
569 )
570 .await?;
571 }
572 Ok(())
573 }
574}
575
576#[cfg(any(test, feature = "test"))]
577pub mod simple_worker {
578 use crate::worker::{Worker, WorkerContext, WorkerResult};
579 use async_trait::async_trait;
580 use concepts::{
581 storage::{HistoryEvent, Version},
582 FunctionFqn, FunctionMetadata, ParameterTypes,
583 };
584 use indexmap::IndexMap;
585 use std::sync::Arc;
586 use tracing::trace;
587
588 pub(crate) const FFQN_SOME: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn");
589 pub type SimpleWorkerResultMap =
590 Arc<std::sync::Mutex<IndexMap<Version, (Vec<HistoryEvent>, WorkerResult)>>>;
591
592 #[derive(Clone, Debug)]
593 pub struct SimpleWorker {
594 pub worker_results_rev: SimpleWorkerResultMap,
595 pub ffqn: FunctionFqn,
596 exported: [FunctionMetadata; 1],
597 }
598
599 impl SimpleWorker {
600 #[must_use]
601 pub fn with_single_result(res: WorkerResult) -> Self {
602 Self::with_worker_results_rev(Arc::new(std::sync::Mutex::new(IndexMap::from([(
603 Version::new(2),
604 (vec![], res),
605 )]))))
606 }
607
608 #[must_use]
609 pub fn with_ffqn(self, ffqn: FunctionFqn) -> Self {
610 Self {
611 worker_results_rev: self.worker_results_rev,
612 exported: [FunctionMetadata {
613 ffqn: ffqn.clone(),
614 parameter_types: ParameterTypes::default(),
615 return_type: None,
616 extension: None,
617 submittable: true,
618 }],
619 ffqn,
620 }
621 }
622
623 #[must_use]
624 pub fn with_worker_results_rev(worker_results_rev: SimpleWorkerResultMap) -> Self {
625 Self {
626 worker_results_rev,
627 ffqn: FFQN_SOME,
628 exported: [FunctionMetadata {
629 ffqn: FFQN_SOME,
630 parameter_types: ParameterTypes::default(),
631 return_type: None,
632 extension: None,
633 submittable: true,
634 }],
635 }
636 }
637 }
638
639 #[async_trait]
640 impl Worker for SimpleWorker {
641 async fn run(&self, ctx: WorkerContext) -> WorkerResult {
642 let (expected_version, (expected_eh, worker_result)) =
643 self.worker_results_rev.lock().unwrap().pop().unwrap();
644 trace!(%expected_version, version = %ctx.version, ?expected_eh, eh = ?ctx.event_history, "Running SimpleWorker");
645 assert_eq!(expected_version, ctx.version);
646 assert_eq!(expected_eh, ctx.event_history);
647 worker_result
648 }
649
650 fn exported_functions(&self) -> &[FunctionMetadata] {
651 &self.exported
652 }
653
654 fn imported_functions(&self) -> &[FunctionMetadata] {
655 &[]
656 }
657 }
658}
659
660#[cfg(test)]
661mod tests {
662 use self::simple_worker::SimpleWorker;
663 use super::*;
664 use crate::worker::FatalError;
665 use crate::{expired_timers_watcher, worker::WorkerResult};
666 use assert_matches::assert_matches;
667 use async_trait::async_trait;
668 use concepts::storage::{CreateRequest, JoinSetRequest};
669 use concepts::storage::{
670 DbConnection, ExecutionEvent, ExecutionEventInner, HistoryEvent, PendingState,
671 };
672 use concepts::time::Now;
673 use concepts::{
674 ClosingStrategy, FunctionMetadata, JoinSetKind, ParameterTypes, Params, StrVariant,
675 SupportedFunctionReturnValue, TrapKind,
676 };
677 use db_tests::Database;
678 use indexmap::IndexMap;
679 use simple_worker::FFQN_SOME;
680 use std::{fmt::Debug, future::Future, ops::Deref, sync::Arc};
681 use test_utils::set_up;
682 use test_utils::sim_clock::{ConstClock, SimClock};
683
684 pub(crate) const FFQN_CHILD: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn-child");
685
686 async fn tick_fn<
687 W: Worker + Debug,
688 C: ClockFn + 'static,
689 DB: DbConnection + 'static,
690 P: DbPool<DB> + 'static,
691 >(
692 config: ExecConfig,
693 clock_fn: C,
694 db_pool: P,
695 worker: Arc<W>,
696 executed_at: DateTime<Utc>,
697 ) -> ExecutionProgress {
698 trace!("Ticking with {worker:?}");
699 let ffqns = super::extract_ffqns(worker.as_ref());
700 let executor = ExecTask::new(worker, config, clock_fn, db_pool, ffqns);
701 let mut execution_progress = executor.tick(executed_at).await.unwrap();
702 loop {
703 execution_progress
704 .executions
705 .retain(|(_, abort_handle)| !abort_handle.is_finished());
706 if execution_progress.executions.is_empty() {
707 return execution_progress;
708 }
709 tokio::time::sleep(Duration::from_millis(10)).await;
710 }
711 }
712
713 #[tokio::test]
714 async fn execute_simple_lifecycle_tick_based_mem() {
715 let created_at = Now.now();
716 let (_guard, db_pool) = Database::Memory.set_up().await;
717 execute_simple_lifecycle_tick_based(db_pool.clone(), ConstClock(created_at)).await;
718 db_pool.close().await.unwrap();
719 }
720
721 #[cfg(not(madsim))]
722 #[tokio::test]
723 async fn execute_simple_lifecycle_tick_based_sqlite() {
724 let created_at = Now.now();
725 let (_guard, db_pool) = Database::Sqlite.set_up().await;
726 execute_simple_lifecycle_tick_based(db_pool.clone(), ConstClock(created_at)).await;
727 db_pool.close().await.unwrap();
728 }
729
730 async fn execute_simple_lifecycle_tick_based<
731 DB: DbConnection + 'static,
732 P: DbPool<DB> + 'static,
733 C: ClockFn + 'static,
734 >(
735 pool: P,
736 clock_fn: C,
737 ) {
738 set_up();
739 let created_at = clock_fn.now();
740 let exec_config = ExecConfig {
741 batch_size: 1,
742 lock_expiry: Duration::from_secs(1),
743 tick_sleep: Duration::from_millis(100),
744 component_id: ComponentId::dummy_activity(),
745 task_limiter: None,
746 };
747
748 let execution_log = create_and_tick(
749 CreateAndTickConfig {
750 execution_id: ExecutionId::generate(),
751 created_at,
752 max_retries: 0,
753 executed_at: created_at,
754 retry_exp_backoff: Duration::ZERO,
755 },
756 clock_fn,
757 pool,
758 exec_config,
759 Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
760 SupportedFunctionReturnValue::None,
761 Version::new(2),
762 ))),
763 tick_fn,
764 )
765 .await;
766 assert_matches!(
767 execution_log.events.get(2).unwrap(),
768 ExecutionEvent {
769 event: ExecutionEventInner::Finished {
770 result: Ok(SupportedFunctionReturnValue::None),
771 },
772 created_at: _,
773 }
774 );
775 }
776
777 #[tokio::test]
778 async fn stochastic_execute_simple_lifecycle_task_based_mem() {
779 set_up();
780 let created_at = Now.now();
781 let clock_fn = ConstClock(created_at);
782 let (_guard, db_pool) = Database::Memory.set_up().await;
783 let exec_config = ExecConfig {
784 batch_size: 1,
785 lock_expiry: Duration::from_secs(1),
786 tick_sleep: Duration::ZERO,
787 component_id: ComponentId::dummy_activity(),
788 task_limiter: None,
789 };
790
791 let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
792 SupportedFunctionReturnValue::None,
793 Version::new(2),
794 )));
795 let exec_task = ExecTask::spawn_new(
796 worker.clone(),
797 exec_config.clone(),
798 clock_fn,
799 db_pool.clone(),
800 ExecutorId::generate(),
801 );
802
803 let execution_log = create_and_tick(
804 CreateAndTickConfig {
805 execution_id: ExecutionId::generate(),
806 created_at,
807 max_retries: 0,
808 executed_at: created_at,
809 retry_exp_backoff: Duration::ZERO,
810 },
811 clock_fn,
812 db_pool.clone(),
813 exec_config,
814 worker,
815 |_, _, _, _, _| async {
816 tokio::time::sleep(Duration::from_secs(1)).await; ExecutionProgress::default()
818 },
819 )
820 .await;
821 exec_task.close().await;
822 db_pool.close().await.unwrap();
823 assert_matches!(
824 execution_log.events.get(2).unwrap(),
825 ExecutionEvent {
826 event: ExecutionEventInner::Finished {
827 result: Ok(SupportedFunctionReturnValue::None),
828 },
829 created_at: _,
830 }
831 );
832 }
833
834 struct CreateAndTickConfig {
835 execution_id: ExecutionId,
836 created_at: DateTime<Utc>,
837 max_retries: u32,
838 executed_at: DateTime<Utc>,
839 retry_exp_backoff: Duration,
840 }
841
842 async fn create_and_tick<
843 W: Worker,
844 C: ClockFn,
845 DB: DbConnection,
846 P: DbPool<DB>,
847 T: FnMut(ExecConfig, C, P, Arc<W>, DateTime<Utc>) -> F,
848 F: Future<Output = ExecutionProgress>,
849 >(
850 config: CreateAndTickConfig,
851 clock_fn: C,
852 db_pool: P,
853 exec_config: ExecConfig,
854 worker: Arc<W>,
855 mut tick: T,
856 ) -> ExecutionLog {
857 let db_connection = db_pool.connection();
859 db_connection
860 .create(CreateRequest {
861 created_at: config.created_at,
862 execution_id: config.execution_id.clone(),
863 ffqn: FFQN_SOME,
864 params: Params::empty(),
865 parent: None,
866 metadata: concepts::ExecutionMetadata::empty(),
867 scheduled_at: config.created_at,
868 retry_exp_backoff: config.retry_exp_backoff,
869 max_retries: config.max_retries,
870 component_id: ComponentId::dummy_activity(),
871 scheduled_by: None,
872 })
873 .await
874 .unwrap();
875 tick(exec_config, clock_fn, db_pool, worker, config.executed_at).await;
877 let execution_log = db_connection.get(&config.execution_id).await.unwrap();
878 debug!("Execution history after tick: {execution_log:?}");
879 assert_matches!(
881 execution_log.events.first().unwrap(),
882 ExecutionEvent {
883 event: ExecutionEventInner::Created { .. },
884 created_at: actually_created_at,
885 }
886 if config.created_at == *actually_created_at
887 );
888 let locked_at = assert_matches!(
889 execution_log.events.get(1).unwrap(),
890 ExecutionEvent {
891 event: ExecutionEventInner::Locked { .. },
892 created_at: locked_at
893 } if config.created_at <= *locked_at
894 => *locked_at
895 );
896 assert_matches!(execution_log.events.get(2).unwrap(), ExecutionEvent {
897 event: _,
898 created_at: executed_at,
899 } if *executed_at >= locked_at);
900 execution_log
901 }
902
903 #[expect(clippy::too_many_lines)]
904 #[tokio::test]
905 async fn activity_trap_should_trigger_an_execution_retry() {
906 set_up();
907 let sim_clock = SimClock::default();
908 let (_guard, db_pool) = Database::Memory.set_up().await;
909 let exec_config = ExecConfig {
910 batch_size: 1,
911 lock_expiry: Duration::from_secs(1),
912 tick_sleep: Duration::ZERO,
913 component_id: ComponentId::dummy_activity(),
914 task_limiter: None,
915 };
916 let expected_reason = "error reason";
917 let expected_detail = "error detail";
918 let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
919 WorkerError::ActivityTrap {
920 reason: expected_reason.to_string(),
921 trap_kind: concepts::TrapKind::Trap,
922 detail: expected_detail.to_string(),
923 version: Version::new(2),
924 },
925 )));
926 let retry_exp_backoff = Duration::from_millis(100);
927 debug!(now = %sim_clock.now(), "Creating an execution that should fail");
928 let execution_log = create_and_tick(
929 CreateAndTickConfig {
930 execution_id: ExecutionId::generate(),
931 created_at: sim_clock.now(),
932 max_retries: 1,
933 executed_at: sim_clock.now(),
934 retry_exp_backoff,
935 },
936 sim_clock.clone(),
937 db_pool.clone(),
938 exec_config.clone(),
939 worker,
940 tick_fn,
941 )
942 .await;
943 assert_eq!(3, execution_log.events.len());
944 {
945 let (reason_full, reason_inner, detail, at, expires_at) = assert_matches!(
946 &execution_log.events.get(2).unwrap(),
947 ExecutionEvent {
948 event: ExecutionEventInner::TemporarilyFailed {
949 reason_inner,
950 reason_full,
951 detail,
952 backoff_expires_at,
953 },
954 created_at: at,
955 }
956 => (reason_full, reason_inner, detail, *at, *backoff_expires_at)
957 );
958 assert_eq!(expected_reason, reason_inner.deref());
959 assert_eq!(
960 format!("activity trap: {expected_reason}"),
961 reason_full.deref()
962 );
963 assert_eq!(Some(expected_detail), detail.as_deref());
964 assert_eq!(at, sim_clock.now());
965 assert_eq!(sim_clock.now() + retry_exp_backoff, expires_at);
966 }
967 let worker = Arc::new(SimpleWorker::with_worker_results_rev(Arc::new(
968 std::sync::Mutex::new(IndexMap::from([(
969 Version::new(4),
970 (
971 vec![],
972 WorkerResult::Ok(SupportedFunctionReturnValue::None, Version::new(4)),
973 ),
974 )])),
975 )));
976 assert!(tick_fn(
978 exec_config.clone(),
979 sim_clock.clone(),
980 db_pool.clone(),
981 worker.clone(),
982 sim_clock.now(),
983 )
984 .await
985 .executions
986 .is_empty());
987 sim_clock.move_time_forward(retry_exp_backoff).await;
989 tick_fn(
990 exec_config,
991 sim_clock.clone(),
992 db_pool.clone(),
993 worker,
994 sim_clock.now(),
995 )
996 .await;
997 let execution_log = {
998 let db_connection = db_pool.connection();
999 db_connection
1000 .get(&execution_log.execution_id)
1001 .await
1002 .unwrap()
1003 };
1004 debug!(now = %sim_clock.now(), "Execution history after second tick: {execution_log:?}");
1005 assert_matches!(
1006 execution_log.events.get(3).unwrap(),
1007 ExecutionEvent {
1008 event: ExecutionEventInner::Locked { .. },
1009 created_at: at
1010 } if *at == sim_clock.now()
1011 );
1012 assert_matches!(
1013 execution_log.events.get(4).unwrap(),
1014 ExecutionEvent {
1015 event: ExecutionEventInner::Finished {
1016 result: Ok(SupportedFunctionReturnValue::None),
1017 },
1018 created_at: finished_at,
1019 } if *finished_at == sim_clock.now()
1020 );
1021 db_pool.close().await.unwrap();
1022 }
1023
1024 #[tokio::test]
1025 async fn activity_trap_should_not_be_retried_if_no_retries_are_set() {
1026 set_up();
1027 let created_at = Now.now();
1028 let clock_fn = ConstClock(created_at);
1029 let (_guard, db_pool) = Database::Memory.set_up().await;
1030 let exec_config = ExecConfig {
1031 batch_size: 1,
1032 lock_expiry: Duration::from_secs(1),
1033 tick_sleep: Duration::ZERO,
1034 component_id: ComponentId::dummy_activity(),
1035 task_limiter: None,
1036 };
1037
1038 let expected_reason = "error reason";
1039 let expected_detail = "error detail";
1040 let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
1041 WorkerError::ActivityTrap {
1042 reason: expected_reason.to_string(),
1043 trap_kind: concepts::TrapKind::Trap,
1044 detail: expected_detail.to_string(),
1045 version: Version::new(2),
1046 },
1047 )));
1048 let execution_log = create_and_tick(
1049 CreateAndTickConfig {
1050 execution_id: ExecutionId::generate(),
1051 created_at,
1052 max_retries: 0,
1053 executed_at: created_at,
1054 retry_exp_backoff: Duration::ZERO,
1055 },
1056 clock_fn,
1057 db_pool.clone(),
1058 exec_config.clone(),
1059 worker,
1060 tick_fn,
1061 )
1062 .await;
1063 assert_eq!(3, execution_log.events.len());
1064 let (reason, kind, detail) = assert_matches!(
1065 &execution_log.events.get(2).unwrap(),
1066 ExecutionEvent {
1067 event: ExecutionEventInner::Finished{
1068 result: Err(FinishedExecutionError::PermanentFailure{reason_inner, kind, detail, reason_full:_})
1069 },
1070 created_at: at,
1071 } if *at == created_at
1072 => (reason_inner, kind, detail)
1073 );
1074 assert_eq!(expected_reason, *reason);
1075 assert_eq!(Some(expected_detail), detail.as_deref());
1076 assert_eq!(PermanentFailureKind::ActivityTrap, *kind);
1077
1078 db_pool.close().await.unwrap();
1079 }
1080
1081 #[tokio::test]
1082 async fn child_execution_permanently_failed_should_notify_parent_permanent_failure() {
1083 let worker_error = WorkerError::ActivityTrap {
1084 reason: "error reason".to_string(),
1085 trap_kind: TrapKind::Trap,
1086 detail: "detail".to_string(),
1087 version: Version::new(2),
1088 };
1089 let expected_child_err = FinishedExecutionError::PermanentFailure {
1090 reason_full: "activity trap: error reason".to_string(),
1091 reason_inner: "error reason".to_string(),
1092 kind: PermanentFailureKind::ActivityTrap,
1093 detail: Some("detail".to_string()),
1094 };
1095 child_execution_permanently_failed_should_notify_parent(worker_error, expected_child_err)
1096 .await;
1097 }
1098
1099 #[tokio::test]
1100 async fn child_execution_permanently_failed_should_notify_parent_timeout() {
1101 let worker_error = WorkerError::TemporaryTimeout;
1102 let expected_child_err = FinishedExecutionError::PermanentTimeout;
1103 child_execution_permanently_failed_should_notify_parent(worker_error, expected_child_err)
1104 .await;
1105 }
1106
1107 #[tokio::test]
1108 async fn child_execution_permanently_failed_should_notify_parent_unhandled_child() {
1109 let parent_id = ExecutionId::from_parts(1, 1);
1110 let join_set_id_outer =
1111 JoinSetId::new(JoinSetKind::OneOff, StrVariant::Static("outer")).unwrap();
1112 let root_cause_id = parent_id.next_level(&join_set_id_outer);
1113 let join_set_id_inner =
1114 JoinSetId::new(JoinSetKind::OneOff, StrVariant::Static("inner")).unwrap();
1115 let child_execution_id = root_cause_id.next_level(&join_set_id_inner);
1116 let worker_error = WorkerError::FatalError(
1117 FatalError::UnhandledChildExecutionError {
1118 child_execution_id: child_execution_id.clone(),
1119 root_cause_id: root_cause_id.clone(),
1120 },
1121 Version::new(2),
1122 );
1123 let expected_child_err = FinishedExecutionError::UnhandledChildExecutionError {
1124 child_execution_id,
1125 root_cause_id,
1126 };
1127 child_execution_permanently_failed_should_notify_parent(worker_error, expected_child_err)
1128 .await;
1129 }
1130
1131 #[expect(clippy::too_many_lines)]
1132 async fn child_execution_permanently_failed_should_notify_parent(
1133 worker_error: WorkerError,
1134 expected_child_err: FinishedExecutionError,
1135 ) {
1136 use concepts::storage::JoinSetResponseEventOuter;
1137 const LOCK_EXPIRY: Duration = Duration::from_secs(1);
1138
1139 set_up();
1140 let sim_clock = SimClock::default();
1141 let (_guard, db_pool) = Database::Memory.set_up().await;
1142
1143 let parent_worker = Arc::new(SimpleWorker::with_single_result(
1144 WorkerResult::DbUpdatedByWorker,
1145 ));
1146 let parent_execution_id = ExecutionId::generate();
1147 db_pool
1148 .connection()
1149 .create(CreateRequest {
1150 created_at: sim_clock.now(),
1151 execution_id: parent_execution_id.clone(),
1152 ffqn: FFQN_SOME,
1153 params: Params::empty(),
1154 parent: None,
1155 metadata: concepts::ExecutionMetadata::empty(),
1156 scheduled_at: sim_clock.now(),
1157 retry_exp_backoff: Duration::ZERO,
1158 max_retries: 0,
1159 component_id: ComponentId::dummy_activity(),
1160 scheduled_by: None,
1161 })
1162 .await
1163 .unwrap();
1164 tick_fn(
1165 ExecConfig {
1166 batch_size: 1,
1167 lock_expiry: LOCK_EXPIRY,
1168 tick_sleep: Duration::ZERO,
1169 component_id: ComponentId::dummy_activity(),
1170 task_limiter: None,
1171 },
1172 sim_clock.clone(),
1173 db_pool.clone(),
1174 parent_worker,
1175 sim_clock.now(),
1176 )
1177 .await;
1178
1179 let join_set_id = JoinSetId::new(JoinSetKind::OneOff, StrVariant::empty()).unwrap();
1180 let child_execution_id = parent_execution_id.next_level(&join_set_id);
1181 {
1183 let child = CreateRequest {
1184 created_at: sim_clock.now(),
1185 execution_id: ExecutionId::Derived(child_execution_id.clone()),
1186 ffqn: FFQN_CHILD,
1187 params: Params::empty(),
1188 parent: Some((parent_execution_id.clone(), join_set_id.clone())),
1189 metadata: concepts::ExecutionMetadata::empty(),
1190 scheduled_at: sim_clock.now(),
1191 retry_exp_backoff: Duration::ZERO,
1192 max_retries: 0,
1193 component_id: ComponentId::dummy_activity(),
1194 scheduled_by: None,
1195 };
1196 let current_time = sim_clock.now();
1197 let join_set = AppendRequest {
1198 created_at: current_time,
1199 event: ExecutionEventInner::HistoryEvent {
1200 event: HistoryEvent::JoinSetCreate {
1201 join_set_id: join_set_id.clone(),
1202 closing_strategy: ClosingStrategy::Complete,
1203 },
1204 },
1205 };
1206 let child_exec_req = AppendRequest {
1207 created_at: current_time,
1208 event: ExecutionEventInner::HistoryEvent {
1209 event: HistoryEvent::JoinSetRequest {
1210 join_set_id: join_set_id.clone(),
1211 request: JoinSetRequest::ChildExecutionRequest {
1212 child_execution_id: child_execution_id.clone(),
1213 },
1214 },
1215 },
1216 };
1217 let join_next = AppendRequest {
1218 created_at: current_time,
1219 event: ExecutionEventInner::HistoryEvent {
1220 event: HistoryEvent::JoinNext {
1221 join_set_id: join_set_id.clone(),
1222 run_expires_at: sim_clock.now(),
1223 closing: false,
1224 },
1225 },
1226 };
1227 db_pool
1228 .connection()
1229 .append_batch_create_new_execution(
1230 current_time,
1231 vec![join_set, child_exec_req, join_next],
1232 parent_execution_id.clone(),
1233 Version::new(2),
1234 vec![child],
1235 )
1236 .await
1237 .unwrap();
1238 }
1239
1240 let child_worker = Arc::new(
1241 SimpleWorker::with_single_result(WorkerResult::Err(worker_error)).with_ffqn(FFQN_CHILD),
1242 );
1243
1244 tick_fn(
1246 ExecConfig {
1247 batch_size: 1,
1248 lock_expiry: LOCK_EXPIRY,
1249 tick_sleep: Duration::ZERO,
1250 component_id: ComponentId::dummy_activity(),
1251 task_limiter: None,
1252 },
1253 sim_clock.clone(),
1254 db_pool.clone(),
1255 child_worker,
1256 sim_clock.now(),
1257 )
1258 .await;
1259 if expected_child_err == FinishedExecutionError::PermanentTimeout {
1260 sim_clock.move_time_forward(LOCK_EXPIRY).await;
1262 expired_timers_watcher::tick(&db_pool.connection(), sim_clock.now())
1263 .await
1264 .unwrap();
1265 }
1266 let child_log = db_pool
1267 .connection()
1268 .get(&ExecutionId::Derived(child_execution_id.clone()))
1269 .await
1270 .unwrap();
1271 assert!(child_log.pending_state.is_finished());
1272 assert_eq!(
1273 Version(2),
1274 child_log.next_version,
1275 "created = 0, locked = 1, with_single_result = 2"
1276 );
1277 assert_eq!(
1278 ExecutionEventInner::Finished {
1279 result: Err(expected_child_err)
1280 },
1281 child_log.last_event().event
1282 );
1283 let parent_log = db_pool
1284 .connection()
1285 .get(&parent_execution_id)
1286 .await
1287 .unwrap();
1288 assert_matches!(
1289 parent_log.pending_state,
1290 PendingState::PendingAt {
1291 scheduled_at
1292 } if scheduled_at == sim_clock.now(),
1293 "parent should be back to pending"
1294 );
1295 let (found_join_set_id, found_child_execution_id, child_finished_version, found_result) = assert_matches!(
1296 parent_log.responses.last(),
1297 Some(JoinSetResponseEventOuter{
1298 created_at: at,
1299 event: JoinSetResponseEvent{
1300 join_set_id: found_join_set_id,
1301 event: JoinSetResponse::ChildExecutionFinished {
1302 child_execution_id: found_child_execution_id,
1303 finished_version,
1304 result: found_result,
1305 }
1306 }
1307 })
1308 if *at == sim_clock.now()
1309 => (found_join_set_id, found_child_execution_id, finished_version, found_result)
1310 );
1311 assert_eq!(join_set_id, *found_join_set_id);
1312 assert_eq!(child_execution_id, *found_child_execution_id);
1313 assert_eq!(child_log.next_version, *child_finished_version);
1314 assert!(found_result.is_err());
1315
1316 db_pool.close().await.unwrap();
1317 }
1318
1319 #[derive(Clone, Debug)]
1320 struct SleepyWorker {
1321 duration: Duration,
1322 result: SupportedFunctionReturnValue,
1323 exported: [FunctionMetadata; 1],
1324 }
1325
1326 #[async_trait]
1327 impl Worker for SleepyWorker {
1328 async fn run(&self, ctx: WorkerContext) -> WorkerResult {
1329 tokio::time::sleep(self.duration).await;
1330 WorkerResult::Ok(self.result.clone(), ctx.version)
1331 }
1332
1333 fn exported_functions(&self) -> &[FunctionMetadata] {
1334 &self.exported
1335 }
1336
1337 fn imported_functions(&self) -> &[FunctionMetadata] {
1338 &[]
1339 }
1340 }
1341
1342 #[expect(clippy::too_many_lines)]
1343 #[tokio::test]
1344 async fn hanging_lock_should_be_cleaned_and_execution_retried() {
1345 set_up();
1346 let sim_clock = SimClock::default();
1347 let (_guard, db_pool) = Database::Memory.set_up().await;
1348 let lock_expiry = Duration::from_millis(100);
1349 let exec_config = ExecConfig {
1350 batch_size: 1,
1351 lock_expiry,
1352 tick_sleep: Duration::ZERO,
1353 component_id: ComponentId::dummy_activity(),
1354 task_limiter: None,
1355 };
1356
1357 let worker = Arc::new(SleepyWorker {
1358 duration: lock_expiry + Duration::from_millis(1), result: SupportedFunctionReturnValue::None,
1360 exported: [FunctionMetadata {
1361 ffqn: FFQN_SOME,
1362 parameter_types: ParameterTypes::default(),
1363 return_type: None,
1364 extension: None,
1365 submittable: true,
1366 }],
1367 });
1368 let execution_id = ExecutionId::generate();
1370 let timeout_duration = Duration::from_millis(300);
1371 let db_connection = db_pool.connection();
1372 db_connection
1373 .create(CreateRequest {
1374 created_at: sim_clock.now(),
1375 execution_id: execution_id.clone(),
1376 ffqn: FFQN_SOME,
1377 params: Params::empty(),
1378 parent: None,
1379 metadata: concepts::ExecutionMetadata::empty(),
1380 scheduled_at: sim_clock.now(),
1381 retry_exp_backoff: timeout_duration,
1382 max_retries: 1,
1383 component_id: ComponentId::dummy_activity(),
1384 scheduled_by: None,
1385 })
1386 .await
1387 .unwrap();
1388
1389 let ffqns = super::extract_ffqns(worker.as_ref());
1390 let executor = ExecTask::new(
1391 worker,
1392 exec_config,
1393 sim_clock.clone(),
1394 db_pool.clone(),
1395 ffqns,
1396 );
1397 let mut first_execution_progress = executor.tick(sim_clock.now()).await.unwrap();
1398 assert_eq!(1, first_execution_progress.executions.len());
1399 sim_clock.move_time_forward(lock_expiry).await;
1401 let now_after_first_lock_expiry = sim_clock.now();
1403 {
1404 debug!(now = %now_after_first_lock_expiry, "Expecting an expired lock");
1405 let cleanup_progress = executor.tick(now_after_first_lock_expiry).await.unwrap();
1406 assert!(cleanup_progress.executions.is_empty());
1407 }
1408 {
1409 let expired_locks =
1410 expired_timers_watcher::tick(&db_pool.connection(), now_after_first_lock_expiry)
1411 .await
1412 .unwrap()
1413 .expired_locks;
1414 assert_eq!(1, expired_locks);
1415 }
1416 assert!(!first_execution_progress
1417 .executions
1418 .pop()
1419 .unwrap()
1420 .1
1421 .is_finished());
1422
1423 let execution_log = db_connection.get(&execution_id).await.unwrap();
1424 let expected_first_timeout_expiry = now_after_first_lock_expiry + timeout_duration;
1425 assert_matches!(
1426 &execution_log.events.get(2).unwrap(),
1427 ExecutionEvent {
1428 event: ExecutionEventInner::TemporarilyTimedOut { backoff_expires_at },
1429 created_at: at,
1430 } if *at == now_after_first_lock_expiry && *backoff_expires_at == expected_first_timeout_expiry
1431 );
1432 assert_eq!(
1433 PendingState::PendingAt {
1434 scheduled_at: expected_first_timeout_expiry
1435 },
1436 execution_log.pending_state
1437 );
1438 sim_clock.move_time_forward(timeout_duration).await;
1439 let now_after_first_timeout = sim_clock.now();
1440 debug!(now = %now_after_first_timeout, "Second execution should hang again and result in a permanent timeout");
1441
1442 let mut second_execution_progress = executor.tick(now_after_first_timeout).await.unwrap();
1443 assert_eq!(1, second_execution_progress.executions.len());
1444
1445 sim_clock.move_time_forward(lock_expiry).await;
1447 let now_after_second_lock_expiry = sim_clock.now();
1449 debug!(now = %now_after_second_lock_expiry, "Expecting the second lock to be expired");
1450 {
1451 let cleanup_progress = executor.tick(now_after_second_lock_expiry).await.unwrap();
1452 assert!(cleanup_progress.executions.is_empty());
1453 }
1454 {
1455 let expired_locks =
1456 expired_timers_watcher::tick(&db_pool.connection(), now_after_second_lock_expiry)
1457 .await
1458 .unwrap()
1459 .expired_locks;
1460 assert_eq!(1, expired_locks);
1461 }
1462 assert!(!second_execution_progress
1463 .executions
1464 .pop()
1465 .unwrap()
1466 .1
1467 .is_finished());
1468
1469 drop(db_connection);
1470 drop(executor);
1471 db_pool.close().await.unwrap();
1472 }
1473}