obeli_sk_executor/
executor.rs

1use crate::worker::{Worker, WorkerContext, WorkerError, WorkerResult};
2use assert_matches::assert_matches;
3use chrono::{DateTime, Utc};
4use concepts::storage::{
5    AppendRequest, DbPool, ExecutionLog, JoinSetResponseEvent, JoinSetResponseEventOuter,
6    LockedExecution,
7};
8use concepts::time::ClockFn;
9use concepts::{prefixed_ulid::ExecutorId, ExecutionId, FunctionFqn};
10use concepts::{
11    storage::{DbConnection, DbError, ExecutionEventInner, JoinSetResponse, Version},
12    FinishedExecutionError,
13};
14use concepts::{ComponentId, FinishedExecutionResult, FunctionMetadata, StrVariant};
15use concepts::{JoinSetId, PermanentFailureKind};
16use std::marker::PhantomData;
17use std::{
18    sync::{
19        atomic::{AtomicBool, Ordering},
20        Arc,
21    },
22    time::Duration,
23};
24use tokio::task::{AbortHandle, JoinHandle};
25use tracing::{debug, error, info, info_span, instrument, trace, warn, Instrument, Level, Span};
26
27#[derive(Debug, Clone)]
28pub struct ExecConfig {
29    pub lock_expiry: Duration,
30    pub tick_sleep: Duration,
31    pub batch_size: u32,
32    pub component_id: ComponentId,
33    pub task_limiter: Option<Arc<tokio::sync::Semaphore>>,
34}
35
36pub struct ExecTask<C: ClockFn, DB: DbConnection, P: DbPool<DB>> {
37    worker: Arc<dyn Worker>,
38    config: ExecConfig,
39    clock_fn: C, // Used for obtaining current time when the execution finishes.
40    db_pool: P,
41    executor_id: ExecutorId,
42    phantom_data: PhantomData<DB>,
43    ffqns: Arc<[FunctionFqn]>,
44}
45
46#[derive(derive_more::Debug, Default)]
47pub struct ExecutionProgress {
48    #[debug(skip)]
49    #[allow(dead_code)]
50    executions: Vec<(ExecutionId, JoinHandle<()>)>,
51}
52
53impl ExecutionProgress {
54    #[cfg(feature = "test")]
55    pub async fn wait_for_tasks(self) -> Result<usize, tokio::task::JoinError> {
56        let execs = self.executions.len();
57        for (_, handle) in self.executions {
58            handle.await?;
59        }
60        Ok(execs)
61    }
62}
63
64#[derive(derive_more::Debug)]
65pub struct ExecutorTaskHandle {
66    #[debug(skip)]
67    is_closing: Arc<AtomicBool>,
68    #[debug(skip)]
69    abort_handle: AbortHandle,
70    component_id: ComponentId,
71    executor_id: ExecutorId,
72}
73
74impl ExecutorTaskHandle {
75    #[instrument(level = Level::DEBUG, name = "executor.close", skip_all, fields(executor_id= %self.executor_id, component_id=%self.component_id))]
76    pub async fn close(&self) {
77        trace!("Gracefully closing");
78        self.is_closing.store(true, Ordering::Relaxed);
79        while !self.abort_handle.is_finished() {
80            tokio::time::sleep(Duration::from_millis(1)).await;
81        }
82        debug!("Gracefully closed");
83    }
84}
85
86impl Drop for ExecutorTaskHandle {
87    #[instrument(level = Level::DEBUG, name = "executor.drop", skip_all, fields(executor_id= %self.executor_id, component_id=%self.component_id))]
88    fn drop(&mut self) {
89        if self.abort_handle.is_finished() {
90            return;
91        }
92        warn!(executor_id= %self.executor_id, component_id=%self.component_id, "Aborting the executor task");
93        self.abort_handle.abort();
94    }
95}
96
97#[cfg(feature = "test")]
98pub fn extract_ffqns_test(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
99    extract_ffqns(worker)
100}
101
102pub(crate) fn extract_ffqns(worker: &dyn Worker) -> Arc<[FunctionFqn]> {
103    worker
104        .exported_functions()
105        .iter()
106        .map(|FunctionMetadata { ffqn, .. }| ffqn.clone())
107        .collect::<Arc<_>>()
108}
109
110impl<C: ClockFn + 'static, DB: DbConnection + 'static, P: DbPool<DB> + 'static> ExecTask<C, DB, P> {
111    #[cfg(feature = "test")]
112    pub fn new(
113        worker: Arc<dyn Worker>,
114        config: ExecConfig,
115        clock_fn: C,
116        db_pool: P,
117        ffqns: Arc<[FunctionFqn]>,
118    ) -> Self {
119        Self {
120            worker,
121            config,
122            executor_id: ExecutorId::generate(),
123            db_pool,
124            phantom_data: PhantomData,
125            ffqns,
126            clock_fn,
127        }
128    }
129
130    pub fn spawn_new(
131        worker: Arc<dyn Worker>,
132        config: ExecConfig,
133        clock_fn: C,
134        db_pool: P,
135        executor_id: ExecutorId,
136    ) -> ExecutorTaskHandle {
137        let is_closing = Arc::new(AtomicBool::default());
138        let is_closing_inner = is_closing.clone();
139        let ffqns = extract_ffqns(worker.as_ref());
140        let component_id = config.component_id.clone();
141        let abort_handle = tokio::spawn(async move {
142            debug!(%executor_id, component_id = %config.component_id, "Spawned executor");
143            let task = Self {
144                worker,
145                config,
146                executor_id,
147                db_pool,
148                phantom_data: PhantomData,
149                ffqns: ffqns.clone(),
150                clock_fn: clock_fn.clone(),
151            };
152            loop {
153                let _ = task.tick(clock_fn.now()).await;
154                let executed_at = clock_fn.now();
155                task.db_pool
156                    .connection()
157                    .wait_for_pending(executed_at, ffqns.clone(), task.config.tick_sleep)
158                    .await;
159                if is_closing_inner.load(Ordering::Relaxed) {
160                    return;
161                }
162            }
163        })
164        .abort_handle();
165        ExecutorTaskHandle {
166            is_closing,
167            abort_handle,
168            component_id,
169            executor_id,
170        }
171    }
172
173    fn acquire_task_permits(&self) -> Vec<Option<tokio::sync::OwnedSemaphorePermit>> {
174        if let Some(task_limiter) = &self.config.task_limiter {
175            let mut locks = Vec::new();
176            for _ in 0..self.config.batch_size {
177                if let Ok(permit) = task_limiter.clone().try_acquire_owned() {
178                    locks.push(Some(permit));
179                } else {
180                    break;
181                }
182            }
183            locks
184        } else {
185            let mut vec = Vec::with_capacity(self.config.batch_size as usize);
186            for _ in 0..self.config.batch_size {
187                vec.push(None);
188            }
189            vec
190        }
191    }
192
193    #[cfg(feature = "test")]
194    pub async fn tick_test(&self, executed_at: DateTime<Utc>) -> Result<ExecutionProgress, ()> {
195        self.tick(executed_at).await
196    }
197
198    #[instrument(level = Level::TRACE, name = "executor.tick" skip_all, fields(executor_id = %self.executor_id, component_id = %self.config.component_id))]
199    async fn tick(&self, executed_at: DateTime<Utc>) -> Result<ExecutionProgress, ()> {
200        let locked_executions = {
201            let mut permits = self.acquire_task_permits();
202            if permits.is_empty() {
203                return Ok(ExecutionProgress::default());
204            }
205            let db_connection = self.db_pool.connection();
206            let lock_expires_at = executed_at + self.config.lock_expiry;
207            let locked_executions = db_connection
208                .lock_pending(
209                    permits.len(), // batch size
210                    executed_at,   // fetch expiring before now
211                    self.ffqns.clone(),
212                    executed_at, // created at
213                    self.config.component_id.clone(),
214                    self.executor_id,
215                    lock_expires_at,
216                )
217                .await
218                .map_err(|err| {
219                    warn!(executor_id = %self.executor_id, component_id = %self.config.component_id, "lock_pending error {err:?}");
220                })?;
221            // Drop permits if too many were allocated.
222            while permits.len() > locked_executions.len() {
223                permits.pop();
224            }
225            assert_eq!(permits.len(), locked_executions.len());
226            locked_executions.into_iter().zip(permits)
227        };
228        let execution_deadline = executed_at + self.config.lock_expiry;
229
230        let mut executions = Vec::with_capacity(locked_executions.len());
231        for (locked_execution, permit) in locked_executions {
232            let execution_id = locked_execution.execution_id.clone();
233            let join_handle = {
234                let worker = self.worker.clone();
235                let db_pool = self.db_pool.clone();
236                let clock_fn = self.clock_fn.clone();
237                let run_id = locked_execution.run_id;
238                let worker_span = info_span!(parent: None, "worker",
239                    %execution_id, %run_id, ffqn = %locked_execution.ffqn, executor_id = %self.executor_id, component_id = %self.config.component_id);
240                locked_execution.metadata.enrich(&worker_span);
241                tokio::spawn({
242                    let worker_span2 = worker_span.clone();
243                    async move {
244                        let res = Self::run_worker(
245                            worker,
246                            &db_pool,
247                            execution_deadline,
248                            clock_fn,
249                            locked_execution,
250                            worker_span2,
251                        )
252                        .await;
253                        if let Err(db_error) = res {
254                            error!("Execution will be timed out not writing `{db_error:?}`");
255                        }
256                        drop(permit);
257                    }
258                    .instrument(worker_span)
259                })
260            };
261            executions.push((execution_id, join_handle));
262        }
263        Ok(ExecutionProgress { executions })
264    }
265
266    async fn run_worker(
267        worker: Arc<dyn Worker>,
268        db_pool: &P,
269        execution_deadline: DateTime<Utc>,
270        clock_fn: C,
271        locked_execution: LockedExecution,
272        worker_span: Span,
273    ) -> Result<(), DbError> {
274        debug!("Worker::run starting");
275        trace!(
276            version = %locked_execution.version,
277            params = ?locked_execution.params,
278            event_history = ?locked_execution.event_history,
279            "Worker::run starting"
280        );
281        let can_be_retried = ExecutionLog::can_be_retried_after(
282            locked_execution.temporary_event_count + 1,
283            locked_execution.max_retries,
284            locked_execution.retry_exp_backoff,
285        );
286        let unlock_expiry_on_limit_reached =
287            ExecutionLog::compute_retry_duration_when_retrying_forever(
288                locked_execution.temporary_event_count + 1,
289                locked_execution.retry_exp_backoff,
290            );
291        let ctx = WorkerContext {
292            execution_id: locked_execution.execution_id.clone(),
293            metadata: locked_execution.metadata,
294            ffqn: locked_execution.ffqn,
295            params: locked_execution.params,
296            event_history: locked_execution.event_history,
297            responses: locked_execution
298                .responses
299                .into_iter()
300                .map(|outer| outer.event)
301                .collect(),
302            version: locked_execution.version,
303            execution_deadline,
304            can_be_retried: can_be_retried.is_some(),
305            run_id: locked_execution.run_id,
306            worker_span,
307        };
308        let worker_result = worker.run(ctx).await;
309        trace!(?worker_result, "Worker::run finished");
310        let result_obtained_at = clock_fn.now();
311        match Self::worker_result_to_execution_event(
312            locked_execution.execution_id,
313            worker_result,
314            result_obtained_at,
315            locked_execution.parent,
316            can_be_retried,
317            unlock_expiry_on_limit_reached,
318        )? {
319            Some(append) => {
320                let db_connection = db_pool.connection();
321                trace!("Appending {append:?}");
322                append.clone().append(&db_connection).await
323            }
324            None => Ok(()),
325        }
326    }
327
328    // FIXME: On a slow execution: race between `expired_timers_watcher` this if retry_exp_backoff is 0.
329    /// Map the `WorkerError` to an temporary or a permanent failure.
330    #[expect(clippy::too_many_lines)]
331    fn worker_result_to_execution_event(
332        execution_id: ExecutionId,
333        worker_result: WorkerResult,
334        result_obtained_at: DateTime<Utc>,
335        parent: Option<(ExecutionId, JoinSetId)>,
336        can_be_retried: Option<Duration>,
337        unlock_expiry_on_limit_reached: Duration,
338    ) -> Result<Option<Append>, DbError> {
339        Ok(match worker_result {
340            WorkerResult::Ok(result, new_version) => {
341                info!(
342                    "Execution finished: {}",
343                    result.as_pending_state_finished_result()
344                );
345                let child_finished =
346                    parent.map(
347                        |(parent_execution_id, parent_join_set)| ChildFinishedResponse {
348                            parent_execution_id,
349                            parent_join_set,
350                            result: Ok(result.clone()),
351                        },
352                    );
353                let primary_event = AppendRequest {
354                    created_at: result_obtained_at,
355                    event: ExecutionEventInner::Finished { result: Ok(result) },
356                };
357
358                Some(Append {
359                    created_at: result_obtained_at,
360                    primary_event,
361                    execution_id,
362                    version: new_version,
363                    child_finished,
364                })
365            }
366            WorkerResult::DbUpdatedByWorker => None,
367            WorkerResult::Err(err) => {
368                let reason = err.to_string();
369                let (primary_event, child_finished, version) = match err {
370                    WorkerError::TemporaryTimeout => {
371                        info!("Temporary timeout");
372                        // Will be updated by `expired_timers_watcher`.
373                        return Ok(None);
374                    }
375                    WorkerError::DbError(db_error) => {
376                        return Err(db_error);
377                    }
378                    WorkerError::ActivityTrap {
379                        reason: reason_inner,
380                        trap_kind,
381                        detail,
382                        version,
383                    } => {
384                        if let Some(duration) = can_be_retried {
385                            let expires_at = result_obtained_at + duration;
386                            debug!(
387                                "Retrying activity {trap_kind} execution after {duration:?} at {expires_at}"
388                            );
389                            (
390                                ExecutionEventInner::TemporarilyFailed {
391                                    backoff_expires_at: expires_at,
392                                    reason_full: StrVariant::from(reason),
393                                    reason_inner: StrVariant::from(reason_inner),
394                                    detail: Some(detail),
395                                },
396                                None,
397                                version,
398                            )
399                        } else {
400                            info!(
401                                "Activity {trap_kind} marked as permanent failure - {reason_inner}"
402                            );
403                            let result = Err(FinishedExecutionError::PermanentFailure {
404                                reason_inner,
405                                reason_full: reason,
406                                kind: PermanentFailureKind::ActivityTrap,
407                                detail: Some(detail),
408                            });
409                            let child_finished =
410                                parent.map(|(parent_execution_id, parent_join_set)| {
411                                    ChildFinishedResponse {
412                                        parent_execution_id,
413                                        parent_join_set,
414                                        result: result.clone(),
415                                    }
416                                });
417                            (
418                                ExecutionEventInner::Finished { result },
419                                child_finished,
420                                version,
421                            )
422                        }
423                    }
424                    WorkerError::ActivityReturnedError { detail, version } => {
425                        let duration = can_be_retried.expect(
426                            "ActivityReturnedError must not be returned when retries are exhausted",
427                        );
428                        let expires_at = result_obtained_at + duration;
429                        debug!("Retrying ActivityReturnedError after {duration:?} at {expires_at}");
430                        (
431                            ExecutionEventInner::TemporarilyFailed {
432                                backoff_expires_at: expires_at,
433                                reason_full: StrVariant::Static("activity returned error"),
434                                reason_inner: StrVariant::Static("activity returned error"),
435                                detail,
436                            },
437                            None,
438                            version,
439                        )
440                    }
441                    WorkerError::TemporaryWorkflowTrap {
442                        reason: reason_inner,
443                        kind,
444                        detail,
445                        version,
446                    } => {
447                        let duration = can_be_retried.expect("workflows are retried forever");
448                        let expires_at = result_obtained_at + duration;
449                        debug!(
450                            "Retrying workflow {kind} execution after {duration:?} at {expires_at}"
451                        );
452                        (
453                            ExecutionEventInner::TemporarilyFailed {
454                                backoff_expires_at: expires_at,
455                                reason_full: StrVariant::from(reason),
456                                reason_inner: StrVariant::from(reason_inner),
457                                detail,
458                            },
459                            None,
460                            version,
461                        )
462                    }
463                    WorkerError::LimitReached {
464                        reason: inner_reason,
465                        version: new_version,
466                    } => {
467                        let expires_at = result_obtained_at + unlock_expiry_on_limit_reached;
468                        warn!(
469                            "Limit reached: {inner_reason}, unlocking after {unlock_expiry_on_limit_reached:?} at {expires_at}"
470                        );
471                        (
472                            ExecutionEventInner::Unlocked {
473                                backoff_expires_at: expires_at,
474                                reason: StrVariant::from(reason),
475                            },
476                            None,
477                            new_version,
478                        )
479                    }
480                    WorkerError::FatalError(fatal_error, version) => {
481                        info!("Fatal worker error - {fatal_error:?}");
482                        let result = Err(FinishedExecutionError::from(fatal_error));
483                        let child_finished =
484                            parent.map(|(parent_execution_id, parent_join_set)| {
485                                ChildFinishedResponse {
486                                    parent_execution_id,
487                                    parent_join_set,
488                                    result: result.clone(),
489                                }
490                            });
491                        (
492                            ExecutionEventInner::Finished { result },
493                            child_finished,
494                            version,
495                        )
496                    }
497                };
498                Some(Append {
499                    created_at: result_obtained_at,
500                    primary_event: AppendRequest {
501                        created_at: result_obtained_at,
502                        event: primary_event,
503                    },
504                    execution_id,
505                    version,
506                    child_finished,
507                })
508            }
509        })
510    }
511}
512
513#[derive(Debug, Clone)]
514pub(crate) struct ChildFinishedResponse {
515    pub(crate) parent_execution_id: ExecutionId,
516    pub(crate) parent_join_set: JoinSetId,
517    pub(crate) result: FinishedExecutionResult,
518}
519
520#[derive(Debug, Clone)]
521pub(crate) struct Append {
522    pub(crate) created_at: DateTime<Utc>,
523    pub(crate) primary_event: AppendRequest,
524    pub(crate) execution_id: ExecutionId,
525    pub(crate) version: Version,
526    pub(crate) child_finished: Option<ChildFinishedResponse>,
527}
528
529impl Append {
530    pub(crate) async fn append(self, db_connection: &impl DbConnection) -> Result<(), DbError> {
531        if let Some(child_finished) = self.child_finished {
532            assert_matches!(
533                &self.primary_event,
534                AppendRequest {
535                    event: ExecutionEventInner::Finished { .. },
536                    ..
537                }
538            );
539            let derived = assert_matches!(self.execution_id.clone(), ExecutionId::Derived(derived) => derived);
540            db_connection
541                .append_batch_respond_to_parent(
542                    derived.clone(),
543                    self.created_at,
544                    vec![self.primary_event],
545                    self.version.clone(),
546                    child_finished.parent_execution_id,
547                    JoinSetResponseEventOuter {
548                        created_at: self.created_at,
549                        event: JoinSetResponseEvent {
550                            join_set_id: child_finished.parent_join_set,
551                            event: JoinSetResponse::ChildExecutionFinished {
552                                child_execution_id: derived,
553                                // Since self.primary_event is a finished event, the version will remain the same.
554                                finished_version: self.version,
555                                result: child_finished.result,
556                            },
557                        },
558                    },
559                )
560                .await?;
561        } else {
562            db_connection
563                .append_batch(
564                    self.created_at,
565                    vec![self.primary_event],
566                    self.execution_id,
567                    self.version,
568                )
569                .await?;
570        }
571        Ok(())
572    }
573}
574
575#[cfg(any(test, feature = "test"))]
576pub mod simple_worker {
577    use crate::worker::{Worker, WorkerContext, WorkerResult};
578    use async_trait::async_trait;
579    use concepts::{
580        storage::{HistoryEvent, Version},
581        FunctionFqn, FunctionMetadata, ParameterTypes,
582    };
583    use indexmap::IndexMap;
584    use std::sync::Arc;
585    use tracing::trace;
586
587    pub(crate) const FFQN_SOME: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn");
588    pub type SimpleWorkerResultMap =
589        Arc<std::sync::Mutex<IndexMap<Version, (Vec<HistoryEvent>, WorkerResult)>>>;
590
591    #[derive(Clone, Debug)]
592    pub struct SimpleWorker {
593        pub worker_results_rev: SimpleWorkerResultMap,
594        pub ffqn: FunctionFqn,
595        exported: [FunctionMetadata; 1],
596    }
597
598    impl SimpleWorker {
599        #[must_use]
600        pub fn with_single_result(res: WorkerResult) -> Self {
601            Self::with_worker_results_rev(Arc::new(std::sync::Mutex::new(IndexMap::from([(
602                Version::new(2),
603                (vec![], res),
604            )]))))
605        }
606
607        #[must_use]
608        pub fn with_ffqn(self, ffqn: FunctionFqn) -> Self {
609            Self {
610                worker_results_rev: self.worker_results_rev,
611                exported: [FunctionMetadata {
612                    ffqn: ffqn.clone(),
613                    parameter_types: ParameterTypes::default(),
614                    return_type: None,
615                    extension: None,
616                    submittable: true,
617                }],
618                ffqn,
619            }
620        }
621
622        #[must_use]
623        pub fn with_worker_results_rev(worker_results_rev: SimpleWorkerResultMap) -> Self {
624            Self {
625                worker_results_rev,
626                ffqn: FFQN_SOME,
627                exported: [FunctionMetadata {
628                    ffqn: FFQN_SOME,
629                    parameter_types: ParameterTypes::default(),
630                    return_type: None,
631                    extension: None,
632                    submittable: true,
633                }],
634            }
635        }
636    }
637
638    #[async_trait]
639    impl Worker for SimpleWorker {
640        async fn run(&self, ctx: WorkerContext) -> WorkerResult {
641            let (expected_version, (expected_eh, worker_result)) =
642                self.worker_results_rev.lock().unwrap().pop().unwrap();
643            trace!(%expected_version, version = %ctx.version, ?expected_eh, eh = ?ctx.event_history, "Running SimpleWorker");
644            assert_eq!(expected_version, ctx.version);
645            assert_eq!(expected_eh, ctx.event_history);
646            worker_result
647        }
648
649        fn exported_functions(&self) -> &[FunctionMetadata] {
650            &self.exported
651        }
652
653        fn imported_functions(&self) -> &[FunctionMetadata] {
654            &[]
655        }
656    }
657}
658
659#[cfg(test)]
660mod tests {
661    use self::simple_worker::SimpleWorker;
662    use super::*;
663    use crate::worker::FatalError;
664    use crate::{expired_timers_watcher, worker::WorkerResult};
665    use assert_matches::assert_matches;
666    use async_trait::async_trait;
667    use concepts::storage::{CreateRequest, JoinSetRequest};
668    use concepts::storage::{
669        DbConnection, ExecutionEvent, ExecutionEventInner, HistoryEvent, PendingState,
670    };
671    use concepts::time::Now;
672    use concepts::{
673        ClosingStrategy, FunctionMetadata, JoinSetKind, ParameterTypes, Params, StrVariant,
674        SupportedFunctionReturnValue, TrapKind,
675    };
676    use db_tests::Database;
677    use indexmap::IndexMap;
678    use simple_worker::FFQN_SOME;
679    use std::{fmt::Debug, future::Future, ops::Deref, sync::Arc};
680    use test_utils::set_up;
681    use test_utils::sim_clock::{ConstClock, SimClock};
682
683    pub(crate) const FFQN_CHILD: FunctionFqn = FunctionFqn::new_static("pkg/ifc", "fn-child");
684
685    async fn tick_fn<
686        W: Worker + Debug,
687        C: ClockFn + 'static,
688        DB: DbConnection + 'static,
689        P: DbPool<DB> + 'static,
690    >(
691        config: ExecConfig,
692        clock_fn: C,
693        db_pool: P,
694        worker: Arc<W>,
695        executed_at: DateTime<Utc>,
696    ) -> ExecutionProgress {
697        trace!("Ticking with {worker:?}");
698        let ffqns = super::extract_ffqns(worker.as_ref());
699        let executor = ExecTask::new(worker, config, clock_fn, db_pool, ffqns);
700        let mut execution_progress = executor.tick(executed_at).await.unwrap();
701        loop {
702            execution_progress
703                .executions
704                .retain(|(_, abort_handle)| !abort_handle.is_finished());
705            if execution_progress.executions.is_empty() {
706                return execution_progress;
707            }
708            tokio::time::sleep(Duration::from_millis(10)).await;
709        }
710    }
711
712    #[tokio::test]
713    async fn execute_simple_lifecycle_tick_based_mem() {
714        let created_at = Now.now();
715        let (_guard, db_pool) = Database::Memory.set_up().await;
716        execute_simple_lifecycle_tick_based(db_pool.clone(), ConstClock(created_at)).await;
717        db_pool.close().await.unwrap();
718    }
719
720    #[cfg(not(madsim))]
721    #[tokio::test]
722    async fn execute_simple_lifecycle_tick_based_sqlite() {
723        let created_at = Now.now();
724        let (_guard, db_pool) = Database::Sqlite.set_up().await;
725        execute_simple_lifecycle_tick_based(db_pool.clone(), ConstClock(created_at)).await;
726        db_pool.close().await.unwrap();
727    }
728
729    async fn execute_simple_lifecycle_tick_based<
730        DB: DbConnection + 'static,
731        P: DbPool<DB> + 'static,
732        C: ClockFn + 'static,
733    >(
734        pool: P,
735        clock_fn: C,
736    ) {
737        set_up();
738        let created_at = clock_fn.now();
739        let exec_config = ExecConfig {
740            batch_size: 1,
741            lock_expiry: Duration::from_secs(1),
742            tick_sleep: Duration::from_millis(100),
743            component_id: ComponentId::dummy_activity(),
744            task_limiter: None,
745        };
746
747        let execution_log = create_and_tick(
748            CreateAndTickConfig {
749                execution_id: ExecutionId::generate(),
750                created_at,
751                max_retries: 0,
752                executed_at: created_at,
753                retry_exp_backoff: Duration::ZERO,
754            },
755            clock_fn,
756            pool,
757            exec_config,
758            Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
759                SupportedFunctionReturnValue::None,
760                Version::new(2),
761            ))),
762            tick_fn,
763        )
764        .await;
765        assert_matches!(
766            execution_log.events.get(2).unwrap(),
767            ExecutionEvent {
768                event: ExecutionEventInner::Finished {
769                    result: Ok(SupportedFunctionReturnValue::None),
770                },
771                created_at: _,
772            }
773        );
774    }
775
776    #[tokio::test]
777    async fn stochastic_execute_simple_lifecycle_task_based_mem() {
778        set_up();
779        let created_at = Now.now();
780        let clock_fn = ConstClock(created_at);
781        let (_guard, db_pool) = Database::Memory.set_up().await;
782        let exec_config = ExecConfig {
783            batch_size: 1,
784            lock_expiry: Duration::from_secs(1),
785            tick_sleep: Duration::ZERO,
786            component_id: ComponentId::dummy_activity(),
787            task_limiter: None,
788        };
789
790        let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Ok(
791            SupportedFunctionReturnValue::None,
792            Version::new(2),
793        )));
794        let exec_task = ExecTask::spawn_new(
795            worker.clone(),
796            exec_config.clone(),
797            clock_fn,
798            db_pool.clone(),
799            ExecutorId::generate(),
800        );
801
802        let execution_log = create_and_tick(
803            CreateAndTickConfig {
804                execution_id: ExecutionId::generate(),
805                created_at,
806                max_retries: 0,
807                executed_at: created_at,
808                retry_exp_backoff: Duration::ZERO,
809            },
810            clock_fn,
811            db_pool.clone(),
812            exec_config,
813            worker,
814            |_, _, _, _, _| async {
815                tokio::time::sleep(Duration::from_secs(1)).await; // non deterministic if not run in madsim
816                ExecutionProgress::default()
817            },
818        )
819        .await;
820        exec_task.close().await;
821        db_pool.close().await.unwrap();
822        assert_matches!(
823            execution_log.events.get(2).unwrap(),
824            ExecutionEvent {
825                event: ExecutionEventInner::Finished {
826                    result: Ok(SupportedFunctionReturnValue::None),
827                },
828                created_at: _,
829            }
830        );
831    }
832
833    struct CreateAndTickConfig {
834        execution_id: ExecutionId,
835        created_at: DateTime<Utc>,
836        max_retries: u32,
837        executed_at: DateTime<Utc>,
838        retry_exp_backoff: Duration,
839    }
840
841    async fn create_and_tick<
842        W: Worker,
843        C: ClockFn,
844        DB: DbConnection,
845        P: DbPool<DB>,
846        T: FnMut(ExecConfig, C, P, Arc<W>, DateTime<Utc>) -> F,
847        F: Future<Output = ExecutionProgress>,
848    >(
849        config: CreateAndTickConfig,
850        clock_fn: C,
851        db_pool: P,
852        exec_config: ExecConfig,
853        worker: Arc<W>,
854        mut tick: T,
855    ) -> ExecutionLog {
856        // Create an execution
857        let db_connection = db_pool.connection();
858        db_connection
859            .create(CreateRequest {
860                created_at: config.created_at,
861                execution_id: config.execution_id.clone(),
862                ffqn: FFQN_SOME,
863                params: Params::empty(),
864                parent: None,
865                metadata: concepts::ExecutionMetadata::empty(),
866                scheduled_at: config.created_at,
867                retry_exp_backoff: config.retry_exp_backoff,
868                max_retries: config.max_retries,
869                component_id: ComponentId::dummy_activity(),
870                scheduled_by: None,
871            })
872            .await
873            .unwrap();
874        // execute!
875        tick(exec_config, clock_fn, db_pool, worker, config.executed_at).await;
876        let execution_log = db_connection.get(&config.execution_id).await.unwrap();
877        debug!("Execution history after tick: {execution_log:?}");
878        // check that DB contains Created and Locked events.
879        assert_matches!(
880            execution_log.events.first().unwrap(),
881            ExecutionEvent {
882                event: ExecutionEventInner::Created { .. },
883                created_at: actually_created_at,
884            }
885            if config.created_at == *actually_created_at
886        );
887        let locked_at = assert_matches!(
888            execution_log.events.get(1).unwrap(),
889            ExecutionEvent {
890                event: ExecutionEventInner::Locked { .. },
891                created_at: locked_at
892            } if config.created_at <= *locked_at
893            => *locked_at
894        );
895        assert_matches!(execution_log.events.get(2).unwrap(), ExecutionEvent {
896            event: _,
897            created_at: executed_at,
898        } if *executed_at >= locked_at);
899        execution_log
900    }
901
902    #[expect(clippy::too_many_lines)]
903    #[tokio::test]
904    async fn activity_trap_should_trigger_an_execution_retry() {
905        set_up();
906        let sim_clock = SimClock::default();
907        let (_guard, db_pool) = Database::Memory.set_up().await;
908        let exec_config = ExecConfig {
909            batch_size: 1,
910            lock_expiry: Duration::from_secs(1),
911            tick_sleep: Duration::ZERO,
912            component_id: ComponentId::dummy_activity(),
913            task_limiter: None,
914        };
915        let expected_reason = "error reason";
916        let expected_detail = "error detail";
917        let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
918            WorkerError::ActivityTrap {
919                reason: expected_reason.to_string(),
920                trap_kind: concepts::TrapKind::Trap,
921                detail: expected_detail.to_string(),
922                version: Version::new(2),
923            },
924        )));
925        let retry_exp_backoff = Duration::from_millis(100);
926        debug!(now = %sim_clock.now(), "Creating an execution that should fail");
927        let execution_log = create_and_tick(
928            CreateAndTickConfig {
929                execution_id: ExecutionId::generate(),
930                created_at: sim_clock.now(),
931                max_retries: 1,
932                executed_at: sim_clock.now(),
933                retry_exp_backoff,
934            },
935            sim_clock.clone(),
936            db_pool.clone(),
937            exec_config.clone(),
938            worker,
939            tick_fn,
940        )
941        .await;
942        assert_eq!(3, execution_log.events.len());
943        {
944            let (reason_full, reason_inner, detail, at, expires_at) = assert_matches!(
945                &execution_log.events.get(2).unwrap(),
946                ExecutionEvent {
947                    event: ExecutionEventInner::TemporarilyFailed {
948                        reason_inner,
949                        reason_full,
950                        detail,
951                        backoff_expires_at,
952                    },
953                    created_at: at,
954                }
955                => (reason_full, reason_inner, detail, *at, *backoff_expires_at)
956            );
957            assert_eq!(expected_reason, reason_inner.deref());
958            assert_eq!(
959                format!("activity trap: {expected_reason}"),
960                reason_full.deref()
961            );
962            assert_eq!(Some(expected_detail), detail.as_deref());
963            assert_eq!(at, sim_clock.now());
964            assert_eq!(sim_clock.now() + retry_exp_backoff, expires_at);
965        }
966        let worker = Arc::new(SimpleWorker::with_worker_results_rev(Arc::new(
967            std::sync::Mutex::new(IndexMap::from([(
968                Version::new(4),
969                (
970                    vec![],
971                    WorkerResult::Ok(SupportedFunctionReturnValue::None, Version::new(4)),
972                ),
973            )])),
974        )));
975        // noop until `retry_exp_backoff` expires
976        assert!(tick_fn(
977            exec_config.clone(),
978            sim_clock.clone(),
979            db_pool.clone(),
980            worker.clone(),
981            sim_clock.now(),
982        )
983        .await
984        .executions
985        .is_empty());
986        // tick again to finish the execution
987        sim_clock.move_time_forward(retry_exp_backoff).await;
988        tick_fn(
989            exec_config,
990            sim_clock.clone(),
991            db_pool.clone(),
992            worker,
993            sim_clock.now(),
994        )
995        .await;
996        let execution_log = {
997            let db_connection = db_pool.connection();
998            db_connection
999                .get(&execution_log.execution_id)
1000                .await
1001                .unwrap()
1002        };
1003        debug!(now = %sim_clock.now(), "Execution history after second tick: {execution_log:?}");
1004        assert_matches!(
1005            execution_log.events.get(3).unwrap(),
1006            ExecutionEvent {
1007                event: ExecutionEventInner::Locked { .. },
1008                created_at: at
1009            } if *at == sim_clock.now()
1010        );
1011        assert_matches!(
1012            execution_log.events.get(4).unwrap(),
1013            ExecutionEvent {
1014                event: ExecutionEventInner::Finished {
1015                    result: Ok(SupportedFunctionReturnValue::None),
1016                },
1017                created_at: finished_at,
1018            } if *finished_at == sim_clock.now()
1019        );
1020        db_pool.close().await.unwrap();
1021    }
1022
1023    #[tokio::test]
1024    async fn activity_trap_should_not_be_retried_if_no_retries_are_set() {
1025        set_up();
1026        let created_at = Now.now();
1027        let clock_fn = ConstClock(created_at);
1028        let (_guard, db_pool) = Database::Memory.set_up().await;
1029        let exec_config = ExecConfig {
1030            batch_size: 1,
1031            lock_expiry: Duration::from_secs(1),
1032            tick_sleep: Duration::ZERO,
1033            component_id: ComponentId::dummy_activity(),
1034            task_limiter: None,
1035        };
1036
1037        let expected_reason = "error reason";
1038        let expected_detail = "error detail";
1039        let worker = Arc::new(SimpleWorker::with_single_result(WorkerResult::Err(
1040            WorkerError::ActivityTrap {
1041                reason: expected_reason.to_string(),
1042                trap_kind: concepts::TrapKind::Trap,
1043                detail: expected_detail.to_string(),
1044                version: Version::new(2),
1045            },
1046        )));
1047        let execution_log = create_and_tick(
1048            CreateAndTickConfig {
1049                execution_id: ExecutionId::generate(),
1050                created_at,
1051                max_retries: 0,
1052                executed_at: created_at,
1053                retry_exp_backoff: Duration::ZERO,
1054            },
1055            clock_fn,
1056            db_pool.clone(),
1057            exec_config.clone(),
1058            worker,
1059            tick_fn,
1060        )
1061        .await;
1062        assert_eq!(3, execution_log.events.len());
1063        let (reason, kind, detail) = assert_matches!(
1064            &execution_log.events.get(2).unwrap(),
1065            ExecutionEvent {
1066                event: ExecutionEventInner::Finished{
1067                    result: Err(FinishedExecutionError::PermanentFailure{reason_inner, kind, detail, reason_full:_})
1068                },
1069                created_at: at,
1070            } if *at == created_at
1071            => (reason_inner, kind, detail)
1072        );
1073        assert_eq!(expected_reason, *reason);
1074        assert_eq!(Some(expected_detail), detail.as_deref());
1075        assert_eq!(PermanentFailureKind::ActivityTrap, *kind);
1076
1077        db_pool.close().await.unwrap();
1078    }
1079
1080    #[tokio::test]
1081    async fn child_execution_permanently_failed_should_notify_parent_permanent_failure() {
1082        let worker_error = WorkerError::ActivityTrap {
1083            reason: "error reason".to_string(),
1084            trap_kind: TrapKind::Trap,
1085            detail: "detail".to_string(),
1086            version: Version::new(2),
1087        };
1088        let expected_child_err = FinishedExecutionError::PermanentFailure {
1089            reason_full: "activity trap: error reason".to_string(),
1090            reason_inner: "error reason".to_string(),
1091            kind: PermanentFailureKind::ActivityTrap,
1092            detail: Some("detail".to_string()),
1093        };
1094        child_execution_permanently_failed_should_notify_parent(worker_error, expected_child_err)
1095            .await;
1096    }
1097
1098    #[tokio::test]
1099    async fn child_execution_permanently_failed_should_notify_parent_timeout() {
1100        let worker_error = WorkerError::TemporaryTimeout;
1101        let expected_child_err = FinishedExecutionError::PermanentTimeout;
1102        child_execution_permanently_failed_should_notify_parent(worker_error, expected_child_err)
1103            .await;
1104    }
1105
1106    #[tokio::test]
1107    async fn child_execution_permanently_failed_should_notify_parent_unhandled_child() {
1108        let parent_id = ExecutionId::from_parts(1, 1);
1109        let join_set_id_outer =
1110            JoinSetId::new(JoinSetKind::OneOff, StrVariant::Static("outer")).unwrap();
1111        let root_cause_id = parent_id.next_level(&join_set_id_outer);
1112        let join_set_id_inner =
1113            JoinSetId::new(JoinSetKind::OneOff, StrVariant::Static("inner")).unwrap();
1114        let child_execution_id = root_cause_id.next_level(&join_set_id_inner);
1115        let worker_error = WorkerError::FatalError(
1116            FatalError::UnhandledChildExecutionError {
1117                child_execution_id: child_execution_id.clone(),
1118                root_cause_id: root_cause_id.clone(),
1119            },
1120            Version::new(2),
1121        );
1122        let expected_child_err = FinishedExecutionError::UnhandledChildExecutionError {
1123            child_execution_id,
1124            root_cause_id,
1125        };
1126        child_execution_permanently_failed_should_notify_parent(worker_error, expected_child_err)
1127            .await;
1128    }
1129
1130    #[expect(clippy::too_many_lines)]
1131    async fn child_execution_permanently_failed_should_notify_parent(
1132        worker_error: WorkerError,
1133        expected_child_err: FinishedExecutionError,
1134    ) {
1135        use concepts::storage::JoinSetResponseEventOuter;
1136        const LOCK_EXPIRY: Duration = Duration::from_secs(1);
1137
1138        set_up();
1139        let sim_clock = SimClock::default();
1140        let (_guard, db_pool) = Database::Memory.set_up().await;
1141
1142        let parent_worker = Arc::new(SimpleWorker::with_single_result(
1143            WorkerResult::DbUpdatedByWorker,
1144        ));
1145        let parent_execution_id = ExecutionId::generate();
1146        db_pool
1147            .connection()
1148            .create(CreateRequest {
1149                created_at: sim_clock.now(),
1150                execution_id: parent_execution_id.clone(),
1151                ffqn: FFQN_SOME,
1152                params: Params::empty(),
1153                parent: None,
1154                metadata: concepts::ExecutionMetadata::empty(),
1155                scheduled_at: sim_clock.now(),
1156                retry_exp_backoff: Duration::ZERO,
1157                max_retries: 0,
1158                component_id: ComponentId::dummy_activity(),
1159                scheduled_by: None,
1160            })
1161            .await
1162            .unwrap();
1163        tick_fn(
1164            ExecConfig {
1165                batch_size: 1,
1166                lock_expiry: LOCK_EXPIRY,
1167                tick_sleep: Duration::ZERO,
1168                component_id: ComponentId::dummy_activity(),
1169                task_limiter: None,
1170            },
1171            sim_clock.clone(),
1172            db_pool.clone(),
1173            parent_worker,
1174            sim_clock.now(),
1175        )
1176        .await;
1177
1178        let join_set_id = JoinSetId::new(JoinSetKind::OneOff, StrVariant::empty()).unwrap();
1179        let child_execution_id = parent_execution_id.next_level(&join_set_id);
1180        // executor does not append anything, this should have been written by the worker:
1181        {
1182            let child = CreateRequest {
1183                created_at: sim_clock.now(),
1184                execution_id: ExecutionId::Derived(child_execution_id.clone()),
1185                ffqn: FFQN_CHILD,
1186                params: Params::empty(),
1187                parent: Some((parent_execution_id.clone(), join_set_id.clone())),
1188                metadata: concepts::ExecutionMetadata::empty(),
1189                scheduled_at: sim_clock.now(),
1190                retry_exp_backoff: Duration::ZERO,
1191                max_retries: 0,
1192                component_id: ComponentId::dummy_activity(),
1193                scheduled_by: None,
1194            };
1195            let current_time = sim_clock.now();
1196            let join_set = AppendRequest {
1197                created_at: current_time,
1198                event: ExecutionEventInner::HistoryEvent {
1199                    event: HistoryEvent::JoinSet {
1200                        join_set_id: join_set_id.clone(),
1201                        closing_strategy: ClosingStrategy::Complete,
1202                    },
1203                },
1204            };
1205            let child_exec_req = AppendRequest {
1206                created_at: current_time,
1207                event: ExecutionEventInner::HistoryEvent {
1208                    event: HistoryEvent::JoinSetRequest {
1209                        join_set_id: join_set_id.clone(),
1210                        request: JoinSetRequest::ChildExecutionRequest {
1211                            child_execution_id: child_execution_id.clone(),
1212                        },
1213                    },
1214                },
1215            };
1216            let join_next = AppendRequest {
1217                created_at: current_time,
1218                event: ExecutionEventInner::HistoryEvent {
1219                    event: HistoryEvent::JoinNext {
1220                        join_set_id: join_set_id.clone(),
1221                        run_expires_at: sim_clock.now(),
1222                        closing: false,
1223                    },
1224                },
1225            };
1226            db_pool
1227                .connection()
1228                .append_batch_create_new_execution(
1229                    current_time,
1230                    vec![join_set, child_exec_req, join_next],
1231                    parent_execution_id.clone(),
1232                    Version::new(2),
1233                    vec![child],
1234                )
1235                .await
1236                .unwrap();
1237        }
1238
1239        let child_worker = Arc::new(
1240            SimpleWorker::with_single_result(WorkerResult::Err(worker_error)).with_ffqn(FFQN_CHILD),
1241        );
1242
1243        // execute the child
1244        tick_fn(
1245            ExecConfig {
1246                batch_size: 1,
1247                lock_expiry: LOCK_EXPIRY,
1248                tick_sleep: Duration::ZERO,
1249                component_id: ComponentId::dummy_activity(),
1250                task_limiter: None,
1251            },
1252            sim_clock.clone(),
1253            db_pool.clone(),
1254            child_worker,
1255            sim_clock.now(),
1256        )
1257        .await;
1258        if expected_child_err == FinishedExecutionError::PermanentTimeout {
1259            // In case of timeout, let the timers watcher handle it
1260            sim_clock.move_time_forward(LOCK_EXPIRY).await;
1261            expired_timers_watcher::tick(&db_pool.connection(), sim_clock.now())
1262                .await
1263                .unwrap();
1264        }
1265        let child_log = db_pool
1266            .connection()
1267            .get(&ExecutionId::Derived(child_execution_id.clone()))
1268            .await
1269            .unwrap();
1270        assert!(child_log.pending_state.is_finished());
1271        assert_eq!(
1272            Version(2),
1273            child_log.next_version,
1274            "created = 0, locked = 1, with_single_result = 2"
1275        );
1276        assert_eq!(
1277            ExecutionEventInner::Finished {
1278                result: Err(expected_child_err)
1279            },
1280            child_log.last_event().event
1281        );
1282        let parent_log = db_pool
1283            .connection()
1284            .get(&parent_execution_id)
1285            .await
1286            .unwrap();
1287        assert_matches!(
1288            parent_log.pending_state,
1289            PendingState::PendingAt {
1290                scheduled_at
1291            } if scheduled_at == sim_clock.now(),
1292            "parent should be back to pending"
1293        );
1294        let (found_join_set_id, found_child_execution_id, child_finished_version, found_result) = assert_matches!(
1295            parent_log.responses.last(),
1296            Some(JoinSetResponseEventOuter{
1297                created_at: at,
1298                event: JoinSetResponseEvent{
1299                    join_set_id: found_join_set_id,
1300                    event: JoinSetResponse::ChildExecutionFinished {
1301                        child_execution_id: found_child_execution_id,
1302                        finished_version,
1303                        result: found_result,
1304                    }
1305                }
1306            })
1307             if *at == sim_clock.now()
1308            => (found_join_set_id, found_child_execution_id, finished_version, found_result)
1309        );
1310        assert_eq!(join_set_id, *found_join_set_id);
1311        assert_eq!(child_execution_id, *found_child_execution_id);
1312        assert_eq!(child_log.next_version, *child_finished_version);
1313        assert!(found_result.is_err());
1314
1315        db_pool.close().await.unwrap();
1316    }
1317
1318    #[derive(Clone, Debug)]
1319    struct SleepyWorker {
1320        duration: Duration,
1321        result: SupportedFunctionReturnValue,
1322        exported: [FunctionMetadata; 1],
1323    }
1324
1325    #[async_trait]
1326    impl Worker for SleepyWorker {
1327        async fn run(&self, ctx: WorkerContext) -> WorkerResult {
1328            tokio::time::sleep(self.duration).await;
1329            WorkerResult::Ok(self.result.clone(), ctx.version)
1330        }
1331
1332        fn exported_functions(&self) -> &[FunctionMetadata] {
1333            &self.exported
1334        }
1335
1336        fn imported_functions(&self) -> &[FunctionMetadata] {
1337            &[]
1338        }
1339    }
1340
1341    #[expect(clippy::too_many_lines)]
1342    #[tokio::test]
1343    async fn hanging_lock_should_be_cleaned_and_execution_retried() {
1344        set_up();
1345        let sim_clock = SimClock::default();
1346        let (_guard, db_pool) = Database::Memory.set_up().await;
1347        let lock_expiry = Duration::from_millis(100);
1348        let exec_config = ExecConfig {
1349            batch_size: 1,
1350            lock_expiry,
1351            tick_sleep: Duration::ZERO,
1352            component_id: ComponentId::dummy_activity(),
1353            task_limiter: None,
1354        };
1355
1356        let worker = Arc::new(SleepyWorker {
1357            duration: lock_expiry + Duration::from_millis(1), // sleep more than allowed by the lock expiry
1358            result: SupportedFunctionReturnValue::None,
1359            exported: [FunctionMetadata {
1360                ffqn: FFQN_SOME,
1361                parameter_types: ParameterTypes::default(),
1362                return_type: None,
1363                extension: None,
1364                submittable: true,
1365            }],
1366        });
1367        // Create an execution
1368        let execution_id = ExecutionId::generate();
1369        let timeout_duration = Duration::from_millis(300);
1370        let db_connection = db_pool.connection();
1371        db_connection
1372            .create(CreateRequest {
1373                created_at: sim_clock.now(),
1374                execution_id: execution_id.clone(),
1375                ffqn: FFQN_SOME,
1376                params: Params::empty(),
1377                parent: None,
1378                metadata: concepts::ExecutionMetadata::empty(),
1379                scheduled_at: sim_clock.now(),
1380                retry_exp_backoff: timeout_duration,
1381                max_retries: 1,
1382                component_id: ComponentId::dummy_activity(),
1383                scheduled_by: None,
1384            })
1385            .await
1386            .unwrap();
1387
1388        let ffqns = super::extract_ffqns(worker.as_ref());
1389        let executor = ExecTask::new(
1390            worker,
1391            exec_config,
1392            sim_clock.clone(),
1393            db_pool.clone(),
1394            ffqns,
1395        );
1396        let mut first_execution_progress = executor.tick(sim_clock.now()).await.unwrap();
1397        assert_eq!(1, first_execution_progress.executions.len());
1398        // Started hanging, wait for lock expiry.
1399        sim_clock.move_time_forward(lock_expiry).await;
1400        // cleanup should be called
1401        let now_after_first_lock_expiry = sim_clock.now();
1402        {
1403            debug!(now = %now_after_first_lock_expiry, "Expecting an expired lock");
1404            let cleanup_progress = executor.tick(now_after_first_lock_expiry).await.unwrap();
1405            assert!(cleanup_progress.executions.is_empty());
1406        }
1407        {
1408            let expired_locks =
1409                expired_timers_watcher::tick(&db_pool.connection(), now_after_first_lock_expiry)
1410                    .await
1411                    .unwrap()
1412                    .expired_locks;
1413            assert_eq!(1, expired_locks);
1414        }
1415        assert!(!first_execution_progress
1416            .executions
1417            .pop()
1418            .unwrap()
1419            .1
1420            .is_finished());
1421
1422        let execution_log = db_connection.get(&execution_id).await.unwrap();
1423        let expected_first_timeout_expiry = now_after_first_lock_expiry + timeout_duration;
1424        assert_matches!(
1425            &execution_log.events.get(2).unwrap(),
1426            ExecutionEvent {
1427                event: ExecutionEventInner::TemporarilyTimedOut { backoff_expires_at },
1428                created_at: at,
1429            } if *at == now_after_first_lock_expiry && *backoff_expires_at == expected_first_timeout_expiry
1430        );
1431        assert_eq!(
1432            PendingState::PendingAt {
1433                scheduled_at: expected_first_timeout_expiry
1434            },
1435            execution_log.pending_state
1436        );
1437        sim_clock.move_time_forward(timeout_duration).await;
1438        let now_after_first_timeout = sim_clock.now();
1439        debug!(now = %now_after_first_timeout, "Second execution should hang again and result in a permanent timeout");
1440
1441        let mut second_execution_progress = executor.tick(now_after_first_timeout).await.unwrap();
1442        assert_eq!(1, second_execution_progress.executions.len());
1443
1444        // Started hanging, wait for lock expiry.
1445        sim_clock.move_time_forward(lock_expiry).await;
1446        // cleanup should be called
1447        let now_after_second_lock_expiry = sim_clock.now();
1448        debug!(now = %now_after_second_lock_expiry, "Expecting the second lock to be expired");
1449        {
1450            let cleanup_progress = executor.tick(now_after_second_lock_expiry).await.unwrap();
1451            assert!(cleanup_progress.executions.is_empty());
1452        }
1453        {
1454            let expired_locks =
1455                expired_timers_watcher::tick(&db_pool.connection(), now_after_second_lock_expiry)
1456                    .await
1457                    .unwrap()
1458                    .expired_locks;
1459            assert_eq!(1, expired_locks);
1460        }
1461        assert!(!second_execution_progress
1462            .executions
1463            .pop()
1464            .unwrap()
1465            .1
1466            .is_finished());
1467
1468        drop(db_connection);
1469        drop(executor);
1470        db_pool.close().await.unwrap();
1471    }
1472}