1use crate::worker::{Worker, WorkerContext, WorkerError, WorkerResult};
2use assert_matches::assert_matches;
3use chrono::{DateTime, Utc};
4use concepts::storage::{
5 AppendRequest, DbPool, ExecutionLog, JoinSetResponseEvent, JoinSetResponseEventOuter,
6 LockedExecution,
7};
8use concepts::{prefixed_ulid::ExecutorId, ExecutionId, FunctionFqn};
9use concepts::{
10 storage::{DbConnection, DbError, ExecutionEventInner, JoinSetResponse, Version},
11 FinishedExecutionError,
12};
13use concepts::{ComponentId, FinishedExecutionResult, FunctionMetadata, StrVariant};
14use concepts::{JoinSetId, PermanentFailureKind};
15use std::marker::PhantomData;
16use std::{
17 sync::{
18 atomic::{AtomicBool, Ordering},
19 Arc,
20 },
21 time::Duration,
22};
23use tokio::task::{AbortHandle, JoinHandle};
24use tracing::{debug, error, info, info_span, instrument, trace, warn, Instrument, Level, Span};
25use utils::time::ClockFn;
26
27#[derive(Debug, Clone)]
28pub struct ExecConfig {
29 pub lock_expiry: Duration,
30 pub tick_sleep: Duration,
31 pub batch_size: u32,
32 pub component_id: ComponentId,
33 pub task_limiter: Option<Arc<tokio::sync::Semaphore>>,
34}
35
36pub struct ExecTask<C: ClockFn, DB: DbConnection, P: DbPool<DB>> {
37 worker: Arc<dyn Worker>,
38 config: ExecConfig,
39 clock_fn: C, db_pool: P,
41 executor_id: ExecutorId,
42 phantom_data: PhantomData<DB>,
43 ffqns: Arc<[FunctionFqn]>,
44}
45
46#[derive(derive_more::Debug, Default)]
47pub struct ExecutionProgress {
48 #[debug(skip)]
49 #[allow(dead_code)]
50 executions: Vec<(ExecutionId, JoinHandle<()>)>,
51}
52
53impl ExecutionProgress {
54 #[cfg(feature = "test")]
55 pub async fn wait_for_tasks(self) -> Result<usize, tokio::task::JoinError> {
56 let execs = self.executions.len();
57 for (_, handle) in self.executions {
58 handle.await?;
59 }
60 Ok(execs)
61 }
62}
63
64#[derive(derive_more::Debug)]
65pub struct ExecutorTaskHandle {
66 #[debug(skip)]
67 is_closing: Arc<AtomicBool>,
68 #[debug(skip)]
69 abort_handle: AbortHandle,
70 component_id: ComponentId,
71 executor_id: ExecutorId,
72}
73
74impl ExecutorTaskHandle {
75 #[instrument(level = Level::DEBUG, name = "executor.close", skip_all, fields(executor_id= %self.executor_id, component_id=%self.component_id))]
76 pub async fn close(&self) {
77 trace!("Gracefully closing");
78 self.is_closing.store(true, Ordering::Relaxed);
79 while !self.abort_handle.is_finished() {
80 tokio::time::sleep(Duration::from_millis(1)).await;
81 }
82 debug!("Gracefully closed");
83 }
84}
85
86impl Drop for ExecutorTaskHandle {
87 #[instrument(level = Level::DEBUG, name = "executor.drop", skip_all, fields(executor_id= %self.executor_id, component_id=%self.component_id))]
88 fn drop(&mut self) {
89 if self.abort_handle.is_finished() {
90 return;
91 }
92 warn!(executor_id= %self.executor_id, component_id=%self.component_id, "Aborting the executor task");
93 self.abort_handle.abort();
94 }
95}
96
97pub(crate) fn extract_ffqns(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
98 worker
99 .exported_functions()
100 .iter()
101 .map(|FunctionMetadata { ffqn, .. }| ffqn.clone())
102 .collect::<Arc<_>>()
103}
104
105impl<C: ClockFn + 'static, DB: DbConnection + 'static, P: DbPool<DB> + 'static> ExecTask<C, DB, P> {
106 #[cfg(feature = "test")]
107 pub fn new(
108 worker: Arc<dyn Worker>,
109 config: ExecConfig,
110 clock_fn: C,
111 db_pool: P,
112 ffqns: Arc<[FunctionFqn]>,
113 ) -> Self {
114 Self {
115 worker,
116 config,
117 executor_id: ExecutorId::generate(),
118 db_pool,
119 phantom_data: PhantomData,
120 ffqns,
121 clock_fn,
122 }
123 }
124
125 pub fn spawn_new(
126 worker: Arc<dyn Worker>,
127 config: ExecConfig,
128 clock_fn: C,
129 db_pool: P,
130 executor_id: ExecutorId,
131 ) -> ExecutorTaskHandle {
132 let is_closing = Arc::new(AtomicBool::default());
133 let is_closing_inner = is_closing.clone();
134 let ffqns = extract_ffqns(worker.as_ref());
135 let component_id = config.component_id.clone();
136 let abort_handle = tokio::spawn(async move {
137 debug!(%executor_id, component_id = %config.component_id, "Spawned executor");
138 let task = Self {
139 worker,
140 config,
141 executor_id,
142 db_pool,
143 phantom_data: PhantomData,
144 ffqns: ffqns.clone(),
145 clock_fn: clock_fn.clone(),
146 };
147 loop {
148 let _ = task.tick(clock_fn.now()).await;
149 let executed_at = clock_fn.now();
150 task.db_pool
151 .connection()
152 .wait_for_pending(executed_at, ffqns.clone(), task.config.tick_sleep)
153 .await;
154 if is_closing_inner.load(Ordering::Relaxed) {
155 return;
156 }
157 }
158 })
159 .abort_handle();
160 ExecutorTaskHandle {
161 is_closing,
162 abort_handle,
163 component_id,
164 executor_id,
165 }
166 }
167
168 fn acquire_task_permits(&self) -> Vec<Option<tokio::sync::OwnedSemaphorePermit>> {
169 if let Some(task_limiter) = &self.config.task_limiter {
170 let mut locks = Vec::new();
171 for _ in 0..self.config.batch_size {
172 if let Ok(permit) = task_limiter.clone().try_acquire_owned() {
173 locks.push(Some(permit));
174 } else {
175 break;
176 }
177 }
178 locks
179 } else {
180 let mut vec = Vec::with_capacity(self.config.batch_size as usize);
181 for _ in 0..self.config.batch_size {
182 vec.push(None);
183 }
184 vec
185 }
186 }
187
188 #[cfg(feature = "test")]
189 pub async fn tick2(&self, executed_at: DateTime<Utc>) -> Result<ExecutionProgress, ()> {
190 self.tick(executed_at).await
191 }
192
193 #[instrument(level = Level::TRACE, name = "executor.tick" skip_all, fields(executor_id = %self.executor_id, component_id = %self.config.component_id))]
194 async fn tick(&self, executed_at: DateTime<Utc>) -> Result<ExecutionProgress, ()> {
195 let locked_executions = {
196 let mut permits = self.acquire_task_permits();
197 if permits.is_empty() {
198 return Ok(ExecutionProgress::default());
199 }
200 let db_connection = self.db_pool.connection();
201 let lock_expires_at = executed_at + self.config.lock_expiry;
202 let locked_executions = db_connection
203 .lock_pending(
204 permits.len(), executed_at, self.ffqns.clone(),
207 executed_at, self.config.component_id.clone(),
209 self.executor_id,
210 lock_expires_at,
211 )
212 .await
213 .map_err(|err| {
214 warn!(executor_id = %self.executor_id, component_id = %self.config.component_id, "lock_pending error {err:?}");
215 })?;
216 while permits.len() > locked_executions.len() {
218 permits.pop();
219 }
220 assert_eq!(permits.len(), locked_executions.len());
221 locked_executions.into_iter().zip(permits)
222 };
223 let execution_deadline = executed_at + self.config.lock_expiry;
224
225 let mut executions = Vec::with_capacity(locked_executions.len());
226 for (locked_execution, permit) in locked_executions {
227 let execution_id = locked_execution.execution_id.clone();
228 let join_handle = {
229 let worker = self.worker.clone();
230 let db_pool = self.db_pool.clone();
231 let clock_fn = self.clock_fn.clone();
232 let run_id = locked_execution.run_id;
233 let worker_span = info_span!(parent: None, "worker",
234 %execution_id, %run_id, ffqn = %locked_execution.ffqn, executor_id = %self.executor_id, component_id = %self.config.component_id);
235 locked_execution.metadata.enrich(&worker_span);
236 tokio::spawn({
237 let worker_span2 = worker_span.clone();
238 async move {
239 let res = Self::run_worker(
240 worker,
241 &db_pool,
242 execution_deadline,
243 clock_fn,
244 locked_execution,
245 worker_span2,
246 )
247 .await;
248 if let Err(db_error) = res {
249 error!("Execution will be timed out not writing `{db_error:?}`");
250 }
251 drop(permit);
252 }
253 .instrument(worker_span)
254 })
255 };
256 executions.push((execution_id, join_handle));
257 }
258 Ok(ExecutionProgress { executions })
259 }
260
261 async fn run_worker(
262 worker: Arc<dyn Worker>,
263 db_pool: &P,
264 execution_deadline: DateTime<Utc>,
265 clock_fn: C,
266 locked_execution: LockedExecution,
267 worker_span: Span,
268 ) -> Result<(), DbError> {
269 debug!("Worker::run starting");
270 trace!(
271 version = %locked_execution.version,
272 params = ?locked_execution.params,
273 event_history = ?locked_execution.event_history,
274 "Worker::run starting"
275 );
276 let can_be_retried = ExecutionLog::can_be_retried_after(
277 locked_execution.temporary_event_count + 1,
278 locked_execution.max_retries,
279 locked_execution.retry_exp_backoff,
280 );
281 let unlock_expiry_on_limit_reached =
282 ExecutionLog::compute_retry_duration_when_retrying_forever(
283 locked_execution.temporary_event_count + 1,
284 locked_execution.retry_exp_backoff,
285 );
286 let ctx = WorkerContext {
287 execution_id: locked_execution.execution_id.clone(),
288 metadata: locked_execution.metadata,
289 ffqn: locked_execution.ffqn,
290 params: locked_execution.params,
291 event_history: locked_execution.event_history,
292 responses: locked_execution
293 .responses
294 .into_iter()
295 .map(|outer| outer.event)
296 .collect(),
297 version: locked_execution.version,
298 execution_deadline,
299 can_be_retried: can_be_retried.is_some(),
300 run_id: locked_execution.run_id,
301 worker_span,
302 };
303 let worker_result = worker.run(ctx).await;
304 trace!(?worker_result, "Worker::run finished");
305 let result_obtained_at = clock_fn.now();
306 match Self::worker_result_to_execution_event(
307 locked_execution.execution_id,
308 worker_result,
309 result_obtained_at,
310 locked_execution.parent,
311 can_be_retried,
312 unlock_expiry_on_limit_reached,
313 )? {
314 Some(append) => {
315 let db_connection = db_pool.connection();
316 trace!("Appending {append:?}");
317 append.clone().append(&db_connection).await
318 }
319 None => Ok(()),
320 }
321 }
322
323 #[expect(clippy::too_many_lines)]
326 fn worker_result_to_execution_event(
327 execution_id: ExecutionId,
328 worker_result: WorkerResult,
329 result_obtained_at: DateTime<Utc>,
330 parent: Option<(ExecutionId, JoinSetId)>,
331 can_be_retried: Option<Duration>,
332 unlock_expiry_on_limit_reached: Duration,
333 ) -> Result<Option<Append>, DbError> {
334 Ok(match worker_result {
335 WorkerResult::Ok(result, new_version) => {
336 info!(
337 "Execution finished: {}",
338 result.as_pending_state_finished_result()
339 );
340 let child_finished =
341 parent.map(
342 |(parent_execution_id, parent_join_set)| ChildFinishedResponse {
343 parent_execution_id,
344 parent_join_set,
345 result: Ok(result.clone()),
346 },
347 );
348 let primary_event = AppendRequest {
349 created_at: result_obtained_at,
350 event: ExecutionEventInner::Finished { result: Ok(result) },
351 };
352
353 Some(Append {
354 created_at: result_obtained_at,
355 primary_event,
356 execution_id,
357 version: new_version,
358 child_finished,
359 })
360 }
361 WorkerResult::DbUpdatedByWorker => None,
362 WorkerResult::Err(err) => {
363 let reason = err.to_string();
364 let (primary_event, child_finished, version) = match err {
365 WorkerError::TemporaryTimeout => {
366 info!("Temporary timeout");
367 return Ok(None);
369 }
370 WorkerError::DbError(db_error) => {
371 return Err(db_error);
372 }
373 WorkerError::ActivityTrap {
374 reason: reason_inner,
375 trap_kind,
376 detail,
377 version,
378 } => {
379 if let Some(duration) = can_be_retried {
380 let expires_at = result_obtained_at + duration;
381 debug!("Retrying activity {trap_kind} execution after {duration:?} at {expires_at}");
382 (
383 ExecutionEventInner::TemporarilyFailed {
384 backoff_expires_at: expires_at,
385 reason_full: StrVariant::from(reason),
386 reason_inner: StrVariant::from(reason_inner),
387 detail: Some(detail),
388 },
389 None,
390 version,
391 )
392 } else {
393 info!(
394 "Activity {trap_kind} marked as permanent failure - {reason_inner}"
395 );
396 let result = Err(FinishedExecutionError::PermanentFailure {
397 reason_inner,
398 reason_full: reason,
399 kind: PermanentFailureKind::ActivityTrap,
400 detail: Some(detail),
401 });
402 let child_finished =
403 parent.map(|(parent_execution_id, parent_join_set)| {
404 ChildFinishedResponse {
405 parent_execution_id,
406 parent_join_set,
407 result: result.clone(),
408 }
409 });
410 (
411 ExecutionEventInner::Finished { result },
412 child_finished,
413 version,
414 )
415 }
416 }
417 WorkerError::ActivityReturnedError { detail, version } => {
418 let duration = can_be_retried.expect(
419 "ActivityReturnedError must not be returned when retries are exhausted",
420 );
421 let expires_at = result_obtained_at + duration;
422 debug!("Retrying ActivityReturnedError after {duration:?} at {expires_at}");
423 (
424 ExecutionEventInner::TemporarilyFailed {
425 backoff_expires_at: expires_at,
426 reason_full: StrVariant::Static("activity returned error"),
427 reason_inner: StrVariant::Static("activity returned error"),
428 detail,
429 },
430 None,
431 version,
432 )
433 }
434 WorkerError::TemporaryWorkflowTrap {
435 reason: reason_inner,
436 kind,
437 detail,
438 version,
439 } => {
440 let duration = can_be_retried.expect("workflows are retried forever");
441 let expires_at = result_obtained_at + duration;
442 debug!(
443 "Retrying workflow {kind} execution after {duration:?} at {expires_at}"
444 );
445 (
446 ExecutionEventInner::TemporarilyFailed {
447 backoff_expires_at: expires_at,
448 reason_full: StrVariant::from(reason),
449 reason_inner: StrVariant::from(reason_inner),
450 detail,
451 },
452 None,
453 version,
454 )
455 }
456 WorkerError::LimitReached {
457 reason: inner_reason,
458 version: new_version,
459 } => {
460 let expires_at = result_obtained_at + unlock_expiry_on_limit_reached;
461 warn!("Limit reached: {inner_reason}, unlocking after {unlock_expiry_on_limit_reached:?} at {expires_at}");
462 (
463 ExecutionEventInner::Unlocked {
464 backoff_expires_at: expires_at,
465 reason: StrVariant::from(reason),
466 },
467 None,
468 new_version,
469 )
470 }
471 WorkerError::FatalError(fatal_error, version) => {
472 info!("Fatal worker error - {fatal_error:?}");
473 let result = Err(FinishedExecutionError::from(fatal_error));
474 let child_finished =
475 parent.map(|(parent_execution_id, parent_join_set)| {
476 ChildFinishedResponse {
477 parent_execution_id,
478 parent_join_set,
479 result: result.clone(),
480 }
481 });
482 (
483 ExecutionEventInner::Finished { result },
484 child_finished,
485 version,
486 )
487 }
488 };
489 Some(Append {
490 created_at: result_obtained_at,
491 primary_event: AppendRequest {
492 created_at: result_obtained_at,
493 event: primary_event,
494 },
495 execution_id,
496 version,
497 child_finished,
498 })
499 }
500 })
501 }
502}
503
504#[derive(Debug, Clone)]
505pub(crate) struct ChildFinishedResponse {
506 pub(crate) parent_execution_id: ExecutionId,
507 pub(crate) parent_join_set: JoinSetId,
508 pub(crate) result: FinishedExecutionResult,
509}
510
511#[derive(Debug, Clone)]
512pub(crate) struct Append {
513 pub(crate) created_at: DateTime<Utc>,
514 pub(crate) primary_event: AppendRequest,
515 pub(crate) execution_id: ExecutionId,
516 pub(crate) version: Version,
517 pub(crate) child_finished: Option<ChildFinishedResponse>,
518}
519
520impl Append {
521 pub(crate) async fn append(self, db_connection: &impl DbConnection) -> Result<(), DbError> {
522 if let Some(child_finished) = self.child_finished {
523 assert_matches!(
524 &self.primary_event,
525 AppendRequest {
526 event: ExecutionEventInner::Finished { .. },
527 ..
528 }
529 );
530 let derived = assert_matches!(self.execution_id.clone(), ExecutionId::Derived(derived) => derived);
531 db_connection
532 .append_batch_respond_to_parent(
533 derived.clone(),
534 self.created_at,
535 vec![self.primary_event],
536 self.version.clone(),
537 child_finished.parent_execution_id,
538 JoinSetResponseEventOuter {
539 created_at: self.created_at,
540 event: JoinSetResponseEvent {
541 join_set_id: child_finished.parent_join_set,
542 event: JoinSetResponse::ChildExecutionFinished {
543 child_execution_id: derived,
544 finished_version: self.version,
546 result: child_finished.result,
547 },
548 },
549 },
550 )
551 .await?;
552 } else {
553 db_connection
554 .append_batch(
555 self.created_at,
556 vec![self.primary_event],
557 self.execution_id,
558 self.version,
559 )
560 .await?;
561 }
562 Ok(())
563 }
564}
565
566#[cfg(any(test, feature = "test"))]
567pub mod simple_worker {
568 use crate::worker::{Worker, WorkerContext, WorkerResult};
569 use async_trait::async_trait;
570 use concepts::{
571 storage::{HistoryEvent, Version},
572 FunctionFqn, FunctionMetadata, ParameterTypes,
573 };
574 use indexmap::IndexMap;
575 use std::sync::Arc;
576 use tracing::trace;
577
578 pub(crate) const FFQN_SOME: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn");
579 pub type SimpleWorkerResultMap =
580 Arc<std::sync::Mutex<IndexMap<Version, (Vec<HistoryEvent>, WorkerResult)>>>;
581
582 #[derive(Clone, Debug)]
583 pub struct SimpleWorker {
584 pub worker_results_rev: SimpleWorkerResultMap,
585 pub ffqn: FunctionFqn,
586 exported: [FunctionMetadata; 1],
587 }
588
589 impl SimpleWorker {
590 #[must_use]
591 pub fn with_single_result(res: WorkerResult) -> Self {
592 Self::with_worker_results_rev(Arc::new(std::sync::Mutex::new(IndexMap::from([(
593 Version::new(2),
594 (vec![], res),
595 )]))))
596 }
597
598 #[must_use]
599 pub fn with_ffqn(self, ffqn: FunctionFqn) -> Self {
600 Self {
601 worker_results_rev: self.worker_results_rev,
602 exported: [FunctionMetadata {
603 ffqn: ffqn.clone(),
604 parameter_types: ParameterTypes::default(),
605 return_type: None,
606 extension: None,
607 submittable: true,
608 }],
609 ffqn,
610 }
611 }
612
613 #[must_use]
614 pub fn with_worker_results_rev(worker_results_rev: SimpleWorkerResultMap) -> Self {
615 Self {
616 worker_results_rev,
617 ffqn: FFQN_SOME,
618 exported: [FunctionMetadata {
619 ffqn: FFQN_SOME,
620 parameter_types: ParameterTypes::default(),
621 return_type: None,
622 extension: None,
623 submittable: true,
624 }],
625 }
626 }
627 }
628
629 #[async_trait]
630 impl Worker for SimpleWorker {
631 async fn run(&self, ctx: WorkerContext) -> WorkerResult {
632 let (expected_version, (expected_eh, worker_result)) =
633 self.worker_results_rev.lock().unwrap().pop().unwrap();
634 trace!(%expected_version, version = %ctx.version, ?expected_eh, eh = ?ctx.event_history, "Running SimpleWorker");
635 assert_eq!(expected_version, ctx.version);
636 assert_eq!(expected_eh, ctx.event_history);
637 worker_result
638 }
639
640 fn exported_functions(&self) -> &[FunctionMetadata] {
641 &self.exported
642 }
643
644 fn imported_functions(&self) -> &[FunctionMetadata] {
645 &[]
646 }
647 }
648}
649
650#[cfg(test)]
651mod tests {
652 use self::simple_worker::SimpleWorker;
653 use super::*;
654 use crate::worker::FatalError;
655 use crate::{expired_timers_watcher, worker::WorkerResult};
656 use assert_matches::assert_matches;
657 use async_trait::async_trait;
658 use concepts::storage::{CreateRequest, JoinSetRequest};
659 use concepts::storage::{
660 DbConnection, ExecutionEvent, ExecutionEventInner, HistoryEvent, PendingState,
661 };
662 use concepts::{
663 FunctionMetadata, JoinSetKind, ParameterTypes, Params, StrVariant,
664 SupportedFunctionReturnValue, TrapKind,
665 };
666 use db_tests::Database;
667 use indexmap::IndexMap;
668 use simple_worker::FFQN_SOME;
669 use std::{fmt::Debug, future::Future, ops::Deref, sync::Arc};
670 use test_utils::set_up;
671 use test_utils::sim_clock::{ConstClock, SimClock};
672 use utils::time::Now;
673
674 pub(crate) const FFQN_CHILD: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn-child");
675
676 async fn tick_fn<
677 W: Worker + Debug,
678 C: ClockFn + 'static,
679 DB: DbConnection + 'static,
680 P: DbPool<DB> + 'static,
681 >(
682 config: ExecConfig,
683 clock_fn: C,
684 db_pool: P,
685 worker: Arc<W>,
686 executed_at: DateTime<Utc>,
687 ) -> ExecutionProgress {
688 trace!("Ticking with {worker:?}");
689 let ffqns = super::extract_ffqns(worker.as_ref());
690 let executor = ExecTask::new(worker, config, clock_fn, db_pool, ffqns);
691 let mut execution_progress = executor.tick(executed_at).await.unwrap();
692 loop {
693 execution_progress
694 .executions
695 .retain(|(_, abort_handle)| !abort_handle.is_finished());
696 if execution_progress.executions.is_empty() {
697 return execution_progress;
698 }
699 tokio::time::sleep(Duration::from_millis(10)).await;
700 }
701 }
702
703 #[tokio::test]
704 async fn execute_simple_lifecycle_tick_based_mem() {
705 let created_at = Now.now();
706 let (_guard, db_pool) = Database::Memory.set_up().await;
707 execute_simple_lifecycle_tick_based(db_pool.clone(), ConstClock(created_at)).await;
708 db_pool.close().await.unwrap();
709 }
710
711 #[cfg(not(madsim))]
712 #[tokio::test]
713 async fn execute_simple_lifecycle_tick_based_sqlite() {
714 let created_at = Now.now();
715 let (_guard, db_pool) = Database::Sqlite.set_up().await;
716 execute_simple_lifecycle_tick_based(db_pool.clone(), ConstClock(created_at)).await;
717 db_pool.close().await.unwrap();
718 }
719
720 async fn execute_simple_lifecycle_tick_based<
721 DB: DbConnection + 'static,
722 P: DbPool<DB> + 'static,
723 C: ClockFn + 'static,
724 >(
725 pool: P,
726 clock_fn: C,
727 ) {
728 set_up();
729 let created_at = clock_fn.now();
730 let exec_config = ExecConfig {
731 batch_size: 1,
732 lock_expiry: Duration::from_secs(1),
733 tick_sleep: Duration::from_millis(100),
734 component_id: ComponentId::dummy_activity(),
735 task_limiter: None,
736 };
737
738 let execution_log = create_and_tick(
739 CreateAndTickConfig {
740 execution_id: ExecutionId::generate(),
741 created_at,
742 max_retries: 0,
743 executed_at: created_at,
744 retry_exp_backoff: Duration::ZERO,
745 },
746 clock_fn,
747 pool,
748 exec_config,
749 Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
750 SupportedFunctionReturnValue::None,
751 Version::new(2),
752 ))),
753 tick_fn,
754 )
755 .await;
756 assert_matches!(
757 execution_log.events.get(2).unwrap(),
758 ExecutionEvent {
759 event: ExecutionEventInner::Finished {
760 result: Ok(SupportedFunctionReturnValue::None),
761 },
762 created_at: _,
763 }
764 );
765 }
766
767 #[tokio::test]
768 async fn stochastic_execute_simple_lifecycle_task_based_mem() {
769 set_up();
770 let created_at = Now.now();
771 let clock_fn = ConstClock(created_at);
772 let (_guard, db_pool) = Database::Memory.set_up().await;
773 let exec_config = ExecConfig {
774 batch_size: 1,
775 lock_expiry: Duration::from_secs(1),
776 tick_sleep: Duration::ZERO,
777 component_id: ComponentId::dummy_activity(),
778 task_limiter: None,
779 };
780
781 let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
782 SupportedFunctionReturnValue::None,
783 Version::new(2),
784 )));
785 let exec_task = ExecTask::spawn_new(
786 worker.clone(),
787 exec_config.clone(),
788 clock_fn,
789 db_pool.clone(),
790 ExecutorId::generate(),
791 );
792
793 let execution_log = create_and_tick(
794 CreateAndTickConfig {
795 execution_id: ExecutionId::generate(),
796 created_at,
797 max_retries: 0,
798 executed_at: created_at,
799 retry_exp_backoff: Duration::ZERO,
800 },
801 clock_fn,
802 db_pool.clone(),
803 exec_config,
804 worker,
805 |_, _, _, _, _| async {
806 tokio::time::sleep(Duration::from_secs(1)).await; ExecutionProgress::default()
808 },
809 )
810 .await;
811 exec_task.close().await;
812 db_pool.close().await.unwrap();
813 assert_matches!(
814 execution_log.events.get(2).unwrap(),
815 ExecutionEvent {
816 event: ExecutionEventInner::Finished {
817 result: Ok(SupportedFunctionReturnValue::None),
818 },
819 created_at: _,
820 }
821 );
822 }
823
824 struct CreateAndTickConfig {
825 execution_id: ExecutionId,
826 created_at: DateTime<Utc>,
827 max_retries: u32,
828 executed_at: DateTime<Utc>,
829 retry_exp_backoff: Duration,
830 }
831
832 async fn create_and_tick<
833 W: Worker,
834 C: ClockFn,
835 DB: DbConnection,
836 P: DbPool<DB>,
837 T: FnMut(ExecConfig, C, P, Arc<W>, DateTime<Utc>) -> F,
838 F: Future<Output = ExecutionProgress>,
839 >(
840 config: CreateAndTickConfig,
841 clock_fn: C,
842 db_pool: P,
843 exec_config: ExecConfig,
844 worker: Arc<W>,
845 mut tick: T,
846 ) -> ExecutionLog {
847 let db_connection = db_pool.connection();
849 db_connection
850 .create(CreateRequest {
851 created_at: config.created_at,
852 execution_id: config.execution_id.clone(),
853 ffqn: FFQN_SOME,
854 params: Params::empty(),
855 parent: None,
856 metadata: concepts::ExecutionMetadata::empty(),
857 scheduled_at: config.created_at,
858 retry_exp_backoff: config.retry_exp_backoff,
859 max_retries: config.max_retries,
860 component_id: ComponentId::dummy_activity(),
861 scheduled_by: None,
862 })
863 .await
864 .unwrap();
865 tick(exec_config, clock_fn, db_pool, worker, config.executed_at).await;
867 let execution_log = db_connection.get(&config.execution_id).await.unwrap();
868 debug!("Execution history after tick: {execution_log:?}");
869 assert_matches!(
871 execution_log.events.first().unwrap(),
872 ExecutionEvent {
873 event: ExecutionEventInner::Created { .. },
874 created_at: actually_created_at,
875 }
876 if config.created_at == *actually_created_at
877 );
878 let locked_at = assert_matches!(
879 execution_log.events.get(1).unwrap(),
880 ExecutionEvent {
881 event: ExecutionEventInner::Locked { .. },
882 created_at: locked_at
883 } if config.created_at <= *locked_at
884 => *locked_at
885 );
886 assert_matches!(execution_log.events.get(2).unwrap(), ExecutionEvent {
887 event: _,
888 created_at: executed_at,
889 } if *executed_at >= locked_at);
890 execution_log
891 }
892
893 #[expect(clippy::too_many_lines)]
894 #[tokio::test]
895 async fn activity_trap_should_trigger_an_execution_retry() {
896 set_up();
897 let sim_clock = SimClock::default();
898 let (_guard, db_pool) = Database::Memory.set_up().await;
899 let exec_config = ExecConfig {
900 batch_size: 1,
901 lock_expiry: Duration::from_secs(1),
902 tick_sleep: Duration::ZERO,
903 component_id: ComponentId::dummy_activity(),
904 task_limiter: None,
905 };
906 let expected_reason = "error reason";
907 let expected_detail = "error detail";
908 let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
909 WorkerError::ActivityTrap {
910 reason: expected_reason.to_string(),
911 trap_kind: concepts::TrapKind::Trap,
912 detail: expected_detail.to_string(),
913 version: Version::new(2),
914 },
915 )));
916 let retry_exp_backoff = Duration::from_millis(100);
917 debug!(now = %sim_clock.now(), "Creating an execution that should fail");
918 let execution_log = create_and_tick(
919 CreateAndTickConfig {
920 execution_id: ExecutionId::generate(),
921 created_at: sim_clock.now(),
922 max_retries: 1,
923 executed_at: sim_clock.now(),
924 retry_exp_backoff,
925 },
926 sim_clock.clone(),
927 db_pool.clone(),
928 exec_config.clone(),
929 worker,
930 tick_fn,
931 )
932 .await;
933 assert_eq!(3, execution_log.events.len());
934 {
935 let (reason_full, reason_inner, detail, at, expires_at) = assert_matches!(
936 &execution_log.events.get(2).unwrap(),
937 ExecutionEvent {
938 event: ExecutionEventInner::TemporarilyFailed {
939 reason_inner,
940 reason_full,
941 detail,
942 backoff_expires_at,
943 },
944 created_at: at,
945 }
946 => (reason_full, reason_inner, detail, *at, *backoff_expires_at)
947 );
948 assert_eq!(expected_reason, reason_inner.deref());
949 assert_eq!(
950 format!("activity trap: {expected_reason}"),
951 reason_full.deref()
952 );
953 assert_eq!(Some(expected_detail), detail.as_deref());
954 assert_eq!(at, sim_clock.now());
955 assert_eq!(sim_clock.now() + retry_exp_backoff, expires_at);
956 }
957 let worker = Arc::new(SimpleWorker::with_worker_results_rev(Arc::new(
958 std::sync::Mutex::new(IndexMap::from([(
959 Version::new(4),
960 (
961 vec![],
962 WorkerResult::Ok(SupportedFunctionReturnValue::None, Version::new(4)),
963 ),
964 )])),
965 )));
966 assert!(tick_fn(
968 exec_config.clone(),
969 sim_clock.clone(),
970 db_pool.clone(),
971 worker.clone(),
972 sim_clock.now(),
973 )
974 .await
975 .executions
976 .is_empty());
977 sim_clock.move_time_forward(retry_exp_backoff).await;
979 tick_fn(
980 exec_config,
981 sim_clock.clone(),
982 db_pool.clone(),
983 worker,
984 sim_clock.now(),
985 )
986 .await;
987 let execution_log = {
988 let db_connection = db_pool.connection();
989 db_connection
990 .get(&execution_log.execution_id)
991 .await
992 .unwrap()
993 };
994 debug!(now = %sim_clock.now(), "Execution history after second tick: {execution_log:?}");
995 assert_matches!(
996 execution_log.events.get(3).unwrap(),
997 ExecutionEvent {
998 event: ExecutionEventInner::Locked { .. },
999 created_at: at
1000 } if *at == sim_clock.now()
1001 );
1002 assert_matches!(
1003 execution_log.events.get(4).unwrap(),
1004 ExecutionEvent {
1005 event: ExecutionEventInner::Finished {
1006 result: Ok(SupportedFunctionReturnValue::None),
1007 },
1008 created_at: finished_at,
1009 } if *finished_at == sim_clock.now()
1010 );
1011 db_pool.close().await.unwrap();
1012 }
1013
1014 #[tokio::test]
1015 async fn activity_trap_should_not_be_retried_if_no_retries_are_set() {
1016 set_up();
1017 let created_at = Now.now();
1018 let clock_fn = ConstClock(created_at);
1019 let (_guard, db_pool) = Database::Memory.set_up().await;
1020 let exec_config = ExecConfig {
1021 batch_size: 1,
1022 lock_expiry: Duration::from_secs(1),
1023 tick_sleep: Duration::ZERO,
1024 component_id: ComponentId::dummy_activity(),
1025 task_limiter: None,
1026 };
1027
1028 let expected_reason = "error reason";
1029 let expected_detail = "error detail";
1030 let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
1031 WorkerError::ActivityTrap {
1032 reason: expected_reason.to_string(),
1033 trap_kind: concepts::TrapKind::Trap,
1034 detail: expected_detail.to_string(),
1035 version: Version::new(2),
1036 },
1037 )));
1038 let execution_log = create_and_tick(
1039 CreateAndTickConfig {
1040 execution_id: ExecutionId::generate(),
1041 created_at,
1042 max_retries: 0,
1043 executed_at: created_at,
1044 retry_exp_backoff: Duration::ZERO,
1045 },
1046 clock_fn,
1047 db_pool.clone(),
1048 exec_config.clone(),
1049 worker,
1050 tick_fn,
1051 )
1052 .await;
1053 assert_eq!(3, execution_log.events.len());
1054 let (reason, kind, detail) = assert_matches!(
1055 &execution_log.events.get(2).unwrap(),
1056 ExecutionEvent {
1057 event: ExecutionEventInner::Finished{
1058 result: Err(FinishedExecutionError::PermanentFailure{reason_inner, kind, detail, reason_full:_})
1059 },
1060 created_at: at,
1061 } if *at == created_at
1062 => (reason_inner, kind, detail)
1063 );
1064 assert_eq!(expected_reason, *reason);
1065 assert_eq!(Some(expected_detail), detail.as_deref());
1066 assert_eq!(PermanentFailureKind::ActivityTrap, *kind);
1067
1068 db_pool.close().await.unwrap();
1069 }
1070
1071 #[tokio::test]
1072 async fn child_execution_permanently_failed_should_notify_parent_permanent_failure() {
1073 let worker_error = WorkerError::ActivityTrap {
1074 reason: "error reason".to_string(),
1075 trap_kind: TrapKind::Trap,
1076 detail: "detail".to_string(),
1077 version: Version::new(2),
1078 };
1079 let expected_child_err = FinishedExecutionError::PermanentFailure {
1080 reason_full: "activity trap: error reason".to_string(),
1081 reason_inner: "error reason".to_string(),
1082 kind: PermanentFailureKind::ActivityTrap,
1083 detail: Some("detail".to_string()),
1084 };
1085 child_execution_permanently_failed_should_notify_parent(worker_error, expected_child_err)
1086 .await;
1087 }
1088
1089 #[tokio::test]
1090 async fn child_execution_permanently_failed_should_notify_parent_timeout() {
1091 let worker_error = WorkerError::TemporaryTimeout;
1092 let expected_child_err = FinishedExecutionError::PermanentTimeout;
1093 child_execution_permanently_failed_should_notify_parent(worker_error, expected_child_err)
1094 .await;
1095 }
1096
1097 #[tokio::test]
1098 async fn child_execution_permanently_failed_should_notify_parent_unhandled_child() {
1099 let parent_id = ExecutionId::from_parts(1, 1);
1100 let join_set_id_outer =
1101 JoinSetId::new(JoinSetKind::OneOff, StrVariant::Static("outer")).unwrap();
1102 let root_cause_id = parent_id.next_level(&join_set_id_outer);
1103 let join_set_id_inner =
1104 JoinSetId::new(JoinSetKind::OneOff, StrVariant::Static("inner")).unwrap();
1105 let child_execution_id = root_cause_id.next_level(&join_set_id_inner);
1106 let worker_error = WorkerError::FatalError(
1107 FatalError::UnhandledChildExecutionError {
1108 child_execution_id: child_execution_id.clone(),
1109 root_cause_id: root_cause_id.clone(),
1110 },
1111 Version::new(2),
1112 );
1113 let expected_child_err = FinishedExecutionError::UnhandledChildExecutionError {
1114 child_execution_id,
1115 root_cause_id,
1116 };
1117 child_execution_permanently_failed_should_notify_parent(worker_error, expected_child_err)
1118 .await;
1119 }
1120
1121 #[expect(clippy::too_many_lines)]
1122 async fn child_execution_permanently_failed_should_notify_parent(
1123 worker_error: WorkerError,
1124 expected_child_err: FinishedExecutionError,
1125 ) {
1126 use concepts::storage::JoinSetResponseEventOuter;
1127 const LOCK_EXPIRY: Duration = Duration::from_secs(1);
1128
1129 set_up();
1130 let sim_clock = SimClock::default();
1131 let (_guard, db_pool) = Database::Memory.set_up().await;
1132
1133 let parent_worker = Arc::new(SimpleWorker::with_single_result(
1134 WorkerResult::DbUpdatedByWorker,
1135 ));
1136 let parent_execution_id = ExecutionId::generate();
1137 db_pool
1138 .connection()
1139 .create(CreateRequest {
1140 created_at: sim_clock.now(),
1141 execution_id: parent_execution_id.clone(),
1142 ffqn: FFQN_SOME,
1143 params: Params::empty(),
1144 parent: None,
1145 metadata: concepts::ExecutionMetadata::empty(),
1146 scheduled_at: sim_clock.now(),
1147 retry_exp_backoff: Duration::ZERO,
1148 max_retries: 0,
1149 component_id: ComponentId::dummy_activity(),
1150 scheduled_by: None,
1151 })
1152 .await
1153 .unwrap();
1154 tick_fn(
1155 ExecConfig {
1156 batch_size: 1,
1157 lock_expiry: LOCK_EXPIRY,
1158 tick_sleep: Duration::ZERO,
1159 component_id: ComponentId::dummy_activity(),
1160 task_limiter: None,
1161 },
1162 sim_clock.clone(),
1163 db_pool.clone(),
1164 parent_worker,
1165 sim_clock.now(),
1166 )
1167 .await;
1168
1169 let join_set_id = JoinSetId::new(JoinSetKind::OneOff, StrVariant::empty()).unwrap();
1170 let child_execution_id = parent_execution_id.next_level(&join_set_id);
1171 {
1173 let child = CreateRequest {
1174 created_at: sim_clock.now(),
1175 execution_id: ExecutionId::Derived(child_execution_id.clone()),
1176 ffqn: FFQN_CHILD,
1177 params: Params::empty(),
1178 parent: Some((parent_execution_id.clone(), join_set_id.clone())),
1179 metadata: concepts::ExecutionMetadata::empty(),
1180 scheduled_at: sim_clock.now(),
1181 retry_exp_backoff: Duration::ZERO,
1182 max_retries: 0,
1183 component_id: ComponentId::dummy_activity(),
1184 scheduled_by: None,
1185 };
1186 let current_time = sim_clock.now();
1187 let join_set = AppendRequest {
1188 created_at: current_time,
1189 event: ExecutionEventInner::HistoryEvent {
1190 event: HistoryEvent::JoinSet {
1191 join_set_id: join_set_id.clone(),
1192 },
1193 },
1194 };
1195 let child_exec_req = AppendRequest {
1196 created_at: current_time,
1197 event: ExecutionEventInner::HistoryEvent {
1198 event: HistoryEvent::JoinSetRequest {
1199 join_set_id: join_set_id.clone(),
1200 request: JoinSetRequest::ChildExecutionRequest {
1201 child_execution_id: child_execution_id.clone(),
1202 },
1203 },
1204 },
1205 };
1206 let join_next = AppendRequest {
1207 created_at: current_time,
1208 event: ExecutionEventInner::HistoryEvent {
1209 event: HistoryEvent::JoinNext {
1210 join_set_id: join_set_id.clone(),
1211 run_expires_at: sim_clock.now(),
1212 closing: false,
1213 },
1214 },
1215 };
1216 db_pool
1217 .connection()
1218 .append_batch_create_new_execution(
1219 current_time,
1220 vec![join_set, child_exec_req, join_next],
1221 parent_execution_id.clone(),
1222 Version::new(2),
1223 vec![child],
1224 )
1225 .await
1226 .unwrap();
1227 }
1228
1229 let child_worker = Arc::new(
1230 SimpleWorker::with_single_result(WorkerResult::Err(worker_error)).with_ffqn(FFQN_CHILD),
1231 );
1232
1233 tick_fn(
1235 ExecConfig {
1236 batch_size: 1,
1237 lock_expiry: LOCK_EXPIRY,
1238 tick_sleep: Duration::ZERO,
1239 component_id: ComponentId::dummy_activity(),
1240 task_limiter: None,
1241 },
1242 sim_clock.clone(),
1243 db_pool.clone(),
1244 child_worker,
1245 sim_clock.now(),
1246 )
1247 .await;
1248 if expected_child_err == FinishedExecutionError::PermanentTimeout {
1249 sim_clock.move_time_forward(LOCK_EXPIRY).await;
1251 expired_timers_watcher::tick(db_pool.connection(), sim_clock.now())
1252 .await
1253 .unwrap();
1254 }
1255 let child_log = db_pool
1256 .connection()
1257 .get(&ExecutionId::Derived(child_execution_id.clone()))
1258 .await
1259 .unwrap();
1260 assert!(child_log.pending_state.is_finished());
1261 assert_eq!(
1262 Version(2),
1263 child_log.next_version,
1264 "created = 0, locked = 1, with_single_result = 2"
1265 );
1266 assert_eq!(
1267 ExecutionEventInner::Finished {
1268 result: Err(expected_child_err)
1269 },
1270 child_log.last_event().event
1271 );
1272 let parent_log = db_pool
1273 .connection()
1274 .get(&parent_execution_id)
1275 .await
1276 .unwrap();
1277 assert_matches!(
1278 parent_log.pending_state,
1279 PendingState::PendingAt {
1280 scheduled_at
1281 } if scheduled_at == sim_clock.now(),
1282 "parent should be back to pending"
1283 );
1284 let (found_join_set_id, found_child_execution_id, child_finished_version, found_result) = assert_matches!(
1285 parent_log.responses.last(),
1286 Some(JoinSetResponseEventOuter{
1287 created_at: at,
1288 event: JoinSetResponseEvent{
1289 join_set_id: found_join_set_id,
1290 event: JoinSetResponse::ChildExecutionFinished {
1291 child_execution_id: found_child_execution_id,
1292 finished_version,
1293 result: found_result,
1294 }
1295 }
1296 })
1297 if *at == sim_clock.now()
1298 => (found_join_set_id, found_child_execution_id, finished_version, found_result)
1299 );
1300 assert_eq!(join_set_id, *found_join_set_id);
1301 assert_eq!(child_execution_id, *found_child_execution_id);
1302 assert_eq!(child_log.next_version, *child_finished_version);
1303 assert!(found_result.is_err());
1304
1305 db_pool.close().await.unwrap();
1306 }
1307
1308 #[derive(Clone, Debug)]
1309 struct SleepyWorker {
1310 duration: Duration,
1311 result: SupportedFunctionReturnValue,
1312 exported: [FunctionMetadata; 1],
1313 }
1314
1315 #[async_trait]
1316 impl Worker for SleepyWorker {
1317 async fn run(&self, ctx: WorkerContext) -> WorkerResult {
1318 tokio::time::sleep(self.duration).await;
1319 WorkerResult::Ok(self.result.clone(), ctx.version)
1320 }
1321
1322 fn exported_functions(&self) -> &[FunctionMetadata] {
1323 &self.exported
1324 }
1325
1326 fn imported_functions(&self) -> &[FunctionMetadata] {
1327 &[]
1328 }
1329 }
1330
1331 #[expect(clippy::too_many_lines)]
1332 #[tokio::test]
1333 async fn hanging_lock_should_be_cleaned_and_execution_retried() {
1334 set_up();
1335 let sim_clock = SimClock::default();
1336 let (_guard, db_pool) = Database::Memory.set_up().await;
1337 let lock_expiry = Duration::from_millis(100);
1338 let exec_config = ExecConfig {
1339 batch_size: 1,
1340 lock_expiry,
1341 tick_sleep: Duration::ZERO,
1342 component_id: ComponentId::dummy_activity(),
1343 task_limiter: None,
1344 };
1345
1346 let worker = Arc::new(SleepyWorker {
1347 duration: lock_expiry + Duration::from_millis(1), result: SupportedFunctionReturnValue::None,
1349 exported: [FunctionMetadata {
1350 ffqn: FFQN_SOME,
1351 parameter_types: ParameterTypes::default(),
1352 return_type: None,
1353 extension: None,
1354 submittable: true,
1355 }],
1356 });
1357 let execution_id = ExecutionId::generate();
1359 let timeout_duration = Duration::from_millis(300);
1360 let db_connection = db_pool.connection();
1361 db_connection
1362 .create(CreateRequest {
1363 created_at: sim_clock.now(),
1364 execution_id: execution_id.clone(),
1365 ffqn: FFQN_SOME,
1366 params: Params::empty(),
1367 parent: None,
1368 metadata: concepts::ExecutionMetadata::empty(),
1369 scheduled_at: sim_clock.now(),
1370 retry_exp_backoff: timeout_duration,
1371 max_retries: 1,
1372 component_id: ComponentId::dummy_activity(),
1373 scheduled_by: None,
1374 })
1375 .await
1376 .unwrap();
1377
1378 let ffqns = super::extract_ffqns(worker.as_ref());
1379 let executor = ExecTask::new(
1380 worker,
1381 exec_config,
1382 sim_clock.clone(),
1383 db_pool.clone(),
1384 ffqns,
1385 );
1386 let mut first_execution_progress = executor.tick(sim_clock.now()).await.unwrap();
1387 assert_eq!(1, first_execution_progress.executions.len());
1388 sim_clock.move_time_forward(lock_expiry).await;
1390 let now_after_first_lock_expiry = sim_clock.now();
1392 {
1393 debug!(now = %now_after_first_lock_expiry, "Expecting an expired lock");
1394 let cleanup_progress = executor.tick(now_after_first_lock_expiry).await.unwrap();
1395 assert!(cleanup_progress.executions.is_empty());
1396 }
1397 {
1398 let expired_locks =
1399 expired_timers_watcher::tick(db_pool.connection(), now_after_first_lock_expiry)
1400 .await
1401 .unwrap()
1402 .expired_locks;
1403 assert_eq!(1, expired_locks);
1404 }
1405 assert!(!first_execution_progress
1406 .executions
1407 .pop()
1408 .unwrap()
1409 .1
1410 .is_finished());
1411
1412 let execution_log = db_connection.get(&execution_id).await.unwrap();
1413 let expected_first_timeout_expiry = now_after_first_lock_expiry + timeout_duration;
1414 assert_matches!(
1415 &execution_log.events.get(2).unwrap(),
1416 ExecutionEvent {
1417 event: ExecutionEventInner::TemporarilyTimedOut { backoff_expires_at },
1418 created_at: at,
1419 } if *at == now_after_first_lock_expiry && *backoff_expires_at == expected_first_timeout_expiry
1420 );
1421 assert_eq!(
1422 PendingState::PendingAt {
1423 scheduled_at: expected_first_timeout_expiry
1424 },
1425 execution_log.pending_state
1426 );
1427 sim_clock.move_time_forward(timeout_duration).await;
1428 let now_after_first_timeout = sim_clock.now();
1429 debug!(now = %now_after_first_timeout, "Second execution should hang again and result in a permanent timeout");
1430
1431 let mut second_execution_progress = executor.tick(now_after_first_timeout).await.unwrap();
1432 assert_eq!(1, second_execution_progress.executions.len());
1433
1434 sim_clock.move_time_forward(lock_expiry).await;
1436 let now_after_second_lock_expiry = sim_clock.now();
1438 debug!(now = %now_after_second_lock_expiry, "Expecting the second lock to be expired");
1439 {
1440 let cleanup_progress = executor.tick(now_after_second_lock_expiry).await.unwrap();
1441 assert!(cleanup_progress.executions.is_empty());
1442 }
1443 {
1444 let expired_locks =
1445 expired_timers_watcher::tick(db_pool.connection(), now_after_second_lock_expiry)
1446 .await
1447 .unwrap()
1448 .expired_locks;
1449 assert_eq!(1, expired_locks);
1450 }
1451 assert!(!second_execution_progress
1452 .executions
1453 .pop()
1454 .unwrap()
1455 .1
1456 .is_finished());
1457
1458 drop(db_connection);
1459 drop(executor);
1460 db_pool.close().await.unwrap();
1461 }
1462}